Golang协程池gopool设计与实现
Goroutine
Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。
Goroutine 的优势:
- 与线程相比,Goroutines 成本很低。
它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,context switch 也很快,而在线程的情况下,堆栈大小必须指定并固定。
- Goroutine 被多路复用到更少数量的 OS 线程。
一个包含数千个 Goroutine 的程序中可能只有一个线程。如果该线程中的任何 Goroutine 阻塞等待用户输入,则创建另一个 OS 线程并将剩余的 Goroutine 移动到新的 OS 线程。所有这些都由运行时处理,作为开发者无需耗费心力关心,这也使得我们有很干净的 API 来支持并发。
- Goroutines 使用 channel 进行通信。
channel 的设计有效防止了在使用 Goroutine 访问共享内存时发生竞争条件(race conditions) 。channel 可以被认为是 Goroutine 进行通信的管道。
下文中我们会以「协程」来代指 Goroutine。
协程池
在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,从而提高协程执行效率的技术。
最近抽空了解了字节官方开源的 gopkg 库提供的 gopool
协程池实现,感觉还是很高质量的,代码也非常简洁清晰,而且 Kitex
底层也在使用 gopool
来管理协程,这里我们梳理一下设计和实现。
gopool
Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool
gopool
is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to thego
keyword.
了解官方 README 就会发现gopool
的用法其实非常简单,将曾经我们经常使用的 go func(){...}
替换为 gopool.Go(func(){...})
即可。
此时 gopool
将会使用默认的配置来管理你启动的协程,你也可以选择针对业务场景配置池子大小,以及扩容上限。
old:
go func() { // do your job }()
new:
import ( "github.com/bytedance/gopkg/util/gopool" ) gopool.Go(func(){ /// do your job })
核心实现
下面我们来看看gopool
是怎样实现协程池管理的。
Pool
Pool
是一个定义了协程池能力的接口。
type Pool interface { // 池子的名称 Name() string // 设置池子内Goroutine的容量 SetCap(cap int32) // 执行 f 函数 Go(f func()) // 带 ctx,执行 f 函数 CtxGo(ctx context.Context, f func()) // 设置发生panic时调用的函数 SetPanicHandler(f func(context.Context, interface{})) }
gopool
提供了这个接口的默认实现(即下面即将介绍的pool
),当我们直接调用 gopool.CtxGo 时依赖的就是这个。
这样的设计模式在 Kitex
中也经常出现,所有的依赖均设计为接口,便于随后扩展,底层提供一个默认的实现暴露出去,这样对调用方也很友好。
type pool struct { // 池子名称 name string // 池子的容量, 即最大并发工作的 goroutine 的数量 cap int32 // 池子配置 config *Config // task 链表 taskHead *task taskTail *task taskLock sync.Mutex taskCount int32 // 记录当前正在运行的 worker 的数量 workerCount int32 // 当 worker 出现panic时被调用 panicHandler func(context.Context, interface{}) } // NewPool 创建一个新的协程池,初始化名称,容量,配置 func NewPool(name string, cap int32, config *Config) Pool { p := &pool{ name: name, cap: cap, config: config, } return p }
调用 NewPool
获取了以 Pool
的形式返回的 pool
结构体。
Task
type task struct { ctx context.Context f func() next *task }
task
是一个链表结构,可以把它理解为一个待执行的任务,它包含了当前节点需要执行的函数f
, 以及指向下一个task
的指针。
综合前一节 pool
的定义,我们可以看到,一个协程池 pool
对应了一组task
。
pool
维护了指向链表的头尾的两个指针:taskHead
和 taskTail
,以及链表的长度taskCount
和对应的锁 taskLock
。
Worker
type worker struct { pool *pool }
一个 worker
就是逻辑上的一个执行器,它唯一对应到一个协程池 pool
。当一个worker
被唤起,将会开启一个goroutine
,不断地从 pool
中的 task
链表获取任务并执行。
func (w *worker) run() { go func() { for { // 声明即将执行的 task var t *task // 操作 pool 中的 task 链表,加锁 w.pool.taskLock.Lock() if w.pool.taskHead != nil { // 拿到 taskHead 准备执行 t = w.pool.taskHead // 更新链表的 head 以及数量 w.pool.taskHead = w.pool.taskHead.next atomic.AddInt32(&w.pool.taskCount, -1) } // 如果前一步拿到的 taskHead 为空,说明无任务需要执行,清理后返回 if t == nil { w.close() w.pool.taskLock.Unlock() w.Recycle() return } w.pool.taskLock.Unlock() // 执行任务,针对 panic 会recover,并调用配置的 handler func() { defer func() { if r := recover(); r != nil { msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack()) logger.CtxErrorf(t.ctx, msg) if w.pool.panicHandler != nil { w.pool.panicHandler(t.ctx, r) } } }() t.f() }() t.Recycle() } }() }
整体来看
看到这里,其实就能把整个流程串起来了。我们来看看对外的接口 CtxGo(context.Context, f func())
到底做了什么?
func Go(f func()) { CtxGo(context.Background(), f) } func CtxGo(ctx context.Context, f func()) { defaultPool.CtxGo(ctx, f) } func (p *pool) CtxGo(ctx context.Context, f func()) { // 创建一个 task 对象,将 ctx 和待执行的函数赋值 t := taskPool.Get().(*task) t.ctx = ctx t.f = f // 将 task 插入 pool 的链表的尾部,更新链表数量 p.taskLock.Lock() if p.taskHead == nil { p.taskHead = t p.taskTail = t } else { p.taskTail.next = t p.taskTail = t } p.taskLock.Unlock() atomic.AddInt32(&p.taskCount, 1) // 以下两个条件满足时,创建新的 worker 并唤起执行: // 1. task的数量超过了配置的限制 // 2. 当前运行的worker数量小于上限(或无worker运行) if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 { // worker数量+1 p.incWorkerCount() // 创建一个新的worker,并把当前 pool 赋值 w := workerPool.Get().(*worker) w.pool = p // 唤起worker执行 w.run() } }
相信看了代码注释,大家就能理解发生了什么。
gopool
会自行维护一个 defaultPool
,这是一个默认的 pool
结构体,在引入包的时候就进行初始化。当我们直接调用 gopool.CtxGo()
时,本质上是调用了 defaultPool
的同名方法
func init() { defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig()) } const ( defaultScalaThreshold = 1 ) // Config is used to config pool. type Config struct { // 控制扩容的门槛,一旦待执行的 task 超过此值,且 worker 数量未达到上限,就开始启动新的 worker ScaleThreshold int32 } // NewConfig creates a default Config. func NewConfig() *Config { c := &Config{ ScaleThreshold: defaultScalaThreshold, } return c }
defaultPool
的名称为 gopool.DefaultPool
,池子容量一万,扩容下限为 1。
当我们调用 CtxGo
时,gopool
就会更新维护的任务链表,并且判断是否需要扩容 worker
:
- 若此时已经有很多
worker
启动(底层一个worker
对应一个goroutine
),不需要扩容,就直接返回。 - 若判断需要扩容,就创建一个新的
worker
,并调用worker.run()
方法启动,各个worker
会异步地检查pool
里面的任务链表是否还有待执行的任务,如果有就执行。
三个角色的定位
task
是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构;worker
是一个实际执行任务的执行器,它会异步启动一个goroutine
执行协程池里面未执行的task
;pool
是一个逻辑上的协程池,对应了一个task
链表,同时负责维护task
状态的更新,以及在需要的时候创建新的worker
。
使用 sync.Pool 进行性能优化
其实到这个地方,gopool
已经是一个代码简洁清晰的协程池库了,但是性能上显然有改进空间,所以gopool
的作者应用了多次 sync.Pool
来池化对象的创建,复用woker和task对象。
这里建议大家直接看源码,其实在上面的代码中已经有所涉及。
- task 池化
var taskPool sync.Pool func init() { taskPool.New = newTask } func newTask() interface{} { return &task{} } func (t *task) Recycle() { t.zero() taskPool.Put(t) }
- worker 池化
var workerPool sync.Pool func init() { workerPool.New = newWorker } func newWorker() interface{} { return &worker{} } func (w *worker) Recycle() { w.zero() workerPool.Put(w) }
到此这篇关于Golang协程池gopool设计与实现的文章就介绍到这了,更多相关Golang协程池gopool内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!
原文出处:https://juejin.cn/post/7086443265309818894
相关文章
- 这篇文章主要介绍了golang 调用 php7详解及实例的相关资料,需要的朋友可以参考下...2017-01-15
- 这篇文章主要介绍了用golang实现替换某个文件中的字符串操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-04-25
- 这篇文章主要介绍了golang在GRPC中设置client的超时时间,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-04-27
- 这篇文章主要介绍了解决Golang json序列化字符串时多了\的情况,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-12-24
Golang中的自定义类型之间的转换的实现(type conversion)
这篇文章主要介绍了Golang中的自定义类型之间的转换的实现(type conversion),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-21- 这篇文章主要介绍了golang与php实现计算两个经纬度之间距离的方法,结合实例形式对比分析了Go语言与php进行经纬度计算的相关数学运算技巧,需要的朋友可以参考下...2016-07-29
解决golang处理http response碰到的问题和需要注意的点
这篇文章主要介绍了解决golang处理http response碰到的问题和需要注意的点,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-12-16- 这篇文章主要介绍了golang http使用踩过的坑与填坑指南,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-04-27
- 这篇文章主要介绍了golang文件读取-按指定BUFF大小读取方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-12-22
- Go(又称Golang)是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。这篇文章给大家介绍golang 正则表达式的相关知识,感兴趣的朋友跟随小编一起看看吧...2021-05-07
- 这次文章为大家带来的是一个比较实用的示例:利用Golang生成整数随机数,对此感兴趣的可以一起来看看。 php随机数生成一个给定范围的随机数,用 PHP 就太简单不过了,而...2017-07-06
- 这篇文章主要介绍了golang DNS服务器的简单实现操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-05-01
- 这篇文章主要介绍了golang中json和struct的使用说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-05-01
Fedora14 Linux系统安装Golang开发环境笔记
这篇文章主要介绍了Fedora14 Linux系统安装Golang开发环境笔记,本文讲解了2种安装方法,需要的朋友可以参考下...2020-05-01- 这篇文章主要介绍了golang去除多余空白字符(含制表符)的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-04-25
golang elasticsearch Client的使用详解
这篇文章主要介绍了golang elasticsearch Client的使用详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-05-04- 这篇文章主要介绍了解决golang json解析出现值为空的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-12-24
- 这篇文章主要介绍了golang中的空接口使用,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-03-30
- 这篇文章主要介绍了Golang Cron 定时任务的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-05-11
- 这篇文章主要为大家介绍了golang开发安装go-torch火焰图操作步骤...2021-11-16