goroutine 进程池

不知道你有没有这种需求了,有大量的数据需要处理,但是系统的资源有限,所有只有开一个固定的进程数进行处理,一个进程结束下一个进程直接顶上。可能在其他语言中称这种处理方式为异步,但是在 go 中叫进程池更准确。

我之前获取了大概 两百万的数据需要处理,这两百万调数据涉及网络访问,网络响应大概在一毫秒到十秒之间。之前是在云服务器上处理数据,因为是并发的效率较低。大概是 1k goroutine 在处理数据,比较占用 cpu 所以打算迁移到位于本地的树莓派上处理。当调用 1k goroutine 时,程序大概会在过几秒钟后崩溃,调低 goroutine 数后正常了这个数字我调到了 200。这意味着效率降低了 5倍 。俗话说的好,穷则思变,或者说之前的程序只是打草稿,我本来就打算改进。改用了进程池来处理数据。改用进程池后,虽然由于树莓派的性能限制,goroutine 数降低了5倍,但是效率在原来1k goroutine 上还提升了 5 倍。

关于 维持一个稳定的 goroutine 我第一个想到的就是使用缓冲 channel 。需要多少 goroutine 就声明多大的缓冲,然后将它塞满。起一个 goroutine 取一个值,然后在 goroutine 结束时还回去。当 channel 里的值取完后自然会阻塞不会新建 goroutine。最后当数据读取完后再抽出 channel 里的所有值以堵塞主进程等待数据处理完成。我是这么想的也是这么做的。

package main

import(
    "fmt"
)

func main(){
    var coroutine int = 3 //进程池数量

    ch := make(chan bool,coroutine)

    for i := 0; i < coroutine;i++{ // 将进程池填满
            ch <- true
    }

    num := 0 //单纯的为本次演示创建的控制条件,在实际使用中可以替换为其他

    for{
            num ++
            if num > 10{ //条件满足时退出循环,我的原版代码是 c == io.EOF
                    break
            }
            <-ch //在起 goroutine 时取走一个值,注意最好放在 goroutine 启动的上一行退
            go func(in int){
                    fmt.Println(in) //这里处理数据
                    ch <- true //在 goroutine 的最后将值还回去
            }(num)
    }

    for i := 0;i < coroutine;i++{ // 当数据读取完后,须抽干进程池,以等待进程的退出
            <-ch
    }
}

大概控制流程类似于上面的演示代码,当然实际代码并不是这样的,如果你也在 telegram go 中文群的话你应该见过我的代码。后来 dalao 推荐了两个官方开发库 golang.org/x/time/rate 和 golang.org/x/sync/semaphore 。golang.org/x/time/rate 我没怎么看过,golang.org/x/sync/semaphore 从名称来看比较感兴趣所以查询了一下资料。

关于 golang.org/x/sync/semaphore 的文档请查看 https://godoc.org/golang.org/x/sync/semaphore

我简单的查看了一下示例感觉非常的熟悉,感觉和我自己做的那个结构差不多。如果你对原版文档比较感兴趣也可以查看原版文档,接下来贴出的是官方的示例

package main

import (
    "context"
    "fmt"
    "log"
    "runtime"

    "golang.org/x/sync/semaphore"
)

// Example_workerPool demonstrates how to use a semaphore to limit the number of
// goroutines working on parallel tasks.
//
// This use of a semaphore mimics a typical “worker pool” pattern, but without
// the need to explicitly shut down idle workers when the work is done.
func main() {
    ctx := context.TODO()

    var (
        maxWorkers = runtime.GOMAXPROCS(0)
        sem        = semaphore.NewWeighted(int64(maxWorkers))
        out        = make([]int, 32)
    )

    // Compute the output using up to maxWorkers goroutines at a time.
    for i := range out {
        // When maxWorkers goroutines are in flight, Acquire blocks until one of the
        // workers finishes.
        if err := sem.Acquire(ctx, 1); err != nil {
            log.Printf("Failed to acquire semaphore: %v", err)
            break
        }

        go func(i int) {
            defer sem.Release(1)
            out[i] = collatzSteps(i + 1)
        }(i)
    }

    // Acquire all of the tokens to wait for any remaining workers to finish.
    //
    // If you are already waiting for the workers by some other means (such as an
    // errgroup.Group), you can omit this final Acquire call.
    if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
        log.Printf("Failed to acquire semaphore: %v", err)
    }

    fmt.Println(out)

}

// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
    if n <= 0 {
        panic("nonpositive input")
    }

    for ; n > 1; steps++ {
        if steps < 0 {
            panic("too many steps")
        }

        if n%2 == 0 {
            n /= 2
            continue
        }

        const maxInt = int(^uint(0) >> 1)
        if n > (maxInt-1)/3 {
            panic("overflow")
        }
        n = 3*n + 1
    }

    return steps
}

以上的示例有两个关键参数 AcquireRelease。 使用 Acquire 从进程池中取走需要使用的进程, 使用 Release 在 goroutine 中归还进程。官方的示例可能稍微有一点复杂。我们可以对他进行一下简化

package main

import (
    "context" //context 库在 go1.7 合并到主线
    "fmt"
    "golang.org/x/sync/semaphore"
    "log"
)

func main() {
    var coroutine int = 3 //进程池总数

    ctx := context.TODO()
    sem := semaphore.NewWeighted(int64(coroutine)) //创建进程池

    num := 0 //单纯的控制条件,可能在实际中并没有什么意义

    for {
        num++

        if num > 10 { //条件满足后退出循环
            break
        }

        if err := sem.Acquire(ctx, 1); err != nil { //从进程池中取出进程使用,还是之前讲的原则,最好放在 goroutine 开始的前一行
            log.Printf("start 无法获取信号量: %v\n", err)
            break
        }
        go func(in int) {
            defer sem.Release(1) //在结束时归还进程数

            fmt.Println(in)
        }(num)

    }

    if err := sem.Acquire(ctx, int64(coroutine)); err != nil { //退出循环后阻塞主进程以等待最后的 goroutine 执行完成
        log.Printf("exit 无法获取信号量: %v\n", err)
    }
}

这样看住是不是简单多了。

注意事项

我在之前的代码注释中说过,从进程池中取进程最好放在 goroutine 开始的前一行,那是因为我们在数据处理中可能数据不合格并不会启动 goroutine 。如果值取走了没有归还那最后抽干进程池等待 goroutine,结束时。就抽多了导致进程永远无法自动结束。如果抽少了会导致程序提前结束。

0%