最近重看MinIO的源代码,发现纠删码模式下读取数据盘的时候,使用了更简单的并发读取方式,以前看的时候没发现,查了下Git历史记录,发现是19年新改的,新的使用channel做标记的方式的确非常巧妙,简化了代码逻辑,值得我们学习。所以今天就开篇文章,介绍下channel在并发下的两个使用技巧。

赢者为王模式

这种并发模式并不稀奇,相信很多朋友都用到过。它的核心思想就是同时开几个协程做同样的事情,谁先搞定,我们就用谁的结果。在Go语言的channel支持下,我们很容易实现这种并发方式。

假设我们把同一份资源,存储在网络上的5个服务器上(镜像、备份等),然后我们现在需要获取这个资源,我们就可以同时开5个协程,访问这5个服务器上的资源,谁先获取到,我们就用谁的,这样就可以最快速度获取,排除掉网络慢的服务器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
	txtResult := make(chan string, 5)
	go func() {txtResult <- getTxt("res1.flysnow.org")}()
	go func() {txtResult <- getTxt("res2.flysnow.org")}()
	go func() {txtResult <- getTxt("res3.flysnow.org")}()
	go func() {txtResult <- getTxt("res4.flysnow.org")}()
	go func() {txtResult <- getTxt("res5.flysnow.org")}()
	println(<-txtResult)
}

func getTxt(host string) string{
	//省略网络访问逻辑,直接返回模拟结果
	//http.Get(host+"/1.txt")
	return host+":模拟结果"
}

其中getTxt没有真实实现,只是一个模拟,但是通过以上示例已经可以说明赢者为王这种并发模式的使用。这种并发模式适合多个协程对同一种资源的读取,更概括的讲就是做同一件事情,只要有一个协程干成了就OK了。这种模式的优点主要有两个:1.可以最大程度减少耗时;提高成功率。

最终成功模式

这种并发模式我们自己可能遇到过,但是可能不是叫这个名字,这个名字是我自己起的,我觉得比较贴切。比如同时并发的从10个文件中成功读取任意5个文件,你可以开启5个协程,也可以开启3个,都随意,但是必须是成功读取了5个才算成功,否则就是失败。

这种模式MinIO也遇到了,它的解决方式就是我在开篇提到的非常好的技巧,现在我们就来介绍这种技巧。在介绍这种技巧前,我们先列举下其他的思路。

第一种思路: 先并发获取,存放起来,然后再一个个判断是否获取成功,如果有的没有成功再重新获取,而且获取的文件不能重复。这种方式是取到结果后进行判断是否成功,然后根据情况再决定是否重新获取,要去重,要判断,业务逻辑比较复杂。

第二种思路: 并发的时候就保证成功,里面可能是个for循环,直到成功为止,然后再返回结果。这种思路缺陷也很明显,如果这个文件损坏,那么就会一直死循环下去,要避免死循环,就要加上重试次数。

而MinIO的实现方式比较巧妙,它也是多协程,但是发现如果有文件读取不成功,他会通过channel的方式标记,换一个文件读取。因为一共10个文件呢,这个不行,换一个,不能在一个文件上等死,只要成功读取5个就可以了。

现在我们看下MinIO的这段代码,代码比较长,我尽可能删除一些无用的,但是为了保证可读性,还是会长一些,大家耐心看完,就学到了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
	newBuf := dst
	//省略不太相关代码
	var newBufLK sync.RWMutex

	//省略无关
	//channel开始创建,要发挥作用了。这里记住几个数字:
	//readTriggerCh大小是10,p.dataBlocks大小是5
	readTriggerCh := make(chan bool, len(p.readers))
	for i := 0; i < p.dataBlocks; i++ {
		// Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.
		readTriggerCh <- true
	}

	healRequired := int32(0) // Atomic bool flag.
	readerIndex := 0
	var wg sync.WaitGroup
	// readTrigger 为 true, 意味着需要用disk.ReadAt() 读取下一个数据
	// readTrigger 为 false, 意味着读取成功了,不再需要读取
	for readTrigger := range readTriggerCh {
		newBufLK.RLock()
		canDecode := p.canDecode(newBuf)
		newBufLK.RUnlock()
		//判断是否有5个成功的,如果有,退出for循环
		if canDecode {
			break
		}
		//读取次数上限,不能大于10
		if readerIndex == len(p.readers) {
			break
		}
		//成功了,退出本次读取
		if !readTrigger {
			continue
		}
		wg.Add(1)
		//并发读取数据
		go func(i int) {
			defer wg.Done()
			//省略不太相关代码
			_, err := rr.ReadAt(p.buf[bufIdx], p.offset)
			if err != nil {
				//省略不太相关代码
				// 失败了,标记为true,触发下一个读取.
				readTriggerCh <- true
				return
			}
			newBufLK.Lock()
			newBuf[bufIdx] = p.buf[bufIdx]
			newBufLK.Unlock()
			// 成功了,标记为false,不再读取
			readTriggerCh <- false
		}(readerIndex)
		//控制次数,同时用来作为索引获取和存储数据
		readerIndex++
	}
	wg.Wait()

    //最终结果判断,如果OK了就正确返回,如果有失败的,返回error信息。
	if p.canDecode(newBuf) {
		p.offset += p.shardSize
		if healRequired != 0 {
			return newBuf, errHealRequired
		}
		return newBuf, nil
	}

	return nil, errErasureReadQuorum
}

以上代码虽然长,但是我做了注释,也比较容易理解了。现在再对这段逻辑进行解释下:

  1. 前提是从10个数据里读取任意5个
  2. 初始化的chan大小是10,但是通过for循环只存放了5个true
  3. 然后对chan循环读取数据,如果是true就开启go协程获取数据,如果是false就终止这次循环
  4. 当前在这之前还会判断下是否已经成功获取了5个,如果是的话,直接跳出整个for循环
  5. 通过readerIndex每次尝试获取一个数据,如果成功赛一个false到chan中,如果失败则塞个true
  6. 这样不成功的readerIndex不再尝试读取,失败了就通过true标记尝试读取下一个readerIndex
  7. 通过chan这种巧妙的方式不断循环,直到成功读取5个,或者把10个数据都读一遍为止
  8. 最终再基于是否成功读取到5个数据,做最终的判断,是返回成功数据,还是错误

利用channel来做标记和循环取数据,是一种非常好的方式,简化了代码逻辑,整体看起来非常清晰了,有兴趣的朋友可以看下MinIO原来的代码,感受会更强烈。

小结

以上主要是两种使用channel的技巧,这些技巧一些会靠自己熟能生巧,一些需要看别人的源代码学习,而阅读开源代码是一个非常不错的途径。

本文为原创文章,转载注明出处,欢迎扫码关注公众号flysnow_org或者网站 https://www.flysnow.org/ ,第一时间看后续精彩文章。觉得好的话,请顺手点个赞吧。

扫码关注