今天看到一段代码,是用 Prometheus 的 client_go 暴露 metrics 的代码。里面不是简单的将对应的一个 metric counter inc()的操作,而是自己实现了一个非常奇怪的逻辑:
- 当程序需要将 counter +1 的时候,没有直接操作对应的 metrics,而是将要增加的 metrics 用自己的格式打包出一个对象,然后将对象发送到一个 channel,每一个 metric 都对应一个 channel
 - 程序启动之初就会启动全局唯一的 worker goroutine, 这个 goroutine 负责操作所有的 metrics:它从不同的 channel 中拿到消息,然后解包出来,找到对应的应该增加的 metrics,然后执行最终的增加操作。
 
实际的操作还要复杂的多,先创建一个 MetricsBuilder,然后 MetricsBuilder 有 Add() 函数,实际上是往 Channel 里面发送了一条信息,Channel 里面读出来,又通过一些系列的层层调用执行到 metrics + 1 上。
事先声明, 本人不会写 golang,本文可能写的不对,请读者指正。
感觉本身就是一行 metrics.Add() 的事情,为什么要搞这么复杂呢?思来想去,我觉得唯一可能的解释就是:这是一个负载极高的系统,希望将 metrics 的操作变成异步的操作,不占用业务处理的时间。但是用 channel 还涉及到打包和解包,真的能快吗?
一开始我以为 channel 可能是一种高性能的无锁的操作,但是看了 golang 的 runtime 部分,发现也是有锁的,如果多个线程同时往一个 channel 里面写,也是存在竞争条件的。
而 Prometheus 的 client_golang 只是执行了一个 Add 操作:atomic.AddUint64(&c.valInt, ival)
虽然 atomic 也是一个 CAS 操作,但直觉上我觉得用 channel 是不会比 atomic 快的。
写了两段代码来比较这两种情况(测试代码和运行方式可以在 atomic_or_channel 这里找到):
直接用 atomic 代码:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19  | 
						func AtomicAdd() uint64 { 	var wg sync.WaitGroup 	var count uint64 	count = 0 	for i := 1; i <= CLIENTS; i++ { 		wg.Add(1) 		go func() { 			defer wg.Done() 			for l := 0; l < LOOP; l++ { 				atomic.AddUint64(&count, 1) 			} 		}() 	} 	wg.Wait() 	return count }  | 
					
模拟开一个 channel 负责增加的情况:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  | 
						func ChannelAdd() uint64 { 	var wg sync.WaitGroup 	var count uint64 	count = 0 	numCh := make(chan *uint64, 10240) 	// start worker 	go func() { 		for value := range numCh { 			atomic.AddUint64(&count, *value) 		} 	}() 	one := uint64(1) 	for i := 1; i <= CLIENTS; i++ { 		wg.Add(1) 		go func() { 			defer wg.Done() 			for l := 0; l < LOOP; l++ { 				numCh <- &one 			} 		}() 	} 	wg.Wait() 	close(numCh) 	return count }  | 
					
参数如下,意在模拟 100 个并行连接,需要增加1百万次:
| 
					 1 2  | 
						var LOOP = 1000000 var CLIENTS = 100  | 
					
实际运行的结果也和我想的一样,atomic 要比 channel 的方式快了 15 倍。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  | 
						➜ go run . ChannelAdd() Done! count=99991670 took 33.95414173ss AtomicAdd() Done! count=100000000 took 1.92393346ss ➜ go test -bench=. -count=10 goos: darwin goarch: amd64 pkg: github.com/laixintao/atomic_or_channel cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz BenchmarkAtomic-12             1        1947990739 ns/op BenchmarkAtomic-12             1        1957558508 ns/op BenchmarkAtomic-12             1        1984851549 ns/op BenchmarkAtomic-12             1        1957678020 ns/op BenchmarkAtomic-12             1        1953746756 ns/op BenchmarkAtomic-12             1        1957197155 ns/op BenchmarkAtomic-12             1        1964173594 ns/op BenchmarkAtomic-12             1        1962058130 ns/op BenchmarkAtomic-12             1        2342306683 ns/op BenchmarkAtomic-12             1        2313845157 ns/op BenchmarkChannel-12            1        35211801199 ns/op BenchmarkChannel-12            1        40597364557 ns/op BenchmarkChannel-12            1        38452709531 ns/op BenchmarkChannel-12            1        40201893971 ns/op BenchmarkChannel-12            1        41802617846 ns/op BenchmarkChannel-12            1        41463031707 ns/op BenchmarkChannel-12            1        41985476702 ns/op BenchmarkChannel-12            1        43106978329 ns/op BenchmarkChannel-12            1        45582670783 ns/op BenchmarkChannel-12            1        43751655673 ns/op PASS ok      github.com/laixintao/atomic_or_channel  432.885s  | 
					
atomic 2s 就可以完成模拟 100 个客户端并行增加1百万次,即可以支持5千万的 QPS (还只是在我的笔记本上),而相同的操作用上文描述的 channel 的方式需要 30-40s。慢了15倍。
虽然我在有些地方说 atomic 很慢,但是这个速度对于 metrics 统计的这个场景来说,是完全足够了。
Twitter 上的 @Kontinuation 提醒有一种优化的方式,我觉得这个很好:
每一个 thread 维护自己的 threadlocal 变量,这样完全不需要锁。只是在 collect metrics 的时候采取收集每一个 thread 的 counter 等。TiKV 中就是使用这个方法实现的 Local 指标(@_yeya24),即每一个线程保存自己的指标在 Thread Local Storage,然后每 1s 刷新到全局变量(其实我觉得可以只有在 metrics 被收集的时候才刷新?),这样可以减少锁的次数。
但是在 golang 里面,从这篇文章发现 go 语言官方是不想让你用 thread local 的东西的,而且为此还专门让 go id 不容易被获取到。那我就像不到什么比较好的实现方法了。
可以加一个协程在ChannelAdd中,会发现其实绝大多数情况下,numCh是一个阻塞态;
// check chan len
go func() {
for {
println(“chan len”, len(numCh))
time.Sleep(20 * time.Millisecond)
}
}()