이 블로그 검색

2012년 12월 11일 화요일

node.js 의 thread pool(libuv) 부분


node.js 에서의 입출력 동작은 멀티쓰레드(thread pool)로 구현된다.
소스 코드에서 이 부분을 정리해본다. 다음과 같이 파일을 비동기로 읽어오는 스크립트가 있을때,
함수 호출을 따라가 보면 다음과 같다.

var fs = require('fs');
fs.readFile('./bigFile.txt', 'utf8', function(err, data) {
    console.log(data);
});
console.log('exiting...');

사용자의 스크립트에서 호출한 fs.readFile 함수는 /lib/fs.js 에 존재하며
내부적으로 다음 함수 호출을 한다.



binding.read(fd, buffer, offset, length, position, wrapper);

node_file.cc 를 보면 javascript 에서 호출한 read 함수는 다음과 같이 정의 되었기 때문에

NODE_SET_METHOD(target, "read", Read);

결국 C++ 소스의 Read 함수를 호출하게 된다 (이전 글 참조). Read함수 정의는 다음과 같다.

Read (src/node_file.cc)


static Handle<Value> Read(const Arguments& args) {
  HandleScope scope;
  ...
  int fd = args[0]->Int32Value();

  Local<Value> cb;
  ...
  pos = GET_OFFSET(args[4]);

  buf = buffer_data + off;

  cb = args[5];

  if (cb->IsFunction()) {
    ASYNC_CALL(read, cb, fd, buf, len, pos);
  } else {
    SYNC_CALL(read, 0, fd, buf, len, pos)
    Local<Integer> bytesRead = Integer::New(SYNC_RESULT);
    return scope.Close(bytesRead);
  }
}

비동기 호출을 하는경우, 콜백이 존재하고 이때 ASYNC_CALL 매크로로 처리된다.

ASYNC_CALL  (src/node_file.cc)


#define ASYNC_CALL(func, callback, ...)                           \
  FSReqWrap* req_wrap = new FSReqWrap(#func);                     \
  int r = uv_fs_##func(uv_default_loop(), &req_wrap->req_,        \
      __VA_ARGS__, After);                                        \
  req_wrap->object_->Set(oncomplete_sym, callback);               \
  req_wrap->Dispatched();                                         \
  if (r < 0) {                                                    \
    uv_fs_t* req = &req_wrap->req_;                               \
    req->result = r;                                              \
    req->path = NULL;                                             \
    req->errorno = uv_last_error(uv_default_loop()).code;         \
    After(req);                                                   \
  }                                                               \
  return scope.Close(req_wrap->object_);

이 매크로 내부에서 read 요청은 libuv 함수 uv_fs_read호출로 변경된다.

uv_fs_read  (deps/uv/src/unix/fs.c)


int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
               uv_file file,
               void* buf,
               size_t len,
               int64_t off,
               uv_fs_cb cb) {
  INIT(READ);
  req->file = file;
  req->buf = buf;
  req->len = len;
  req->off = off;
  POST;
}

먼저 INIT처리, 이후 POST 처리를 하고 있다.

INIT (deps/uv/src/unix/fs.c)


#define INIT(type)                                                            \
  do {                                                                        \
    uv__req_init((loop), (req), UV_FS);                                       \
    (req)->fs_type = UV_FS_ ## type;                                          \
    (req)->errorno = 0;                                                       \
    (req)->result = 0;                                                        \
    (req)->ptr = NULL;                                                        \
    (req)->loop = loop;                                                       \
    (req)->path = NULL;                                                       \
    (req)->new_path = NULL;                                                   \
    (req)->cb = (cb);                                                         \
  }                                                                           \
  while (0)

POST (deps/uv/src/unix/fs.c)


#define POST                                                                  \
  do {                                                                        \
    if ((cb) != NULL) {                                                       \
      uv__work_submit((loop), &(req)->work_req, uv__fs_work, uv__fs_done);    \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&(req)->work_req);                                          \
      uv__fs_done(&(req)->work_req);                                          \
      return (req)->result;                                                   \
    }                                                                         \
  }                                                                           \
  while (0)

여기서 thread pool 로 처리가 되기 위한 uv__work_submit 함수 호출을 볼수 있다.

uv__work_submit  (deps/uv/src/unix/threadpool.c)


이 함수는 초기화 작업이 한번만 수행되도록  pthread_once 를 호출하였다. 이 초기화 작업에 쓰레드 풀을 생성하는것이 포함된다.

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w)) {
  pthread_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq);
}

init_once  (deps/uv/src/unix/threadpool.c)


다음처럼  threads 배열의 크기만큼 쓰레드 풀을 생성하고 있다.
linux/mac 의 경우 쓰레드풀을 위해 고정된 쓰레드수가 4로 지정되어 있다.
windows 경우는 OS 차원의 쓰레드풀을 활용하므로(QueueUserWorkItem 호출)
이런 고정 갯수는 존재하지 않는다.

static pthread_t threads[4];
...
static void init_once(void) {
  unsigned int i;

  ngx_queue_init(&wq);

  for (i = 0; i < ARRAY_SIZE(threads); i++)
    if (pthread_create(threads + i, NULL, worker, NULL))
      abort();

  initialized = 1;
}

post (deps/uv/src/unix/threadpool.c)


여기서는 큐에 작업을 넣고, 조건변수에 신호를 보낸다.

static void post(ngx_queue_t* q) {
  pthread_mutex_lock(&mutex);
  ngx_queue_insert_tail(&wq, q);
  pthread_cond_signal(&cond);
  pthread_mutex_unlock(&mutex);
}

한편, 이 thread pool에서 작업을 처리하는 쓰레드 함수는 다음과 같다.
큐에 작업이 삽입될때 까지 조건변수를 기다리고 신호를 받으면 작업큐에서 가져와서 처리한다.

static void* worker(void* arg) {
  struct uv__work* w;
  ngx_queue_t* q;

  (void) arg;

  for (;;) {
    if (pthread_mutex_lock(&mutex))
      abort();

    while (ngx_queue_empty(&wq))
      if (pthread_cond_wait(&cond, &mutex))
        abort();

    q = ngx_queue_head(&wq);

    if (q == &exit_message)
      pthread_cond_signal(&cond);
    else
      ngx_queue_remove(q);

    if (pthread_mutex_unlock(&mutex))
      abort();

    if (q == &exit_message)
      break;

    w = ngx_queue_data(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    ngx_queue_insert_tail(&w->loop->wq, &w->wq);
    uv_mutex_unlock(&w->loop->wq_mutex);
    uv_async_send(&w->loop->wq_async);
  }

  return NULL;
}

ngx_queue_insert_tail  (deps/uv/include/uv-private/ngx-queue.h)


큐에 작업을 넣는 부분은 다음과 같이 구현된다.  리스트를 이용해서 큐를 구현하였다.

#define ngx_queue_insert_tail(h, x)                                           \
  do {                                                                        \
    (x)->prev = (h)->prev;                                                    \
    (x)->prev->next = x;                                                      \
    (x)->next = h;                                                            \
    (h)->prev = x;                                                            \
  }                                                                           \
  while (0)

소스를 따라가면 누구나 알수 있는 내용이지만, 한번 간단히 요약해 보았다.

댓글 없음:

댓글 쓰기