WRY

Where Are You?
You are on the brave land,
To experience, to remember...

0%

Go之Context标准库

诞生的背景

Go中的goroutine之间没有父子关系,也就没有所谓的退出通知机制,多个goroutine都是平行的被调度,他们之间的协作涉及到通信、同步、通知和退出。

  • 通信:主要指goroutine之间的数据通信,以chan通道为通信的基础
  • 同步:可以通过不带缓冲的chan或者sync.WaitGroup来实现
  • 通知:和通信的主要区别是,通知更偏重于管理、控制流数据。当然也可以通过chan来实现,但是他不是一个通用的解决方案。
  • 退出:可以通过chan和select的广播机制(close chan to broadcast)实现退出。但这也不是一个通用的解决方案。

综上所述,可以看出,通信、同步、通知与等待的本质都是信息的传递,得益于强大的chan的功能,上述的需求都可以通过chan的各种特性来实现。但chan的强大并不意味它是完成所用功能的最佳通用解决方案。在复杂的应用场景中,goroutine之间存在复杂的树状关系,想实现上述功能就需要精心的设计和小心的调试,这显然不是Go语言的风格。

设计目的

context库的设计目的是跟踪Goroutine的调用树,并在这些Goroutine调用树中传递通知和元数据。

  • 退出通知:通知可以传递给整个goroutine调用树上的每一个goroutine
  • 传递数据:数据可以传递给整个goroutine调用树上的每一个goroutine

实现思想

Context需要由goroutine手动创建,第一个创建的goroutine被称为根节点,其负责创建一个实现Context接口的具体对象。并将该对象在新拉起来的goroutine中传递,最终该context对象就有了一个树状的数据结构,依赖于该数据结构实现消息传递。

本质上还是通过chan来实现的消信息传递

接口设计

Context的源码接口定义如下,对其中的注释进行了简单的总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Context 包含了超时时间、取消信号和一些其它值,其中的方法需要支持并发访问
type Context interface {
// Deadline 可用于实现超时控制,当其被具体实现时,ok=true,deadline为超时时间
Deadline() (deadline time.Time, ok bool)

// Done 返回一个chan,当chan中有内容输出(在具体实现中通常是关闭信号)时,代表当前的goroutine应当结束。
// 下面是一个使用上的例子,常被用于select语句
// // 从DoSomething函数获取到数据流,若获取过程出错或者监听到ctx的Done通知,则退出循环过程
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// 参考 https://blog.golang.org/pipelines中更多用demo
Done() <-chan struct{}

// 在Done关闭之后,可用该函数查看取消的原因:
// * Canceled,context被取消掉了
// * DeadlineExceeded,运行超时
Err() error

// 用于访问上游传递给下游的值,通过一个key返回一个value或nil
// 仅将其用于传输进程和API边界的请求范围,而不是其他的可选参数。我的理解是别用它来获取和进程控制无关的数据
// 数据存储的结构体是map[interface{}]interface{},而且是存储的key都是全局的
// 下面是一个使用demo
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}

此外还有一个值得一提的扩展接口canceler,实现这个接口的例子就有*cancelCtx and *timerCtx

1
2
3
4
5
6
type canceler interface {
// 创建了canceler实例的goroutine,可以通过cancel函数来取消后续创建的goroutine的执行
cancel(removeFromParent bool, err error)
// 后续的goroutine需要监听Done的内容,及时退出
Done() <-chan struct{}
}

具体实现

以context原生提供的*cancelCtx and *timerCtx,理解context的实现思想。

cancelCtx

该结构体实现了Context和cancler接口,实现源代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// A cancelCtx can be canceled. When canceled, it also cancels any children that implement canceler.
type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call,实现了一个进程树,用map来当set用。
err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Value(key interface{}) interface{} {
if key == &cancelCtxKey { // &cancelCtxKey is the key that a cancelCtx returns itself for.
return c
}
return c.Context.Value(key)
}

func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock() // 先锁,就不会有重复创建的问题
if c.done == nil { // 惰性创建
c.done = make(chan struct{})
}
d := c.done // 此处的确是复制,但是chan本身就是一个指针类型
c.mu.Unlock()
return d
}

func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}

type stringer interface {
String() string
}

func contextName(c Context) string {
if s, ok := c.(stringer); ok {
return s.String()
}
return reflectlite.TypeOf(c).String()
}

func (c *cancelCtx) String() string {
return contextName(c.Context) + ".WithCancel"
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil { // 通过c.err是否被设置来判断context是否已经被取消了
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan // closedchan is a reusable closed channel. 在包中默认创建的一个
} else {
close(c.done)
}
for child := range c.children { // 沿树状结构向下调用取消函数
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()

if removeFromParent {
removeChild(c.Context, c) // 姜当前Context从父Context上删除下来,具体细节见Ps
}
}

Ps:

  • 关于chan的具体类型,参考,函数签名如下

    1
    func makechan(t *chantype, size int) *hchan
  • removeChild实现逻辑如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // removeChild removes a context from its parent.
    func removeChild(parent Context, child canceler) {
    p, ok := parentCancelCtx(parent) // 判断parent中是否封装了cancelCtx字段或者接口里面存放的底层类型是cancelCtx类型,具体细节见API接口
    if !ok {
    return
    }
    p.mu.Lock()
    if p.children != nil {
    delete(p.children, child)
    }
    p.mu.Unlock()
    }

timerCtx

timerCtx实现了Context接口的具体类型,内部封装了cancelCtx的类型实例,同时包含一个deadline变量,用于实现定时退出的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.

deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}

func (c *timerCtx) String() string {
return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
c.deadline.String() + " [" +
time.Until(c.deadline).String() + "])"
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}

valueCtx

valueCtx用于传递通知消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}

// stringify tries a bit to stringify v, without using fmt, since we don't
// want context depending on the unicode tables. This is only used by
// *valueCtx.String().
func stringify(v interface{}) string {
switch s := v.(type) {
case stringer:
return s.String()
case string:
return s
}
return "<not Stringer>"
}

func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

API接口

上述介绍的Context、canceler接口以及cancelCtx、timerCtx具体实现,都是对用户不可见的,而是通过下面的API函数获取一个emptyCtx的根节点对象。

emptyCtx定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}

获取emptyCtx实例的方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}

// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}

用于构造取消树节点的根节点对象,用于填充之后Withxxx函数的实参,函数签名如下:

1
2
3
4
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) // 带有退出通知的Context对象
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) // 带有超时通知的Context对象
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) // 内部调用WithDeadline函数
func WithValue(parent Context, key, val interface{}) Context // 创建一个用于数据传递的Context对象

WithCancel函数为例,介绍源码的实现细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled,说明当前正在创建的child是取消构造树的根
}
// 此处说明parent已经是一个可以取消的节点
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

// 回溯判断自己的祖先是否是一个cancelCtx类型实例
// 若是:将child注册到parent中
// 若不是:则只需要监听parent和自己的取消信号即可
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil { // 检查当前parent的parent是否被取消
// parent has already been canceled
child.cancel(false, p.err)
} else { // 在当前的parent依旧存活的情况下,为parent注册child
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1) // goroutines counts the number of goroutines ever created; for testing.
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

判断parent中是否封装了cancelCtx字段或者接口里面存放的底层类型是cancelCtx类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
// closedchan默认的一个chan
if done == closedchan || done == nil { // cancelCtx的done返回的chan一定不是默认的或者nil
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx) // cancelCtx实例的cancelCtxKey代表其本身
if !ok {
return nil, false
}
p.mu.Lock()
ok = p.done == done // TODO 不太理解为啥需要这里的判断
p.mu.Unlock()
if !ok {
return nil, false
}
return p, true
}

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
"context"
"fmt"
"time"
)

//define a new type include a Context Field
type otherContext struct {
context.Context
}

func main() {

//Construct a *cancelCtx type object
ctxa, cancel := context.WithCancel(context.Background())
go work(ctxa, "work1")

//Construct a *timerCtx type object wrapped by *cancelCtx
tm := time.Now().Add(3 * time.Second)
ctxb, _ := context.WithDeadline(ctxa, tm)
go work(ctxb, "work2")

oc := otherContext{ctxb}
//Construct a *cancelCtx type object wrapped by oc
ctxc := context.WithValue(oc, "key", "god andes,pass from main ")
go workWithValue(ctxc, "work3")

time.Sleep(10 * time.Second)
cancel()
time.Sleep(5 * time.Second)
fmt.Println("main stop")
}

//do something
func work(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s get msg to cancel\n", name)
return
default:
fmt.Printf("%s is running \n", name)
time.Sleep(1 * time.Second)
}
}
}

//do something and pass values by context
func workWithValue(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Printf("%s get msg to cancel\n", name)
return
default:
value := ctx.Value("key").(string)
fmt.Printf("%s is running value=%s \n", name, value)
time.Sleep(1 * time.Second)
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
work2 is running 
work3 is running value=god andes,pass from main
work1 is running
work2 is running
work1 is running
work3 is running value=god andes,pass from main
work3 is running value=god andes,pass from main
work1 is running
work2 is running
// work2, 3超时退出
work2 get msg to cancel
work3 get msg to cancel
work1 is running
work1 is running
work1 is running
work1 is running
work1 is running
work1 is running
work1 is running
// work1 被取消退出
work1 get msg to cancel
main stop

流程图如下

当ctxb超时退出时,会cancel掉其所有孩子,因此work2,3一起超时退出

当调用创建ctxa返回的cancel函数时,会将所有cancel都取消掉

参考资料

  • 《Go语言核心编程》--李文塔
  • Go源码库