论如何设计一个简单的守护进程,并增加一些自己的需求
业务中经常性遇到希望实时监听的需求, 最后硬生生改成了定时任务
- 场景
需要实施监听,满足某种需求,如到点执行ps: 一般秒杀,到点执行,可通过限制设置时间(只能选择整点/每5/10分钟等)来定时扫描达到目的
-
我想要的守护进程
1.指定任意时刻执行程序
2.通过任务队列来执行
3.新task来临时,队列更新(指针即可),总体维持固定携程数量进行执行 - 如何设计
1.建立模型,定义单例模式
将需要执行的内容做成一个函数进行执行
// daemon progress
type Daemon struct {
Task chan *Task // 当前频道任务,即正在执行的任务
tasks []*Task // 任务队列
Num int // 监听连接池数量
}
type Task struct {
//ID uint64 // job id, 0++
daemonFunc DaemonFunc // exec func
Time *time.CTime // ctime
}
type TimeTask struct {
Task
Time time.CTime // ctime
}
type DaemonFunc func()
......
// ======= Singleton ========
// 单例模式
var (
daemon *Daemon
onceDaemon sync.Once
)
func Daemoner() *Daemon {
onceDaemon.Do(func() {
daemon = newDaemon()
go daemon.task()
go daemon.taskQueue()
})
return daemon
}
2.将新的任务加入队列
维持一个全局任务队列,不断的将任务加入其中
// add daemon task
func (d *Daemon) AddTask(params ...Param) *Daemon {
task := &Task{}
for _, p := range params {
p(task)
}
// d.Task <- task
d.tasks = append(d.tasks, task)
return d
}
3.任务执行队列,以及扫描新加入的队列任务
从全局队列中进行不断的扫描,扫描过程中,通过chan进行通信,执行任务,当任务连接池任务满员后,任务队列进入阻塞等待状态
一旦任务连接池出现空闲等待队列,阻塞状态结束,发送任务执行,进行下一个任务阻塞/发送
my: 其中加入了时间的扫描,符合需求则执行时间任务,执行结束,将该时间任务移除队列
// running task queue
func (d *Daemon) taskQueue() {
for {
for k := 0; k < len(d.tasks); k++ {
task := d.tasks[k]
if task.Time != nil {
if time2.Time(*task.Time).After(time2.Now()) {
//fmt.Println("time early")
continue
} else {
d.Task <- task
d.tasks = append(d.tasks[:k], d.tasks[k+1:]...)
k--
continue
}
}
d.Task <- task
}
//fmt.Println("==split line===")
}
}
4.连接池执行监听任务
启动指定数量的goroutine等待监听任务
// daemon goroutine pool nun
func (d *Daemon) task() {
for i := 0; i < d.Num; i++ {
//i := i
go func() {
for {
//fmt.Println("协程", i)
task := <-d.Task
task.daemonFunc()
}
}()
}
}
ps: 源码摘自: gt/daemon
- 来看上面函数作用
1.定义守护携程模型,从第一次引用开始创建单例模式守护进程
2.通过单例模式,分别启动了队列扫描传递任务和goroutine pool连接池执行任务两个后台进程
3.后续不断通过AddTask()添加任务到队列中,等待任务扫描执行即可