导读:

Redis作为一个CS结构的服务器,它是怎样维护客户端连接的?
本文尝试通过代码的方式,对这些内容进行梳理。

依赖:

  • 本文所有分析基于Redis7.0RC代码;
  • 只针对普通TCP连接方式通讯的客户端;
  • 系统平台centos,因此事件模型默认是epoll。

代码整理

文件事件列表

Redis服务器启动后会对相关网络端口进行监听;如果使用epoll这种IO多路复用的模型,还需要针对相关兴趣事件
进行注册事件回调操作。以下是Redis服务器执行初始化函数initServer()后注册到事件循环的相关事件列表:

事件类别 事件读写标志 事件回调方法 事件触发条件 代码文件所在位置 备注
监听TCP端口 读事件 acceptTcpHandler 客户端通过TCP端口连接到来,触发可读事件 server.c
TLS安全协议下监听TCP端口 读事件 acceptTLSHandler 开启tls通讯,客户端连接到来,触发可读事件 server.c
监听UNIX socket 读事件 acceptUnixHandler 客户端通过unix domain socket协议连接到服务端,触发可读事件 server.c
监听管道事件 读事件 modulePipeReadable 模块通过该管道唤醒事件循环,往管道写入,则触发另一侧读事件就绪 server.c
接收客户端数据 读事件 connSocketEventHandler 客户端连接成功后,发送数据到达服务端,触发该读事件就绪 networking.c 事件函数是connSocketEventHandler,但是实际读逻辑函数是readQueryFromClient

其中和本次分析相关的两个事件主要是监听TCP端口,接收客户端数据

在分析新客户端连接过程的之前,首先需要针对相关重要结构体和变量进行说明。

Redis服务器对每个客户端都通过一个结构体struct client进行管理维护,而每个网络连接
则通过结构体struct connection进行维护;每个connection结构体都有个表示连接类型的成员;而通过TCP网络连接的客户端统一定义为CT_Socket类型。

struct connection 连接结构体:
1
2
3
4
5
6
7
8
9
10
11
12
struct connection {
ConnectionType *type;//连接类型
ConnectionState state;//连接状态
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;//连接回调
ConnectionCallbackFunc write_handler;//写回调
ConnectionCallbackFunc read_handler; //读回调
int fd;
};

其中ConnectionState通过枚举变量定义:

1
2
3
4
5
6
7
8
typedef enum {
CONN_STATE_NONE = 0,
CONN_STATE_CONNECTING,
CONN_STATE_ACCEPTING,
CONN_STATE_CONNECTED,
CONN_STATE_CLOSED,
CONN_STATE_ERROR
} ConnectionState;

分别代表未连接、连接中、连接成功、连接关闭、连接错误等状态。

连接类型则是一个包含了各种函数指针的结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
int (*get_type)(struct connection *conn);
} ConnectionType;

对于通过普通TCP协议的客户端,则直接定义了一个变量CT_Socket结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,//socket读写操作的回调函数
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
};

struct client 结构体:

由于 struct client字段较多,故此文章不在粘贴代码,读者可以通过redis源代码src/server.h直接查看即可。

客户端创建流程分析

1,网络端口监听回调:

通过以上分析所知,当TCP端口有客户端连接到来,则会触发socket可读就绪事件,事件处理函数调用注册的回调函数进行事件消费处理,即新连接到来处理逻辑。即执行acceptTcpHandler函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 处理新客户端连接回调
* @param el 事件循环
* @param fd 文件描述符
* @param privdata 私有数据
* @param mask 掩码
*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;//1000个
char cip[NET_IP_STR_LEN];//客户端IP缓冲区
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);//内部封装OSapi,屏蔽了IPv4和IPv6差异
if (cfd == ANET_ERR) {//成功返回连接的socket描述符,失败则返回-1,即ANET_ERR
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);//cip客户端的网络IP地址
}
}

通过代码分析,该函数逻辑比较简单,主要代码逻辑在于anetTcpAccept,connCreateAcceptedSocketacceptCommonHandler。而加上循环
是因为可能存在多个客户端连接同时到来,因此每次事件就绪都需要同时处理多个新客户端连接,但是控制频率每次只处理1000个。

其中anetTcpAccept 函数内部调用系统API建立客户端连接,并且返回连接成功后的代表该新连接的文件描述符。(该函数内部同时支持IPv4和IPv6网络协议)。

因为Redis通过connection结构体对每一个网络连接进行封装,因此需要通过调用函数connCreateAcceptedSocket()创建一个新的connection结构体,并且初始化。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
connection *connCreateAcceptedSocket(int fd) {
connection *conn = connCreateSocket();//分配conn内存对象,设置新建立连接的FD,设置conn->type=CT_Socket。
conn->fd = fd;
conn->state = CONN_STATE_ACCEPTING;//调用者需要用到这个状态
return conn;
}

connection *connCreateSocket() {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Socket;//Connect Type = CT, 连接类型
conn->fd = -1;

return conn;
}

注意,这里返回的connection对象,其描述符是刚才新连接建立的文件描述符,连接状态是连接中,最关键的是
连接类型赋值为CT_Socket,后续很多操作都通过该对象的回调函数进行处理。

acceptCommonHandler()函数代码较多,其中最为重要的代码如下:

1
2
3
4
5
6
7
8
9
10
11

/* 创建绑定该连接的一个客户端对象:client */
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}


创建一个代表当前连接客户端的结构体对象struct client。在执行createClient()函数之前,当前客户端只有一个conneciton对象,代表它的文件描述符并未注册到事件引擎,这时候如果客户端数据到来,它是无法被触发可读的。需要执行完createClient()函数以后才可以。

1
2
3
4
5
6
7
8
9

```c
if (conn) {
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);//注册读就绪事件
connSetPrivateData(conn, c);//把客户端对象指针保存在当前连接结构体的私有成员
}

函数connSetReadHandler()内部处理相关事件的注册操作,代码如下:

1
2
3
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
}

这是t一个壳函数,其内部就是一样代码 conn->type->set_read_handler(conn, func)。这里的set_read_handler。就是CT_Socket成员set_read_handler,该指针指向了实际函数是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
```connSocketSetReadHandler(conn, readQueryFromClient)```。

而```connSocketSetReadHandler```代码如下:

```c
/**
* 注册一个读句柄(回调函数),当连接可读的时候,调用该回调,如果为NULL,则表示删除当前回调。
* 简单来说这个函数根据传入的func来决定设置回调还是删除回调事件。
* @param conn
* @param func
* @return
*/
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;//传入的句柄和当前的句柄一致,无需处理,返回OK

conn->read_handler = func;//用参数的句柄赋值,这时候func可能是NULL,也可能不是
if (!conn->read_handler)//这里判断赋值结果,其实就是判断新的func是否为空
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);//为空,表示删除事件
else//非空,注册读事件,回调方法使用ae_handler ,即CT_Socket引用的connSocketEventHandler()
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}

逻辑如下:判断参数func是否已经等于conn->read_handler,如果等于直接返回无需处理了;否则把参数func赋值给
read_handler。接着对赋值结果进行判断,如果回调为空,表示删除网络连接的事件,换句话说调用函数connSocketSetReadHandler如果传入的回调函数为空,则可以用来删除已注册的读事件;
非空则注册读事件,但是这里可以看出针对新连接的读数据回调并非参数的这个func,即readQueryFromClient()函数,而是ConnectionTypeae_handler,对于TCP连接来说
就是CT_Socket对象指定的connSocketEventHandler()函数。而readQueryFromClient()
是保存在conn变量的读回调当中,只有通过conn对象才能访问该函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* socket 可读、可写的时候回调该函数
* @param el
* @param fd
* @param clientData
* @param mask
*/
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
UNUSED(el);
UNUSED(fd);
connection *conn = clientData;
//如果连接状态是连接中,但是当前socket写就绪,并且设置了回调。
if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {

int conn_error = connGetSocketError(conn);//获取当前错误信息
if (conn_error) {//存在错误信息。因为连接处理中,发生错误,会往socket写入错误信息,因此会出现连接中的写就绪事件。
conn->last_errno = conn_error;
conn->state = CONN_STATE_ERROR;//连接状态设置为连接错误
} else {
conn->state = CONN_STATE_CONNECTED;//不存在错误,则直接变更连接状态为连接中。
}

if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);//写就绪,但是并没有指定写回调。直接删除事件

if (!callHandler(conn, conn->conn_handler)) return;//实际上是conn->conn_handler(conn);这样的调用
conn->conn_handler = NULL;
}

int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;

int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;

/* Handle normal I/O flows */
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}

这个函数内部根据客户端读写屏蔽是决定顺序先读后写,还是先写后读。具体使用的读写等回调方法,就是之前conn里面设置的相关回调函数,对于新客户端的socket数据读操作就是函数:readQueryFromClient()

以上就是一个客户端连接建立,到注册该连接的读事件回调的整个流程。

下篇将分析这个新客户端对于服务器,它是如何被维护的!