0%

goroutine 进程池

不知道你有没有这种需求了,有大量的数据需要处理,但是系统的资源有限,所有只有开一个固定的进程数进行处理,一个进程结束下一个进程直接顶上。可能在其他语言中称这种处理方式为异步,但是在 go 中叫进程池更准确。

我之前获取了大概 两百万的数据需要处理,这两百万调数据涉及网络访问,网络响应大概在一毫秒到十秒之间。之前是在云服务器上处理数据,因为是并发的效率较低。大概是 1k goroutine 在处理数据,比较占用 cpu 所以打算迁移到位于本地的树莓派上处理。当调用 1k goroutine 时,程序大概会在过几秒钟后崩溃,调低 goroutine 数后正常了这个数字我调到了 200。这意味着效率降低了 5倍 。俗话说的好,穷则思变,或者说之前的程序只是打草稿,我本来就打算改进。改用了进程池来处理数据。改用进程池后,虽然由于树莓派的性能限制,goroutine 数降低了5倍,但是效率在原来1k goroutine 上还提升了 5 倍。

关于 维持一个稳定的 goroutine 我第一个想到的就是使用缓冲 channel 。需要多少 goroutine 就声明多大的缓冲,然后将它塞满。起一个 goroutine 取一个值,然后在 goroutine 结束时还回去。当 channel 里的值取完后自然会阻塞不会新建 goroutine。最后当数据读取完后再抽出 channel 里的所有值以堵塞主进程等待数据处理完成。我是这么想的也是这么做的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import(
"fmt"
)

func main(){
var coroutine int = 3 //进程池数量

ch := make(chan bool,coroutine)

for i := 0; i < coroutine;i++{ // 将进程池填满
ch <- true
}

num := 0 //单纯的为本次演示创建的控制条件,在实际使用中可以替换为其他

for{
num ++
if num > 10{ //条件满足时退出循环,我的原版代码是 c == io.EOF
break
}
<-ch //在起 goroutine 时取走一个值,注意最好放在 goroutine 启动的上一行退
go func(in int){
fmt.Println(in) //这里处理数据
ch <- true //在 goroutine 的最后将值还回去
}(num)
}

for i := 0;i < coroutine;i++{ // 当数据读取完后,须抽干进程池,以等待进程的退出
<-ch
}
}

大概控制流程类似于上面的演示代码,当然实际代码并不是这样的,如果你也在 telegram go 中文群的话你应该见过我的代码。后来 dalao 推荐了两个官方开发库 golang.org/x/time/rate 和 golang.org/x/sync/semaphore 。golang.org/x/time/rate 我没怎么看过,golang.org/x/sync/semaphore 从名称来看比较感兴趣所以查询了一下资料。

关于 golang.org/x/sync/semaphore 的文档请查看 https://godoc.org/golang.org/x/sync/semaphore

我简单的查看了一下示例感觉非常的熟悉,感觉和我自己做的那个结构差不多。如果你对原版文档比较感兴趣也可以查看原版文档,接下来贴出的是官方的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
"context"
"fmt"
"log"
"runtime"

"golang.org/x/sync/semaphore"
)

// Example_workerPool demonstrates how to use a semaphore to limit the number of
// goroutines working on parallel tasks.
//
// This use of a semaphore mimics a typical “worker pool” pattern, but without
// the need to explicitly shut down idle workers when the work is done.
func main() {
ctx := context.TODO()

var (
maxWorkers = runtime.GOMAXPROCS(0)
sem = semaphore.NewWeighted(int64(maxWorkers))
out = make([]int, 32)
)

// Compute the output using up to maxWorkers goroutines at a time.
for i := range out {
// When maxWorkers goroutines are in flight, Acquire blocks until one of the
// workers finishes.
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}

go func(i int) {
defer sem.Release(1)
out[i] = collatzSteps(i + 1)
}(i)
}

// Acquire all of the tokens to wait for any remaining workers to finish.
//
// If you are already waiting for the workers by some other means (such as an
// errgroup.Group), you can omit this final Acquire call.
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}

fmt.Println(out)

}

// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}

for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}

if n%2 == 0 {
n /= 2
continue
}

const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}

return steps
}

以上的示例有两个关键参数 AcquireRelease。 使用 Acquire 从进程池中取走需要使用的进程, 使用 Release 在 goroutine 中归还进程。官方的示例可能稍微有一点复杂。我们可以对他进行一下简化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"context" // context 库在 go1.7 合并到主线
"fmt"
"log"

"golang.org/x/sync/semaphore"
)

func main() {
var coroutine int64 = 3 // 进程池总数

ctx := context.Background()
sem := semaphore.NewWeighted(coroutine) // 创建进程池

index := 0 // 单纯的控制条件,可能在实际中并没有什么意义
for {
index++
if index > 10 { // 条件满足后退出循环
break
}

if err := sem.Acquire(ctx, 1); err != nil { // 从进程池中取出进程使用,还是之前讲的原则,最好放在 goroutine 开始的前一行
log.Printf("start 无法获取信号量: %v\n", err)
break
}
go func(i int) {
defer sem.Release(1) // 在结束时归还进程数

fmt.Println(i)
}(index)
}

if err := sem.Acquire(ctx, coroutine); err != nil { //退出循环后阻塞主进程以等待最后的 goroutine 执行完成
log.Printf("exit 无法获取信号量: %v\n", err)
}
}

这样看住是不是简单多了。

注意事项

我在之前的代码注释中说过,从进程池中取进程最好放在 goroutine 开始的前一行,那是因为我们在数据处理中可能数据不合格并不会启动 goroutine 。如果值取走了没有归还那最后抽干进程池等待 goroutine,结束时。就抽多了导致进程永远无法自动结束。如果抽少了会导致程序提前结束。