Linux下的epoll异步编程初步

最近完成了计算机动画与虚拟现实的课设,课设需要一个低延迟、及时性好的服务器。我们小组使用了Linux系统的socket模块,直接使用了C语言的接口,实现了一个非堵塞异步IO的事件服务器。

Linux套接字

Linux继承了Unix的socket体系,遵循POSIX标准。这里主要介绍IPV4域(AF_INET)下SOCK_STREAMSOCK_DGRAM两种类型的socket,他们的默认协议就是我们熟知的TCP和UDP协议。

套接字的描述

Linux遵循“一切皆文件”的设计思想,所以socket在Linux下被抽象成了一个文件。通过read/write两个文件操作的函数就可以简单进行读写,也就是收发消息。尽管如此,被用于网络通信的socket不能完全等同于文件,有些文件操作函数也不适用于socket。所以在实际使用中,很少用read/write来收发消息,转而使用send/recv/sendto/recvfrom四个函数用于消息收发。

文件描述符

Linux下通过文件描述符来描述一个文件。直观来说,文件描述符就是一个32位的正整数,它对应了Linux内核操作的一个文件。默认情况下,文件描述符0指向标准输入,1指向标准输出,2指向标准错误输出。使用read/write操作这三个文件描述符的效果类似于直接使用C语言的scanf/printf函数。

现在我们就有了socket的表示方法。linux创建一个套接字的函数是:

#include <sys/socket.h>
int socket(int domain, int type, int protocol);

1.参数domain(域)确定了通信的特性,包括地址格式。通用的有AF_INET/AF_INET6/AF_UNIX三种。  
2.参数type确定套接字的类型,进一步确定通信特征。这里使用了SOCK_STREAM(有序的、可靠的、双向的、面向连接的字节流)和`SOCK_DGRAM`(固定长度的、无连接的、不可靠的报文协议)。  
3.参数protocol通常是0,代表给定域和套接字类型的默认协议,在AF_INET通信域中,套解析类型SOCK_STREAM对应的是传输控制协议(TCP),SOCK_DGRAM对应的是用户数据报协议(UDP)。  

创建socket的范例如下:

int sock_tcp = socket(AF_INET, SOCK_STREAM, 0);  
int sock_udp = socket(AF_INET, SOCK_DGRAM, 0);  

网络地址

在IPV4协议下,确认一个主机的地址是通过IP地址和服务端口号port来进行的。在Linux的socket中,为了满足套接字函数满足不同地址格式,所有保存地址格式的结构体会被强制转换成一个通用的地址结构sockaddr

在Linux里的定义如下:
struct sockaddr{  
    sa_family_t sa_family;     /*address family */
    char sa_data[14];          /* variable-length address */
    /*...*/
};

在IPV4因特网域(AF_INET)中,套接字的地址用结构sockaddr_in表示。

struct in_addr{  
    in_addr_t s_addr;          /* IPV4 address */
};

struct sockaddr_in{  
    sa_family_t sin_family;    /* address family */
    in_port_t sin_port;        /*port number */
    struct in_addr sin_addr;   /* IPV4 address */
};

数据类型in_port_t定义成uint16_t,数据类型in_addr_t定义成uint32_t

网络字节序

我们平常是用的intel构架的计算机使用的是小端字节序,即一个数的高位在高字节,低位在低字节,而socket使用的是大端字节序,高位的地址在低字节,低位的地址在高字节。为了不起冲突,Linux提供了四个函数用于字节序的转换。

#include <arpa/inet.h>

uint32_t htonl(uint32_t hostint32);  
unit16_t htons(uint16_t hostint16);  
uint32_t ntohl(uint32_t netint32);  
uint16_t ntohs(uint16_t netint32);  
/* h表示“主机”字节序,n表示“网络”字节序,l表示“长”(4字节整数),s表示“短”(2字节整数) */

连接与非连接

网络通信的时候有建立连接和报文两种方式。连接是只双方主机通过商定一套协议,达成的一个稳定的虚拟连接。这里的代表是TCP协议,它能够通过建立的连接,保证信息的稳定有序的传输,同时会自动校验数据的完整性,通过重发机制,保证上层应用接收到的数据是完整的。缺点是会降低网络通信的效率,同时它内置的优化算法可能会在一些特殊情况下造成麻烦,比如说Nagle算法和粘包问题。而UDP是没有连接概念的,数据像邮件一样发送到目的地,不能保证对方收到信息的顺序,也不能保证对方能否收到信息。

建立连接

Linux通过connect函数建立连接:

#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr, socklen_t len);  

一般只有TCP客户端需要调用connect函数,同时被分配一个随机的端口。也只有调用了connect函数,TCP客户端才能和服务器建立连接进行通信。当然UDP也可以调用connect,调用之后每次发送数据不需要指明发送地址,同时只接受来自connect指向的主机。(注意,UDP调用connect并不是建立了一个连接)

在编写服务器的时候,我们需要一个固定的地址用于监听请求。在Linux下使用bind函数来关联一个socket和地址。

#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *addr, socklen_t len);  

此函数同时适用于TCP套接字和UDP套接字。如果绑定的IP地址为INADDR_ANY(定义的),套接字可以被绑定到所有的系统网络接口上,这意味着可以接收系统任何一个网卡的数据包。

TCP socket工作流程

TCP客户端和服务端的工作流程有很大的区别。

1.服务端工作流程为:创建套接字(socket) -> 绑定地址(bind) -> 监听端口(listen) -> 接收请求(accept) -> 进行通信(send/recv)
2.客户端工作流程为:创建套接字(socket) -> 建立连接(connect) -> 进行通信(send/recv)

UDP socket工作流程

UDP客户端和服务端的工作流程类似。

1.服务端工作流程为:创建套接字(socket) -> 绑定地址(bind) -> 进行通信(sendto/recvfrom)
2.客户端工作流程为:创建套接字(socket) -> 进行通信(sendto/recvfrom)

epoll异步IO编程

  • 如果直接依照上面工作流程来写服务端,会发现当执行accept/recv/recvfrom函数的时候,如果没有消息或者请求,程序会卡在函数执行的位置不动(send/sendto也会,不过次数比较少)。这叫做阻塞,这时的模型叫做阻塞IO模型。你可以通过设置socket参数将接口调节成非阻塞的模式,这样如果没有数据到来,函数会直接返回,并且会把errno设置为EAGAIN或者EWOULDBLOCK。这样因为网络带来的问题就不会影响到程序的运行。

  • 对于服务器来说,非阻塞socket可以实现多连接的并发处理。当如果每次都通过上述的方法来判断是否有数据可以读取的话,存在者严重的效率问题。所以早期的Unix系统以及Linux2.6以前的内核采用了两种方式来监控socket的读写,分别是poll和select。poll在客户端上相对适用,它可以监控单个socket的状态,在限定时间内,如果有数据会返回,否则一直阻塞到超过限定时间后返回。select和poll类似,不过它可以同时监控多个socket。但select有一个问题,那就是它所能同时监控的socket有上限,默认不超过2048,实际情况要比这个数字还要小。其次它采用了轮询的方式,依次对选择的socket监控状态,当连接较少的时候,会造成极大的浪费。所以Linux2.6内核以及之后的版本提供了epoll机制,通过回掉函数被动接受socket状态,避免了轮询检查,同时可以同时处理大量的连接。

  • epoll主要有五个函数和一个事件结构体:

epoll_create();  
epoll_add();  
epoll_mod();  
epoll_del();  
epoll_wait();

struct epoll_event;  

通过epollcreate创建epoll结构,然后用epolladd/epollmod/epolldel来添加/修改/删除监听请求(EPOLLIN/EPOLLOUT/EPOLLHUP等),然后调用epoll_wait来等待事件产生。 下面贴出一个实例代码,其中包含了一些socket选项的修改。

#define EPOLL_MAX 1024
#define DELETE_BUFFER(buf) {if(buf) delete[] buf; buf = NULL;}

static void set_no_blocking(int fd){  
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
}

static void set_no_delay(int fd){  
    int opt = 1;
    setsockopt(fd,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
}

static void set_reuse(int fd){  
    int opt = 1;
    setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
}

static void set_keep_alive(int fd){  
    int opt = 1;
    setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&opt,sizeof(opt));
}

static void epoll_add(int epoll_fd, int fd, uint32_t sig)  
{
    epoll_event event;
    event.data.fd = fd;
    event.events = sig;
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,fd,&event);
}
static void epoll_mod(int epoll_fd,int fd,uint32_t sig){  
    epoll_event event;
    event.data.fd = fd;
    event.events = sig;
    epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&event);
}

static void epoll_del(int epoll_fd,int fd){  
    epoll_event event;
    event.data.fd = fd;
    event.events = 0;
    epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,&event);
}

static void send_udp(int fd,const char *buf,int size,struct sockaddr_in *address, int len)  
{
    for(;;){
        int ret = sendto(fd,buf,size,0,(struct sockaddr*)address,len);
        if(ret < 0 && errno == EAGAIN)
            usleep(10);
        else
            break;
    }
}
static void send_tcp(int fd,const char *buf)  
{
    for(;;){
        int ret = send(fd,buf,TCP_BUFFER_SIZE,0);
        if(ret < 0 && errno == EAGAIN)
            usleep(10);
        else
            break;
    }
}
EpollEventLoop::EpollEventLoop()  
{
    tcp_fd = socket(AF_INET,SOCK_STREAM,0);
    assert(tcp_fd != -1);
    set_no_blocking(tcp_fd);

    udp_fd = socket(AF_INET,SOCK_DGRAM,0);
    assert(udp_fd != -1);
    set_no_blocking(udp_fd);

    epoll_fd = epoll_create(EPOLL_MAX);
    assert(epoll_fd != -1);

    set_reuse(tcp_fd);

    tcp_buf = new char[TCP_BUFFER_SIZE];
    udp_buf = new char[UDP_BUFFER_SIZE];
    assert(tcp_buf||udp_buf);

    std::cout<<"Server have started"<<std::endl;
}

EpollEventLoop::~EpollEventLoop()  
{
    DELETE_BUFFER(tcp_buf)
    DELETE_BUFFER(udp_buf)
    close(tcp_fd);
    close(udp_fd);
    close(epoll_fd);
}

void EpollEventLoop::bind_port(unsigned short port)  
{
    struct sockaddr_in address;

    memset(&address,0,sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons(port);

    int ret = bind(tcp_fd,(struct sockaddr*)&address,sizeof(address));
    assert(ret != -1);

    memset(&address,0,sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons(port);

    ret = bind(udp_fd,(struct sockaddr*)&address,sizeof(address));
    assert(ret != -1);

    std::cout<<"Listening port "<<port<<std::endl;
}

void EpollEventLoop::exec()  
{

    int ret = listen(tcp_fd,10);
    assert(ret != -1);
    epoll_add(epoll_fd,tcp_fd,EPOLLIN);
    epoll_add(epoll_fd,udp_fd,EPOLLIN);

    for(;;){
        int number = epoll_wait(epoll_fd,events,MAX_EVENT_NUMBER,-1);
        assert(number>=0);
        for(int i=0; i<number; i++){
            int sock_fd = events[i].data.fd;
            if(sock_fd == tcp_fd){
                struct  sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);

                int con_fd = accept(tcp_fd,(struct sockaddr*)&client_address,&client_addrlength);
                if(con_fd < 0) continue;
                bzero(tcp_buf,TCP_BUFFER_SIZE);
                sprintf(tcp_buf,"{\"token\":%d}",con_fd);
                int ret = send(con_fd,tcp_buf,TCP_BUFFER_SIZE,0);
                if(ret < 0){
                    shutdown(con_fd,SHUT_RDWR);
                }
                else{
                    set_no_blocking(con_fd);
                    set_no_delay(con_fd);
                    set_keep_alive(con_fd);
                    epoll_add(epoll_fd,con_fd,EPOLLIN);
                    data_handler.add_player(con_fd,client_address);
                    printf("[NEW] from %s %d tcp\n",inet_ntoa(client_address.sin_addr),ntohs(client_address.sin_port));
                }
            }
            else if(sock_fd == udp_fd){
                bzero(udp_buf,UDP_BUFFER_SIZE);

                struct  sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);

                int ret = recvfrom(udp_fd,udp_buf,UDP_BUFFER_SIZE,0,(struct sockaddr*)&client_address,&client_addrlength);

                if(ret > 0){
                    data_handler.handle_udp(udp_buf,&client_address);
                }
            }
            else if(events[i].events & EPOLLIN){
                bzero(tcp_buf,TCP_BUFFER_SIZE);
                int ret = recv(sock_fd,tcp_buf,TCP_BUFFER_SIZE,0);

                if(ret<=0){
                    epoll_mod(epoll_fd,sock_fd,0);
                    shutdown(sock_fd,SHUT_RDWR);
                }

                if(ret > 0){
                    data_handler.handle_tcp(sock_fd,tcp_buf);
                }
            }
            else if(events[i].events & EPOLLHUP){
                epoll_del(epoll_fd,sock_fd);
                data_handler.del_player(sock_fd);
                std::cout<<"delete "<<sock_fd<<std::endl;
            }
        }
    }
}

黄文杰

Read more posts by this author.

Subscribe to CG-HHU

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!