编写高并发程序常常让人头疼:代码冗余、死锁风险、逻辑复杂等等。Go语言的标准库提供了sync.WaitGroup、channel等工具来实现并发控制,但使用起来还是有一些困难。conc项目就是为了解决这些痛点而生的。
主要功能
1. WaitGroup
在编写并发demo时,我们经常需要用到sync.WaitGroup来等待所有goroutine执行完毕。每次都要添加Add(),在goroutine退出前调用Done(),最后Wait()等待,非常繁琐。
标准库示例:
1
2
3
4
5
6
7
8
9
10
11
12
|
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 如果doSomething()panic,程序会崩溃
doSomething()
}()
}
wg.Wait()
}
|
使用conc后:
1
2
3
4
5
6
7
|
func main() {
var wg conc.WaitGroup
for i := 0; i < 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}
|
conc对WaitGroup进行了封装,自动执行Add和Done操作。更重要的是,如果任一goroutine发生panic,错误会被捕获,在Wait时安全重新panic,而不会直接导致程序崩溃。
2. ForEach
在并发处理数据时,我们往往需要创建一个channel,启动多个goroutine从channel读取数据并处理。代码十分冗长:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func process(values []int) {
feeder := make(chan int, 8)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range feeder {
handle(elem)
}
}()
}
for _, value := range values {
feeder <- value
}
close(feeder)
wg.Wait()
}
|
使用conc的ForEach,实现变得十分精简:
1
2
3
|
func process(values []int) {
iter.ForEach(values, handle)
}
|
ForEach会自动启动足够数量的goroutine,并发读取slice中的数据,应用handle函数进行处理。
3. Map
另一个常见场景是从外部接口获取数据,并并行处理这些数据生成新的结果。传统实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func concMap(
input []int,
f func(int) int,
) []int {
res := make([]int, len(input))
var idx atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i := int(idx.Add(1) - 1)
if i >= len(input) {
return
}
res[i] = f(input[i])
}
}()
}
wg.Wait()
return res
}
|
使用conc的Map方法就简洁多了:
1
2
3
4
5
6
|
func concMap(
input []int,
f func(*int) int,
) []int {
return iter.Map(input, f)
}
|
Map会自动创建一组goroutine,并发地对slice中每个元素应用f函数,最终返回结果slice。
代码分析
conc项目的核心是错误捕获和并发控制,下面分析其实现原理。
1. Catcher
Catcher用于捕获并保存panic产生的错误信息。它的核心是CompareAndSwap原子操作,确保多个goroutine同时捕获panic时也能保证数据安全。
1
2
3
4
5
6
7
8
9
|
type Catcher struct {
recovered atomic.Pointer[Recovered]
}
type Recovered struct {
Value any
Callers []uintptr
Stack []byte
}
|
Catcher的Try方法用于执行传入的函数,在panic时通过recover捕获错误:
1
2
3
4
5
6
7
8
9
10
11
|
func (p *Catcher) Try(f func()) {
defer p.tryRecover()
f()
}
func (p *Catcher) tryRecover() {
if val := recover(); val != nil {
rp := NewRecovered(1, val)
p.recovered.CompareAndSwap(nil, &rp)
}
}
|
Repanic则检查之前是否捕获了panic,如果有就重新panic,确保主goroutine能获取完整的错误堆栈信息。
2. WaitGroup
WaitGroup通过组合Catcher和sync.WaitGroup,实现自动错误捕获:
1
2
3
4
5
6
7
8
9
10
11
12
|
type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}
func (h *WaitGroup) Go(f func()) {
h.wg.Add(1)
go func() {
defer h.wg.Done()
h.pc.Try(f)
}()
}
|
Go方法启动新的goroutine执行f,受Try保护,发生panic时被捕获到Catcher中。Wait则首先等待所有goroutine退出,之后检查Catcher中是否存在被捕获的panic,如果有就重新panic。
3. ForEach和Map
ForEach和Map的实现核心其实就是对slice的并发遍历,应用指定的处理函数。通过原子操作cnt确保索引唯一,goroutine安全地读取不同的slice元素。这里就不再赘述了。
总的来说,conc项目通过对WaitGroup、channel等并发原语的封装,简化了并发程序的编写。同时其错误捕获机制能最大程度地避免由panic导致的程序崩溃,极大提高了程序的健壮性。这些设计思路和技术细节,无疑能给Go并发编程带来极大的启发。