Libev 学习笔记

四 10 九月 2015 | tags: libev

简介

Libev是一个用C编写的功能齐全的高性能的轻量级事件驱动库,作为Libevent的替代者,拥有更快的速度,更小的体积。

Libev supports select, poll, the Linux-specific epoll, the BSD-specific kqueue and the Solaris-specific event port mechanisms for file descriptor events (ev_io), the Linux inotify interface (for ev_stat), Linux eventfd/signalfd (for faster and cleaner inter-thread wakeup (ev_async)/signal handling (ev_signal)) relative timers (ev_timer), absolute timers with customised rescheduling (ev_periodic), synchronous signals (ev_signal), process status change events (ev_child), and event watchers dealing with the event loop mechanism itself (ev_idle, ev_embed, ev_prepare and ev_check watchers) as well as file watchers (ev_stat) and even limited support for fork events (ev_fork).

安装Libev

sudo apt-get install libev-dev

使用Libev

Libev通过ev_loop结构来表示一个事件驱动的框架,在这个框架之下,支持几十种事件,每个事件通过ev_TYPE结构体,ev_TYEP_init,ev_TYPE_set,ev_TYPE_start,ev_TYPE_stop等API来跟这个事件框架注册事件监控器。当我们要监控的事件出现是,框架便会触发已经注册的时间监控器来处理事件。

支持的事件类型

ev_io                 // IO可读可写
ev_timer              // 相对定时器
ev_periodic           // 绝对定时器
ev_signal             // 信号处理
ev_child              // 子进程状态变化
ev_stat               // 文件属性变化
ev_idle               // event loop空闲触发事件
ev_prepare            // event loop之前事件
ev_check              // event loop之后事件
ev_embed              // 嵌入另一个后台循环
ev_fork               // fork事件
ev_cleanup            // event loop退出触发事件
ev_async              // 线程间异步事件

支持后台复用

  • select -- ev_select.c
  • poll -- ev_poll.c
  • epoll -- ev_epoll.c
  • kqueue -- ev_kqueue.c
  • port -- ev_port.c

使用基本流程

1. 初始化框架
     struct ev_loop *loop = EV_DEFAULT;
     struct ev_loop *loop = ev_default_loop(0);

     或者

    struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);
    //TODO
    ev_loop_destroy(loop);
2. 初始化监控器
    ev_TYPE w;
    ev_TYPE_init(w,cb,...);
3. 把监控器加入到框架的监控器列表里
    ev_TYPE_start(loop,w);
4. 框架开始处理事件
    ev_run(loop,0);

源码分析

关键数据结构

libev采用继承的关系来处理各种不同的监控器数据结构。基类:

# define EV_COMMON void *data;
# define EV_CB_DECLARE(type) void (*cb)(EV_P_ struct type *w, int revents);
# define EV_DECL_PRIORITY int priority;

#define EV_WATCHER(type)            \
  int active; /* private */         \
  int pending; /* private */            \
  EV_DECL_PRIORITY /* private */        \
  EV_COMMON /* rw */                \
  EV_CB_DECLARE (type) /* private */

 #define EV_WATCHER_LIST(type)          \
  EV_WATCHER (type)             \
  struct ev_watcher_list *next; /* private */

typedef struct ev_watcher
{
  EV_WATCHER (ev_watcher)
} ev_watcher;

typedef struct ev_watcher_list
{
  EV_WATCHER_LIST (ev_watcher_list)
} ev_watcher_list;

对于其他具体的监控器数据结构,都有基类派生而来,如ev_io类:

typedef struct ev_io
{
  EV_WATCHER_LIST (ev_io)

  int fd;     /* ro */
  int events; /* ro */
} ev_io;

其他类都是由类似的方法派生而来,具体可以参考源代码。

在介绍最重要的结构ev_loop之前,先解释一下EV_P,EV_P_,EV_A,EV_A_这几个宏,在代码中会经常看到这几个宏,主要是为了简化单线程模式下的函数调用的接口,在Libev的源代码的ev.h中,有:

/* support multiple event loops? */
#if EV_MULTIPLICITY
struct ev_loop;
# define EV_P  struct ev_loop *loop               /* a loop as sole parameter in a declaration */
# define EV_P_ EV_P,                              /* a loop as first of multiple parameters */
# define EV_A  loop                               /* a loop as sole argument to a function call */
# define EV_A_ EV_A,                              /* a loop as first of multiple arguments */
# define EV_DEFAULT_UC  ev_default_loop_uc_ ()    /* the default loop, if initialised, as sole arg */
# define EV_DEFAULT_UC_ EV_DEFAULT_UC,            /* the default loop as first of multiple arguments */
# define EV_DEFAULT  ev_default_loop (0)          /* the default loop as sole arg */
# define EV_DEFAULT_ EV_DEFAULT,                  /* the default loop as first of multiple arguments */
#else
# define EV_P void
# define EV_P_
# define EV_A
# define EV_A_
# define EV_DEFAULT
# define EV_DEFAULT_
# define EV_DEFAULT_UC
# define EV_DEFAULT_UC_
# undef EV_EMBED_ENABLE
#endif

如果包含EV_MULTIPLICITY, 表示支持多个ev_loop实例存在,一般来说,一个线程中有且只有一个ev_loop实例。如果整个程序是单线程,程序中可以选择使用默认的ev_loop来简化调用,即全局的ev_loop( 没有用结构),所以不需要参数。

ev_loop结构:

#if EV_MULTIPLICITY

  struct ev_loop
  {
    ev_tstamp ev_rt_now;
    #define ev_rt_now ((loop)->ev_rt_now)
    #define VAR(name,decl) decl;
      #include "ev_vars.h"
    #undef VAR
  };
  #include "ev_wrap.h"

  static struct ev_loop default_loop_struct;
  EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0; /* needs to be initialised to make it a definition despite extern */

#else

  EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */
  #define VAR(name,decl) static decl;
    #include "ev_vars.h"
  #undef VAR
  static int ev_default_loop_ptr;
#endif

对于这个宏VAR,可以参考这里的解释

对于单线城来说 ,定义了全局静态变量。对于多线程来说,所有数据结构都封装在ev_loop中,并且在ev_var.h中定义,通过include来展开。并且通过ev_wrap.h来简化访问,使多线程和单线城访问方式一样。

在ev_loop中有一个重要的成员,就是anfds,它定义了需要监控的I/O的文件句柄。libev需要考虑的一个问题就是执行效率,这里采用牺牲空间换时间的办法,用fd的值作为数组的下表,来加快查找的效率。

typedef ev_watcher_list *WL;

typedef struct
{
  WL head;
  unsigned char events; /* the events watched for */
  unsigned char reify;  /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
  unsigned char emask;  /* the epoll backend stores the actual kernel mask in here */
  unsigned char unused;
#if EV_USE_EPOLL
  unsigned int egen;    /* generation counter to counter epoll bugs */
#endif
#if EV_SELECT_IS_WINSOCKET || EV_USE_IOCP
  SOCKET handle;
#endif
#if EV_USE_IOCP
  OVERLAPPED or, ow;
#endif
} ANFD;

关键函数

框架初始化: ev_default_loop和ev_loop_new都会调用loop_init:

static void noinline ecb_cold
loop_init (EV_P_ unsigned int flags) EV_THROW
{
//根据不同的参数来初始化不同的backend
      if (!(flags & EVBACKEND_MASK))
        flags |= ev_recommended_backends ();

#if EV_USE_IOCP
      if (!backend && (flags & EVBACKEND_IOCP  )) backend = iocp_init   (EV_A_ flags);
#endif
#if EV_USE_PORT
      if (!backend && (flags & EVBACKEND_PORT  )) backend = port_init   (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
      if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
      if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init  (EV_A_ flags);
#endif
#if EV_USE_POLL
      if (!backend && (flags & EVBACKEND_POLL  )) backend = poll_init   (EV_A_ flags);
#endif
#if EV_USE_SELECT
      if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
#endif
    //...
    }
}

把监视器注册到框架中,看I/O的实现,其他类似:

void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  ...  
  //把active状态改成活动状态
  ev_start (EV_A_ (W)w, 1);

  //根据fd的大小来决定来分配多少内存,同时要根据内存的分配规律来分配合适的大小,加快访问速度
  //但是没有搞明白为什么要MALLOC_ROUND- sizeof (void *) * 4 ????
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);

  //把这个监视器放入fd对应的列表
  wlist_add (&anfds[fd].head, (WL)w);
  ...
  //标记哪些fd有改动,等到下一次循环的时候把这些fd的改动写到kernel里面去
  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
  w->events &= ~EV__IOFDSET;
  ...
}

把监视器从框架删除,ev_io_start的反操作:

void noinline
ev_io_stop (EV_P_ ev_io *w) EV_THROW
{
  //如果该事件正在等待执行,则删除
  clear_pending (EV_A_ (W)w);
  ...
  //从对应的fd列表里面删除监视器
  wlist_del (&anfds[w->fd].head, (WL)w);

  //把监视器active改成0
  ev_stop (EV_A_ (W)w);

 //标记哪些fd有改动,等到下一次循环的时候把这些fd的改动写到kernel里面去
  fd_change (EV_A_ w->fd, EV_ANFD_REIFY);
  ...
}

主要循环ev_run,监视事件是否有变化,有变化,这调用通知监视器:

int
ev_run (EV_P_ int flags)
{
   ...
  do
    {

#if EV_PREPARE_ENABLE
      //如果有prepare监视器,则在执行select/poll等之前先执行prepare
      if (expect_false (preparecnt))
        {
         //触发PREPARE事件
          queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
          //执行callback
          EV_INVOKE_PENDING;
        }
#endif
      ...

     //把ev_io_start中fd_change有改动的fd同步到kernel里面去,这里会调用具体backend的函数
      fd_reify (EV_A);

     //这里很长一段代码主要是计算时间
    ...

        //调用select/poll等方法来等待事件的发生,发生之后放入队列。 这里会调用具体backend的函数
        backend_poll (EV_A_ waittime);

#if EV_CHECK_ENABLE
     //如果有注册check监视器,则等待事件发生之后调用callback
      if (expect_false (checkcnt))
        //触发CHECK事件
        queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
#endif
     //执行所有pending的callback
      EV_INVOKE_PENDING;
    }
  while (expect_true (
    activecnt
    && !loop_done
    && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
  ));
  ...
}

例子

作者例子

man libev 

或者这里可以获得 ,并有详细解释。

简单socket例子

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <ev.h>

#define PORT 9000
#define IP "127.0.0.1"

int create_socket(); 
void accept_socket_cb(struct ev_loop *loop,ev_io *w, int revents);
void recv_socket_cb(struct ev_loop *loop,ev_io *w, int revents);
void write_socket_cb(struct ev_loop *loop,ev_io *w, int revents);


int main(int argc, char *argv[])
{
    int s = create_socket();
    if(s < 0){
        return -1;
    }

    ev_io ev_io_watcher;
    struct ev_loop *loop = ev_loop_new(EVBACKEND_EPOLL);

    ev_io_init(&ev_io_watcher, accept_socket_cb,s, EV_READ);
    ev_io_start(loop,&ev_io_watcher); 
    ev_loop(loop,0);
    ev_loop_destroy(loop);

    return 0;
}


int create_socket()
{
    struct sockaddr_in addr;
    int s;
    s = socket(AF_INET, SOCK_STREAM, 0);

    if(s == -1){
        perror("create socket error \n");
        return -1;
    }

    int so_reuseaddr = 1;
    setsockopt(s,SOL_SOCKET,SO_REUSEADDR,&so_reuseaddr,sizeof(so_reuseaddr));
    bzero(&addr, sizeof(addr));
    addr.sin_family = PF_INET;
    addr.sin_port = htons(PORT);
    addr.sin_addr.s_addr = inet_addr(IP);

    if(bind(s, (struct sockaddr *) &addr, sizeof(struct sockaddr))== -1){
        perror("bind socket error \n");
        return -1;
    }


    if(listen(s,32) == -1){
        perror("listen socket error\n");
        return -1;
    }

    printf("bind %s,listen %d \n",IP,PORT);

    return s;
}


void accept_socket_cb(struct ev_loop *loop,ev_io *w, int revents)
{
    int fd; 
    int s = w->fd;
    struct sockaddr_in sin;
    socklen_t addrlen = sizeof(struct sockaddr);
    do{
        fd = accept(s, (struct sockaddr *)&sin, &addrlen);
        if(fd > 0){
            break;
        }

        if(errno == EAGAIN || errno == EWOULDBLOCK){
            continue;
        }
    }while(1);

    ev_io* accept_watcher = malloc(sizeof(ev_io));

    memset(accept_watcher,0x00,sizeof(ev_io));

    ev_io_init(accept_watcher,recv_socket_cb,fd,EV_READ);
    ev_io_start(loop,accept_watcher);
}

#define MAX_BUF_LEN  1024
void recv_socket_cb(struct ev_loop *loop,ev_io *w, int revents)
{
    char buf[MAX_BUF_LEN] = {0};
    int ret = 0;

    do{
        ret = recv(w->fd,buf,MAX_BUF_LEN - 1, 0);

        if(ret > 0){
            printf("recv message:\n'%s'\n",buf);
            ev_io_stop(loop,  w);
            ev_io_init(w,write_socket_cb,w->fd,EV_WRITE);
            ev_io_start(loop,w);
            return;
        }

        if(ret == 0){
            printf("remote socket closed \n");
            break;
        }

        if(errno == EAGAIN ||errno == EWOULDBLOCK){
            continue;
        }
        break;
    }while(1);

    close(w->fd);
    ev_io_stop(loop,w);
    free(w);
}


void write_socket_cb(struct ev_loop *loop,ev_io *w, int revents)
{
    char buf[MAX_BUF_LEN] = {0};

    snprintf(buf,MAX_BUF_LEN - 1, "this is test message from libev \n");

    write(w->fd,buf,strlen(buf),0);

    ev_io_stop(loop,  w);
    ev_io_init(w,recv_socket_cb,w->fd,EV_READ);
    ev_io_start(loop,w);
}

多线程例子

参考

参考

Libev

libev多线程使用例

库-libev:详解

Comments !

个人链接