深入浅出 Go Context

0x01 前言

Go ContextGo 语言标准库中用于处理请求作用域的包,它提供了一种优雅的方式来传递请求相关的上下文信息,例如请求的截止时间用户认证信息等。

0x02 为什么需要 Context?

Go中,最重要的一个概念就是并发协程,它只需用一个关键字go就可以开启一个协程并运行。在并发协程中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作,比如在实际中当服务器端收到一个请求时,很可能需要发送几个请求去请求其他服务的数据,由于Go语法上的同步阻塞写法,我们一般会创建几个goroutine并发去做一些事情;那么这时候很可能几个goroutine之间需要共享数据,还有当request被取消时,创建的几个goroutine也应该被取消掉,那么这就是context的用武之地。

在讲context之前,先说说channel,使用 channel也可以解决这类问题。举个例子:

func main() {
	// 接收完成动作的信道
	stop := make(chan bool)
	// 接收消息的缓存信道,数量为 10
	messages := make(chan string, 10)

	// 释放资源(养成使用资源后释放资源的好习惯~)
	defer close(stop)
	defer close(messages)

	// consumer 新建一个协程,负责消费完成信道
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			// select 阻塞监听 channel 的 io 操作
			select {
			case <-stop:
				fmt.Println("Job has stop...")
				return
			default:
				fmt.Printf("Received message: %s\n", <-messages)
			}
		}
	}()

	// producer
	for i := 0; i < 10; i++ {
		messages <- fmt.Sprintf("I AM PRODUCER-%v", i)
	}
	time.Sleep(5 * time.Second)
	// 告诉 stop 信道,我要中止了
	stop <- true
	time.Sleep(1 * time.Second)
	fmt.Println("Process exit!")
}

上述例子中定义了一个 buffer 为0的 stop channel , 子协程运行着定时任务。如果主协程需要在某个时刻发送消息通知子协程中断任务退出,那么就可以让子协程监听这个 stop channel ,一旦主协程发出信号或者关闭 stop channel ,那么子协程就可以退出了,这样就实现了主协程通知子协程的需求。

但如果是父子任务的同步取消机制,层级更深次(比如子任务包含子任务,而这一层的子任务又包含下一层子任务)的情况下,使用 channel将变得复杂繁琐。

context 原理

先让我们来看看 context的实现原理

context 接口

type Context interface {
    // 返回与此上下文关联的取消函数。
    Done() <-chan struct{}

    // 返回此上下文的截止时间(如果有)。
    // 如果没有截止时间,则ok为false。
    Deadline() (deadline time.Time, ok bool)

    // 返回此上下文的键值对数据。
    Value(key interface{}) interface{}
}

context接口四个具体实现方法:

  • Deadline 返回绑定当前 context 的任务被取消的截止时间;如果没有设定期限,将返回 ok == false 。
  • Done 当绑定当前 context 的任务被取消时,将返回一个关闭的 channel ;如果当前 context 不会被取消,将返回 nil 。
  • Err 如果 Done 返回的 channel 没有关闭,将返回 nil ;如果 Done 返回的 channel 已经关闭,将返回非空的值表示任务结束的原因。如果是 context 被取消, Err 将返回 Canceled ;如果是 context 超时, Err 将返回 DeadlineExceeded
  • Value 返回 context 存储的键值对中当前 key 对应的值,如果没有对应的 key ,则返回 nil 。

contxxt接口六个函数:

  • Background
  • TODO
  • WithCancel
  • WithDeadline
  • WithTimeout
  • WithValue

context重要的结构体

emptyCtx

emptyCtx本质是一个整型类型,他对Context接口的实现,非常简单,其实是什么也没做,都是一堆空方法。

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key any) any {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
	return background
}

// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
	return todo
}

可以看到上述代码中使用new了两个结构体为 emptyCtx 的变量background 和 todo ,String()方法得意 case对比得出对象。

Background和TODO这两个公共方法是返回background和todo;官方建议Background用做顶层的context,todo看起来用来占位使用,不过实现来说两个没区别

cancelCtx

通过WithCancel来创建的就是cancelCtx,WithCancel返回一个ctx和cancel方法,通过调用cancel方法,可以将Context取消,来控制协程,具体看下面例子:

func fixLeakingByContext() {
	// 创建一个带有取消方法的上下文
	ctxWithCancel, cancel := context.WithCancel(context.Background())
	// 释放资源
	defer cancel()

	// 创建一个信道
	doChan := make(chan interface{})

	go doJob(ctxWithCancel, doChan)
	go doJob(ctxWithCancel, doChan)
	go doJob(ctxWithCancel, doChan)
	go doJob(ctxWithCancel, doChan)
	go doJob(ctxWithCancel, doChan)

	// 异步取消
	go func() {
		cancel()
	}()
	// 随机触发某个子协程
	doChan <- 1
	// 休眠一下方便看到效果
	time.Sleep(1 * time.Second)
  // 执行后输出结果
  /*
	Cancelled
	Received:  1
	Cancelled
	Cancelled
	Cancelled
	*/
}

func doJob(ctx context.Context, doChan chan interface{}) {
	select {
	case <-ctx.Done():
		fmt.Println("Cancelled")
		return
	case res := <-doChan:
		fmt.Println("Received: ", res)
		return
	}
}

可以看到上述代码执行后的输出结果只有一个 Received,其他的子协程由于收到了取消的信号,就直接 return ,不往下执行了,可以避免协程资源没有被回收引起的内存泄露。

再看看 WithCancel的源码:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
         // WithCancel通过一个父级Context来创建出一个cancelCtx
	c := newCancelCtx(parent)
         // 调用propagateCancel根据父级context的状态来关联cancelCtx的cancel行为
	propagateCancel(parent, &c)
         // 返回c和一个方法,方法中调用c.cancel并传递Canceled变量
	return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}
var Canceled = errors.New("context canceled")

WithCancel通过一个父级Context来创建出一个cancelCtx,然后调用propagateCancel根据父级context的状态来关联cancelCtxcancel行为(感觉这里不应该叫propagate,冒泡一般理解是自下向上,这个函数明显是自下向上,应该叫cascade更为合理一些)。随后返回c和一个方法,方法中调用c.cancel并传递Canceled变量(其实是一个error实例);

cancelCtxWidthDeadlineWidthTimeout的基石,所以cancelCtx的实现相对复杂。

newCancelCtx方法可以看到是创建了一个cancelCtx实例

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

再看看 cancelCtx 的定义

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
	Context // 内嵌结构体

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

// 函数返回一个只读channel,而且没有地方向这个channel里写数据。所以直接调用这个只读channel会被阻塞。一般通过搭配 select 来使用。一旦关闭,就会立即读出零值。
func (c *cancelCtx) Done() <-chan struct{} {
    c.mu.Lock()
    if c.done == nil {
        c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
}

// 这个函数功能就是关闭channel:c.done();
// 递归取消它的所有子节点;最后从父节点删除自己。
// 通过关闭channel,将取消信号传递给了它的所有子节点。
// goroutine 接收到取消信号的方式就是 select 语句中的 读c.done 被选中
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {// 必须传一个err值,后面判断用
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled 已经被其他协程取消了
    }
    c.err = err
    
    // 关闭channel,通知其他协程
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    //遍历它是所有子节点
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)// 递归地取消所有子节点
    }
    // 将子节点清空
    c.children = nil
    c.mu.Unlock()
    if removeFromParent {
       // 从父节点中移除自己
        removeChild(c.Context, c)
    }
}

cancelCtx有一个内嵌的Context类型,实际存储的都是父级上下文对象,还有四个独立的字段:

  • mu:一个互斥量,用来加锁保证某些操作的线程安全性
  • done:atomic.Value一个可以对任意类型进行原子型操作的结构;提供Load和Store方法;看Go源码这里存的是一个struct{}类型的channel
  • children:一个key为canceler值为struct{}的map类型;
  • err:存放error的字段

这里的canceler是一个接口,代表可以直接被cancelContext类型,基本指的是 *cancelCtx*timerCtx两种context,也被他俩实现

// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

下面看下propagateCancel,据父级context的状态来关联cancelCtxcancel行为

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
   // 父节点是空的,直接返回
    if parent.Done() == nil {
        return // parent is never canceled
    }
    
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)//父节点已经取消,它的子节点也需要取消
        } else {
            //父节点未取消
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            // 把这个child放到父节点上
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
      // 如果没有找到可取消的父 context。新启动一个协程监控父节点或子节点取消信号
        go func() {
            select {          
             // 保证父节点被取消的时候子节点会被取消
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

parentCancelCtx

这个函数识别三种类型的ContextcancelCtxtimerCtxvalueCtx

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    for {
        switch c := parent.(type) {
        case *cancelCtx:
            return c, true    // 找到最近支持cancel的parent,由parent进行取消操作的调用
        case *timerCtx:
            return &c.cancelCtx, true // 找到最近支持cancel的parent,由parent进行取消操作的调用
        case *valueCtx:
            parent = c.Context // 递归
        default:
            return nil, false
        }
    }
}

timerCtx

timerCtx嵌入了cancelCtx结构体,所以cancelCtx的方法也可以使用。
timerCtx主要是用于实现WithDeadlineWithTimeout两个context实现,其继承了cancelCtx结构体,同时还包含一个timer.Timer定时器和一个deadline终止实现。Timer会在deadline到来时,自动取消context

// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
	*cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
   c.cancelCtx.cancel(false, err) //由于继承了cancelCtx,这里调用了cancelCtx的cancel()方法
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        c.timer.Stop()//停止定时器
        c.timer = nil
    }
    c.mu.Unlock()
}

上方 cancel函数继承了cancelCtx的方法cancel(),然后后面进行自身定时器Stop()的操作,这样就可以实现取消操作了。

valueCtx

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val any
}
func WithValue(parent Context, key, val any) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

通过键值对的方式保存(注意value是非线程安全)

func (c *valueCtx) String() string {
	return contextName(c.Context) + ".WithValue(type " +
		reflectlite.TypeOf(c.key).String() +
		", val " + stringify(c.val) + ")"
}

func (c *valueCtx) Value(key any) any {
	if c.key == key {
		return c.val
	}
	return value(c.Context, key)
}

如果keyWithValue调用时相同,则返回对应的val,否则进入value方法,在内嵌的Context中查找key对应的值,根据Context类型先做一些类型判断,来判断一些关键的keycancelCtxKey,不然继续在内嵌Context中查找。

0x03 使用示例

带超时的上下文

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel() // 在函数结束时调用 cancel 函数释放资源
	// 使用 ctx 执行一些需要在 5 秒内完成的操作

	done := make(chan bool)

	go func(doneCh chan bool) {
		// 模拟超时行为
		time.Sleep(10 * time.Second)
		doneCh <- true
	}(done)

	select {
	case <-ctx.Done():
		// 结束,释放资源
		fmt.Println("Cancelled")
		return
	case <-done:
		fmt.Println("Received")
		return
	}
}

带截止时间的上下文

deadline := time.Now().Add(time.Second * 10)  // 10 秒后的截止时间
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

// 使用 ctx 执行一些需要在 10 秒内完成的操作

传递值的上下文

ctx := context.WithValue(context.Background(), key, value)

// 在其他函数中可以使用 ctx.Value(key) 获取传递的值

并发控制

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    // 执行一些耗时的操作

    // 在操作完成或需要提前退出时调用 cancel 函数
    cancel()
}()

// 在主函数或其他需要等待的地方,使用 <-ctx.Done() 接收信号
select {
case <-ctx.Done():
    // 收到信号后的处理逻辑
}