解锁Go并发编程的新姿势 -- conc项目解析

编写高并发程序常常让人头疼:代码冗余、死锁风险、逻辑复杂等等。Go语言的标准库提供了sync.WaitGroup、channel等工具来实现并发控制,但使用起来还是有一些困难。conc项目就是为了解决这些痛点而生的。

主要功能

1. WaitGroup

在编写并发demo时,我们经常需要用到sync.WaitGroup来等待所有goroutine执行完毕。每次都要添加Add(),在goroutine退出前调用Done(),最后Wait()等待,非常繁琐。

标准库示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // 如果doSomething()panic,程序会崩溃
            doSomething()
        }()
    }
    wg.Wait()
}

使用conc后:

1
2
3
4
5
6
7
func main() {
    var wg conc.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Go(doSomething)
    }
    wg.Wait()
}

conc对WaitGroup进行了封装,自动执行Add和Done操作。更重要的是,如果任一goroutine发生panic,错误会被捕获,在Wait时安全重新panic,而不会直接导致程序崩溃。

2. ForEach

在并发处理数据时,我们往往需要创建一个channel,启动多个goroutine从channel读取数据并处理。代码十分冗长:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func process(values []int) {
    feeder := make(chan int, 8)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for elem := range feeder {
                handle(elem)
            }
        }()
    }

    for _, value := range values {
        feeder <- value
    }
    close(feeder)
    wg.Wait()
}

使用conc的ForEach,实现变得十分精简:

1
2
3
func process(values []int) {
    iter.ForEach(values, handle)
}

ForEach会自动启动足够数量的goroutine,并发读取slice中的数据,应用handle函数进行处理。

3. Map

另一个常见场景是从外部接口获取数据,并并行处理这些数据生成新的结果。传统实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func concMap(
    input []int,
    f func(int) int,
) []int {
    res := make([]int, len(input))
    var idx atomic.Int64

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for {
                i := int(idx.Add(1) - 1)
                if i >= len(input) {
                    return
                }

                res[i] = f(input[i])
            }
        }()
    }
    wg.Wait()
    return res
}

使用conc的Map方法就简洁多了:

1
2
3
4
5
6
func concMap(
    input []int,
    f func(*int) int,
) []int {
    return iter.Map(input, f)
}

Map会自动创建一组goroutine,并发地对slice中每个元素应用f函数,最终返回结果slice。

代码分析

conc项目的核心是错误捕获和并发控制,下面分析其实现原理。

1. Catcher

Catcher用于捕获并保存panic产生的错误信息。它的核心是CompareAndSwap原子操作,确保多个goroutine同时捕获panic时也能保证数据安全。

1
2
3
4
5
6
7
8
9
type Catcher struct {
    recovered atomic.Pointer[Recovered]
}

type Recovered struct {
    Value   any
    Callers []uintptr 
    Stack   []byte
}

Catcher的Try方法用于执行传入的函数,在panic时通过recover捕获错误:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (p *Catcher) Try(f func()) {
    defer p.tryRecover()
    f()
}

func (p *Catcher) tryRecover() {
    if val := recover(); val != nil {
        rp := NewRecovered(1, val)
        p.recovered.CompareAndSwap(nil, &rp)
    }
}

Repanic则检查之前是否捕获了panic,如果有就重新panic,确保主goroutine能获取完整的错误堆栈信息。

2. WaitGroup

WaitGroup通过组合Catcher和sync.WaitGroup,实现自动错误捕获:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type WaitGroup struct {
    wg sync.WaitGroup
    pc panics.Catcher  
}

func (h *WaitGroup) Go(f func()) {
    h.wg.Add(1)
    go func() {
        defer h.wg.Done()
        h.pc.Try(f)
    }()
}

Go方法启动新的goroutine执行f,受Try保护,发生panic时被捕获到Catcher中。Wait则首先等待所有goroutine退出,之后检查Catcher中是否存在被捕获的panic,如果有就重新panic。

3. ForEach和Map

ForEach和Map的实现核心其实就是对slice的并发遍历,应用指定的处理函数。通过原子操作cnt确保索引唯一,goroutine安全地读取不同的slice元素。这里就不再赘述了。

总的来说,conc项目通过对WaitGroup、channel等并发原语的封装,简化了并发程序的编写。同时其错误捕获机制能最大程度地避免由panic导致的程序崩溃,极大提高了程序的健壮性。这些设计思路和技术细节,无疑能给Go并发编程带来极大的启发。