root/trunk/server/thread.c @ 608

Revision 608, 15.2 kB (checked in by plindner, 2 years ago)

Incorporate incrememnt patch from Evan Miller
<emiller@…> to define increment overflow behavior.

Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Thread management for memcached.
4 *
5 *  $Id$
6 */
7#include "memcached.h"
8#include <stdio.h>
9#include <errno.h>
10#include <stdlib.h>
11#include <errno.h>
12
13#ifdef HAVE_MALLOC_H
14#include <malloc.h>
15#endif
16
17#ifdef HAVE_STRING_H
18#include <string.h>
19#endif
20
21#ifdef USE_THREADS
22
23#include <pthread.h>
24
25#define ITEMS_PER_ALLOC 64
26
27/* An item in the connection queue. */
28typedef struct conn_queue_item CQ_ITEM;
29struct conn_queue_item {
30    int     sfd;
31    int     init_state;
32    int     event_flags;
33    int     read_buffer_size;
34    int     is_udp;
35    CQ_ITEM *next;
36};
37
38/* A connection queue. */
39typedef struct conn_queue CQ;
40struct conn_queue {
41    CQ_ITEM *head;
42    CQ_ITEM *tail;
43    pthread_mutex_t lock;
44    pthread_cond_t  cond;
45};
46
47/* Lock for connection freelist */
48static pthread_mutex_t conn_lock;
49
50/* Lock for cache operations (item_*, assoc_*) */
51static pthread_mutex_t cache_lock;
52
53/* Lock for slab allocator operations */
54static pthread_mutex_t slabs_lock;
55
56/* Lock for global stats */
57static pthread_mutex_t stats_lock;
58
59/* Free list of CQ_ITEM structs */
60static CQ_ITEM *cqi_freelist;
61static pthread_mutex_t cqi_freelist_lock;
62
63/*
64 * Each libevent instance has a wakeup pipe, which other threads
65 * can use to signal that they've put a new connection on its queue.
66 */
67typedef struct {
68    pthread_t thread_id;        /* unique ID of this thread */
69    struct event_base *base;    /* libevent handle this thread uses */
70    struct event notify_event;  /* listen event for notify pipe */
71    int notify_receive_fd;      /* receiving end of notify pipe */
72    int notify_send_fd;         /* sending end of notify pipe */
73    CQ  new_conn_queue;         /* queue of new connections to handle */
74} LIBEVENT_THREAD;
75
76static LIBEVENT_THREAD *threads;
77
78/*
79 * Number of threads that have finished setting themselves up.
80 */
81static int init_count = 0;
82static pthread_mutex_t init_lock;
83static pthread_cond_t init_cond;
84
85
86static void thread_libevent_process(int fd, short which, void *arg);
87
88/*
89 * Initializes a connection queue.
90 */
91static void cq_init(CQ *cq) {
92    pthread_mutex_init(&cq->lock, NULL);
93    pthread_cond_init(&cq->cond, NULL);
94    cq->head = NULL;
95    cq->tail = NULL;
96}
97
98/*
99 * Waits for work on a connection queue.
100 */
101static CQ_ITEM *cq_pop(CQ *cq) {
102    CQ_ITEM *item;
103
104    pthread_mutex_lock(&cq->lock);
105    while (NULL == cq->head)
106        pthread_cond_wait(&cq->cond, &cq->lock);
107    item = cq->head;
108    cq->head = item->next;
109    if (NULL == cq->head)
110        cq->tail = NULL;
111    pthread_mutex_unlock(&cq->lock);
112
113    return item;
114}
115
116/*
117 * Looks for an item on a connection queue, but doesn't block if there isn't
118 * one.
119 * Returns the item, or NULL if no item is available
120 */
121static CQ_ITEM *cq_peek(CQ *cq) {
122    CQ_ITEM *item;
123
124    pthread_mutex_lock(&cq->lock);
125    item = cq->head;
126    if (NULL != item) {
127        cq->head = item->next;
128        if (NULL == cq->head)
129            cq->tail = NULL;
130    }
131    pthread_mutex_unlock(&cq->lock);
132
133    return item;
134}
135
136/*
137 * Adds an item to a connection queue.
138 */
139static void cq_push(CQ *cq, CQ_ITEM *item) {
140    item->next = NULL;
141
142    pthread_mutex_lock(&cq->lock);
143    if (NULL == cq->tail)
144        cq->head = item;
145    else
146        cq->tail->next = item;
147    cq->tail = item;
148    pthread_cond_signal(&cq->cond);
149    pthread_mutex_unlock(&cq->lock);
150}
151
152/*
153 * Returns a fresh connection queue item.
154 */
155static CQ_ITEM *cqi_new() {
156    CQ_ITEM *item = NULL;
157    pthread_mutex_lock(&cqi_freelist_lock);
158    if (cqi_freelist) {
159        item = cqi_freelist;
160        cqi_freelist = item->next;
161    }
162    pthread_mutex_unlock(&cqi_freelist_lock);
163
164    if (NULL == item) {
165        int i;
166
167        /* Allocate a bunch of items at once to reduce fragmentation */
168        item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
169        if (NULL == item)
170            return NULL;
171
172        /*
173         * Link together all the new items except the first one
174         * (which we'll return to the caller) for placement on
175         * the freelist.
176         */
177        for (i = 2; i < ITEMS_PER_ALLOC; i++)
178            item[i - 1].next = &item[i];
179
180        pthread_mutex_lock(&cqi_freelist_lock);
181        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
182        cqi_freelist = &item[1];
183        pthread_mutex_unlock(&cqi_freelist_lock);
184    }
185
186    return item;
187}
188
189
190/*
191 * Frees a connection queue item (adds it to the freelist.)
192 */
193static void cqi_free(CQ_ITEM *item) {
194    pthread_mutex_lock(&cqi_freelist_lock);
195    item->next = cqi_freelist;
196    cqi_freelist = item;
197    pthread_mutex_unlock(&cqi_freelist_lock);
198}
199
200
201/*
202 * Creates a worker thread.
203 */
204static void create_worker(void *(*func)(void *), void *arg) {
205    pthread_t       thread;
206    pthread_attr_t  attr;
207    int             ret;
208
209    pthread_attr_init(&attr);
210
211    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
212        fprintf(stderr, "Can't create thread: %s\n",
213                strerror(ret));
214        exit(1);
215    }
216}
217
218
219/*
220 * Pulls a conn structure from the freelist, if one is available.
221 */
222conn *mt_conn_from_freelist() {
223    conn *c;
224
225    pthread_mutex_lock(&conn_lock);
226    c = do_conn_from_freelist();
227    pthread_mutex_unlock(&conn_lock);
228
229    return c;
230}
231
232
233/*
234 * Adds a conn structure to the freelist.
235 *
236 * Returns 0 on success, 1 if the structure couldn't be added.
237 */
238bool mt_conn_add_to_freelist(conn *c) {
239    bool result;
240
241    pthread_mutex_lock(&conn_lock);
242    result = do_conn_add_to_freelist(c);
243    pthread_mutex_unlock(&conn_lock);
244
245    return result;
246}
247
248/****************************** LIBEVENT THREADS *****************************/
249
250/*
251 * Set up a thread's information.
252 */
253static void setup_thread(LIBEVENT_THREAD *me) {
254    if (! me->base) {
255        me->base = event_init();
256        if (! me->base) {
257            fprintf(stderr, "Can't allocate event base\n");
258            exit(1);
259        }
260    }
261
262    /* Listen for notifications from other threads */
263    event_set(&me->notify_event, me->notify_receive_fd,
264              EV_READ | EV_PERSIST, thread_libevent_process, me);
265    event_base_set(me->base, &me->notify_event);
266
267    if (event_add(&me->notify_event, 0) == -1) {
268        fprintf(stderr, "Can't monitor libevent notify pipe\n");
269        exit(1);
270    }
271
272    cq_init(&me->new_conn_queue);
273}
274
275
276/*
277 * Worker thread: main event loop
278 */
279static void *worker_libevent(void *arg) {
280    LIBEVENT_THREAD *me = arg;
281
282    /* Any per-thread setup can happen here; thread_init() will block until
283     * all threads have finished initializing.
284     */
285
286    pthread_mutex_lock(&init_lock);
287    init_count++;
288    pthread_cond_signal(&init_cond);
289    pthread_mutex_unlock(&init_lock);
290
291    return (void*) event_base_loop(me->base, 0);
292}
293
294
295/*
296 * Processes an incoming "handle a new connection" item. This is called when
297 * input arrives on the libevent wakeup pipe.
298 */
299static void thread_libevent_process(int fd, short which, void *arg) {
300    LIBEVENT_THREAD *me = arg;
301    CQ_ITEM *item;
302    char buf[1];
303
304    if (read(fd, buf, 1) != 1)
305        if (settings.verbose > 0)
306            fprintf(stderr, "Can't read from libevent pipe\n");
307
308    item = cq_peek(&me->new_conn_queue);
309
310    if (NULL != item) {
311        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
312                           item->read_buffer_size, item->is_udp, me->base);
313        if (c == NULL) {
314            if (item->is_udp) {
315                fprintf(stderr, "Can't listen for events on UDP socket\n");
316                exit(1);
317            } else {
318                if (settings.verbose > 0) {
319                    fprintf(stderr, "Can't listen for events on fd %d\n",
320                        item->sfd);
321                }
322                close(item->sfd);
323            }
324        }
325        cqi_free(item);
326    }
327}
328
329/* Which thread we assigned a connection to most recently. */
330static int last_thread = -1;
331
332/*
333 * Dispatches a new connection to another thread. This is only ever called
334 * from the main thread, either during initialization (for UDP) or because
335 * of an incoming connection.
336 */
337void dispatch_conn_new(int sfd, int init_state, int event_flags,
338                       int read_buffer_size, int is_udp) {
339    CQ_ITEM *item = cqi_new();
340    int thread = (last_thread + 1) % settings.num_threads;
341
342    last_thread = thread;
343
344    item->sfd = sfd;
345    item->init_state = init_state;
346    item->event_flags = event_flags;
347    item->read_buffer_size = read_buffer_size;
348    item->is_udp = is_udp;
349
350    cq_push(&threads[thread].new_conn_queue, item);
351    if (write(threads[thread].notify_send_fd, "", 1) != 1) {
352        perror("Writing to thread notify pipe");
353    }
354}
355
356/*
357 * Returns true if this is the thread that listens for new TCP connections.
358 */
359int mt_is_listen_thread() {
360    return pthread_self() == threads[0].thread_id;
361}
362
363/********************************* ITEM ACCESS *******************************/
364
365/*
366 * Walks through the list of deletes that have been deferred because the items
367 * were locked down at the tmie.
368 */
369void mt_run_deferred_deletes() {
370    pthread_mutex_lock(&cache_lock);
371    do_run_deferred_deletes();
372    pthread_mutex_unlock(&cache_lock);
373}
374
375/*
376 * Allocates a new item.
377 */
378item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
379    item *it;
380    pthread_mutex_lock(&cache_lock);
381    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
382    pthread_mutex_unlock(&cache_lock);
383    return it;
384}
385
386/*
387 * Returns an item if it hasn't been marked as expired or deleted,
388 * lazy-expiring as needed.
389 */
390item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked) {
391    item *it;
392    pthread_mutex_lock(&cache_lock);
393    it = do_item_get_notedeleted(key, nkey, delete_locked);
394    pthread_mutex_unlock(&cache_lock);
395    return it;
396}
397
398/*
399 * Links an item into the LRU and hashtable.
400 */
401int mt_item_link(item *item) {
402    int ret;
403
404    pthread_mutex_lock(&cache_lock);
405    ret = do_item_link(item);
406    pthread_mutex_unlock(&cache_lock);
407    return ret;
408}
409
410/*
411 * Decrements the reference count on an item and adds it to the freelist if
412 * needed.
413 */
414void mt_item_remove(item *item) {
415    pthread_mutex_lock(&cache_lock);
416    do_item_remove(item);
417    pthread_mutex_unlock(&cache_lock);
418}
419
420/*
421 * Replaces one item with another in the hashtable.
422 */
423int mt_item_replace(item *old, item *new) {
424    int ret;
425
426    pthread_mutex_lock(&cache_lock);
427    ret = do_item_replace(old, new);
428    pthread_mutex_unlock(&cache_lock);
429    return ret;
430}
431
432/*
433 * Unlinks an item from the LRU and hashtable.
434 */
435void mt_item_unlink(item *item) {
436    pthread_mutex_lock(&cache_lock);
437    do_item_unlink(item);
438    pthread_mutex_unlock(&cache_lock);
439}
440
441/*
442 * Moves an item to the back of the LRU queue.
443 */
444void mt_item_update(item *item) {
445    pthread_mutex_lock(&cache_lock);
446    do_item_update(item);
447    pthread_mutex_unlock(&cache_lock);
448}
449
450/*
451 * Adds an item to the deferred-delete list so it can be reaped later.
452 */
453char *mt_defer_delete(item *item, time_t exptime) {
454    char *ret;
455
456    pthread_mutex_lock(&cache_lock);
457    ret = do_defer_delete(item, exptime);
458    pthread_mutex_unlock(&cache_lock);
459    return ret;
460}
461
462/*
463 * Does arithmetic on a numeric item value.
464 */
465char *mt_add_delta(item *item, int incr, const unsigned int delta, char *buf) {
466    char *ret;
467
468    pthread_mutex_lock(&cache_lock);
469    ret = do_add_delta(item, incr, delta, buf);
470    pthread_mutex_unlock(&cache_lock);
471    return ret;
472}
473
474/*
475 * Stores an item in the cache (high level, obeys set/add/replace semantics)
476 */
477int mt_store_item(item *item, int comm) {
478    int ret;
479
480    pthread_mutex_lock(&cache_lock);
481    ret = do_store_item(item, comm);
482    pthread_mutex_unlock(&cache_lock);
483    return ret;
484}
485
486/*
487 * Flushes expired items after a flush_all call
488 */
489void mt_item_flush_expired() {
490    pthread_mutex_lock(&cache_lock);
491    do_item_flush_expired();
492    pthread_mutex_unlock(&cache_lock);
493}
494
495/*
496 * Dumps part of the cache
497 */
498char *mt_item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
499    char *ret;
500
501    pthread_mutex_lock(&cache_lock);
502    ret = do_item_cachedump(slabs_clsid, limit, bytes);
503    pthread_mutex_unlock(&cache_lock);
504    return ret;
505}
506
507/*
508 * Dumps statistics about slab classes
509 */
510char *mt_item_stats(int *bytes) {
511    char *ret;
512
513    pthread_mutex_lock(&cache_lock);
514    ret = do_item_stats(bytes);
515    pthread_mutex_unlock(&cache_lock);
516    return ret;
517}
518
519/*
520 * Dumps a list of objects of each size in 32-byte increments
521 */
522char *mt_item_stats_sizes(int *bytes) {
523    char *ret;
524
525    pthread_mutex_lock(&cache_lock);
526    ret = do_item_stats_sizes(bytes);
527    pthread_mutex_unlock(&cache_lock);
528    return ret;
529}
530
531/****************************** HASHTABLE MODULE *****************************/
532
533void mt_assoc_move_next_bucket() {
534    pthread_mutex_lock(&cache_lock);
535    do_assoc_move_next_bucket();
536    pthread_mutex_unlock(&cache_lock);
537}
538
539/******************************* SLAB ALLOCATOR ******************************/
540
541void *mt_slabs_alloc(size_t size) {
542    void *ret;
543
544    pthread_mutex_lock(&slabs_lock);
545    ret = do_slabs_alloc(size);
546    pthread_mutex_unlock(&slabs_lock);
547    return ret;
548}
549
550void mt_slabs_free(void *ptr, size_t size) {
551    pthread_mutex_lock(&slabs_lock);
552    do_slabs_free(ptr, size);
553    pthread_mutex_unlock(&slabs_lock);
554}
555
556char *mt_slabs_stats(int *buflen) {
557    char *ret;
558
559    pthread_mutex_lock(&slabs_lock);
560    ret = do_slabs_stats(buflen);
561    pthread_mutex_unlock(&slabs_lock);
562    return ret;
563}
564
565#ifdef ALLOW_SLABS_REASSIGN
566int mt_slabs_reassign(unsigned char srcid, unsigned char dstid) {
567    int ret;
568
569    pthread_mutex_lock(&slabs_lock);
570    ret = do_slabs_reassign(srcid, dstid);
571    pthread_mutex_unlock(&slabs_lock);
572    return ret;
573}
574#endif
575
576/******************************* GLOBAL STATS ******************************/
577
578void mt_stats_lock() {
579    pthread_mutex_lock(&stats_lock);
580}
581
582void mt_stats_unlock() {
583    pthread_mutex_unlock(&stats_lock);
584}
585
586/*
587 * Initializes the thread subsystem, creating various worker threads.
588 *
589 * nthreads  Number of event handler threads to spawn
590 * main_base Event base for main thread
591 */
592void thread_init(int nthreads, struct event_base *main_base) {
593    int         i;
594
595    pthread_mutex_init(&cache_lock, NULL);
596    pthread_mutex_init(&conn_lock, NULL);
597    pthread_mutex_init(&slabs_lock, NULL);
598    pthread_mutex_init(&stats_lock, NULL);
599
600    pthread_mutex_init(&init_lock, NULL);
601    pthread_cond_init(&init_cond, NULL);
602
603    pthread_mutex_init(&cqi_freelist_lock, NULL);
604    cqi_freelist = NULL;
605
606    threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);
607    if (! threads) {
608        perror("Can't allocate thread descriptors");
609        exit(1);
610    }
611
612    threads[0].base = main_base;
613    threads[0].thread_id = pthread_self();
614
615    for (i = 0; i < nthreads; i++) {
616        int fds[2];
617        if (pipe(fds)) {
618            perror("Can't create notify pipe");
619            exit(1);
620        }
621
622        threads[i].notify_receive_fd = fds[0];
623        threads[i].notify_send_fd = fds[1];
624
625    setup_thread(&threads[i]);
626    }
627
628    /* Create threads after we've done all the libevent setup. */
629    for (i = 1; i < nthreads; i++) {
630        create_worker(worker_libevent, &threads[i]);
631    }
632
633    /* Wait for all the threads to set themselves up before returning. */
634    pthread_mutex_lock(&init_lock);
635    init_count++; /* main thread */
636    while (init_count < nthreads) {
637        pthread_cond_wait(&init_cond, &init_lock);
638    }
639    pthread_mutex_unlock(&init_lock);
640}
641
642#endif
Note: See TracBrowser for help on using the browser.