【Redis】集群命令处理

2022/5/27 2:20:06

本文主要是介绍【Redis】集群命令处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

集群请求命令处理

在Redis的命令处理函数processCommand(server.c)中有对集群节点的处理,满足以下条件时进入集群节点处理逻辑中:

  1. 启用了集群模式,通过server.cluster_enabled判断
  2. 发送命令的节点不是主节点
  3. 收到的命令中包含了key参数或者命令是EXEC,EXEC命令与MULTI结合使用,用于执行事务

条件三的判断条件有些绕,!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0意味着命令中没有key参数,c->cmd->proc != execCommand表示当前命令不是EXEC,然后对(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)整体做了取反操作,那么看以下两种情况:

  • 如果命令中带有Key,那么!cmdHasMovableKeys(c->cmd)就已返回false,又因为对整体做了取反操作,所以条件成立,意味着收到命令中带有Key时需要执行重定向处理
  • 如果收到的命令是EXEC,c->cmd->proc != execCommand返回false,对整体取反变成true,所以条件也成立,意味着收到EXEC命令的时候执行重定向处理
int processCommand(client *c) {
  
    // 省略...
  
    /* 如果启用了集群且发送命令的节点不是主节点,并且收到的命令中包含了key参数或者命令是EXEC时 */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(!cmdHasMovableKeys(c->cmd) && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        // 查询节点
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            // 重定向
            clusterRedirectClient(c,n,hashslot,error_code);
            c->cmd->rejected_calls++;
            return C_OK;
        }
    }
  
    // 省略...
  
    return C_OK;
}


/* 如果参数中有key将会返回1 */
static int cmdHasMovableKeys(struct redisCommand *cmd) {
    return (cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
            cmd->flags & CMD_MODULE_GETKEYS;
}

MULTI命令的处理

上面说到,如果是EXEC命令时,也会进入到集群节点处理逻辑,EXEC命令一般与MULTI结合使用,用于执行事务。比如以下例子中,使用MULTI开启事务,执行对a账户增1,b账户减1的操作,可以看到返回结果为QUEUED,命令被缓存起来,直到执行EXEC命令,Redis才开始提交命令:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> INCR a:account
QUEUED
127.0.0.1:6379> DECR b:account
QUEUED
127.0.0.1:6379> EXEC
1) (integer) 1
2) (integer) -1

由于集群也需要对EXEC命令处理,所以先看一下MULTI命令的处理逻辑,MULTI命令对应的执行函数为multiCommand,可以看到它在处理的时候为客户端设置了CLIENT_MULTI标记

void multiCommand(client *c) {
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 设置CLIENT_MULTI标记
    c->flags |= CLIENT_MULTI;

    addReply(c,shared.ok);
}

在Redis的命令处理函数中可以找到对CLIENT_MULTI的处理逻辑,如果客户端标记中有CLIENT_MULTI,并且当前命令不是EXEC、DISCARD、MULTI、WATCH和RESET,将调用queueMultiCommand函数,对命令进行缓存:

int processCommand(client *c) {
    
    // 省略...
  
    /* 处理MULTI命令 */
    /* 如果客户端标记中有CLIENT_MULTI,并且当前命令不是EXEC、DISCARD、MULTI、WATCH和RESET */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
        c->cmd->proc != resetCommand)
    {
        queueMultiCommand(c); // 加入到multi队列中,先将命令缓存
        addReply(c,shared.queued);
    } else {
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

MULTI命令结构体定义

在客户端结构体定义中,可以看到使用了multiState缓存MULTI命令:

// 客户端
typedef struct client {
    // ...
    multiState mstate;      /* 存储MULTI/EXEC命令的结构体 */
    // ...
}

multiState

MULTI命令对应的结构体为multiStatemultiState中使用了multiCmd结构体来缓存具体的命令:

typedef struct multiState {
    multiCmd *commands;     /* MULTI命令数组 */
    int count;              /* 缓存的命令个数 */
    int cmd_flags;          /* 命令标记 */
    int cmd_inv_flags;      /* 与cmd_flags一致 */
} multiState;

/* multi命令 */
typedef struct multiCmd {
    robj **argv;
    int argc;
    struct redisCommand *cmd; /* 命令 */
} multiCmd;

MULTI命令的缓存

queueMultiCommand

对MULTI命令缓存的处理在queueMultiCommand函数中,它在multi.c文件中定义:

  1. multiCmd加入到缓存数组c->mstate.commands中,对命令进行缓存
  2. 将当前命令的内容设置到multiCmd
/* 将当前命令加入到MULTI命令中 */
void queueMultiCommand(client *c) {
    // MULTI命令
    multiCmd *mc;
    int j;
    if (c->flags & CLIENT_DIRTY_EXEC)
        return;
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 到加入MULTI数组中
    mc = c->mstate.commands+c->mstate.count;
    // 设置命令
    mc->cmd = c->cmd;
    // 设置参数
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    // 缓存的命令数加1
    c->mstate.count++;
    // 设置客户端标记
    c->mstate.cmd_flags |= c->cmd->flags;
    c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}

查询节点

getNodeByQuery

getNodeByQuery函数用于根据KEY查询数据所在的节点,处理逻辑如下:

  1. 如果是EXEC命令,从客户端获取multiState,multiState中缓存了MULTI命令,如果不是MULTI命令,而是单个命令,同样使用multiState来存放命令,之后就可以统一使用multiState来获取请求中的命令

  2. 根据命令的个数进行遍历,处理每一个命令

    (1)从命令中获取key的个数,处理每一个key

    (2)查询每一个key所在的slot

    (3)如果处理的是第一个key,根据所属slot获取所在的节点,记为n,有以下三种情况:

    ​ 情况一:未获取到节点(有可能节点已下线但是还未更新状态),记录错误信息为CLUSTER_REDIR_DOWN_UNBOUND,表示key未绑定到slot,返回NULL

    ​ 情况二:可以查找到节点,并且是当前节点自己,但是key所属slot正在做数据迁出操作(从当前节点迁出),此时将migrating_slot置为1

    ​ 情况三:可以查找到节点,并且不是当前节点自己,但是key所属slot正在迁入到当前节点,此时将importing_slot置为1

    (4)如果处理的不是第一个key,判断当前key所属的slot是否与第一个key的slot一致:

    ​ 情况一:如果不一致,表示不同的key所属的slot不同,将error_code置为CLUSTER_REDIR_CROSS_SLOT,返回NULL

    ​ 情况二:如果一致,将multiple_keys置为1,表示请求中有多个命令

    (5)根据migrating_slotimporting_slot的值判断key所属slot是否正在迁出或者迁入,迁出意味着key对应的数据正在从当前节点迁出到其他节点,迁入意味着key对应的数据正在迁入到当前节点,由于数据未迁移完毕,所以这两种情况都需要检查key是否在当前节点的数据库中,如果不在意味着当前节点没有该key的数据,需要记录缺失的KEY的数量,missing_keys增1

  3. 根据第二步查询后的结果,进行如下处理:

    • 未查找到节点,也就是n为空,返回当前节点自己

    • 当前节点不处于正常状态(CLUSTER_OK)

      (1)如果未开启allow_reads_when_down(在节点下线时允许读),error_code置为CLUSTER_REDIR_DOWN_STATE,并返回NULL

      (2)当前命令中有写标记,error_code置为CLUSTER_REDIR_DOWN_RO_STATE,并返回NULL

      (3)非以上两种情况,表示开启了allow_reads_when_down,并且是读操作,所以当前节点依旧可以处理请求,继续往下执行

    • 如果数据正在迁出或者正在迁入,并且当前命令是MIGRATE数据迁移的命令,返回当前节点

    • 如果key所在slot数据正在从当前节点迁出,并且当前节点数据库中有缺失的key,error_code置为CLUSTER_REDIR_ASK并返回迁出到的那个节点

    • 如果key所在slot正在迁入到当前节点,并且当前命令是ASK ,此时如果请求中有多个KEY并且当前节点存在缺失的KEY,表示有些key不在当前节点,error_code置为CLUSTER_REDIR_UNSTABLE返回NULL,否则返回当前节点即可

    • 如果客户端有只读标记、 当前命令不是写命令、当前节点是从节点并且它的主节点是根据key所属slot查找到的节点,返回当前节点,因为从节点数据是从master节点同步的,而master节点正是要查找的节点,从节点也可以处理读请求

    • 如果查询到的节点不是当前节点,将error_code置为CLUSTER_REDIR_MOVED,表示数据已经移动到其他节点,此时返回key所属slot对应的实际节点

clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
    // 集群节点
    clusterNode *n = NULL;
    // 记录命令中的第一个KEY
    robj *firstkey = NULL;
    int multiple_keys = 0;
    multiState *ms, _ms;
    multiCmd mc;
    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
  
    if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
        return myself;

    if (error_code) *error_code = CLUSTER_REDIR_NONE;

    /* 如果是EXEC命令 */
    if (cmd->proc == execCommand) {
        /* 校验是否有CLIENT_MULTI标记 */
        if (!(c->flags & CLIENT_MULTI)) return myself;
        // 获取multiState
        ms = &c->mstate;
    } else {
        /* 如果不是MULTI命令,而是单个命令,同样使用multiState来存储命令 */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1; // 命令个数设置为1
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd; // 设置命令
    }

    /* 根据命令的个数进行遍历,处理每一个命令 */
    for (i = 0; i < ms->count; i++) {
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;

        mcmd = ms->commands[i].cmd; // 获取命令
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;

        getKeysResult result = GETKEYS_RESULT_INIT;
        // 从命令中获取key的个数
        numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
        keyindex = result.keys;
        // 遍历每一个key
        for (j = 0; j < numkeys; j++) {
            // 获取key
            robj *thiskey = margv[keyindex[j]];
            // 查询key所在的slot
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));
            // 如果是第一个key
            if (firstkey == NULL) {
                /* 将第一个key记录在firstkey */
                firstkey = thiskey;
                // 记录slot
                slot = thisslot;
                // 根据slot获取集群节点
                n = server.cluster->slots[slot];

                /* 如果未获取到节点(有可能节点已下线),记录错误信息,返回NULL */
                if (n == NULL) {
                    getKeysFreeResult(&result);
                    if (error_code)
                        *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
                    return NULL;
                }

                /* 如果根据slot查到的节点是当前节点自己,并且slot正在做数据迁出操作 */
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1; // migrating_slot置为1,标记正在做数据迁出操作
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    // 如果key所属的slot正在做数据迁入操作,importing_slot置为1
                    importing_slot = 1;
                }
            } else { 
                /* 如果不是第一个key*/
                if (!equalStringObjects(firstkey,thiskey)) {
                    // 如果和第一个key的slot不一致,error_code置为CLUSTER_REDIR_CROSS_SLOT
                    if (slot != thisslot) {
                    
                        getKeysFreeResult(&result);
                        if (error_code)
                            *error_code = CLUSTER_REDIR_CROSS_SLOT;    /* 不同的key所属不同的slot */
                        return NULL;
                    } else {
                        /* 标记请求中有多个KEY */
                        multiple_keys = 1;
                    }
                }
            }

            /* 如果slot正在迁入或者迁出,检查key是否在当前节点的db中,如果不在记录缺失的KEY的数量 */
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(&result);
    }

    /* 如果未查到,返回当前节点自己 */
    if (n == NULL) return myself;

    /* 如果当前节点的状态不是CLUSTER_OK状态,节点可能处于异常状态,只有在开启了allow_reads_when_down(在节点下线时允许读)并且当前命令是读操作才继续往下处理,否则记录错误信息返回NULL */
    if (server.cluster->state != CLUSTER_OK) {
         // 如果设置了节点下线时不允许读
        if (!server.cluster_allow_reads_when_down) {
            /* 记录错误信息,返回NULL */
            if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
            return NULL;
        } else if (cmd->flags & CMD_WRITE) { // 如果命令中有写标记
            /* The cluster is configured to allow read only commands */
            if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
            return NULL;
        } else {
            /* Fall through and allow the command to be executed:
             * this happens when server.cluster_allow_reads_when_down is
             * true and the command is not a write command */
        }
    }

    /* 更新hashslot */
    if (hashslot) *hashslot = slot;

    /* 如果数据正在迁出或者正在迁入,并且当前命令是MIGRATE数据迁移的命令,返回当前节点 */
    if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
        return myself;

    /* 如果key所在slot数据正在迁出,并且当前节点数据库中有缺失的key*/
    if (migrating_slot && missing_keys) {
        // error_code设置为CLUSTER_REDIR_ASK
        if (error_code) *error_code = CLUSTER_REDIR_ASK;
        // 返回迁出到的那个节点
        return server.cluster->migrating_slots_to[slot];
    }

    /* 如果key所在slot正在做数据迁入,并且当前命令是ASK */
    if (importing_slot &&
        (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
    {
        // 如果请求中有多个KEY并且有当前节点数据库中有缺失的key
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            // 返回当前节点
            return myself;
        }
    }

    /* 是否是写命令 */
    int is_write_command = (c->cmd->flags & CMD_WRITE) ||
                           (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
    // 如果客户端有只读标记、当前命令不是写命令,当前节点是从节点并且它的主节点是根据key所属slot查找到节点
    if (c->flags & CLIENT_READONLY &&
        !is_write_command &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    {
        // 返回当前节点即可
        return myself;
    }

    /* 如果查询到的节点不是当前节点,将error_code置为CLUSTER_REDIR_MOVED,返回key所属slot对应的实际节点 */
    if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
    return n;
}

根据Key从DB中查询Value

redisDb

Redis数据库对应的结构体定义为redisDb,里面有个字典类型的对象,存储键值对数据:

typedef struct redisDb {
    dict *dict;                 /* 存储的键值对数据 */
    // 省略...
} redisDb;

lookupKeyRead

lookupKeyRead函数用于从redisDb中根据key查找数据,最终是调用lookupKey函数完成的,根据Key从字典中查找并返回value:

robj *lookupKeyRead(redisDb *db, robj *key) {
    // 调用lookupKeyReadWithFlags查找
    return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}

robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
    expireIfNeeded(db,key);
    // 调用lookupKey函数查找
    return lookupKey(db,key,flags);
}

robj *lookupKey(redisDb *db, robj *key, int flags) {
    // 根据KEY从字典中进行查找
    dictEntry *de = dictFind(db->dict,key->ptr);
    // 如果不为空
    if (de) {
        robj *val = dictGetVal(de);
        if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                val->lru = LRU_CLOCK();
            }
        }
        // 返回value
        return val;
    } else {
        return NULL;
    }
}

集群重定向

clusterRedirectClient

clusterRedirectClient用于集群重定向处理,在getNodeByQuery函数中,根据查询节点的情况对error_code设置了不同的值,在clusterRedirectClient函数中可以看到对error_code的判断,根据error_code的不同,向客户端响应不同的内容:

  1. 如果error_code是CLUSTER_REDIR_CROSS_SLOT,表示请求中有多个KEY,但是KEY所属slot不在同一个slot中
  2. 如果error_code是CLUSTER_REDIR_UNSTABLE,表示请求中有多个KEY并且在一个slot,但是数据可能正在迁入或迁出的过程中,节点中有缺失的KEY,slot处于一个不稳定的状态
  3. 如果error_code是CLUSTER_REDIR_DOWN_STATE,表示节点处于下线状态
  4. 如果error_code是CLUSTER_REDIR_DOWN_RO_STATE,表示节点处于下线状态,只接收读命令
  5. 如果error_code是CLUSTER_REDIR_DOWN_UNBOUND,标识key未绑定到节点,也就是根据key所属slot未查询到节点
  6. 如果error_code是CLUSTER_REDIR_MOVED或者CLUSTER_REDIR_ASK
    • CLUSTER_REDIR_MOVED表示key所属slot已从当前节点迁出,此时向客户端响应MOVED命令并将迁出后slot以及所在节点ip和端口返回
    • CLUSTER_REDIR_ASK表示key所属slot正在从当前节点迁出的过程中,请求中的key有可能一部分还未迁出,一部分已经迁出完毕,此时向客户端返回ASK命令,并将slot以及迁出到的目标节点的ip和端口返回
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
    if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
        // 如果是CLUSTER_REDIR_CROSS_SLOT,向客户端回复key不在同一个slot中
        addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
    } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
        /* 请求中有多个key并且在一个slot,但是数据可能正在迁入或迁出的过程中,slot并不稳定 */
        addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
        // 节点处于下线状态
        addReplyError(c,"-CLUSTERDOWN The cluster is down");
    } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
        // 节点已经下线只接收读命令
        addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
    } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
        // 如果是CLUSTER_REDIR_DOWN_UNBOUND,表示根据key所属slot未查询到节点
        addReplyError(c,"-CLUSTERDOWN Hash slot not served");
    } else if (error_code == CLUSTER_REDIR_MOVED ||
               error_code == CLUSTER_REDIR_ASK)
    { 
        /* 如果是MOVED或者ASK,需要进行请求重定向处理,向客户端返回ASK或者MOVED命令,并将目标节点的ip和端口返回 */
        int use_pport = (server.tls_cluster &&
                         c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
        int port = use_pport && n->pport ? n->pport : n->port;
        // 返回响应,包括ASK或者MOVED命令、slot信息、目标节点的ip端口
        addReplyErrorSds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d",
            (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot, n->ip, port));
    } else {
        serverPanic("getNodeByQuery() unknown error.");
    }
}

参考

极客时间 - Redis源码剖析与实战(蒋德钧)

Redis版本:redis-6.2.5



这篇关于【Redis】集群命令处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程