协程库coroutine源代码解析

赵星宇

本来是想分析微信的libco的所有实现,但是发现其可读性并不高,因而选择了云风原来写过的玩具协程库coroutine https://github.com/cloudwu/coroutine 。和libco一样,coroutine也使用了共享栈,即多个协程的栈共用一块内存。

唯一的调度协程使用系统分配的栈空间,多个工作协程使用调度协程在堆中分配出来的一块内存作为它们的栈。

本文一共分为以下三部分进行叙述:

1、使用方法

2、需要用到的数据结构与宏定义

3、具体库函数分析

1、使用方法

先看看其使用方法:https://github.com/cloudwu/coroutine/blob/master/main.c 

int main() {

        struct schedule * S = coroutine_open();

struct args arg1 = { 0 };

        struct args arg2 = { 100 };

        int co1 = coroutine_new(S, foo, &arg1);

        int co2 = coroutine_new(S, foo, &arg2);

        printf("main start\n");

        while (coroutine_status(S,co1) && coroutine_status(S,co2)) {

                coroutine_resume(S,co1);

                coroutine_resume(S,co2);

        }

        printf("main end\n");

        coroutine_close(S);

        return 0;

}

首先调用coroutine_open函数打开一个调度器。

创建协程使用coroutine_new函数传给它调度器,协程要执行的函数和指向函数需要的参数的指针,返回创建的协程的协程号。

然后分别创建了两个协程co1和co2。

查看协程状态使用coroutine_status函数,它会返回指定协程的状态信息,为0表示协程已经运行完毕。

将当前运行协程切换到指定协程使用coroutine_resume恢复协程函数。

然后循环判断协程1和协程2是否运行完毕,如果都没有运行完毕则继续循环,否则跳出循环。在循环中首先恢复协程1运行,协程1阶段性完成后回到主协程,恢复协程2运行,协程2阶段性完成后回到主协程继续循环。

最后调用coroutine_close函数关闭打开的调度器。

协程函数和协程函数需要的参数定义:

struct args {

        int n;

};

static void foo(struct schedule * S, void *ud) {

        struct args * arg = ud;

        int start = arg->n;

        int i;

        for (i=0;i<5;i++) {

                printf("coroutine %d : %d\n",coroutine_running(S) , start + i);

                coroutine_yield(S);

        }

}

参数类型无需多言可以自行定义。

将当前运行协程恢复到主协程使用coroutine_yield阶段性完成函数。

协程函数需要传入调度器指针和指向协程参数的指针。本协程函数中,首先取出参数中包裹的数据,然后进行了5次打印和阶段性完成操作coroutine_yield。

程序的运行结果为:

0:0

1:100

0:1

1:101

0:2

1:102

0:3

1:103

0:4

1:104

调用流程是:

主协程调用coroutine_resume 0当前运行协程变为0号协程

0号协程输出0:0

0号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 1当前运行协程变为1号协程

1号协程输出1:100

1号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 0当前运行协程变为0号协程

0号协程输出0:1

0号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 1当前运行协程变为1号协程

1号协程输出1:101

1号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 0当前运行协程变为0号协程

0号协程输出0:2

0号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 1当前运行协程变为1号协程

1号协程输出1:102

1号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 0当前运行协程变为0号协程

0号协程输出0:3

0号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 1当前运行协程变为1号协程

1号协程输出1:103

1号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 0当前运行协程变为0号协程

0号协程输出0:4

0号协程调用coroutine_yield 当前运行协程变为主协程

主协程调用coroutine_resume 1当前运行协程变为1号协程

1号协程输出1:104

1号协程调用coroutine_yield 当前运行协程变为主协程

2、需要用到的数据结构与宏定义

#define COROUTINE_DEAD 0

#define COROUTINE_READY 1

#define COROUTINE_RUNNING 2

#define COROUTINE_SUSPEND 3

上面四个宏分别定义了协程的不同状态。dead表示协程运行完毕,ready表示协程创建完毕还没有运行,running表示协程正在运行,suspend表示协程被挂起。

#define STACK_SIZE (1024*1024)

栈大小定义了所有工作协程使用的栈的最大值,主协程会按照这个大小去堆中分配一块内存。

#define DEFAULT_COROUTINE 16

默认协程数指定了调度器中存放协程指针的容器的初始值

struct schedule {

        char stack[STACK_SIZE];

        ucontext_t main;

        int nco;

        int cap;

        int running;

        struct coroutine **co;

};

调度器结构体,其中存放了:

stack 多个工作协程共用的栈空间

main 主协程的上下文

nco 工作协程的数量

cap 存放工作协程的容器的容量,如果超过容量会重新分配内存

running 正在运行的工作协程号

co 工作协程的指针数组,即存放指向工作协程的指针的容器

typedef void (*coroutine_func)(struct schedule *, void *ud);

协程函数类型定义

struct coroutine {

        coroutine_func func;

        void *ud;

        ucontext_t ctx;

        struct schedule * sch;

        ptrdiff_t cap;

        ptrdiff_t size;

        int status;

        char *stack;

};

协程结构体,其中存放了:

func 协程函数的函数指针

ud 指向协程函数参数的指针

ctx 协程上下文

sch 指向调度器的指针

stack 当协程换出时用于存放当前栈内容的容器

cap stack容器的容量

size stack容器目前使用的大小

status 当前协程的状态

3、具体库函数分析

先看简单的打开和关闭调度器函数:

struct schedule * coroutine_open(void) {

        struct schedule *S = malloc(sizeof(*S));

        S->nco = 0;

        S->cap = DEFAULT_COROUTINE;

        S->running = -1;

        S->co = malloc(sizeof(struct coroutine *) * S->cap);

        memset(S->co, 0, sizeof(struct coroutine *) * S->cap);

        return S;

}

打开调度器函数做的事情基本上就是分配一个调度器对象,然后将其线程数设置为0,将其线程容量设置为默认容量,将其正在运行的协程号设置为-1即主协程,将其存放指向工作协程的指针的容器按照容量分配好。这里没有提到,其实在分配调度器的时候其中的主协程上下文和多个工作协程共用的栈空间也分配好了。

void coroutine_close(struct schedule *S) {

        int i;

        for (i=0;i<S->cap;i++) {

                struct coroutine * co = S->co[i];

                if (co) {

                        _co_delete(co);

                }

        }

        free(S->co);

        S->co = NULL;

        free(S);

}

关闭调度器函数做的事情是:将调度器里面的存放指向工作协程的指针的容器中的协程占用的内存调用_co_delete释放掉,并且将容器占用的内存也释放掉并且置空,最后将调度器占用的内存也释放掉。

coroutine_close函数中调用的_co_delete函数实现如下:

void _co_delete(struct coroutine *co) {

        free(co->stack);

        free(co);

}

删除协程函数做的事情是:将协程中用于换出后存放栈的内存释放掉,最后将协程也释放掉。

再看看创建协程的函数:

struct coroutine * _co_new(struct schedule *S , coroutine_func func, void *ud) {

        struct coroutine * co = malloc(sizeof(*co));

        co->func = func;

        co->ud = ud;

        co->sch = S;

        co->cap = 0;

        co->size = 0;

        co->status = COROUTINE_READY;

        co->stack = NULL;

        return co;

}

协程构造函数做的事情是:分配协程所需要的内存,并且将其中的协程函数,指向协程函数参数的指针,和调度器指针按照参数进行初始化,然后将其换出后的栈空间置空,其容量和大小都设置为0,最后将协程状态置为就绪。

int coroutine_new(struct schedule *S, coroutine_func func, void *ud) {

        struct coroutine *co = _co_new(S, func , ud);

        if (S->nco >= S->cap) {

                int id = S->cap;

                S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *));

                memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap);

                S->co[S->cap] = co;

                S->cap *= 2;

                ++S->nco;

                return id;

        } else {

                int i;

                for (i=0;i<S->cap;i++) {

                        int id = (i+S->nco) % S->cap;

                        if (S->co[id] == NULL) {

                                S->co[id] = co;

                                ++S->nco;

                                return id;

                        }

                }

        }

        assert(0);

        return -1;

}

创建协程函数做的事情是:首先调用_co_new创建一个协程对象,然后将其插入调度器的协程指针数组容器中,并且返回其在容器中的编号。当协程数达到最大容量的时候会进行扩容。

到了最关键的时候,我们来分析resume函数和yield函数:

void coroutine_resume(struct schedule * S, int id) {

        assert(S->running == -1);

        assert(id >=0 && id < S->cap);

        struct coroutine *C = S->co[id];

        if (C == NULL)

                return;

        int status = C->status;

        switch(status) {

        case COROUTINE_READY:

                getcontext(&C->ctx);

                C->ctx.uc_stack.ss_sp = S->stack;

                C->ctx.uc_stack.ss_size = STACK_SIZE;

                C->ctx.uc_link = &S->main;

                S->running = id;

                C->status = COROUTINE_RUNNING;

                uintptr_t ptr = (uintptr_t)S;

                makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));

                swapcontext(&S->main, &C->ctx);

                break;

        case COROUTINE_SUSPEND:

                memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);

                S->running = id;

                C->status = COROUTINE_RUNNING;

                swapcontext(&S->main, &C->ctx);

                break;

        default:

                assert(0);

        }

}

首先分析resume函数,其主要做的是:首先从调度器的协程指针容器中根据协程号获取对应的协程。然后判断该协程是否为空,如果为空则直接返回。否则根据协程的状态信息进行处理,此时协程只可能有两个状态就绪和挂起。不可能是dead是因为这样会找不到该协程,不可能是正在运行是因为当前正在运行的是主协程。

如果是就绪状态则,首先调用getcontext给协程的上下文做好初始化工作。然后给其ss_sp设置为共享栈的最大栈顶,给其ss_size设置为共享栈的最大大小,给其link设置为指向主协程上下文的指针。然后给协程的状态设置为运行状态,并且将调度器中运行的协程号设置为这个协程。然后调用了makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32)),这个函数做的工作可以使用前面在分析libco里面的coctx_make来解释。

int coctx_make(coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1)

{

        //make room for coctx_param

        char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);

        sp = (char*)((unsigned long)sp & -16L);

        

        coctx_param_t* param = (coctx_param_t*)sp ;

        param->s1 = s;

        param->s2 = s1;

        memset(ctx->regs, 0, sizeof(ctx->regs));

        ctx->regs[ kESP ] = (char*)(sp) - sizeof(void*);

        ctx->regs[ kEIP ] = (char*)pfn;

        return 0;

}

coctx_make函数做的事情是:这里分析i386版本,首先将sp指向栈底保留的param对象,并且对其内存到16字节,将参数s,s1设置到param对象中模仿参数传递,然后对该上下文所有寄存器清空内存,将esp设置为指向当前栈顶,将eip设置为传递进来的函数指针,以备上下文切换。这里没有讨论link的问题,后面会继续说。这里为什么要传递两个参数呢?是为了与x86-64的edi, esi两个寄存器必须传递两个参数做兼容。

套用在makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32))上面就是说,会将mainfunc函数指针设置为协程上下文中的eip,将ptr和ptr>>32当作参数放到上下文中指定的栈上,并且设置上下文中的esp。

最后调用swapcontext,将当前上下文保存到主协程的上下文中,并且将目标协程的上下文置为当前上下文,即跳转到eip指向的地址,esp为栈顶指针,进入函数后会初始化ebp。具体分析见我的上一篇博客https://zxylvlp.github.io/blog/coctx_swap.html 。这时我们就调度了该协程,当该协程yield或者完成时会回到主协程上下文继续执行。

刚刚讨论了就绪状态的处理,现在来看看挂起状态的处理。首先将协程保存的换出前的栈复制到调度器的共享栈中。然后给协程的状态设置为运行状态,并且将调度器中运行的协程号设置为这个协程。最后调用了swapcontext,将当前上下文保存到主协程的上下文中,并且将目标协程的上下文置为当前上下文,跳入协程。

其中用到了mainfunc函数,现在讨论具体细节:

static void mainfunc(uint32_t low32, uint32_t hi32) {

        uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32);

        struct schedule *S = (struct schedule *)ptr;

        int id = S->running;

        struct coroutine *C = S->co[id];

        C->func(S,C->ud);

        _co_delete(C);

        S->co[id] = NULL;

        --S->nco;

        S->running = -1;

}

根据参数拼出调度器指针,从中找到当前协程,并且调用当前协程的协程函数,调用完毕后调用_co_delete析构协程,并且将该协程在调度器容器中的指针置空,将协程数减1,将运行的协程置为主协程。

注意在当前实现中当mainfunc返回的时候,其没有返回地址并且pop ebp也会给其清零,如果返回会出错,所以当其返回的时候需要加1条setcontext,将上下文设置为主协程的上下文,如果不加可以通过多包裹1层加了的函数来实现。这里的link就是这个作用,指定最后setcontext的上下文。

void coroutine_yield(struct schedule * S) {

        int id = S->running;

        assert(id >= 0);

        struct coroutine * C = S->co[id];

        assert((char *)&C > S->stack);

        _save_stack(C,S->stack + STACK_SIZE);

        C->status = COROUTINE_SUSPEND;

        S->running = -1;

        swapcontext(&C->ctx , &S->main);

}

coroutine_yield函数做的事情是:根据当前运行的协程号即自己的协程号去调度器中找到指向自己协程的指针,然后调用_save_stack将当前使用的栈拷贝到协程中。将协程的状态置为挂起状态,并且将调度器运行的协程置为主协程。最后调用swapcontext将当前上下文保存到协程的上下文中,并且将主协程的上下文设置为当前上下文,跳回主协程。

在coroutine_yield函数中用到了_save_stack保存当前协程使用的栈。具体分析如下:

static void _save_stack(struct coroutine *C, char *top) {

        char dummy = 0;

        assert(top - &dummy <= STACK_SIZE);

        if (C->cap < top - &dummy) {

                free(C->stack);

                C->cap = top-&dummy;

                C->stack = malloc(C->cap);

        }

        C->size = top - &dummy;

        memcpy(C->stack, &dummy, C->size);

}

首先创建一个局部变量,并且获取其地址作为栈顶地址,然后判断协程中用于存放栈的容器的容量是否足够,如果不够则重新申请。将容器使用量置为栈的大小,并且将整个栈拷贝到容器中。

最后还有两个简单的函数:

int coroutine_status(struct schedule * S, int id) {

        assert(id>=0 && id < S->cap);

        if (S->co[id] == NULL) {

                return COROUTINE_DEAD;

        }

        return S->co[id]->status;

}

coroutine_status函数会返回指定协程的状态,它会去调度器的协程容器中根据协程号去找,不过找不到的返回dead状态,如果找到了则返回该协程的状态。

int coroutine_running(struct schedule * S) {

        return S->running;

}

coroutine_running函数会返回当前协程的协程号。