【SWOOLE系列】谈谈reactor

2020/7/27 14:04:11

本文主要是介绍【SWOOLE系列】谈谈reactor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言

环境说明

php --ri swoole

swoole

Swoole => enabled
Author => Swoole Team <team@swoole.com>
Version => 4.4.4
Built => Aug 27 2019 11:

php -version
PHP 7.3.7 (cli) (built: Jul  5 2019 12:44:05) ( NTS )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.3.7, Copyright (c) 1998-2018 Zend Technologies
    with Zend OPcache v7.3.7, Copyright (c) 1999-2018, by Zend Technologies

通过swoole如何不用nginxphp-fpm快速启动一个http服务

<?php
$http = new Swoole\Http\Server("0.0.0.0", 9501);

$http->on('request', function ($request, $response) {
    $response->header("Content-Type", "text/html; charset=utf-8");
    $response->end("立的flag,含着泪,吃着屎也要做到");
});

$http->start();

cli下用php server.php启动服务

curl http://localhost:9501
立的flag,含着泪,吃着屎也要做到

服务是启动完了,性能如何?
那么问题来了!不卖关子了,直接给个结论吧 swoole启动的server 远大于 fpm下的http服务。(可以自行本地ab看结果)

php做web的瓶颈在哪?

大家在论坛、贴吧、社区经常看到以下对话
狗蛋A:php是世界最好的语言
狗蛋B:去你的,php性能差
狗蛋C:哟!php竟然还活着。
狗蛋D:php的最终结果都是转向java

。。。。。
这几年我也看淡了,这里我只想说php的确是世界最好的语言。
1538363198814.gif
至于php有没有问题,我只能说的确有问题,每个语言都有自己的或多或少的问题,你让那些天天吹go好的人,你问问他们用go CURD的开心不开心!
php被嘲讽最多的还是性能问题,你说的一点都没有错,追求极致的性能php还真不行(除非无脑堆机器)

lnmp

在具体说php性能问题之前,我们可以再来回顾下 lnmp
L:linux
N:nginx
M:mysql
P:php
让我们来康康这四个东西,我们发现不管啥语言基本上都依赖LinuxNginxMysql。那么这仨肯定没锅,问题就落到了php上。
nginx是没法直接解析php,那么借助是php-fpm
整个工作流程如下:
image.png

php-fpm是一个多进程单线程的模型,如果一个请求卡了60s吗,那么这个php-fpm就属于占着茅坑不拉屎待岗状态。那么我们得出结论,服务器承载的最高并发的短板是fpm的数量,那么有人会说无脑设置上线的fpm的数量不就好了吗?那么恭喜你可以直接找下一份工作了!
每个fpm平均大概占用内存20到30mb 那么机器支持最高的fpm数量公式如下:
fpm数量 = 机器内存 * 1024M * 0.8 / 30 (20)

乘0.8 主要考虑做人留一线,非要榨干服务器内存干啥,影响多不好啊

我们试着想一个问题nginx的是怎么演化的? php做web是否可以跟nginx一样
select -> poll -> epoll

没错swoole可以!php-fpm做不到的,但是swoole做到了。
image.png

reactor

关于reactor的基本概念网络一大堆,可以总结以下几点
1.I/O对路复用
2.事件注册、分发、调度
3.异步非堵塞
基于reactor实现的大家做熟悉nginxredis等(主要我也就知道这两个)
我们接下来来康康swoole怎么结合reactor

swoole 的 server的运行流程图

阅读源码之前我们先看看server的运行流程图

process.jpg
swoole.jpg
3.png

来自https://wiki.swoole.com/wiki/...

探索真相

根据上述的demo我可以看到new Swoole\Http\Server("0.0.0.0", 9501)创建一个server

我们可以定位源码到swoole_server.ccstatic PHP_METHOD(swoole_server, __construct)

static PHP_METHOD(swoole_server, __construct)
{
    ......
    // 初始化
    zval *zserv = ZEND_THIS;
    char *host;
    size_t host_len = 0;
    zend_long sock_type = SW_SOCK_TCP;
    zend_long serv_port = 0;
    zend_long serv_mode = SW_MODE_PROCESS;

    // 看到木有 只能cli执行
    if (!SWOOLE_G(cli))
    {
        zend_throw_exception_ex(swoole_exception_ce, -1, "%s can only be used in CLI mode", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    if (sw_server() != NULL)
    {
        zend_throw_exception_ex(swoole_exception_ce, -3, "server is running. unable to create %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    .....
    // serv_mode SWOOLE_BASE、SWOOLE_PROCESS 具体看官方wiki解释

    if (serv_mode != SW_MODE_BASE && serv_mode != SW_MODE_PROCESS)
    {
        php_swoole_fatal_error(E_ERROR, "invalid $mode parameters %d", (int) serv_mode);
        RETURN_FALSE;
    }

    // 申请内存
    serv = (swServer *) sw_malloc(sizeof(swServer));
    if (!serv)
    {
        zend_throw_exception_ex(swoole_exception_ce, errno, "malloc(%ld) failed", sizeof(swServer));
        RETURN_FALSE;
    }

    swServer_init(serv);
   
    ....
}

在构造方法里 我们并没有看到reactor相关的代码,我们继续往下追$http->start()定位到
static PHP_METHOD(swoole_server, start)

static PHP_METHOD(swoole_server, start)
{
    zval *zserv = ZEND_THIS;
    // 读一读什么叫标准的命名 获取server 并且 检测这个服务 多么通熟易懂
    swServer *serv = php_swoole_server_get_and_check_server(zserv);

    if (serv->gs->start > 0)
    {
        php_swoole_fatal_error(E_WARNING, "server is running, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }
    if (serv->gs->shutdown > 0)
    {
        php_swoole_fatal_error(E_WARNING, "server have been shutdown, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    if (SwooleTG.reactor)
    {
        php_swoole_fatal_error(E_WARNING, "eventLoop has already been created, unable to start %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    .....

    // swoole服务启动前的前置工作
    php_swoole_server_before_start(serv, zserv);
    // 启动sever
    if (swServer_start(serv) < 0)
    {
        php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
    }

    RETURN_TRUE;
}

按照我多年的搬屎山的直觉来说,再启动之前肯定需要做准备工作,那么reactor的初始化肯定在php_swoole_server_before_start

void php_swoole_server_before_start(swServer *serv, zval *zobject)
{
    /**
     * create swoole server
     */
    if (swServer_create(serv) < 0)
    {
        php_swoole_fatal_error(E_ERROR, "failed to create the server. Error: %s", sw_error);
        return;
    }
    .....
int swServer_create(swServer *serv)
{
    serv->factory.ptr = serv;

    serv->session_list = (swSession *) sw_shm_calloc(SW_SESSION_LIST_SIZE, sizeof(swSession));
    if (serv->session_list == NULL)
    {
        swError("sw_shm_calloc(%ld) for session_list failed", SW_SESSION_LIST_SIZE * sizeof(swSession));
        return SW_ERR;
    }

    if (serv->enable_static_handler && serv->locations == nullptr)
    {
        serv->locations = new std::unordered_set<std::string>;
    }

    if (serv->factory_mode == SW_MODE_BASE)
    {
        return swReactorProcess_create(serv);
    }
    else
    {
        return swReactorThread_create(serv);
    }
}

可以发现根据不同的执行模式 创建reactor也是不同,不过这次我们只看swReactorProcess_create 开始分析瞎逼逼这块代码
5.jpg

reactor

int swReactorProcess_create(swServer *serv)
{
    serv->reactor_num = serv->worker_num;
    serv->connection_list = (swConnection *) sw_calloc(serv->max_connection, sizeof(swConnection));
    if (serv->connection_list == NULL)
    {
        swSysWarn("calloc[2](%d) failed", (int )(serv->max_connection * sizeof(swConnection)));
        return SW_ERR;
    }
    //create factry object
    if (swFactory_create(&(serv->factory)) < 0)
    {
        swError("create factory failed");
        return SW_ERR;
    }
    serv->factory.finish = swReactorProcess_send2client;
    return SW_OK;
}

其实这个函数只是将swserver这个结构体初始化对应的属性和回调函数
1.reactor_num
2.conection_list 初始化好空间
3.finish的结束回调函数
4.factory

int swFactory_create(swFactory *factory)
{
    factory->dispatch = swFactory_dispatch;
    factory->finish = swFactory_finish;
    factory->start = swFactory_start;
    factory->shutdown = swFactory_shutdown;
    factory->end = swFactory_end;
    factory->notify = swFactory_notify;
    factory->free = swFactory_free;
    return SW_OK;
}

再初始化swServer后就可以看swServer_start 参数是之前初始化构造好的swServer

int swServer_start(swServer *serv)
{
    // factory 可以定位到swFactory_create
    swFactory *factory = &serv->factory;
    int ret;

    // 启动前的检测 判断不同mode下的参数 和 php上游回调函数的是否构造 比如onTask等。。
    ret = swServer_start_check(serv);
    if (ret < 0)
    {
        return SW_ERR;
    }
    // 检测钩子
    if (SwooleG.hooks[SW_GLOBAL_HOOK_BEFORE_SERVER_START])
    {
        swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_START, serv);
    }
    // sw_atomic_cmp_set 此处理解成一个锁 也就是同时时间只能存在一个服务 
    //cannot start 2 servers at the same time, please use process->exec.
    if (!sw_atomic_cmp_set(&serv->gs->start, 0, 1))
    {
        swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_ONLY_START_ONE, "must only start one server");
        return SW_ERR;
    }
    // 这块标准输出 大家应该都懂 跳过跳过
    //run as daemon
    if (serv->daemonize > 0)
    {
        /**
         * redirect STDOUT to log file
         */
        if (SwooleG.log_fd > STDOUT_FILENO)
        {
            swoole_redirect_stdout(SwooleG.log_fd);
        }
        /**
         * redirect STDOUT_FILENO/STDERR_FILENO to /dev/null
         */
        else
        {
            serv->null_fd = open("/dev/null", O_WRONLY);
            if (serv->null_fd > 0)
            {
                swoole_redirect_stdout(serv->null_fd);
            }
            else
            {
                swSysWarn("open(/dev/null) failed");
            }
        }

        if (swoole_daemon(0, 1) < 0)
        {
            return SW_ERR;
        }
    }

    //master pid
    // 获取对应的master进程和启动时间
    serv->gs->master_pid = getpid();
    serv->stats->start_time = time(NULL);

    /**
     * init method
     */
     // 继续初始化关于tcp相关函数
    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;
    serv->notify = swServer_tcp_notify;
    serv->feedback = swServer_tcp_feedback;
    // 申请worker的对应的内存空间
    serv->workers = (swWorker *) SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker));
    if (serv->workers == NULL)
    {
        swSysWarn("gmalloc[server->workers] failed");
        return SW_ERR;
    }

    if (swMutex_create(&serv->lock, 0) < 0)
    {
        return SW_ERR;
    }

    /**
     * store to swProcessPool object
     */
    
    serv->gs->event_workers.ptr = serv;
    serv->gs->event_workers.workers = serv->workers;
    serv->gs->event_workers.worker_num = serv->worker_num;
    serv->gs->event_workers.use_msgqueue = 0;

    uint32_t i;
    for (i = 0; i < serv->worker_num; i++)
    {
        serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
        serv->gs->event_workers.workers[i].id = i;
        serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER;
    }

    /*
     * For swoole_server->taskwait, create notify pipe and result shared memory.
     */
    if (serv->task_worker_num > 0 && serv->worker_num > 0)
    {
        serv->task_result = (swEventData *) sw_shm_calloc(serv->worker_num, sizeof(swEventData));
        if (!serv->task_result)
        {
            swWarn("malloc[serv->task_result] failed");
            return SW_ERR;
        }
        serv->task_notify = (swPipe *) sw_calloc(serv->worker_num, sizeof(swPipe));
        if (!serv->task_notify)
        {
            swWarn("malloc[serv->task_notify] failed");
            sw_shm_free(serv->task_result);
            return SW_ERR;
        }
        for (i = 0; i < serv->worker_num; i++)
        {
            if (swPipeNotify_auto(&serv->task_notify[i], 1, 0))
            {
                sw_shm_free(serv->task_result);
                sw_free(serv->task_notify);
                return SW_ERR;
            }
        }
    }

    /**
     * user worker process
     */
    if (serv->user_worker_list)
    {
        i = 0;
        for (auto worker : *serv->user_worker_list)
        {
        // 此处可以看看 worker id的生成机制 有没有课代表解释下 要两个woker_num
            worker->id = serv->worker_num + serv->task_worker_num + i;
            i++;
        }
    }
    serv->running = 1;
    //factory start
    if (factory->start(factory) < 0)
    {
        return SW_ERR;
    }
    // 注册信号机制
    swServer_signal_init(serv);

    //write PID file
    if (serv->pid_file)
    {
        ret = sw_snprintf(SwooleTG.buffer_stack->str, SwooleTG.buffer_stack->size, "%d", getpid());
        swoole_file_put_contents(serv->pid_file, SwooleTG.buffer_stack->str, ret);
    }
    if (serv->factory_mode == SW_MODE_BASE)
    {
        ret = swReactorProcess_start(serv);
    }
    else
    {
        ret = swReactorThread_start(serv);
    }
    //failed to start
    if (ret < 0)
    {
        return SW_ERR;
    }
    swServer_destory(serv);
    //remove PID file
    if (serv->pid_file)
    {
        unlink(serv->pid_file);
    }
    return SW_OK;
}

代码很长,我们概括为几件事情

  1. 启动检测
  2. 检查钩子 & 执行钩子
  3. 锁判断
  4. 如果daemon启动更改标准输出
  5. 初始化函数和初始化基本的信息(pid、时间、woker的内存等)
  6. 创建管道、共享内存
  7. 创建信号处理机制
  8. 创建 & 启动 woker、task、reactor
  9. 撒花完结

所以我们就单独看看8

    if (serv->factory_mode == SW_MODE_BASE)
    {
        ret = swReactorProcess_start(serv);
    }
    else
    {
        ret = swReactorThread_start(serv);
    }

同样我们还是只看SW_MODE_BASE

int swReactorProcess_start(swServer *serv)
{
    // 此处需要主要下 SW_MODE_BASE下是单线程模式
    serv->single_thread = 1;

    //监听tcp
    if (serv->have_stream_sock == 1)
    {
        for (auto ls : *serv->listen_list)
        {
        // 过滤udp
            if (swSocket_is_dgram(ls->type))
            {
                continue;
            }
            // 复用端口的处理 
#ifdef HAVE_REUSEPORT
            if (serv->enable_reuse_port)
            {
                if (close(ls->socket->fd) < 0)
                {
                    swSysWarn("close(%d) failed", ls->socket->fd);
                }
                continue;
            }
            else
#endif
            {
                //监听socket
                if (swPort_listen(ls) < 0)
                {
                    return SW_ERR;
                }
            }
        }
    }

    swProcessPool *pool = &serv->gs->event_workers;
    if (swProcessPool_create(pool, serv->worker_num, 0, SW_IPC_UNIXSOCK) < 0)
    {
        return SW_ERR;
    }
    swProcessPool_set_max_request(pool, serv->max_request, serv->max_request_grace);

    /**
     * store to swProcessPool object
     */
    serv->gs->event_workers.ptr = serv;
    serv->gs->event_workers.max_wait_time = serv->max_wait_time;
    serv->gs->event_workers.use_msgqueue = 0;
    serv->gs->event_workers.main_loop = swReactorProcess_loop;
    serv->gs->event_workers.onWorkerNotFound = swManager_wait_other_worker;

    uint32_t i;
    for (i = 0; i < serv->worker_num; i++)
    {
        serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
        serv->gs->event_workers.workers[i].id = i;
        serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER;
    }

    //single worker
    if (swServer_is_single(serv))
    {
        return swReactorProcess_loop(&serv->gs->event_workers, &serv->gs->event_workers.workers[0]);
    }

    for (i = 0; i < serv->worker_num; i++)
    {
        if (swServer_worker_create(serv, &serv->gs->event_workers.workers[i]) < 0)
        {
            return SW_ERR;
        }
    }

    //task workers
    if (serv->task_worker_num > 0)
    {
        if (swServer_create_task_workers(serv) < 0)
        {
            return SW_ERR;
        }
        swTaskWorker_init(serv);
        if (swProcessPool_start(&serv->gs->task_workers) < 0)
        {
            return SW_ERR;
        }
    }

    /**
     * create user worker process
     */
    if (serv->user_worker_list)
    {
        serv->user_workers = (swWorker *) sw_malloc(serv->user_worker_num * sizeof(swWorker));
        if (serv->user_workers == NULL)
        {
            swSysWarn("gmalloc[server->user_workers] failed");
            return SW_ERR;
        }
        for (auto worker : *serv->user_worker_list)
        {
            /**
             * store the pipe object
             */
            if (worker->pipe_object)
            {
                swServer_store_pipe_fd(serv, worker->pipe_object);
            }
            swManager_spawn_user_worker(serv, worker);
        }
    }

    /**
     * manager process is the same as the master process
     */
    SwooleG.pid = serv->gs->manager_pid = getpid();
    SwooleG.process_type = SW_PROCESS_MANAGER;

    /**
     * manager process can not use signalfd
     */
    SwooleG.use_signalfd = 0;

    swProcessPool_start(&serv->gs->event_workers);
    swServer_signal_init(serv);

    if (serv->onStart)
    {
        swWarn("The onStart event with SWOOLE_BASE is deprecated");
        serv->onStart(serv);
    }

    if (serv->onManagerStart)
    {
        serv->onManagerStart(serv);
    }

    swProcessPool_wait(&serv->gs->event_workers);
    swProcessPool_shutdown(&serv->gs->event_workers);

    swManager_kill_user_workers(serv);

    if (serv->onManagerStop)
    {
        serv->onManagerStop(serv);
    }

    return SW_OK;
}

继续总结该函做的事情

  1. 监听tcp 端口 & 端口复用
  2. 创建woker进程
  3. 创建task进程
  4. 信号处理
  5. 等待wokers进程的结束
  6. wokers进程技术的后shuntdown处理
  7. onManagerStop的处理
  8. 撒花结束

关键server的启动代码完了,肯定也是雨里雾里的,别问我,我也是,我们其实想看reactorswoole的server服务承担什么角色 我们看这些干嘛
7.jpg
其实只有看到完启动的代码才能知道一些我们想要的 如果根据epoll的模型核心就在notify,我们可以回过头再看下总结下

1.关于reactor的线程创建

int swReactor_create(swReactor *reactor, int max_event)
{
    int ret;
    bzero(reactor, sizeof(swReactor));

#ifdef HAVE_EPOLL
    ret = swReactorEpoll_create(reactor, max_event);
#elif defined(HAVE_KQUEUE)
    ret = swReactorKqueue_create(reactor, max_event);
#elif defined(HAVE_POLL)
    ret = swReactorPoll_create(reactor, max_event);
#else
    ret = swReactorSelect_create(reactor);
#endif

    reactor->running = 1;

    reactor->onFinish = reactor_finish;
    reactor->onTimeout = reactor_timeout;
    reactor->is_empty = swReactor_empty;
    reactor->can_exit = SwooleG.reactor_can_exit;

    reactor->write = swReactor_write;
    reactor->close = swReactor_close;

    reactor->defer = defer_task_add;
    reactor->defer_tasks = nullptr;

    reactor->default_write_handler = swReactor_onWrite;

    Socket::init_reactor(reactor);
    System::init_reactor(reactor);
    swClient_init_reactor(reactor);

    if (SwooleG.hooks[SW_GLOBAL_HOOK_ON_REACTOR_CREATE])
    {
        swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE, reactor);
    }

    return ret;
}

2.关于tcp的函数

    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;
    serv->notify = swServer_tcp_notify;
    serv->feedback = swServer_tcp_feedback;

3.关于swServer_tcp_notify

/**
 * use in master process
 */
static int swServer_tcp_notify(swServer *serv, swConnection *conn, int event)
{
    swDataHead notify_event = {};
    notify_event.type = event;
    notify_event.reactor_id = conn->reactor_id;
    notify_event.fd = conn->fd;
    notify_event.server_fd = conn->server_fd;
    return serv->factory.notify(&serv->factory, &notify_event);
}

4.关于swFactory_create

int swFactory_create(swFactory *factory)
{
    factory->dispatch = swFactory_dispatch;
    factory->finish = swFactory_finish;
    factory->start = swFactory_start;
    factory->shutdown = swFactory_shutdown;
    factory->end = swFactory_end;
    factory->notify = swFactory_notify;
    factory->free = swFactory_free;
    return SW_OK;
}

5.关于notify

/**
 * only stream fd
 */
static int swFactory_notify(swFactory *factory, swDataHead *info)
{
    swServer *serv = (swServer *) factory->ptr;
    swConnection *conn = swServer_connection_get(serv, info->fd);
    if (conn == NULL || conn->active == 0)
    {
        swWarn("dispatch[type=%d] failed, connection#%d is not active", info->type, info->fd);
        return SW_ERR;
    }
    //server active close, discard data.
    if (conn->closed)
    {
        swWarn("dispatch[type=%d] failed, connection#%d is closed by server", info->type, info->fd);
        return SW_OK;
    }
    //converted fd to session_id
    info->fd = conn->session_id;
    info->server_fd = conn->server_fd;
    info->flags = SW_EVENT_DATA_NORMAL;

    return swWorker_onTask(factory, (swEventData *) info);
}

6.关于onReceiveonTaskonFinish

可以开始记笔记了
8.jpg

Reactor

1.reactor是线程形态,可以单线程也可以多线程 取决woker数量
2.负责维护客户端TCP连接、处理网络IO、处理协议、收发数据
3.不执行任何PHP代码

Worker

1.worker是进程形态,可以单进程也可以多进程
2.接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
3.生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端

总结就是再swoolereactor就是nginx, worker就是php-fpm,不同的有了协程后woker可以是异步的,那么理论上承载的并发是无上限的。(有人开喷了:你当文件句柄打开限制是吃素的吗!)

写给最后

swoole的源码的分析的文章可能要先缓缓,因为越看发现越吃力,还是因为自己底子不够好。
9.jpg
BUT 立的flag还是要完成。接下来可能先从redis入手,因为书籍比较全面(copy更加的顺畅丝滑)。
看完上述给个大家一个小小的问题
写过swoole的小伙伴都知道 为什么mysqlredis连接需要在onStart的时候进行处理?



这篇关于【SWOOLE系列】谈谈reactor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程