【SWOOLE系列】谈谈reactor
2020/7/27 14:04:11
本文主要是介绍【SWOOLE系列】谈谈reactor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
前言
环境说明
php --ri swoole swoole Swoole => enabled Author => Swoole Team <team@swoole.com> Version => 4.4.4 Built => Aug 27 2019 11: php -version PHP 7.3.7 (cli) (built: Jul 5 2019 12:44:05) ( NTS ) Copyright (c) 1997-2018 The PHP Group Zend Engine v3.3.7, Copyright (c) 1998-2018 Zend Technologies with Zend OPcache v7.3.7, Copyright (c) 1999-2018, by Zend Technologies
通过swoole
如何不用nginx
和php-fpm
快速启动一个http服务
<?php $http = new Swoole\Http\Server("0.0.0.0", 9501); $http->on('request', function ($request, $response) { $response->header("Content-Type", "text/html; charset=utf-8"); $response->end("立的flag,含着泪,吃着屎也要做到"); }); $http->start();
cli下用php server.php
启动服务
curl http://localhost:9501 立的flag,含着泪,吃着屎也要做到
服务是启动完了,性能如何?那么问题来了!不卖关子了,直接给个结论吧 swoole启动的server 远大于 fpm下的http服务。(可以自行本地ab看结果)
php做web的瓶颈在哪?
大家在论坛、贴吧、社区经常看到以下对话
狗蛋A:php是世界最好的语言
狗蛋B:去你的,php性能差
狗蛋C:哟!php竟然还活着。
狗蛋D:php的最终结果都是转向java
。。。。。
这几年我也看淡了,这里我只想说php的确是世界最好的语言。
至于php有没有问题,我只能说的确有问题,每个语言都有自己的或多或少的问题,你让那些天天吹go好的人,你问问他们用go CURD的开心不开心!
php被嘲讽最多的还是性能问题,你说的一点都没有错,追求极致
的性能php还真不行(除非无脑堆机器)
lnmp
在具体说php性能问题之前,我们可以再来回顾下 lnmp
L:linux
N:nginx
M:mysql
P:php
让我们来康康这四个东西,我们发现不管啥语言基本上都依赖Linux
、Nginx
、Mysql
。那么这仨肯定没锅,问题就落到了php
上。nginx
是没法直接解析php,那么借助是php-fpm
。
整个工作流程如下:
php-fpm
是一个多进程单线程的模型,如果一个请求卡了60s吗,那么这个php-fpm
就属于占着茅坑不拉屎待岗状态。那么我们得出结论,服务器承载的最高并发的短板是fpm的数量,那么有人会说无脑设置上线的fpm的数量不就好了吗?那么恭喜你可以直接找下一份工作了!
每个fpm平均大概占用内存20到30mb 那么机器支持最高的fpm数量公式如下:fpm数量 = 机器内存 * 1024M * 0.8 / 30 (20)
乘0.8 主要考虑做人留一线,非要榨干服务器内存干啥,影响多不好啊
我们试着想一个问题nginx的是怎么演化的? php做web是否可以跟nginx一样select -> poll -> epoll
没错swoole可以!php-fpm做不到的,但是swoole做到了。
reactor
关于reactor的基本概念网络一大堆,可以总结以下几点
1.I/O对路复用
2.事件注册、分发、调度
3.异步非堵塞
基于reactor实现的大家做熟悉nginx
、redis
等(主要我也就知道这两个)
我们接下来来康康swoole
怎么结合reactor
swoole 的 server的运行流程图
阅读源码之前我们先看看server的运行流程图
来自https://wiki.swoole.com/wiki/...
探索真相
根据上述的demo我可以看到new Swoole\Http\Server("0.0.0.0", 9501)
创建一个server
我们可以定位源码到swoole_server.cc
的static PHP_METHOD(swoole_server, __construct)
static PHP_METHOD(swoole_server, __construct) { ...... // 初始化 zval *zserv = ZEND_THIS; char *host; size_t host_len = 0; zend_long sock_type = SW_SOCK_TCP; zend_long serv_port = 0; zend_long serv_mode = SW_MODE_PROCESS; // 看到木有 只能cli执行 if (!SWOOLE_G(cli)) { zend_throw_exception_ex(swoole_exception_ce, -1, "%s can only be used in CLI mode", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; } if (sw_server() != NULL) { zend_throw_exception_ex(swoole_exception_ce, -3, "server is running. unable to create %s", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; } ..... // serv_mode SWOOLE_BASE、SWOOLE_PROCESS 具体看官方wiki解释 if (serv_mode != SW_MODE_BASE && serv_mode != SW_MODE_PROCESS) { php_swoole_fatal_error(E_ERROR, "invalid $mode parameters %d", (int) serv_mode); RETURN_FALSE; } // 申请内存 serv = (swServer *) sw_malloc(sizeof(swServer)); if (!serv) { zend_throw_exception_ex(swoole_exception_ce, errno, "malloc(%ld) failed", sizeof(swServer)); RETURN_FALSE; } swServer_init(serv); .... }
在构造方法里 我们并没有看到reactor
相关的代码,我们继续往下追$http->start()
定位到static PHP_METHOD(swoole_server, start)
static PHP_METHOD(swoole_server, start) { zval *zserv = ZEND_THIS; // 读一读什么叫标准的命名 获取server 并且 检测这个服务 多么通熟易懂 swServer *serv = php_swoole_server_get_and_check_server(zserv); if (serv->gs->start > 0) { php_swoole_fatal_error(E_WARNING, "server is running, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; } if (serv->gs->shutdown > 0) { php_swoole_fatal_error(E_WARNING, "server have been shutdown, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; } if (SwooleTG.reactor) { php_swoole_fatal_error(E_WARNING, "eventLoop has already been created, unable to start %s", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; } ..... // swoole服务启动前的前置工作 php_swoole_server_before_start(serv, zserv); // 启动sever if (swServer_start(serv) < 0) { php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error); } RETURN_TRUE; }
按照我多年的搬屎山的直觉来说,再启动之前肯定需要做准备工作,那么reactor的初始化肯定在php_swoole_server_before_start
void php_swoole_server_before_start(swServer *serv, zval *zobject) { /** * create swoole server */ if (swServer_create(serv) < 0) { php_swoole_fatal_error(E_ERROR, "failed to create the server. Error: %s", sw_error); return; } .....
int swServer_create(swServer *serv) { serv->factory.ptr = serv; serv->session_list = (swSession *) sw_shm_calloc(SW_SESSION_LIST_SIZE, sizeof(swSession)); if (serv->session_list == NULL) { swError("sw_shm_calloc(%ld) for session_list failed", SW_SESSION_LIST_SIZE * sizeof(swSession)); return SW_ERR; } if (serv->enable_static_handler && serv->locations == nullptr) { serv->locations = new std::unordered_set<std::string>; } if (serv->factory_mode == SW_MODE_BASE) { return swReactorProcess_create(serv); } else { return swReactorThread_create(serv); } }
可以发现根据不同的执行模式 创建reactor也是不同,不过这次我们只看swReactorProcess_create
开始分析瞎逼逼这块代码
reactor
int swReactorProcess_create(swServer *serv) { serv->reactor_num = serv->worker_num; serv->connection_list = (swConnection *) sw_calloc(serv->max_connection, sizeof(swConnection)); if (serv->connection_list == NULL) { swSysWarn("calloc[2](%d) failed", (int )(serv->max_connection * sizeof(swConnection))); return SW_ERR; } //create factry object if (swFactory_create(&(serv->factory)) < 0) { swError("create factory failed"); return SW_ERR; } serv->factory.finish = swReactorProcess_send2client; return SW_OK; }
其实这个函数只是将swserver
这个结构体初始化对应的属性和回调函数
1.reactor_num
2.conection_list 初始化好空间
3.finish的结束回调函数
4.factory
int swFactory_create(swFactory *factory) { factory->dispatch = swFactory_dispatch; factory->finish = swFactory_finish; factory->start = swFactory_start; factory->shutdown = swFactory_shutdown; factory->end = swFactory_end; factory->notify = swFactory_notify; factory->free = swFactory_free; return SW_OK; }
再初始化swServer
后就可以看swServer_start
参数是之前初始化构造好的swServer
int swServer_start(swServer *serv) { // factory 可以定位到swFactory_create swFactory *factory = &serv->factory; int ret; // 启动前的检测 判断不同mode下的参数 和 php上游回调函数的是否构造 比如onTask等。。 ret = swServer_start_check(serv); if (ret < 0) { return SW_ERR; } // 检测钩子 if (SwooleG.hooks[SW_GLOBAL_HOOK_BEFORE_SERVER_START]) { swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_START, serv); } // sw_atomic_cmp_set 此处理解成一个锁 也就是同时时间只能存在一个服务 //cannot start 2 servers at the same time, please use process->exec. if (!sw_atomic_cmp_set(&serv->gs->start, 0, 1)) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_ONLY_START_ONE, "must only start one server"); return SW_ERR; } // 这块标准输出 大家应该都懂 跳过跳过 //run as daemon if (serv->daemonize > 0) { /** * redirect STDOUT to log file */ if (SwooleG.log_fd > STDOUT_FILENO) { swoole_redirect_stdout(SwooleG.log_fd); } /** * redirect STDOUT_FILENO/STDERR_FILENO to /dev/null */ else { serv->null_fd = open("/dev/null", O_WRONLY); if (serv->null_fd > 0) { swoole_redirect_stdout(serv->null_fd); } else { swSysWarn("open(/dev/null) failed"); } } if (swoole_daemon(0, 1) < 0) { return SW_ERR; } } //master pid // 获取对应的master进程和启动时间 serv->gs->master_pid = getpid(); serv->stats->start_time = time(NULL); /** * init method */ // 继续初始化关于tcp相关函数 serv->send = swServer_tcp_send; serv->sendwait = swServer_tcp_sendwait; serv->sendfile = swServer_tcp_sendfile; serv->close = swServer_tcp_close; serv->notify = swServer_tcp_notify; serv->feedback = swServer_tcp_feedback; // 申请worker的对应的内存空间 serv->workers = (swWorker *) SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker)); if (serv->workers == NULL) { swSysWarn("gmalloc[server->workers] failed"); return SW_ERR; } if (swMutex_create(&serv->lock, 0) < 0) { return SW_ERR; } /** * store to swProcessPool object */ serv->gs->event_workers.ptr = serv; serv->gs->event_workers.workers = serv->workers; serv->gs->event_workers.worker_num = serv->worker_num; serv->gs->event_workers.use_msgqueue = 0; uint32_t i; for (i = 0; i < serv->worker_num; i++) { serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers; serv->gs->event_workers.workers[i].id = i; serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER; } /* * For swoole_server->taskwait, create notify pipe and result shared memory. */ if (serv->task_worker_num > 0 && serv->worker_num > 0) { serv->task_result = (swEventData *) sw_shm_calloc(serv->worker_num, sizeof(swEventData)); if (!serv->task_result) { swWarn("malloc[serv->task_result] failed"); return SW_ERR; } serv->task_notify = (swPipe *) sw_calloc(serv->worker_num, sizeof(swPipe)); if (!serv->task_notify) { swWarn("malloc[serv->task_notify] failed"); sw_shm_free(serv->task_result); return SW_ERR; } for (i = 0; i < serv->worker_num; i++) { if (swPipeNotify_auto(&serv->task_notify[i], 1, 0)) { sw_shm_free(serv->task_result); sw_free(serv->task_notify); return SW_ERR; } } } /** * user worker process */ if (serv->user_worker_list) { i = 0; for (auto worker : *serv->user_worker_list) { // 此处可以看看 worker id的生成机制 有没有课代表解释下 要两个woker_num worker->id = serv->worker_num + serv->task_worker_num + i; i++; } } serv->running = 1; //factory start if (factory->start(factory) < 0) { return SW_ERR; } // 注册信号机制 swServer_signal_init(serv); //write PID file if (serv->pid_file) { ret = sw_snprintf(SwooleTG.buffer_stack->str, SwooleTG.buffer_stack->size, "%d", getpid()); swoole_file_put_contents(serv->pid_file, SwooleTG.buffer_stack->str, ret); } if (serv->factory_mode == SW_MODE_BASE) { ret = swReactorProcess_start(serv); } else { ret = swReactorThread_start(serv); } //failed to start if (ret < 0) { return SW_ERR; } swServer_destory(serv); //remove PID file if (serv->pid_file) { unlink(serv->pid_file); } return SW_OK; }
代码很长,我们概括为几件事情
- 启动检测
- 检查钩子 & 执行钩子
- 锁判断
- 如果
daemon
启动更改标准输出 - 初始化函数和初始化基本的信息(pid、时间、woker的内存等)
- 创建管道、共享内存
- 创建信号处理机制
- 创建 & 启动 woker、task、reactor
- 撒花完结
所以我们就单独看看8
if (serv->factory_mode == SW_MODE_BASE) { ret = swReactorProcess_start(serv); } else { ret = swReactorThread_start(serv); }
同样我们还是只看SW_MODE_BASE
int swReactorProcess_start(swServer *serv) { // 此处需要主要下 SW_MODE_BASE下是单线程模式 serv->single_thread = 1; //监听tcp if (serv->have_stream_sock == 1) { for (auto ls : *serv->listen_list) { // 过滤udp if (swSocket_is_dgram(ls->type)) { continue; } // 复用端口的处理 #ifdef HAVE_REUSEPORT if (serv->enable_reuse_port) { if (close(ls->socket->fd) < 0) { swSysWarn("close(%d) failed", ls->socket->fd); } continue; } else #endif { //监听socket if (swPort_listen(ls) < 0) { return SW_ERR; } } } } swProcessPool *pool = &serv->gs->event_workers; if (swProcessPool_create(pool, serv->worker_num, 0, SW_IPC_UNIXSOCK) < 0) { return SW_ERR; } swProcessPool_set_max_request(pool, serv->max_request, serv->max_request_grace); /** * store to swProcessPool object */ serv->gs->event_workers.ptr = serv; serv->gs->event_workers.max_wait_time = serv->max_wait_time; serv->gs->event_workers.use_msgqueue = 0; serv->gs->event_workers.main_loop = swReactorProcess_loop; serv->gs->event_workers.onWorkerNotFound = swManager_wait_other_worker; uint32_t i; for (i = 0; i < serv->worker_num; i++) { serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers; serv->gs->event_workers.workers[i].id = i; serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER; } //single worker if (swServer_is_single(serv)) { return swReactorProcess_loop(&serv->gs->event_workers, &serv->gs->event_workers.workers[0]); } for (i = 0; i < serv->worker_num; i++) { if (swServer_worker_create(serv, &serv->gs->event_workers.workers[i]) < 0) { return SW_ERR; } } //task workers if (serv->task_worker_num > 0) { if (swServer_create_task_workers(serv) < 0) { return SW_ERR; } swTaskWorker_init(serv); if (swProcessPool_start(&serv->gs->task_workers) < 0) { return SW_ERR; } } /** * create user worker process */ if (serv->user_worker_list) { serv->user_workers = (swWorker *) sw_malloc(serv->user_worker_num * sizeof(swWorker)); if (serv->user_workers == NULL) { swSysWarn("gmalloc[server->user_workers] failed"); return SW_ERR; } for (auto worker : *serv->user_worker_list) { /** * store the pipe object */ if (worker->pipe_object) { swServer_store_pipe_fd(serv, worker->pipe_object); } swManager_spawn_user_worker(serv, worker); } } /** * manager process is the same as the master process */ SwooleG.pid = serv->gs->manager_pid = getpid(); SwooleG.process_type = SW_PROCESS_MANAGER; /** * manager process can not use signalfd */ SwooleG.use_signalfd = 0; swProcessPool_start(&serv->gs->event_workers); swServer_signal_init(serv); if (serv->onStart) { swWarn("The onStart event with SWOOLE_BASE is deprecated"); serv->onStart(serv); } if (serv->onManagerStart) { serv->onManagerStart(serv); } swProcessPool_wait(&serv->gs->event_workers); swProcessPool_shutdown(&serv->gs->event_workers); swManager_kill_user_workers(serv); if (serv->onManagerStop) { serv->onManagerStop(serv); } return SW_OK; }
继续总结该函做的事情
- 监听tcp 端口 & 端口复用
- 创建woker进程
- 创建task进程
- 信号处理
- 等待wokers进程的结束
- wokers进程技术的后shuntdown处理
- onManagerStop的处理
- 撒花结束
关键server的启动代码完了,肯定也是雨里雾里的,别问我,我也是,我们其实想看reactor
在swoole
的server服务承担什么角色 我们看这些干嘛
其实只有看到完启动的代码才能知道一些我们想要的 如果根据epoll
的模型核心就在notify
,我们可以回过头再看下总结下
1.关于reactor的线程创建
int swReactor_create(swReactor *reactor, int max_event) { int ret; bzero(reactor, sizeof(swReactor)); #ifdef HAVE_EPOLL ret = swReactorEpoll_create(reactor, max_event); #elif defined(HAVE_KQUEUE) ret = swReactorKqueue_create(reactor, max_event); #elif defined(HAVE_POLL) ret = swReactorPoll_create(reactor, max_event); #else ret = swReactorSelect_create(reactor); #endif reactor->running = 1; reactor->onFinish = reactor_finish; reactor->onTimeout = reactor_timeout; reactor->is_empty = swReactor_empty; reactor->can_exit = SwooleG.reactor_can_exit; reactor->write = swReactor_write; reactor->close = swReactor_close; reactor->defer = defer_task_add; reactor->defer_tasks = nullptr; reactor->default_write_handler = swReactor_onWrite; Socket::init_reactor(reactor); System::init_reactor(reactor); swClient_init_reactor(reactor); if (SwooleG.hooks[SW_GLOBAL_HOOK_ON_REACTOR_CREATE]) { swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE, reactor); } return ret; }
2.关于tcp的函数
serv->send = swServer_tcp_send; serv->sendwait = swServer_tcp_sendwait; serv->sendfile = swServer_tcp_sendfile; serv->close = swServer_tcp_close; serv->notify = swServer_tcp_notify; serv->feedback = swServer_tcp_feedback;
3.关于swServer_tcp_notify
/** * use in master process */ static int swServer_tcp_notify(swServer *serv, swConnection *conn, int event) { swDataHead notify_event = {}; notify_event.type = event; notify_event.reactor_id = conn->reactor_id; notify_event.fd = conn->fd; notify_event.server_fd = conn->server_fd; return serv->factory.notify(&serv->factory, ¬ify_event); }
4.关于swFactory_create
int swFactory_create(swFactory *factory) { factory->dispatch = swFactory_dispatch; factory->finish = swFactory_finish; factory->start = swFactory_start; factory->shutdown = swFactory_shutdown; factory->end = swFactory_end; factory->notify = swFactory_notify; factory->free = swFactory_free; return SW_OK; }
5.关于notify
/** * only stream fd */ static int swFactory_notify(swFactory *factory, swDataHead *info) { swServer *serv = (swServer *) factory->ptr; swConnection *conn = swServer_connection_get(serv, info->fd); if (conn == NULL || conn->active == 0) { swWarn("dispatch[type=%d] failed, connection#%d is not active", info->type, info->fd); return SW_ERR; } //server active close, discard data. if (conn->closed) { swWarn("dispatch[type=%d] failed, connection#%d is closed by server", info->type, info->fd); return SW_OK; } //converted fd to session_id info->fd = conn->session_id; info->server_fd = conn->server_fd; info->flags = SW_EVENT_DATA_NORMAL; return swWorker_onTask(factory, (swEventData *) info); }
6.关于onReceive
、onTask
、onFinish
可以开始记笔记了
Reactor
1.reactor是线程形态,可以单线程也可以多线程 取决woker数量
2.负责维护客户端TCP
连接、处理网络IO
、处理协议、收发数据
3.不执行任何PHP代码
Worker
1.worker是进程形态,可以单进程也可以多进程
2.接受由Reactor
线程投递的请求数据包,并执行PHP
回调函数处理数据
3.生成响应数据并发给Reactor
线程,由Reactor
线程发送给TCP
客户端
总结就是再swoole
里 reactor
就是nginx
, worker
就是php-fpm
,不同的有了协程后woker可以是异步
的,那么理论上承载的并发是无上限的。(有人开喷了:你当文件句柄打开限制是吃素的吗!)
写给最后
swoole
的源码的分析的文章可能要先缓缓,因为越看发现越吃力,还是因为自己底子不够好。
BUT 立的flag还是要完成。接下来可能先从redis
入手,因为书籍比较全面(copy更加的顺畅丝滑)。
看完上述给个大家一个小小的问题
写过swoole
的小伙伴都知道 为什么mysql
、redis
连接需要在onStart
的时候进行处理?
这篇关于【SWOOLE系列】谈谈reactor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23Vue新手入门教程:从零开始学习Vue框架
- 2024-11-23如何集成Ant Design Vue的图标
- 2024-11-23如何集成Ant Design Vue图标
- 2024-11-23使用vue CLI快速搭建Vue项目教程
- 2024-11-23Vue CLI多环境配置简单教程
- 2024-11-23Vue3入门教程:轻松搭建你的第一个Vue3应用
- 2024-11-23Vue3+Vite快速上手指南
- 2024-11-23Vue3阿里系UI组件入门指南
- 2024-11-23Vue3的阿里系UI组件入门指南
- 2024-11-23Vue3公共组件入门教程