简介
Libev是一个用C编写的功能齐全的高性能的轻量级事件驱动库,作为Libevent的替代者,拥有更快的速度,更小的体积。
Libev supports select, poll, the Linux-specific epoll, the BSD-specific kqueue and the Solaris-specific event port mechanisms for file descriptor events (ev_io), the Linux inotify interface (for ev_stat), Linux eventfd/signalfd (for faster and cleaner inter-thread wakeup (ev_async)/signal handling (ev_signal)) relative timers (ev_timer), absolute timers with customised rescheduling (ev_periodic), synchronous signals (ev_signal), process status change events (ev_child), and event watchers dealing with the event loop mechanism itself (ev_idle, ev_embed, ev_prepare and ev_check watchers) as well as file watchers (ev_stat) and even limited support for fork events (ev_fork).
安装Libev
sudo apt-get install libev-dev
使用Libev
Libev通过ev_loop结构来表示一个事件驱动的框架,在这个框架之下,支持几十种事件,每个事件通过ev_TYPE结构体,ev_TYEP_init,ev_TYPE_set,ev_TYPE_start,ev_TYPE_stop等API来跟这个事件框架注册事件监控器。当我们要监控的事件出现是,框架便会触发已经注册的时间监控器来处理事件。
支持的事件类型
ev_io // IO可读可写
ev_timer // 相对定时器
ev_periodic // 绝对定时器
ev_signal // 信号处理
ev_child // 子进程状态变化
ev_stat // 文件属性变化
ev_idle // event loop空闲触发事件
ev_prepare // event loop之前事件
ev_check // event loop之后事件
ev_embed // 嵌入另一个后台循环
ev_fork // fork事件
ev_cleanup // event loop退出触发事件
ev_async // 线程间异步事件
支持后台复用
- select -- ev_select.c
- poll -- ev_poll.c
- epoll -- ev_epoll.c
- kqueue -- ev_kqueue.c
- port -- ev_port.c
使用基本流程
1. 初始化框架
struct ev_loop *loop = EV_DEFAULT;
struct ev_loop *loop = ev_default_loop(0);
或者
struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);
//TODO
ev_loop_destroy(loop);
2. 初始化监控器
ev_TYPE w;
ev_TYPE_init(w,cb,...);
3. 把监控器加入到框架的监控器列表里
ev_TYPE_start(loop,w);
4. 框架开始处理事件
ev_run(loop,0);
源码分析
关键数据结构
libev采用继承的关系来处理各种不同的监控器数据结构。基类:
# define EV_COMMON void *data;
# define EV_CB_DECLARE(type) void (*cb)(EV_P_ struct type *w, int revents);
# define EV_DECL_PRIORITY int priority;
#define EV_WATCHER(type) \
int active; /* private */ \
int pending; /* private */ \
EV_DECL_PRIORITY /* private */ \
EV_COMMON /* rw */ \
EV_CB_DECLARE (type) /* private */
#define EV_WATCHER_LIST(type) \
EV_WATCHER (type) \
struct ev_watcher_list *next; /* private */
typedef struct ev_watcher
{
EV_WATCHER (ev_watcher)
} ev_watcher;
typedef struct ev_watcher_list
{
EV_WATCHER_LIST (ev_watcher_list)
} ev_watcher_list;
对于其他具体的监控器数据结构,都有基类派生而来,如ev_io类:
typedef struct ev_io
{
EV_WATCHER_LIST (ev_io)
int fd; /* ro */
int events; /* ro */
} ev_io;
其他类都是由类似的方法派生而来,具体可以参考源代码。
在介绍最重要的结构ev_loop之前,先解释一下EV_P,EV_P_,EV_A,EV_A_这几个宏,在代码中会经常看到这几个宏,主要是为了简化单线程模式下的函数调用的接口,在Libev的源代码的ev.h中,有:
/* support multiple event loops? */
#if EV_MULTIPLICITY
struct ev_loop;
# define EV_P struct ev_loop *loop /* a loop as sole parameter in a declaration */
# define EV_P_ EV_P, /* a loop as first of multiple parameters */
# define EV_A loop /* a loop as sole argument to a function call */
# define EV_A_ EV_A, /* a loop as first of multiple arguments */
# define EV_DEFAULT_UC ev_default_loop_uc_ () /* the default loop, if initialised, as sole arg */
# define EV_DEFAULT_UC_ EV_DEFAULT_UC, /* the default loop as first of multiple arguments */
# define EV_DEFAULT ev_default_loop (0) /* the default loop as sole arg */
# define EV_DEFAULT_ EV_DEFAULT, /* the default loop as first of multiple arguments */
#else
# define EV_P void
# define EV_P_
# define EV_A
# define EV_A_
# define EV_DEFAULT
# define EV_DEFAULT_
# define EV_DEFAULT_UC
# define EV_DEFAULT_UC_
# undef EV_EMBED_ENABLE
#endif
如果包含EV_MULTIPLICITY, 表示支持多个ev_loop实例存在,一般来说,一个线程中有且只有一个ev_loop实例。如果整个程序是单线程,程序中可以选择使用默认的ev_loop来简化调用,即全局的ev_loop( 没有用结构),所以不需要参数。
ev_loop结构:
#if EV_MULTIPLICITY
struct ev_loop
{
ev_tstamp ev_rt_now;
#define ev_rt_now ((loop)->ev_rt_now)
#define VAR(name,decl) decl;
#include "ev_vars.h"
#undef VAR
};
#include "ev_wrap.h"
static struct ev_loop default_loop_struct;
EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0; /* needs to be initialised to make it a definition despite extern */
#else
EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */
#define VAR(name,decl) static decl;
#include "ev_vars.h"
#undef VAR
static int ev_default_loop_ptr;
#endif
对于这个宏VAR,可以参考这里的解释
对于单线城来说 ,定义了全局静态变量。对于多线程来说,所有数据结构都封装在ev_loop中,并且在ev_var.h中定义,通过include来展开。并且通过ev_wrap.h来简化访问,使多线程和单线城访问方式一样。
在ev_loop中有一个重要的成员,就是anfds,它定义了需要监控的I/O的文件句柄。libev需要考虑的一个问题就是执行效率,这里采用牺牲空间换时间的办法,用fd的值作为数组的下表,来加快查找的效率。
typedef ev_watcher_list *WL;
typedef struct
{
WL head;
unsigned char events; /* the events watched for */
unsigned char reify; /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
unsigned char emask; /* the epoll backend stores the actual kernel mask in here */
unsigned char unused;
#if EV_USE_EPOLL
unsigned int egen; /* generation counter to counter epoll bugs */
#endif
#if EV_SELECT_IS_WINSOCKET || EV_USE_IOCP
SOCKET handle;
#endif
#if EV_USE_IOCP
OVERLAPPED or, ow;
#endif
} ANFD;
关键函数
框架初始化: ev_default_loop和ev_loop_new都会调用loop_init:
static void noinline ecb_cold
loop_init (EV_P_ unsigned int flags) EV_THROW
{
//根据不同的参数来初始化不同的backend
if (!(flags & EVBACKEND_MASK))
flags |= ev_recommended_backends ();
#if EV_USE_IOCP
if (!backend && (flags & EVBACKEND_IOCP )) backend = iocp_init (EV_A_ flags);
#endif
#if EV_USE_PORT
if (!backend && (flags & EVBACKEND_PORT )) backend = port_init (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init (EV_A_ flags);
#endif
#if EV_USE_POLL
if (!backend && (flags & EVBACKEND_POLL )) backend = poll_init (EV_A_ flags);
#endif
#if EV_USE_SELECT
if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif
//...
}
}
把监视器注册到框架中,看I/O的实现,其他类似:
void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
...
//把active状态改成活动状态
ev_start (EV_A_ (W)w, 1);
//根据fd的大小来决定来分配多少内存,同时要根据内存的分配规律来分配合适的大小,加快访问速度
//但是没有搞明白为什么要MALLOC_ROUND- sizeof (void *) * 4 ????
array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
//把这个监视器放入fd对应的列表
wlist_add (&anfds[fd].head, (WL)w);
...
//标记哪些fd有改动,等到下一次循环的时候把这些fd的改动写到kernel里面去
fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
w->events &= ~EV__IOFDSET;
...
}
把监视器从框架删除,ev_io_start的反操作:
void noinline
ev_io_stop (EV_P_ ev_io *w) EV_THROW
{
//如果该事件正在等待执行,则删除
clear_pending (EV_A_ (W)w);
...
//从对应的fd列表里面删除监视器
wlist_del (&anfds[w->fd].head, (WL)w);
//把监视器active改成0
ev_stop (EV_A_ (W)w);
//标记哪些fd有改动,等到下一次循环的时候把这些fd的改动写到kernel里面去
fd_change (EV_A_ w->fd, EV_ANFD_REIFY);
...
}
主要循环ev_run,监视事件是否有变化,有变化,这调用通知监视器:
int
ev_run (EV_P_ int flags)
{
...
do
{
#if EV_PREPARE_ENABLE
//如果有prepare监视器,则在执行select/poll等之前先执行prepare
if (expect_false (preparecnt))
{
//触发PREPARE事件
queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
//执行callback
EV_INVOKE_PENDING;
}
#endif
...
//把ev_io_start中fd_change有改动的fd同步到kernel里面去,这里会调用具体backend的函数
fd_reify (EV_A);
//这里很长一段代码主要是计算时间
...
//调用select/poll等方法来等待事件的发生,发生之后放入队列。 这里会调用具体backend的函数
backend_poll (EV_A_ waittime);
#if EV_CHECK_ENABLE
//如果有注册check监视器,则等待事件发生之后调用callback
if (expect_false (checkcnt))
//触发CHECK事件
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
#endif
//执行所有pending的callback
EV_INVOKE_PENDING;
}
while (expect_true (
activecnt
&& !loop_done
&& !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
));
...
}
例子
作者例子
man libev
或者这里可以获得 ,并有详细解释。
简单socket例子
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <ev.h>
#define PORT 9000
#define IP "127.0.0.1"
int create_socket();
void accept_socket_cb(struct ev_loop *loop,ev_io *w, int revents);
void recv_socket_cb(struct ev_loop *loop,ev_io *w, int revents);
void write_socket_cb(struct ev_loop *loop,ev_io *w, int revents);
int main(int argc, char *argv[])
{
int s = create_socket();
if(s < 0){
return -1;
}
ev_io ev_io_watcher;
struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);
ev_io_init(&ev_io_watcher, accept_socket_cb,s, EV_READ);
ev_io_start(loop,&ev_io_watcher);
ev_loop(loop,0);
ev_loop_destroy(loop);
return 0;
}
int create_socket()
{
struct sockaddr_in addr;
int s;
s = socket(AF_INET, SOCK_STREAM, 0);
if(s == -1){
perror("create socket error \n");
return -1;
}
int so_reuseaddr = 1;
setsockopt(s,SOL_SOCKET,SO_REUSEADDR,&so_reuseaddr,sizeof(so_reuseaddr));
bzero(&addr, sizeof(addr));
addr.sin_family = PF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = inet_addr(IP);
if(bind(s, (struct sockaddr *) &addr, sizeof(struct sockaddr))== -1){
perror("bind socket error \n");
return -1;
}
if(listen(s,32) == -1){
perror("listen socket error\n");
return -1;
}
printf("bind %s,listen %d \n",IP,PORT);
return s;
}
void accept_socket_cb(struct ev_loop *loop,ev_io *w, int revents)
{
int fd;
int s = w->fd;
struct sockaddr_in sin;
socklen_t addrlen = sizeof(struct sockaddr);
do{
fd = accept(s, (struct sockaddr *)&sin, &addrlen);
if(fd > 0){
break;
}
if(errno == EAGAIN || errno == EWOULDBLOCK){
continue;
}
}while(1);
ev_io* accept_watcher = malloc(sizeof(ev_io));
memset(accept_watcher,0x00,sizeof(ev_io));
ev_io_init(accept_watcher,recv_socket_cb,fd,EV_READ);
ev_io_start(loop,accept_watcher);
}
#define MAX_BUF_LEN 1024
void recv_socket_cb(struct ev_loop *loop,ev_io *w, int revents)
{
char buf[MAX_BUF_LEN] = {0};
int ret = 0;
do{
ret = recv(w->fd,buf,MAX_BUF_LEN - 1, 0);
if(ret > 0){
printf("recv message:\n'%s'\n",buf);
ev_io_stop(loop, w);
ev_io_init(w,write_socket_cb,w->fd,EV_WRITE);
ev_io_start(loop,w);
return;
}
if(ret == 0){
printf("remote socket closed \n");
break;
}
if(errno == EAGAIN ||errno == EWOULDBLOCK){
continue;
}
break;
}while(1);
close(w->fd);
ev_io_stop(loop,w);
free(w);
}
void write_socket_cb(struct ev_loop *loop,ev_io *w, int revents)
{
char buf[MAX_BUF_LEN] = {0};
snprintf(buf,MAX_BUF_LEN - 1, "this is test message from libev \n");
write(w->fd,buf,strlen(buf),0);
ev_io_stop(loop, w);
ev_io_init(w,recv_socket_cb,w->fd,EV_READ);
ev_io_start(loop,w);
}
多线程例子
请参考
Comments !