0x01 前言
Go Context
是 Go
语言标准库中用于处理请求作用域的包,它提供了一种优雅的方式来传递请求相关的上下文信息
,例如请求的截止时间
、用户认证信息
等。
0x02 为什么需要 Context?
在 Go
中,最重要的一个概念就是并发协程,它只需用一个关键字go
就可以开启一个协程并运行。在并发协程中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作,比如在实际中当服务器端收到一个请求时,很可能需要发送几个请求去请求其他服务的数据,由于Go
语法上的同步阻塞写法,我们一般会创建几个goroutine
并发去做一些事情;那么这时候很可能几个goroutine
之间需要共享数据,还有当request
被取消时,创建的几个goroutine
也应该被取消掉,那么这就是context
的用武之地。
在讲context
之前,先说说channel
,使用 channel
也可以解决这类问题。举个例子:
func main() {
// 接收完成动作的信道
stop := make(chan bool)
// 接收消息的缓存信道,数量为 10
messages := make(chan string, 10)
// 释放资源(养成使用资源后释放资源的好习惯~)
defer close(stop)
defer close(messages)
// consumer 新建一个协程,负责消费完成信道
go func() {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
// select 阻塞监听 channel 的 io 操作
select {
case <-stop:
fmt.Println("Job has stop...")
return
default:
fmt.Printf("Received message: %s\n", <-messages)
}
}
}()
// producer
for i := 0; i < 10; i++ {
messages <- fmt.Sprintf("I AM PRODUCER-%v", i)
}
time.Sleep(5 * time.Second)
// 告诉 stop 信道,我要中止了
stop <- true
time.Sleep(1 * time.Second)
fmt.Println("Process exit!")
}
上述例子中定义了一个 buffer
为0的 stop channel
, 子协程
运行着定时任务。如果主协程需要在某个时刻发送消息通知子协程
中断任务退出,那么就可以让子协程
监听这个 stop channel
,一旦主协程
发出信号
或者关闭 stop channel
,那么子协程
就可以退出了,这样就实现了主协程
通知子协程
的需求。
但如果是父子任务的同步取消机制,层级更深次(比如子任务包含子任务,而这一层的子任务又包含下一层子任务)的情况下,使用 channel
将变得复杂繁琐。
context 原理
先让我们来看看 context
的实现原理
context 接口
type Context interface {
// 返回与此上下文关联的取消函数。
Done() <-chan struct{}
// 返回此上下文的截止时间(如果有)。
// 如果没有截止时间,则ok为false。
Deadline() (deadline time.Time, ok bool)
// 返回此上下文的键值对数据。
Value(key interface{}) interface{}
}
context
接口四个具体实现方法:
Deadline
返回绑定当前context
的任务被取消的截止时间;如果没有设定期限,将返回 ok == false 。Done
当绑定当前context
的任务被取消时,将返回一个关闭的channel
;如果当前context
不会被取消,将返回 nil 。Err
如果Done
返回的channel
没有关闭,将返回 nil ;如果Done
返回的channel
已经关闭,将返回非空的值表示任务结束的原因。如果是context
被取消,Err
将返回Canceled
;如果是context
超时,Err
将返回DeadlineExceeded
。Value
返回context
存储的键值对中当前key
对应的值,如果没有对应的key
,则返回 nil 。
contxxt
接口六个函数:
- Background
- TODO
- WithCancel
- WithDeadline
- WithTimeout
- WithValue
context重要的结构体
emptyCtx
emptyCtx本质是一个整型类型,他对Context接口的实现,非常简单,其实是什么也没做,都是一堆空方法。
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}
可以看到上述代码中使用new了两个结构体为
emptyCtx
的变量background 和 todo ,String()方法得意 case对比得出对象。Background和TODO这两个公共方法是返回background和todo;官方建议Background用做顶层的context,todo看起来用来占位使用,不过实现来说两个没区别
cancelCtx
通过WithCancel来创建的就是cancelCtx,WithCancel返回一个ctx和cancel方法,通过调用cancel方法,可以将Context取消,来控制协程,具体看下面例子:
func fixLeakingByContext() {
// 创建一个带有取消方法的上下文
ctxWithCancel, cancel := context.WithCancel(context.Background())
// 释放资源
defer cancel()
// 创建一个信道
doChan := make(chan interface{})
go doJob(ctxWithCancel, doChan)
go doJob(ctxWithCancel, doChan)
go doJob(ctxWithCancel, doChan)
go doJob(ctxWithCancel, doChan)
go doJob(ctxWithCancel, doChan)
// 异步取消
go func() {
cancel()
}()
// 随机触发某个子协程
doChan <- 1
// 休眠一下方便看到效果
time.Sleep(1 * time.Second)
// 执行后输出结果
/*
Cancelled
Received: 1
Cancelled
Cancelled
Cancelled
*/
}
func doJob(ctx context.Context, doChan chan interface{}) {
select {
case <-ctx.Done():
fmt.Println("Cancelled")
return
case res := <-doChan:
fmt.Println("Received: ", res)
return
}
}
可以看到上述代码执行后的输出结果只有一个 Received
,其他的子协程由于收到了取消的信号,就直接 return
,不往下执行了,可以避免协程资源没有被回收引起的内存泄露。
再看看 WithCancel
的源码:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
// WithCancel通过一个父级Context来创建出一个cancelCtx
c := newCancelCtx(parent)
// 调用propagateCancel根据父级context的状态来关联cancelCtx的cancel行为
propagateCancel(parent, &c)
// 返回c和一个方法,方法中调用c.cancel并传递Canceled变量
return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
var Canceled = errors.New("context canceled")
WithCancel
通过一个父级Context
来创建出一个cancelCtx
,然后调用propagateCancel
根据父级context
的状态来关联cancelCtx
的cancel
行为(感觉这里不应该叫propagate,冒泡一般理解是自下向上,这个函数明显是自下向上,应该叫cascade更为合理一些)。随后返回c和一个方法,方法中调用c.cancel并传递Canceled
变量(其实是一个error
实例);
cancelCtx
是WidthDeadline
和WidthTimeout
的基石,所以cancelCtx
的实现相对复杂。
newCancelCtx
方法可以看到是创建了一个cancelCtx
实例
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
再看看 cancelCtx
的定义
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context // 内嵌结构体
mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
// 函数返回一个只读channel,而且没有地方向这个channel里写数据。所以直接调用这个只读channel会被阻塞。一般通过搭配 select 来使用。一旦关闭,就会立即读出零值。
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
// 这个函数功能就是关闭channel:c.done();
// 递归取消它的所有子节点;最后从父节点删除自己。
// 通过关闭channel,将取消信号传递给了它的所有子节点。
// goroutine 接收到取消信号的方式就是 select 语句中的 读c.done 被选中
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {// 必须传一个err值,后面判断用
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled 已经被其他协程取消了
}
c.err = err
// 关闭channel,通知其他协程
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
//遍历它是所有子节点
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)// 递归地取消所有子节点
}
// 将子节点清空
c.children = nil
c.mu.Unlock()
if removeFromParent {
// 从父节点中移除自己
removeChild(c.Context, c)
}
}
cancelCtx
有一个内嵌的Context
类型,实际存储的都是父级上下文对象,还有四个独立的字段:
- mu:一个互斥量,用来加锁保证某些操作的线程安全性
- done:atomic.Value一个可以对任意类型进行原子型操作的结构;提供Load和Store方法;看Go源码这里存的是一个struct{}类型的channel
- children:一个key为canceler值为struct{}的map类型;
- err:存放error的字段
这里的canceler
是一个接口,代表可以直接被cancel
的Context
类型,基本指的是 *cancelCtx
和 *timerCtx
两种context
,也被他俩实现
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
下面看下propagateCancel
,据父级context
的状态来关联cancelCtx
的cancel
行为
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
// 父节点是空的,直接返回
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)//父节点已经取消,它的子节点也需要取消
} else {
//父节点未取消
if p.children == nil {
p.children = make(map[canceler]struct{})
}
// 把这个child放到父节点上
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 如果没有找到可取消的父 context。新启动一个协程监控父节点或子节点取消信号
go func() {
select {
// 保证父节点被取消的时候子节点会被取消
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
parentCancelCtx
这个函数识别三种类型的Context
:cancelCtx
,timerCtx
,valueCtx
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true // 找到最近支持cancel的parent,由parent进行取消操作的调用
case *timerCtx:
return &c.cancelCtx, true // 找到最近支持cancel的parent,由parent进行取消操作的调用
case *valueCtx:
parent = c.Context // 递归
default:
return nil, false
}
}
}
timerCtx
timerCtx
嵌入了cancelCtx
结构体,所以cancelCtx
的方法也可以使用。
timerCtx
主要是用于实现WithDeadline
和WithTimeout
两个context
实现,其继承了cancelCtx
结构体,同时还包含一个timer.Timer
定时器和一个deadline
终止实现。Timer
会在deadline
到来时,自动取消context
。
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
*cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err) //由于继承了cancelCtx,这里调用了cancelCtx的cancel()方法
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()//停止定时器
c.timer = nil
}
c.mu.Unlock()
}
上方 cancel
函数继承了cancelCtx
的方法cancel(),然后后面进行自身定时器Stop()
的操作,这样就可以实现取消操作了。
valueCtx
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val any
}
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
通过键值对的方式保存(注意value是非线程安全)
func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
如果key
与WithValue
调用时相同,则返回对应的val
,否则进入value
方法,在内嵌的Context
中查找key
对应的值,根据Context
类型先做一些类型判断,来判断一些关键的key
如cancelCtxKey
,不然继续在内嵌Context
中查找。
0x03 使用示例
带超时的上下文
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() // 在函数结束时调用 cancel 函数释放资源
// 使用 ctx 执行一些需要在 5 秒内完成的操作
done := make(chan bool)
go func(doneCh chan bool) {
// 模拟超时行为
time.Sleep(10 * time.Second)
doneCh <- true
}(done)
select {
case <-ctx.Done():
// 结束,释放资源
fmt.Println("Cancelled")
return
case <-done:
fmt.Println("Received")
return
}
}
带截止时间的上下文
deadline := time.Now().Add(time.Second * 10) // 10 秒后的截止时间
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// 使用 ctx 执行一些需要在 10 秒内完成的操作
传递值的上下文
ctx := context.WithValue(context.Background(), key, value)
// 在其他函数中可以使用 ctx.Value(key) 获取传递的值
并发控制
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
// 执行一些耗时的操作
// 在操作完成或需要提前退出时调用 cancel 函数
cancel()
}()
// 在主函数或其他需要等待的地方,使用 <-ctx.Done() 接收信号
select {
case <-ctx.Done():
// 收到信号后的处理逻辑
}