redis源码分析——1、网络框架

2021/10/28 2:09:32

本文主要是介绍redis源码分析——1、网络框架,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

我们知道redis用的epoll,但是底层的代码到底是怎样一步步起来的,本文解读redis的网络框架,一探究竟。

一、 核心数据结构

  1. ConnectionTypeConnectionType定义了网络连接的接口,包含readwrite等,具体定义如下。

    typedef struct ConnectionType {
        void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
        int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
        int (*write)(struct connection *conn, const void *data, size_t data_len);
        int (*read)(struct connection *conn, void *buf, size_t buf_len);
        void (*close)(struct connection *conn);
        int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
        int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
        int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
        const char *(*get_last_error)(struct connection *conn);
        int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
        ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
        ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
        ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    } ConnectionType;
    

    在源码中实现了两种connection,分别是CT_SocketCT_TLS,这两种也是redis支持的连接类型,redis也支持本地unix socket,但这归根也是socket,其读写和网络socket是一样的。这两种的实现分别如下:

    CT_Socket:

    ConnectionType CT_Socket = {
        .ae_handler = connSocketEventHandler,
        .close = connSocketClose,
        .write = connSocketWrite,
        .read = connSocketRead,
        .accept = connSocketAccept,
        .connect = connSocketConnect,
        .set_write_handler = connSocketSetWriteHandler,
        .set_read_handler = connSocketSetReadHandler,
        .get_last_error = connSocketGetLastError,
        .blocking_connect = connSocketBlockingConnect,
        .sync_write = connSocketSyncWrite,
        .sync_read = connSocketSyncRead,
        .sync_readline = connSocketSyncReadLine
    };
    

    CT_TLS:

    ConnectionType CT_TLS = {
        .ae_handler = tlsEventHandler,
        .accept = connTLSAccept,
        .connect = connTLSConnect,
        .blocking_connect = connTLSBlockingConnect,
        .read = connTLSRead,
        .write = connTLSWrite,
        .close = connTLSClose,
        .set_write_handler = connTLSSetWriteHandler,
        .set_read_handler = connTLSSetReadHandler,
        .get_last_error = connTLSGetLastError,
        .sync_write = connTLSSyncWrite,
        .sync_read = connTLSSyncRead,
        .sync_readline = connTLSSyncReadLine,
    }
    

    这个结构是核心,后续的各种网络操作,都是通过ConnectionType中的指针调用的。

  2. **struct connection:**该结构的一个完成的连接,客户端的fd封装成一个connection。该结构与ConnectionType配合使用,可以认为ConnectionType是操作接口(所有函数的第一个参数都是connection),而struct connection是操作对象,set_r_w_hander都是设置struct connection中的handler

    struct connection {
        ConnectionType *type;
        ConnectionState state;
        short int flags;
        short int refs;
        int last_errno;
        void *private_data;
        ConnectionCallbackFunc conn_handler;
        ConnectionCallbackFunc write_handler;
        ConnectionCallbackFunc read_handler;
        int fd;
    };
    
  3. aeEventLoop,redis是单线程非阻塞io,网络的结构封装在aeEventLoop结构中

    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        time_t lastTime;     /* Used to detect system clock skew */
        aeFileEvent *events; /* Registered events */
        aeFiredEvent *fired; /* Fired events */
        aeTimeEvent *timeEventHead;
        int stop;
        void *apidata; /* This is used for polling API specific data */
        aeBeforeSleepProc *beforesleep;
        aeBeforeSleepProc *aftersleep;
        int flags;
    } aeEventLoop;
    

    最核心的两个数据就是eventsfired,分别代表了在监听的fd有事件触发的fd。

    aeFileEvent:

    /* File event structure */
    typedef struct aeFileEvent {
        int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
        aeFileProc *rfileProc;
        aeFileProc *wfileProc;
        void *clientData;
    } aeFileEvent;
    

    其中rfileProcwfileProc分别是该事件对应的读写回调。

    aeFiredEvent:

    /* A fired event */
    typedef struct aeFiredEvent {
        int fd;
        int mask;
    } aeFiredEvent;
    

    aeFiredEvent的定义很简单,就一个fd,再加一个该fd是否可读写。

    可以看到,在aeEventLoop中定义的eventsfired都是指针,指向了外面分配的一个数组,其大小是server.maxclients+**CONFIG_FDSET_INCR**

二、网络框架

  1. 等待连接

    main()
    initServer()
    	server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
    	listenToPort(server.port,server.ipfd,&server.ipfd_count)
    	aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
    		// 设置rfileProc和wfileProc都为 acceptTcpHandler
    InitServerLast()
    	initThreadedIO()
    		pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
    aeMain(server.el)
    	aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    		fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的监听的fd,则走acceptTcpHandler回调
    			acceptTcpHandler()
    				anetTcpAccept()
                    connCreateAcceptedSocket()
                    	connCreateSocket()
                    		conn->type = &CT_Socket;  // 最终创建了ConnectionType为CT_Socket的一个连接
    				acceptCommonHandler()
                        createClient() // 创建一个真正的client
                        	connSetReadHandler(conn, readQueryFromClient); // 个客户端fd设置 readQueryFromClient 回调,epoll触发时调用
    							set_read_handler->connSocketSetReadHandler(conn, func)
                                    aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) // 注册fd
                        	linkClient()
                        		raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
    					connAccept(conn, clientAcceptHandler) 
                            connSocketAccept
                            	callHandler(conn, accept_handler)
                            		accept->connSocketAccept
    		fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    
  2. 读请求

    aeMain(server.el)
    	aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    		fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的是客户端fd,则走 readQueryFromClient 回调
    			readQueryFromClient()
    

三、主要函数

  1. aeCreateFileEvent: 该函数将redis监听的fd都加到epoll队列中,并且对fd绑定了回调函数acceptTcpHandler用于接收客户端的连接请求。

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
     
        if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 将监听的fd加到epoll
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;  // 读写回调都是 acceptTcpHandler
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }
    
  2. acceptTcpHandler: 用于接收客户端连接请求,其中anetTcpAccept内部是系统调用accept接口。该函数内部的acceptCommonHandler是重要操作,通过connCreateAcceptedSocket将客户端的fd转换成一个connection,在connCreateAcceptedSocket内部通过connCreateSocket创建一个connection,其类型为CT_Socket

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[NET_IP_STR_LEN];
        UNUSED(el);
        UNUSED(mask);
        UNUSED(privdata);
     
        while(max--) {
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 内部调用accept,返回成功fd
            if (cfd == ANET_ERR) {
                if (errno != EWOULDBLOCK)
                    serverLog(LL_WARNING,
                        "Accepting client connection: %s", server.neterr);
                return;
            }
            serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
            acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
        }
    }
    
  3. **acceptCommonHandler:**根据connection创建一个client,并将该客户端链接到server.clients。在新建client的时候有个重要操作connSetReadHandler,这个操作绑定了客户端的读取实现。

    static void acceptCommonHandler(connection *conn, int flags, char *ip) {
        client *c;
        UNUSED(ip);
     
        /* Admission control will happen before a client is created and connAccept()
         * called, because we don't want to even start transport-level negotiation
         * if rejected.
         */
        if (listLength(server.clients) >= server.maxclients) {
            char *err = "-ERR max number of clients reached\r\n";
     
            /* That's a best effort error message, don't check write errors.
             * Note that for TLS connections, no handshake was done yet so nothing is written
             * and the connection will just drop.
             */
            if (connWrite(conn,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;
            connClose(conn);
            return;
        }
     
        /* Create connection and client */
        if ((c = createClient(conn)) == NULL) {  // 这里创建了客户端,并且画挂载到server.clients列表
            char conninfo[100];
            serverLog(LL_WARNING,
                "Error registering fd event for the new client: %s (conn: %s)",
                connGetLastError(conn),
                connGetInfo(conn, conninfo, sizeof(conninfo)));
            connClose(conn); /* May be already closed, just ignore errors */
            return;
        }
     
        /* Last chance to keep flags */
        c->flags |= flags;
     
        /* Initiate accept.
         *
         * Note that connAccept() is free to do two things here:
         * 1. Call clientAcceptHandler() immediately;
         * 2. Schedule a future call to clientAcceptHandler().
         *
         * Because of that, we must do nothing else afterwards.
         */
        if (connAccept(conn, clientAcceptHandler) == C_ERR) { 
            char conninfo[100];
            if (connGetState(conn) == CONN_STATE_ERROR)
                serverLog(LL_WARNING,
                        "Error accepting a client connection: %s (conn: %s)",
                        connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
            freeClient(connGetPrivateData(conn));
            return;
        }
    }
    
  4. **createClient:**最主要的是connSetReadHandler,其实是调用CT_Socketset_read_handler接口,也就是connSocketSetReadHandler函数,读取请求就是readQueryFromClient函数。

    client *createClient(connection *conn) {
        client *c = zmalloc(sizeof(client));
    
        /* passing NULL as conn it is possible to create a non connected client.
         * This is useful since all the commands needs to be executed
         * in the context of a client. When commands are executed in other
         * contexts (for instance a Lua script) we need a non connected client. */
        if (conn) {
            connNonBlock(conn);
            connEnableTcpNoDelay(conn);
            if (server.tcpkeepalive)
                connKeepAlive(conn,server.tcpkeepalive);
            connSetReadHandler(conn, readQueryFromClient);
            connSetPrivateData(conn, c);
        }
        省略代码/
    
  5. **connSocketSetReadHandler:**该函数的目的是把客户端的connection注册到epoll队列中,等待读写触发,从而实现了非阻塞。

    /* Register a read handler, to be called when the connection is readable.
     * If NULL, the existing handler is removed.
     */
    static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
        if (func == conn->read_handler) return C_OK;
    
        conn->read_handler = func;
        if (!conn->read_handler)
            aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
        else
            if (aeCreateFileEvent(server.el,conn->fd,
                        AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
        return C_OK;
    }
    

原文: http://ditanshow.com/articles/p/313.html



这篇关于redis源码分析——1、网络框架的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程