srs源码分析2-浅析state_threads

2021/10/2 20:42:12

本文主要是介绍srs源码分析2-浅析state_threads,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

本文分析的state-threads的版本是1.9

srs源码分析1-搭建环境

srs源码分析2-浅析state_threads

srs源码分析3-srs的启动

srs源码分析4-客户端的连接

srs源码分析5-handshake

srs源码分析6-connect

以下正在写作中。。。

srs源码分析7-create stream

srs源码分析8-推流-publish

srs源码分析9-推流-unpublish

srs源码分析10-拉流-play

srs源码分析11-拉流-pause

srs源码分析12-转发-forward


srs是基于协程开发的,底层使用了state_threads协程库。为了更好的理解srs,所以需要先熟悉state_threads。这里并不会介绍协程的相关概念,只是简单的介绍一下state_threads的核心逻辑。

以下state_thread会被简称为st。

使用示例-echo server

使用st实现了一个简单的echo服务器,以下代码写的很简单,重点是理解st的使用。

#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <st.h>

#define LISTEN_PORT 9000

#define ERR_EXIT(m) \
  do {              \
    perror(m);      \
    exit(-1);       \
  } while (0)

void *client_thread(void *arg) {
  st_netfd_t client_st_fd = (st_netfd_t)arg;
  int client_fd = st_netfd_fileno(client_st_fd);

  sockaddr_in client_addr;
  socklen_t client_addr_len = sizeof(client_addr);
  int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);
  if (ret == -1) {
    printf("[WARN] Failed to get client ip: %s\n", strerror(ret));
  }

  char ip_buf[INET_ADDRSTRLEN];
  bzero(ip_buf, sizeof(ip_buf));
  inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,
            sizeof(ip_buf));

  while (1) {
    char buf[1024] = {0};
    ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);
    if (ret == -1) {
      printf("client st_read error\n");
      break;
    } else if (ret == 0) {
      printf("client quit, ip = %s\n", ip_buf);
      break;
    }

    printf("recv from %s, data = %s", ip_buf, buf);

    ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);
    if (ret == -1) {
      printf("client st_write error\n");
    }
  }
}

void *listen_thread(void *arg) {
  while (1) {
    st_netfd_t client_st_fd =
        st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);
    if (client_st_fd == NULL) {
      continue;
    }

    printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));

    st_thread_t client_tid =
        st_thread_create(client_thread, (void *)client_st_fd, 0, 0);
    if (client_tid == NULL) {
      printf("Failed to st create client thread\n");
    }
  }
}

int main() {
  int ret = st_set_eventsys(ST_EVENTSYS_ALT);
  if (ret == -1) {
    printf("st_set_eventsys use linux epoll failed\n");
  }

  ret = st_init();
  if (ret != 0) {
    printf("st_init failed. ret = %d\n", ret);
    return -1;
  }

  int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  if (listen_fd == -1) {
    ERR_EXIT("socket");
  }

  int reuse_socket = 1;
  ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,
                   sizeof(int));
  if (ret == -1) {
    ERR_EXIT("setsockopt");
  }

  struct sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(LISTEN_PORT);
  server_addr.sin_addr.s_addr = INADDR_ANY;

  ret =
      bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));
  if (ret == -1) {
    ERR_EXIT("bind");
  }

  ret = listen(listen_fd, 128);
  if (ret == -1) {
    ERR_EXIT("listen");
  }

  st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);
  if (!st_listen_fd) {
    printf("st_netfd_open_socket open socket failed.\n");
    return -1;
  }

  st_thread_t listen_tid =
      st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);
  if (listen_tid == NULL) {
    printf("Failed to st create listen thread\n");
  }

  while (1) {
    st_sleep(1);   /*用于让出CPU执行权,重新调度就绪的协程。*/
  }

  return 0;
}
root@learner:~/tmp/st# gcc main.cpp -lst
root@learner-Lenovo:~/tmp/st# ./a.out 
get a new client, fd = 4
recv from 192.168.30.17, data = hello world
client quit, ip = 192.168.30.17
^C
root@learner:~# nc 192.168.30.17 9000
hello world
hello world
^C

创建一个listen协程,用于监听客户端的连接,当客户端连接服务后,会为此客户端创建一个client协程,用于处理此客户端的所有请求。

协程的切换

st中协程的切换提供了两种方式:一种是使用系统提供的setjmplongjmp接口,另一种是使用汇编实现的_st_md_cxt_save_st_md_cxt_restore接口,这两个函数从用法上同setjmp和longjmp。

这两种方式的切换本质上都是栈帧的切换。

setjmp和longjmp

C语言中的goto语句只能在当前函数内跳转,而不能在函数间跳转。setjmp()和longjmp()可以执行非局部跳转,即跳转的目标为当前执行函数之外的某个位置。

setjmp()函数为后续由longjmp()调用执行的跳转确立了跳转目标,该目标正是程序发起setjmp()调用的位置。从编程角度看来,调用longjmp()函数后,看起来就和从第二次调用setjmp()返回时完全一样。通过setjmp()的返回值,可以区分setjmp()调用是初始返回还是第二次返回。初始调用返回值为0,后续“伪返回”的返回值为longjmp()调用中val参数所指定的任意值。通过对val参数使用不同值,能够区分程序中跳转至同一目标的不同起跳位置。更多相关setjmp()、longjmp()的介绍,可以参考《Linux/UNIX系统编程手册》上册第106页。

以下是从《Linux/UNIX系统编程手册》摘抄的示例:

#include <stdio.h>
#include <stdlib.h>
#include <setjmp.h>

jmp_buf env;

void f2(int num)
{
    longjmp(env, num);
}

void f1(int num)
{
    if(num == 1){
        longjmp(env, num);
    }

    f2(num);
}

int main(int argc, char** argv)
{
    if(argc != 2){
        printf("Usage: %s [1|2]\n", argv[0]);
        return -1;
    }

    switch(setjmp(env)){
        case 0:
            printf("Calling f1() after initial setjmp()\n");
            f1(atoi(argv[1]));
            break;
        case 1:
            printf("We jumped back from f1()\n");
            break;
        case 2:
            printf("We jumped back from f2()\n");
            break;
    }

    return 0;
}

这个示例我稍微做了一些修改,运行结果及分析如下:

root@learner:~/tmp# ./a.out 1
Calling f1() after initial setjmp()
We jumped back from f1()

在这里插入图片描述

root@learner:~/tmp# ./a.out 2
Calling f1() after initial setjmp()
We jumped back from f2()

在这里插入图片描述

_st_md_cxt_save和_st_md_cxt_restore

这两个函数是通过汇编实现的,代码如下:

#define JB_BX  0
#define JB_SI  1
#define JB_DI  2
#define JB_BP  3
#define JB_SP  4
#define JB_PC  5

.file "md.S"
.text

/* _st_md_cxt_save(__jmp_buf env)              存储函数栈帧   */
.globl _st_md_cxt_save
    .type _st_md_cxt_save, @function
    .align 16
_st_md_cxt_save:
    movl 4(%esp), %eax                /*取得参数env的地址,保存到eax中。*/

    movl %ebx, (JB_BX*4)(%eax)        /*保存ebx*/
    movl %esi, (JB_SI*4)(%eax)        /*保存esi*/
    movl %edi, (JB_DI*4)(%eax)        /*保存edi*/
    
    /*保存esp,即栈顶,保存的栈顶是没有调用_st_md_cxt_save()函数之前的栈顶*/
    leal 4(%esp), %ecx                /
    movl %ecx, (JB_SP*4)(%eax)        /*保存ecx*/
    movl 0(%esp), %ecx                      
    movl %ecx, (JB_PC*4)(%eax)        /*保存引用计数器pc*/
    movl %ebp, (JB_BP*4)(%eax)        /*保存ebp 即调用_st_md_cxt_save()的函数的ebp*/
    xorl %eax, %eax                   /*清空eax 作为_st_md_cxt_save()的返回值*/
    ret
.size _st_md_cxt_save, .-_st_md_cxt_save

/* _st_md_cxt_restore(__jmp_buf env, int val)     恢复函数栈帧  */
.globl _st_md_cxt_restore
    .type _st_md_cxt_restore, @function
    .align 16
_st_md_cxt_restore:
    movl 4(%esp), %ecx            /*获取第一个参数的地址,即env的地址。*/
    movl 8(%esp), %eax            /*获取第二个参数的地址,即val的地址。*/
    movl (JB_PC*4)(%ecx), %edx    /*将原pc寄存器的值保存到edx中*/

    movl (JB_BX*4)(%ecx), %ebx    /*恢复ebx*/
    movl (JB_SI*4)(%ecx), %esi    /*恢复esi*/
    movl (JB_DI*4)(%ecx), %edi    /*恢复edi*/
    movl (JB_BP*4)(%ecx), %ebp    /*恢复ebp*/
    movl (JB_SP*4)(%ecx), %esp    /*恢复esp*/
    testl %eax, %eax              /*测试eax的值是否为0,也就是第二个参数是否为0。*/
    jnz  1f                       /*如果第二个参数不为0,则直接跳转到1:执行。*/
    incl %eax                     /*将返回值置为1*/
    1: jmp *%edx                  /*跳转到之前pc处*/
.size _st_md_cxt_restore, .-_st_md_cxt_restore

_st_md_cxt_save(__jmp_buf env)用于保存栈帧,_st_md_cxt_restore(__jmp_buf env, int val)用于恢复栈帧。

st中协程的切换宏

#if defined(MD_USE_BUILTIN_SETJMP) && !defined(USE_LIBC_SETJMP)   
    #define MD_SETJMP(env) _st_md_cxt_save(env)
    #define MD_LONGJMP(env, val) _st_md_cxt_restore(env, val)

    extern int _st_md_cxt_save(jmp_buf env);
    extern void _st_md_cxt_restore(jmp_buf env, int val);
#else
    #define MD_SETJMP(env) setjmp(env)  
    #define MD_LONGJMP(env, val) longjmp(env, val)
#endif

如果定义了MD_USE_BUILTIN_SETJMP宏,且没有定义USE_LIBC_SETJMP宏,则使用自定义的栈帧存取函数。否则使用系统提供的setjmp和longjmp切换栈帧。

#define _ST_SWITCH_CONTEXT(_thread)       \
    ST_BEGIN_MACRO                        \
    ST_SWITCH_OUT_CB(_thread);            \
    if (!MD_SETJMP((_thread)->context)) { \   /*调出协程返回0,调入协程返回1。*/ 
        _st_vp_schedule();                \   /*选择下一个需要调度的协程*/
    }                                     \
    ST_DEBUG_ITERATE_THREADS();           \
    ST_SWITCH_IN_CB(_thread);             \
    ST_END_MACRO

_ST_SWITCH_CONTEXT用于将协程的CPU执行权让出去,重新调度一个新的协程。

当协程调用_ST_SWITCH_CONTEXT时,此时MD_SETJMP会返回0,则进入协程调度函数_st_vp_schedule(),CPU的执行权转移到其他协程。此时相当于在本协程中打上了一个切换点。当本协程将再次获得CPU执行权时,在_st_vp_schedule()中调用_ST_RESTORE_CONTEXT宏函数,会通过MD_SETJMP再次返回,此时返回值为1,跳过if语句返回到本协程调用_ST_SWITCH_CONTEXT的位置,继续往下执行。

#define _ST_RESTORE_CONTEXT(_thread)   \
    ST_BEGIN_MACRO                     \
    _ST_SET_CURRENT_THREAD(_thread);   \      /*标记此协程为当前运行的协程*/
    MD_LONGJMP((_thread)->context, 1); \      /*执行协程切换     恢复之前挂起的协程*/
    ST_END_MACRO

_ST_RESTORE_CONTEXT用于恢复指定的协程,通过MD_LONGJMP宏,返回到MD_SETJMP打的断点处,从MD_SETJMP再次返回,从而再次获取到CPU的执行权。

void _st_vp_schedule(void)
{
    _st_thread_t *thread;
    
    /*从就绪的协程队列中取出一个协程*/
    if (_ST_RUNQ.next != &_ST_RUNQ) {
        thread = _ST_THREAD_PTR(_ST_RUNQ.next);
        _ST_DEL_RUNQ(thread);    /*从就绪协程队列删除*/
    } else {   /*如果就绪的协程队列为空,说明所有的就绪协程都处理完毕了。*/
        thread = _st_this_vp.idle_thread;     /*现在切换至idle协程*/
    }
    ST_ASSERT(thread->state == _ST_ST_RUNNABLE);    /*该协程必须处于可运行状态*/

    thread->state = _ST_ST_RUNNING;     /*将即将运行协程的状态标记为正在运行*/
    _ST_RESTORE_CONTEXT(thread);        /*切换至新的协程*/
}

在切换协程时,会从就绪的协程队列中取出一个协程,然后切换至该协程。如果就绪队列中没有可切换的协程,则说明没有协程需要处理,此时会切换至idle协程。返回idle协程后,会重新进入epoll_wait,重新开始监听待发生的事件和处理定时事件。

调度器

所有的协程都是在一个单线程中执行的,所以需要有一个调度器来调度所有的协程,以便需要执行权限的协程能够获取到CPU。通常协程在发生读事件写事件定时器事件时才需要执行权限,也就是发生这些事件后,需要将协程调度到CPU上,让其获得CPU的执行权,处理对应的事情。

st中对读写事件的监控是通过epoll实现的,而定时器事件通过最小堆配合epoll的超时实现的。

typedef struct _st_eventsys_ops {
    const char *name;                          /* Name of this event system */
    int  val;                                  /* Type of this event system */
    int  (*init)(void);                        /* Initialization */
    void (*dispatch)(void);                    /* Dispatch function */
    int  (*pollset_add)(struct pollfd *, int); /* Add descriptor set */
    void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */
    int  (*fd_new)(int);                       /* New descriptor allocated */
    int  (*fd_close)(int);                     /* Descriptor closed */
    int  (*fd_getlimit)(void);                 /* Descriptor hard limit */
} _st_eventsys_t;

这是调度器的接口,可以使用epoll实现,也可以使用select和poll实现。

static _st_eventsys_t _st_epoll_eventsys = {
    "epoll",
    ST_EVENTSYS_ALT,
    _st_epoll_init,
    _st_epoll_dispatch,
    _st_epoll_pollset_add,
    _st_epoll_pollset_del,
    _st_epoll_fd_new,
    _st_epoll_fd_close,
    _st_epoll_fd_getlimit
};

st中通过epoll实现了调度器,实现的这些函数作为回调函数封装到了结构体中。

ST_HIDDEN void _st_epoll_dispatch(void)
{
...
    if (_ST_SLEEPQ == NULL) {     
     /* 定时队列为空,说明没有定时器事件,则epoll_wait的超时时间为-1,
        即没有事件触发时,epoll_wait一直阻塞。*/
        timeout = -1;
    } else {                      
        /*从定时队列获取最小定时器*/
        min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
        /*换算epoll_wait的超时时间  单位:us */
        timeout = (int) (min_timeout / 1000);
...
    }

    /*进入epoll等待事件的触发,也可能因为超时而退出。*/
    nfd = epoll_wait(..., ..., ..., timeout);
...
    
    pq->thread->state = _ST_ST_RUNNABLE;  /*把协程的状态设置为可运行状态*/
    _ST_ADD_RUNQ(pq->thread);             /*将协程添加到运行队列,等待新一轮的调度。*/
...
}

在进入epoll_wait之前,先从最小堆中获取最近一个定时器触发的时间,将此时间作为epoll_wait的超时时间,如果在这个超时时间之内发生了读写事件,则epoll_wait返回处理读写事件;如果段超时时间之内没有发生读写事件,epoll_wait会因为超时而退出,此时返回正好处理定时事件。

若不是因为超时而从epoll_wait返回,说明有的协程读写事件触发了,此时需要将触发事件的协程保存到可运行队列中,等待新一轮的调度。

创建协程

_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
    _st_thread_t *thread;
    _st_stack_t *stack;
    void **ptds;
    char *sp;
    
    /* Adjust stack size   调整栈大小*/
    if (stk_size == 0)
        stk_size = ST_DEFAULT_STACK_SIZE;    /*默认栈大小是128KB*/

    /*页大小对齐*/
    stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;    
    /*申请栈空间*/
    stack = _st_stack_new(stk_size);
    if (!stack)
        return NULL;
    
    sp = stack->stk_top;                     /*栈顶*/
    sp = sp - (ST_KEYS_MAX * sizeof(void *));/*栈顶空出一块区域,用于存放私有的数据。*/
    ptds = (void **) sp;
    sp = sp - sizeof(_st_thread_t);          /*再空出一个_st_thread_t大小*/
    thread = (_st_thread_t *) sp;
    
    if ((unsigned long)sp & 0x3f)
        sp = sp - ((unsigned long)sp & 0x3f);
    
    stack->sp = sp - _ST_STACK_PAD_SIZE;     /*栈顶再空出128字节的填充区域*/

    memset(thread, 0, sizeof(_st_thread_t));
    memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
    
    thread->private_data = ptds;     /*指向协程私有数据*/
    thread->stack = stack;           /*指向协程栈*/
    thread->start = start;           /*协程入口函数*/
    thread->arg = arg;               /*入口函数参数*/
    
    /*保存切换上下文,打上还原点,当本协程下次获取到执行权限时,从这个还原点接着执行。*/
    _ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
    
    /*如果需要主动回收协程,则需要协程创建一个条件变量,用于阻塞等待回收协程。*/
    if (joinable) {     
        thread->term = st_cond_new();
        if (thread->term == NULL) {
            _st_stack_free(thread->stack);
            return NULL;
        }
    }
    
    thread->state = _ST_ST_RUNNABLE;   /*标记协程为可运行状态*/
    _st_active_count++;                /*增加活跃协程的个数*/
    _ST_ADD_RUNQ(thread);              /*将协程插入到运行队列*/

    return thread;
}

创建一个新的协程,在创建的过程中,会将这个协程放到可运行队列,等待着调度。在调度到这个新的协程时,就可以获得CPU的执行权。

除了主协程外,其他协程的栈都是在堆上申请的空间,默认大小时128KB。

#define _ST_INIT_CONTEXT MD_INIT_CONTEXT

#define MD_INIT_CONTEXT(_thread, _sp, _main) \
	ST_BEGIN_MACRO                           \
	if (MD_SETJMP((_thread)->context))       \ /*设置还原点,或从还原点返回。*/
	      _main();                           \ 
	MD_GET_SP(_thread) = (long) (_sp);       \ /*设置ctx中sp寄存器的值,设置新的栈帧*/
	ST_END_MACRO

在创建新协程时,会通过上面的宏函数设置还原点,当执行到MD_SETJMP时,会返回0,此时_main()函数不会被执行。当协程再次获取执行权时,会再次从MD_SETJMP返回,此时返回值为1,则进入_main()函数,也就是_st_thread_main()函数。

void _st_thread_main(void)
{
    _st_thread_t *thread = _ST_CURRENT_THREAD();      /*获取当前协程的句柄*/
    
    MD_CAP_STACK(&thread);
    
    thread->retval = (*thread->start)(thread->arg);   /*执行协程入口函数*/
    
    st_thread_exit(thread->retval);                   /*协程退出*/
}

新的协程创建后,并不会立即被执行,需要先打上还原点,然后放入可执行队列中。当调度器调度到这个新线程后才会真正获取到CPU的执行权,在MD_SETJMP返回后,进入这个函数,在此函数中才会进入协程的入口函数。协程入口函数处理完毕后,会进入协程退出函数,这个稍后分析。

st的初始化

int st_init(void)
{
    _st_thread_t *thread;
    
    if (_st_active_count) {                /*如果已经初始化,则直接返回。*/
        return 0;
    }
    
    st_set_eventsys(ST_EVENTSYS_DEFAULT);  /*设置epoll封装的接口 */
    
    if (_st_io_init() < 0)
        return -1;
    
    memset(&_st_this_vp, 0, sizeof(_st_vp_t));
    
    /*三个队列的初始化*/
    ST_INIT_CLIST(&_ST_RUNQ);        /*可运行队列*/
    ST_INIT_CLIST(&_ST_IOQ);         /*io队列*/
    ST_INIT_CLIST(&_ST_ZOMBIEQ);     /*僵尸态队列*/
    
    if ((*_st_eventsys->init)() < 0)  /*epoll的初始化*/
        return -1;
    
    _st_this_vp.pagesize = getpagesize();     /*页大小*/
    _st_this_vp.last_clock = st_utime();      /*时钟时间*/
    
    /* 创建一个idle协程 */
    _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
    if (!_st_this_vp.idle_thread)
        return -1;

    _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;    /*标识为idle协程*/
    _st_active_count--;      

    _ST_DEL_RUNQ(_st_this_vp.idle_thread);    /*从可运行队列中删除idle协程*/
    
    /*为主协程封装一个_st_thread_t */
    thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *)));    
    if (!thread)
        return -1;

    thread->private_data = (void **) (thread + 1);    /*指向协程私有数据*/
    thread->state = _ST_ST_RUNNING;                   /*设置协程为可运行状态*/
    thread->flags = _ST_FL_PRIMORDIAL;                /*标识为主协程*/

    _ST_SET_CURRENT_THREAD(thread);       /*设置当前工作的协程*/
    _st_active_count++;                   /*增加活跃协程个数*/

    return 0;
}

在使用st时,首先需要调用st_init()函数对st进行初始化。这个函数有三个作用:1、做一些初始化工作 2、创建idle协程 3、将主线程封装为主协程

主线程也是一条可执行流,需要将主线程封装成主协程,以便能够在调度器中进行调度。

idle协程是非常核心的,当就绪队列中没有可运行的协程时,会将CPU的执行权限调度到idle协程。在idle协程中重新开始监听读、写、定时器事件。

void *_st_idle_thread_start(void *arg)
{
    _st_thread_t *me = _ST_CURRENT_THREAD();
    
    while (_st_active_count > 0) {
        _ST_VP_IDLE();                /*进入epoll_wait,监听读写事件*/
        
        _st_vp_check_clock();         /*处理定时器事件*/
        
        me->state = _ST_ST_RUNNABLE;  /*将idle线程标记为可运行状态*/
        _ST_SWITCH_CONTEXT(me);       /*让出CPU执行权,重新开始调度。*/
    }
    
    exit(0);
    
    return NULL;
}

当就绪队列为空时,调度会进入idle线程,在idle线程中,会进入epoll_wait监听读写事件,有读写事件触发时,会将协程保存到就绪队列中;从epoll_wait返回后,查看是否有定时器触发,若有定时器触发,则将协程保存到就绪队列中。处理完读写事件和定时器事件后,idle协程让出CPU执行权,开始依次调度所有的就绪协程,所有的就绪协程处理完毕后,会再次进入idle协程,之后都是这样循环往复。

#define _ST_VP_IDLE()                   (*_st_eventsys->dispatch)()

_st_eventsys->dispatch是回调函数,这个函数指针实际指向_st_epoll_dispatch。

void _st_vp_check_clock(void)
{
    _st_thread_t *thread;
    st_utime_t now;

    now = st_utime();        /*获取当前时间*/
    _ST_LAST_CLOCK = now;
    
    if (_st_curr_time && now - _st_last_tset > 999000) {
        _st_curr_time = time(NULL);
        _st_last_tset = now;
    }
    
    while (_ST_SLEEPQ != NULL) {    /*睡眠队列不为空*/
        thread = _ST_SLEEPQ;        /*获取最小堆上的最小的定时器*/
        ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);

        if (thread->due > now)    
            break;                  /*协程的定时器还没有到,立即返回。*/

        /*协程的定时器触发了*/
        _ST_DEL_SLEEPQ(thread);  /*从睡眠队列中删除*/
        
        /*协程是因为条件变量而睡眠的,现在条件变量超时了。*/
        if (thread->state == _ST_ST_COND_WAIT)    
            thread->flags |= _ST_FL_TIMEDOUT;     
        
        ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));
        
        thread->state = _ST_ST_RUNNABLE;    /*标记协程为可运行状态*/
        _ST_INSERT_RUNQ(thread);            /*将协程送至就绪队列,等待调度。*/
    }
}

从epoll_wait返回后,检查睡眠队列中的协程,当其定时器到了,则将协程送至就绪队列,等待新一轮的调度。

所有的定时器都放在最小堆中,从最小堆中获取到的是所有定时器的最小值。如果当前时间超过了最小堆中的定时器,说明定时器触发了。通过while循环将最小堆中的所有该触发的定时器全部都保存到就绪队列中。

协程的exit、join和yield

_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
...
	if (joinable) {     /*如果协程需要主动回收,则为协程创建一个条件变量。*/
		thread->term = st_cond_new();        /*创建条件变量*/
		if (thread->term == NULL) {
			_st_stack_free(thread->stack);
			return NULL;
		}
    }
...
}

在创建协程的时候,需要指明是否会主动回收协程。如果需要主动回收协程,则需要为协程创建一个条件变量,以便其他协程阻塞的回收该协程。

void _st_thread_main(void)
{
    _st_thread_t *thread = _ST_CURRENT_THREAD();      /*获取当前协程的句柄*/
    
    MD_CAP_STACK(&thread);
    
    thread->retval = (*thread->start)(thread->arg);   /*执行协程入口函数*/
    
    st_thread_exit(thread->retval);                   /*退出协程*/
}

当协程的主体函数执行完毕后,会进入st_thread_exit函数,用于退出协程。

void st_thread_exit(void *retval)
{
    _st_thread_t *thread = _ST_CURRENT_THREAD();    /*获取当前协程句柄*/
    
    thread->retval = retval;        /*保存返回值*/
    _st_thread_cleanup(thread);     /*释放协程的私有数据*/
    _st_active_count--;             /*活跃协程数减一*/
    if (thread->term) {    /*如果需要主动回收此协程*/
        thread->state = _ST_ST_ZOMBIE;     /*设置协程为僵尸态*/
        _ST_ADD_ZOMBIEQ(thread);           /*添加到僵尸态队列*/
        
        st_cond_signal(thread->term);      /*通知阻塞等待回收的协程*/
        
        _ST_SWITCH_CONTEXT(thread);        /*让出执行权*/
        
        st_cond_destroy(thread->term);     /*销毁条件变量*/
        thread->term = NULL;
    }
    
    /*如果是主协程,则无需释放其对应的栈,否则释放在堆上申请的栈空间。*/
    if (!(thread->flags & _ST_FL_PRIMORDIAL))
        _st_stack_free(thread->stack);
    
    _ST_SWITCH_CONTEXT(thread);            /*销毁完毕让出CPU执行权*/
}

若协程是主协程,则无需释放堆空间,否则需要释放在堆上申请的用于栈的空间。thread->term不为NULL,说明这个协程需要主动的回收,此时需要将协程设置为僵尸态,并加入僵尸态队列。同时通知阻塞等待回收的协程。

int st_thread_join(_st_thread_t *thread, void **retvalp)
{
    _st_cond_t *term = thread->term;         /*获取协程的条件变量*/
    if (term == NULL) {                  
        errno = EINVAL;
        return -1;
    }
    
    if (_ST_CURRENT_THREAD() == thread) {    /*不能是当前协程*/
        errno = EDEADLK;
        return -1;
    }
    
	/*不能多个线程回收同时回收同一个协程*/
    if (term->wait_q.next != &term->wait_q) {
        errno = EINVAL;
        return -1;
    }
    
    /*如果协程的状态不是僵尸态,则用于回收的线程将进入条件变量等待。*/
    while (thread->state != _ST_ST_ZOMBIE) {
        if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0)
            return -1;
    }
    
    if (retvalp)
        *retvalp = thread->retval;       /*获取待回收协程的返回值*/
    
    thread->state = _ST_ST_RUNNABLE;     /*将待回收的协程设置为可运行状态*/
    _ST_DEL_ZOMBIEQ(thread);             /*从僵尸态队列删除*/
    _ST_ADD_RUNQ(thread);                /*加入就绪运行队列*/
    
    return 0;
}

协程在回收其他协程,此时待回收的协程还没有退出,主动回收的协程将进入条件变量等待。当待回收的协程退出时,会激活条件变量上的协程。

主动回收的协程从条件变量返回后,此时待回收的协程处于僵尸态,获取返回值后,此时需要再次将待回收的协程置为可运行状态,并加入就绪运行队列。待回收协程会再次进入st_thread_exit()函数,从_ST_SWITCH_CONTEXT返回,主动销毁条件变量和栈空间,最后通过_ST_SWITCH_CONTEXT让出执行权,这时协程才算退出。

void st_thread_yield()
{
    _st_thread_t *me = _ST_CURRENT_THREAD();    /*获取当前协程句柄*/

    /*检查是否有定时器事件触发*/
    _st_vp_check_clock();

    /*就绪队列为空,则直接返回。*/
    if (_ST_RUNQ.next == &_ST_RUNQ) {
        return;
    }

    me->state = _ST_ST_RUNNABLE;  /*将本协程标记为可运行状态*/
    _ST_ADD_RUNQ(me);             /*把本协程添加到就绪队列中*/

    /*将执行权切换给就绪队列中的其他协程*/
    _ST_SWITCH_CONTEXT(me);
}

协程在运行的过程中,可以主动的让出执行权。在让出执行权的时候,需要将自己主动加入到就绪队列中,等待再次被调度。

socket的处理

int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
    struct pollfd *pd;
    struct pollfd *epd = pds + npds;     /*指向数组末尾*/
    _st_pollq_t pq;
    _st_thread_t *me = _ST_CURRENT_THREAD();
    int n;
    
    if (me->flags & _ST_FL_INTERRUPT) {
        me->flags &= ~_ST_FL_INTERRUPT;
        errno = EINTR;
        return -1;
    }
    
    if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
        return -1;
    
    pq.pds = pds;
    pq.npds = npds;
    pq.thread = me;
    pq.on_ioq = 1;
    _ST_ADD_IOQ(pq);
    if (timeout != ST_UTIME_NO_TIMEOUT)
        _ST_ADD_SLEEPQ(me, timeout);
    me->state = _ST_ST_IO_WAIT;
    
    /*主动切出协程,交出执行权。*/
    _ST_SWITCH_CONTEXT(me);
    
    n = 0;
    if (pq.on_ioq) {
        _ST_DEL_IOQ(pq);
        (*_st_eventsys->pollset_del)(pds, npds);
    } else {
        for (pd = pds; pd < epd; pd++) {
            if (pd->revents)
                n++;
        }
    }
    
    if (me->flags & _ST_FL_INTERRUPT) {
        me->flags &= ~_ST_FL_INTERRUPT;
        errno = EINTR;
        return -1;
    }
    
    return n;
}

注册需要监听的事件,然后让出CPU执行权,当事件触发后再次从_ST_SWITCH_CONTEXT返回继续处理。

int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
    struct pollfd pd;
    int n;
    
    pd.fd = fd->osfd;
    pd.events = (short) how;
    pd.revents = 0;
    
    if ((n = st_poll(&pd, 1, timeout)) < 0)     /*单一fd*/
        return -1;
    if (n == 0) {
        errno = ETIME;
        return -1;
    }
    if (pd.revents & POLLNVAL) {
        errno = EBADF;
        return -1;
    }
    
    return 0;
}

对监听一个文件描述符的封装

_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{
    int osfd, err;
    _st_netfd_t *newfd;
    
    /*执行accept函数,如果没有client连接,则accept立即返回。*/
    while ((osfd = accept(fd->osfd, addr, (socklen_t *)addrlen)) < 0) {
        if (errno == EINTR)
            continue;
        if (!_IO_NOT_READY_ERROR)
            return NULL;

        /*进入poll函数,注册读事件,同时让出CPU的执行权,等待读事件触发。*/
        if (st_netfd_poll(fd, POLLIN, timeout) < 0)
            return NULL;
    }
    
    /*accept返回的client socket fd,将其进行封装。*/
    newfd = _st_netfd_new(osfd, 1, 1);
    if (!newfd) {
        err = errno;
        close(osfd);
        errno = err;
    }
    
    return newfd;
}

fd被设置为了非阻塞,调用accept()函数后,若没有客户端请求连接,则立即从accept返回,若errno为EAGAINEWOULDBLOCK,说明没有客户端连接,然后执行st_netfd_poll()函数,在此函数内会为fd注册读事件,同时会让出CPU的执行权。当fd的读事件触发后,本协程会再次被调度从而获得CPU执行权,接着往下执行。

ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout)
{
    ssize_t n;
    
    while ((n = read(fd->osfd, buf, nbyte)) < 0) {   /*非阻塞的读取*/
        if (errno == EINTR)            /*被信号中断了*/
            continue;
        if (!_IO_NOT_READY_ERROR)      /*不是EAGAIN或EWOULDBLOCK错误*/
            return -1;

        /*执行到这里说明发生了EAGAIN或EWOULDBLOCK错误,此时没有数据可读,让出执行权。*/
        if (st_netfd_poll(fd, POLLIN, timeout) < 0)
            return -1;
    }
    
    return n;
}

read的原理同accept,不再赘述。



这篇关于srs源码分析2-浅析state_threads的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程