Go语言里线程池的实现

Go语言里的标准库里没有线程池的概念,不过我们可以利用Go强大的并发特性自己构建,而且在其它语言里也有成熟的设计模型可供借鉴,这里我们参考Java的线程池模型实现部分功能。

另外,确切地说,这里的线程并非真正意义上的线程,用“协程”这个概念称呼更为合适。

Future的定义

首先我需要实现一个Future的接口, 它是一个任务执行的载体。

1
2
3
4
5
type Future interface {
Run() error
Success()
Failed(error)
}

以后,每个要执行的任务都要实现这个接口的方法。

下面我们来通过具体的代码实现这些方法。

Future的具体实现

我们通过两个具体的结构体实现接口。

第一个结构体的作用,是在它初始化的时候给它传进一个整型数组和一个权重值,在它执行时,把这个整型数组的每个元素都乘以这个权重值再输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type NumTask struct {
Name string
Elements []int
Weight int
IsStop bool
}
func (n NumTask) Run() error {
if len(n.Elements) == 0 {
return errors.New("The args should not be null!")
}
for index, element := range n.Elements {
n.Elements[index] = element*n.Weight
}
return nil
}
func (n NumTask)Success() {
for _,element := range n.Elements {
fmt.Printf("The element in task %s is: %d\n",n.Name, element)
}
fmt.Println(n.Name, "has done!")
}
func (n NumTask)Failed(err error) {
if err != nil {
beego.Error(n.Name, err)
}
}

第二个结构体的作用,是在初始化时传入一个字符串,在它执行时,把字符串里的字母全部转化成小写字母再输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type LowerTask struct {
Name string
Content string
}
func (n LowerTask) Run() error {
if n.Content == "" {
return errors.New("The args should not be null!")
}
n.Content = strings.ToLower(n.Content)
return nil
}
func (n LowerTask)Success() {
fmt.Println(n.Content)
fmt.Println(n.Name, "has done!")
}
func (n LowerTask)Failed(err error) {
if err != nil {
beego.Error(n.Name, err)
}
}

Executor的实现

Executor的结构很简单,我们力求精简,里面只有一个成员 tasks, 它是一个通道类型,它存储了以后我们要执行的任务。

1
2
3
type Executor struct {
tasks chan Future
}

NewExecutor 函数返回一个初始化后的Executor,这里固定成可以存储10个Future。

通过Executor 的Submit 方法可以提交一个任务,当任务提交完后,调用Executor 的Start 方法后,Executor 会不断轮循任务队列,依次执行所有 的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (e *Executor) Submit(future Future) {
e.tasks <- future
}
func (e *Executor) Start() {
for {
select {
case task := <- e.tasks:
go e.exec(task)
}
}
}
func NewExecutor() *Executor {
return &Executor{tasks: make(chan Future, 10)}
}

在 Executor 的Start 方法中,会通过它另外一个方法exec 再调用每个Future的Run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
func (e *Executor) exec(future Future) {
defer func() {
if err:=recover(); err!=nil {}
fmt.Errorf("Task execute error!")
}()
err := future.Run()
if err != nil {
future.Failed(err)
} else {
future.Success()
}
}

当Future 的Run 方法 执行失败后,有错误返回,会继续调用Future的Failed方法;如果没有错误返回,则调用Future的Success 方法。

启动Executor 执行任务

​ 在main 函数里执行代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
exec := executor.NewExecutor();
future1 := executor.NumTask{
Name: "Number task",
Elements: []int{1 , 2, 3 ,4},
Weight: 3,
}
exec.Submit(future1)
future2 := executor.LowerTask{
Name: "String convert task",
Content: "I am just thirsty!",
}
exec.Submit(future2)
exec.Start()
}
//output:
// I am just thirsty!
// String convert task has done!
// The element in task Number task is: 3
// The element in task Number task is: 6
// The element in task Number task is: 9
// The element in task Number task is: 12
// Number task has done!