用Go编写一个简单的定时器,首先需要考虑定时器需要实现哪些功能?

  1. 定时执行
  2. 拦截执行错误
  3. 批量添加执行,检测执行存活
  4. 批量停止执行,检测执行停止

定义结构体

要解决我们第1.3.4点的需求,需要定义一个结构体,用于存储任务信息,以及任务状态

1
2
3
4
5
6
7
8
9
10
11
12
type _Job struct {
// 名称
Name string
// 是否运行
Running bool
// 执行间隔
Ticker *time.Ticker
// 任务方法
Call func()
// 是否已关机
IsShutdown bool
}

在执行过程中,通过不断获取结构体中的信息,来检测状态以及执行任务

初始化任务

为了在程序启动时自动开始任务,我们定义了一个jobMap用来全局存储任务状态,以及定义了一个初始化函数initJob,并在其中添加了个任务myJob。这些任务会以一定的频率(例如每分钟一次)执行。如果你不希望系统执行任务,可以在配置中关闭它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var jobMap sync.Map

func initJob() {
if !global.Config.System.Job {
return
}
fmt.Println("========start job =================")

myJob(time.Minute * 1)
}

func myJob(d time.Duration) {
addJob(d, func() {
if err := job_service.exampleJob(); err != nil {
global.Logger.Error("myJob_exampleJob_err:", zap.Error(err))
}
})
}

添加任务

为了批量添加任务,我们定义了一个addJob函数,它接受一个执行间隔和一个函数作为参数,将它们封装为一个_Job实例,并将实例存储在jobMap中。然后,这个函数启动一个新的goroutine来执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func addJob(d time.Duration, call func()) {
ticker := time.NewTicker(d)
name := runtime.FuncForPC(reflect.ValueOf(call).Pointer()).Name()
job := _Job{
Name: name,
Running: false,
Ticker: ticker,
Call: call,
IsShutdown: false,
}
jobMap.Store(name, job)
go func(do _Job) {
defer func() {
if err := recover(); err != nil {
global.Logger.Error("job_goroutine_error",
zap.Any("error msg:", err),
zap.Stack("addJob_goroutine_stack"),
)
}
}()

for range ticker.C {
v, _ := jobMap.Load(do.Name)
runJob := v.(_Job)
if runJob.IsShutdown == true {
return
}
runJob.Running = true
jobMap.Store(do.Name, runJob)
do.Call()
runJob.Running = false
jobMap.Store(do.Name, runJob)
}

}(job)
}

停止任务

在某些情况下,我们可能需要停止正在运行的任务。例如,当我们想要关闭系统时,我们需要优雅地关闭所有正在运行的任务。为此,我们定义了一个stopJob函数,它将遍历jobMap中的所有任务,停止它们的定时器,并将IsShutdown字段设置为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func stopJob() {
fmt.Println("========stop job =================")
jobList := make([]_Job, 0)

jobMap.Range(func(k, v interface{}) bool {
jobList = append(jobList, v.(_Job))
return true
})
// 先停止定时器
for i := range jobList {
job := jobList[i]
job.Ticker.Stop()
job.IsShutdown = true
jobMap.Store(job.Name, job)
}
// 查看定时器是否在运行
// 等待运行中结束
for i := range jobList {

for {
v, _ := jobMap.Load(jobList[i].Name)
job := v.(_Job)
if !job.Running {
break
}
global.Logger.Info("job is running", zap.String("name", job.Name))


// 睡眠一秒
time.Sleep(time.Second)
}
}

}

这种方式的定时任务设计,使得我们可以很方便地对任务进行控制和管理,同时能够保证任务的正常执行和优雅的停止。

异常捕获

在执行定时任务时,我们不可避免地会遇到一些错误或异常。为了避免一个任务的异常影响到其他任务或者整个程序,我们需要对每个任务的执行进行异常捕获。这个逻辑在我们的 addJob 函数的 goroutine 中实现。

1
2
3
4
5
6
7
8
9
10
11
go func(do _Job) {
defer func() {
if err := recover(); err != nil {
global.Logger.Error("job_goroutine_error",
zap.Any("error msg:", err),
zap.Stack("addJob_goroutine_stack"),
)
}
}()
// ....
}(job)

在这段代码中,我们使用 deferrecover 函数来捕获可能产生的 panic。如果有 panic 发生,我们就记录下来,并打印相应的堆栈信息。

总结

在此,我们实现了一个简单的定时任务系统,它包括:

  • 使用 sync.Map 保存所有任务
  • 定义 _Job 结构体用来描述一个任务
  • 提供 initJob 函数来启动任务
  • 提供 addJob 函数来新增任务
  • 提供 stopJob 函数来停止所有任务
  • 异常捕获以保证任务出错时不会影响整个程序的运行

这个系统还有很多可以改进的地方。比如,我们可以增加任务的删除和修改功能,可以提供统一的任务错误处理机制,可以增加任务的优先级设置,等等。但是,基本的设计思路和实现方式已经在这里了,你可以根据你的需要进行修改和扩展。