root/trunk/server/memcached.c @ 627

Revision 627, 83.7 kB (checked in by plindner, 2 years ago)

update for prepend operation, thread safe version from Maxim replacing Filipe's implementation

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <sys/signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(const bool is_udp);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92/* time handling */
93static void set_current_time(void);  /* update the global variable holding
94                              global 32-bit seconds-since-start time
95                              (to avoid 64 bit time_t) */
96
97void pre_gdb(void);
98static void conn_free(conn *c);
99
100/** exported globals **/
101struct stats stats;
102struct settings settings;
103
104/** file scope variables **/
105static item **todelete = NULL;
106static int delcurr;
107static int deltotal;
108static conn *listen_conn;
109static struct event_base *main_base;
110
111#define TRANSMIT_COMPLETE   0
112#define TRANSMIT_INCOMPLETE 1
113#define TRANSMIT_SOFT_ERROR 2
114#define TRANSMIT_HARD_ERROR 3
115
116static int *buckets = 0; /* bucket->generation array for a managed instance */
117
118#define REALTIME_MAXDELTA 60*60*24*30
119/*
120 * given time value that's either unix time or delta from current unix time, return
121 * unix time. Use the fact that delta can't exceed one month (and real time value can't
122 * be that low).
123 */
124static rel_time_t realtime(const time_t exptime) {
125    /* no. of seconds in 30 days - largest possible delta exptime */
126
127    if (exptime == 0) return 0; /* 0 means never expire */
128
129    if (exptime > REALTIME_MAXDELTA) {
130        /* if item expiration is at/before the server started, give it an
131           expiration time of 1 second after the server started.
132           (because 0 means don't expire).  without this, we'd
133           underflow and wrap around to some large value way in the
134           future, effectively making items expiring in the past
135           really expiring never */
136        if (exptime <= stats.started)
137            return (rel_time_t)1;
138        return (rel_time_t)(exptime - stats.started);
139    } else {
140        return (rel_time_t)(exptime + current_time);
141    }
142}
143
144static void stats_init(void) {
145    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
146    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
147    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
148
149    /* make the time we started always be 2 seconds before we really
150       did, so time(0) - time.started is never zero.  if so, things
151       like 'settings.oldest_live' which act as booleans as well as
152       values are now false in boolean context... */
153    stats.started = time(0) - 2;
154    stats_prefix_init();
155}
156
157static void stats_reset(void) {
158    STATS_LOCK();
159    stats.total_items = stats.total_conns = 0;
160    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
161    stats.bytes_read = stats.bytes_written = 0;
162    stats_prefix_clear();
163    STATS_UNLOCK();
164}
165
166static void settings_init(void) {
167    settings.port = 11211;
168    settings.udpport = 0;
169    settings.interf.s_addr = htonl(INADDR_ANY);
170    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
171    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
172    settings.verbose = 0;
173    settings.oldest_live = 0;
174    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
175    settings.socketpath = NULL;       /* by default, not using a unix socket */
176    settings.managed = false;
177    settings.factor = 1.25;
178    settings.chunk_size = 48;         /* space for a modest key and value */
179#ifdef USE_THREADS
180    settings.num_threads = 4;
181#else
182    settings.num_threads = 1;
183#endif
184    settings.prefix_delimiter = ':';
185    settings.detail_enabled = 0;
186}
187
188/* returns true if a deleted item's delete-locked-time is over, and it
189   should be removed from the namespace */
190static bool item_delete_lock_over (item *it) {
191    assert(it->it_flags & ITEM_DELETED);
192    return (current_time >= it->exptime);
193}
194
195/*
196 * Adds a message header to a connection.
197 *
198 * Returns 0 on success, -1 on out-of-memory.
199 */
200static int add_msghdr(conn *c)
201{
202    struct msghdr *msg;
203
204    assert(c != NULL);
205
206    if (c->msgsize == c->msgused) {
207        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
208        if (! msg)
209            return -1;
210        c->msglist = msg;
211        c->msgsize *= 2;
212    }
213
214    msg = c->msglist + c->msgused;
215
216    /* this wipes msg_iovlen, msg_control, msg_controllen, and
217       msg_flags, the last 3 of which aren't defined on solaris: */
218    memset(msg, 0, sizeof(struct msghdr));
219
220    msg->msg_iov = &c->iov[c->iovused];
221
222    if (c->request_addr_size > 0) {
223        msg->msg_name = &c->request_addr;
224        msg->msg_namelen = c->request_addr_size;
225    }
226
227    c->msgbytes = 0;
228    c->msgused++;
229
230    if (c->udp) {
231        /* Leave room for the UDP header, which we'll fill in later. */
232        return add_iov(c, NULL, UDP_HEADER_SIZE);
233    }
234
235    return 0;
236}
237
238
239/*
240 * Free list management for connections.
241 */
242
243static conn **freeconns;
244static int freetotal;
245static int freecurr;
246
247
248static void conn_init(void) {
249    freetotal = 200;
250    freecurr = 0;
251    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
252        perror("malloc()");
253    }
254    return;
255}
256
257/*
258 * Returns a connection from the freelist, if any. Should call this using
259 * conn_from_freelist() for thread safety.
260 */
261conn *do_conn_from_freelist() {
262    conn *c;
263
264    if (freecurr > 0) {
265        c = freeconns[--freecurr];
266    } else {
267        c = NULL;
268    }
269
270    return c;
271}
272
273/*
274 * Adds a connection to the freelist. 0 = success. Should call this using
275 * conn_add_to_freelist() for thread safety.
276 */
277bool do_conn_add_to_freelist(conn *c) {
278    if (freecurr < freetotal) {
279        freeconns[freecurr++] = c;
280        return false;
281    } else {
282        /* try to enlarge free connections array */
283        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
284        if (new_freeconns) {
285            freetotal *= 2;
286            freeconns = new_freeconns;
287            freeconns[freecurr++] = c;
288            return false;
289        }
290    }
291    return true;
292}
293
294conn *conn_new(const int sfd, const int init_state, const int event_flags,
295                const int read_buffer_size, const bool is_udp, struct event_base *base) {
296    conn *c = conn_from_freelist();
297
298    if (NULL == c) {
299        if (!(c = (conn *)malloc(sizeof(conn)))) {
300            perror("malloc()");
301            return NULL;
302        }
303        c->rbuf = c->wbuf = 0;
304        c->ilist = 0;
305        c->iov = 0;
306        c->msglist = 0;
307        c->hdrbuf = 0;
308
309        c->rsize = read_buffer_size;
310        c->wsize = DATA_BUFFER_SIZE;
311        c->isize = ITEM_LIST_INITIAL;
312        c->iovsize = IOV_LIST_INITIAL;
313        c->msgsize = MSG_LIST_INITIAL;
314        c->hdrsize = 0;
315
316        c->rbuf = (char *)malloc((size_t)c->rsize);
317        c->wbuf = (char *)malloc((size_t)c->wsize);
318        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
319        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
320        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
321
322        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
323                c->msglist == 0) {
324            if (c->rbuf != 0) free(c->rbuf);
325            if (c->wbuf != 0) free(c->wbuf);
326            if (c->ilist !=0) free(c->ilist);
327            if (c->iov != 0) free(c->iov);
328            if (c->msglist != 0) free(c->msglist);
329            free(c);
330            perror("malloc()");
331            return NULL;
332        }
333
334        STATS_LOCK();
335        stats.conn_structs++;
336        STATS_UNLOCK();
337    }
338
339    if (settings.verbose > 1) {
340        if (init_state == conn_listening)
341            fprintf(stderr, "<%d server listening\n", sfd);
342        else if (is_udp)
343            fprintf(stderr, "<%d server listening (udp)\n", sfd);
344        else
345            fprintf(stderr, "<%d new client connection\n", sfd);
346    }
347
348    c->sfd = sfd;
349    c->udp = is_udp;
350    c->state = init_state;
351    c->rlbytes = 0;
352    c->rbytes = c->wbytes = 0;
353    c->wcurr = c->wbuf;
354    c->rcurr = c->rbuf;
355    c->ritem = 0;
356    c->icurr = c->ilist;
357    c->ileft = 0;
358    c->iovused = 0;
359    c->msgcurr = 0;
360    c->msgused = 0;
361
362    c->write_and_go = conn_read;
363    c->write_and_free = 0;
364    c->item = 0;
365    c->bucket = -1;
366    c->gen = 0;
367
368    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
369    event_base_set(base, &c->event);
370    c->ev_flags = event_flags;
371
372    if (event_add(&c->event, 0) == -1) {
373        if (conn_add_to_freelist(c)) {
374            conn_free(c);
375        }
376        return NULL;
377    }
378
379    STATS_LOCK();
380    stats.curr_conns++;
381    stats.total_conns++;
382    STATS_UNLOCK();
383
384    return c;
385}
386
387static void conn_cleanup(conn *c) {
388    assert(c != NULL);
389
390    if (c->item) {
391        item_remove(c->item);
392        c->item = 0;
393    }
394
395    if (c->ileft != 0) {
396        for (; c->ileft > 0; c->ileft--,c->icurr++) {
397            item_remove(*(c->icurr));
398        }
399    }
400
401    if (c->write_and_free) {
402        free(c->write_and_free);
403        c->write_and_free = 0;
404    }
405}
406
407/*
408 * Frees a connection.
409 */
410void conn_free(conn *c) {
411    if (c) {
412        if (c->hdrbuf)
413            free(c->hdrbuf);
414        if (c->msglist)
415            free(c->msglist);
416        if (c->rbuf)
417            free(c->rbuf);
418        if (c->wbuf)
419            free(c->wbuf);
420        if (c->ilist)
421            free(c->ilist);
422        if (c->iov)
423            free(c->iov);
424        free(c);
425    }
426}
427
428static void conn_close(conn *c) {
429    assert(c != NULL);
430
431    /* delete the event, the socket and the conn */
432    event_del(&c->event);
433
434    if (settings.verbose > 1)
435        fprintf(stderr, "<%d connection closed.\n", c->sfd);
436
437    close(c->sfd);
438    accept_new_conns(true);
439    conn_cleanup(c);
440
441    /* if the connection has big buffers, just free it */
442    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
443        conn_free(c);
444    }
445
446    STATS_LOCK();
447    stats.curr_conns--;
448    STATS_UNLOCK();
449
450    return;
451}
452
453
454/*
455 * Shrinks a connection's buffers if they're too big.  This prevents
456 * periodic large "get" requests from permanently chewing lots of server
457 * memory.
458 *
459 * This should only be called in between requests since it can wipe output
460 * buffers!
461 */
462static void conn_shrink(conn *c) {
463    assert(c != NULL);
464
465    if (c->udp)
466        return;
467
468    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
469        char *newbuf;
470
471        if (c->rcurr != c->rbuf)
472            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
473
474        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
475
476        if (newbuf) {
477            c->rbuf = newbuf;
478            c->rsize = DATA_BUFFER_SIZE;
479        }
480        /* TODO check other branch... */
481        c->rcurr = c->rbuf;
482    }
483
484    if (c->isize > ITEM_LIST_HIGHWAT) {
485        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
486        if (newbuf) {
487            c->ilist = newbuf;
488            c->isize = ITEM_LIST_INITIAL;
489        }
490    /* TODO check error condition? */
491    }
492
493    if (c->msgsize > MSG_LIST_HIGHWAT) {
494        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
495        if (newbuf) {
496            c->msglist = newbuf;
497            c->msgsize = MSG_LIST_INITIAL;
498        }
499    /* TODO check error condition? */
500    }
501
502    if (c->iovsize > IOV_LIST_HIGHWAT) {
503        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
504        if (newbuf) {
505            c->iov = newbuf;
506            c->iovsize = IOV_LIST_INITIAL;
507        }
508    /* TODO check return value */
509    }
510}
511
512/*
513 * Sets a connection's current state in the state machine. Any special
514 * processing that needs to happen on certain state transitions can
515 * happen here.
516 */
517static void conn_set_state(conn *c, int state) {
518    assert(c != NULL);
519
520    if (state != c->state) {
521        if (state == conn_read) {
522            conn_shrink(c);
523            assoc_move_next_bucket();
524        }
525        c->state = state;
526    }
527}
528
529
530/*
531 * Ensures that there is room for another struct iovec in a connection's
532 * iov list.
533 *
534 * Returns 0 on success, -1 on out-of-memory.
535 */
536static int ensure_iov_space(conn *c) {
537    assert(c != NULL);
538
539    if (c->iovused >= c->iovsize) {
540        int i, iovnum;
541        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
542                                (c->iovsize * 2) * sizeof(struct iovec));
543        if (! new_iov)
544            return -1;
545        c->iov = new_iov;
546        c->iovsize *= 2;
547
548        /* Point all the msghdr structures at the new list. */
549        for (i = 0, iovnum = 0; i < c->msgused; i++) {
550            c->msglist[i].msg_iov = &c->iov[iovnum];
551            iovnum += c->msglist[i].msg_iovlen;
552        }
553    }
554
555    return 0;
556}
557
558
559/*
560 * Adds data to the list of pending data that will be written out to a
561 * connection.
562 *
563 * Returns 0 on success, -1 on out-of-memory.
564 */
565
566static int add_iov(conn *c, const void *buf, int len) {
567    struct msghdr *m;
568    int leftover;
569    bool limit_to_mtu;
570
571    assert(c != NULL);
572
573    do {
574        m = &c->msglist[c->msgused - 1];
575
576        /*
577         * Limit UDP packets, and the first payloads of TCP replies, to
578         * UDP_MAX_PAYLOAD_SIZE bytes.
579         */
580        limit_to_mtu = c->udp || (1 == c->msgused);
581
582        /* We may need to start a new msghdr if this one is full. */
583        if (m->msg_iovlen == IOV_MAX ||
584            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
585            add_msghdr(c);
586            m = &c->msglist[c->msgused - 1];
587        }
588
589        if (ensure_iov_space(c) != 0)
590            return -1;
591
592        /* If the fragment is too big to fit in the datagram, split it up */
593        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
594            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
595            len -= leftover;
596        } else {
597            leftover = 0;
598        }
599
600        m = &c->msglist[c->msgused - 1];
601        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
602        m->msg_iov[m->msg_iovlen].iov_len = len;
603
604        c->msgbytes += len;
605        c->iovused++;
606        m->msg_iovlen++;
607
608        buf = ((char *)buf) + len;
609        len = leftover;
610    } while (leftover > 0);
611
612    return 0;
613}
614
615
616/*
617 * Constructs a set of UDP headers and attaches them to the outgoing messages.
618 */
619static int build_udp_headers(conn *c) {
620    int i;
621    unsigned char *hdr;
622
623    assert(c != NULL);
624
625    if (c->msgused > c->hdrsize) {
626        void *new_hdrbuf;
627        if (c->hdrbuf)
628            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
629        else
630            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
631        if (! new_hdrbuf)
632            return -1;
633        c->hdrbuf = (unsigned char *)new_hdrbuf;
634        c->hdrsize = c->msgused * 2;
635    }
636
637    hdr = c->hdrbuf;
638    for (i = 0; i < c->msgused; i++) {
639        c->msglist[i].msg_iov[0].iov_base = hdr;
640        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
641        *hdr++ = c->request_id / 256;
642        *hdr++ = c->request_id % 256;
643        *hdr++ = i / 256;
644        *hdr++ = i % 256;
645        *hdr++ = c->msgused / 256;
646        *hdr++ = c->msgused % 256;
647        *hdr++ = 0;
648        *hdr++ = 0;
649        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
650    }
651
652    return 0;
653}
654
655
656static void out_string(conn *c, const char *str) {
657    size_t len;
658
659    assert(c != NULL);
660
661    if (settings.verbose > 1)
662        fprintf(stderr, ">%d %s\n", c->sfd, str);
663
664    len = strlen(str);
665    if ((len + 2) > c->wsize) {
666        /* ought to be always enough. just fail for simplicity */
667        str = "SERVER_ERROR output line too long";
668        len = strlen(str);
669    }
670
671    memcpy(c->wbuf, str, len);
672    memcpy(c->wbuf + len, "\r\n", 3);
673    c->wbytes = len + 2;
674    c->wcurr = c->wbuf;
675
676    conn_set_state(c, conn_write);
677    c->write_and_go = conn_read;
678    return;
679}
680
681/*
682 * we get here after reading the value in set/add/replace commands. The command
683 * has been stored in c->item_comm, and the item is ready in c->item.
684 */
685
686static void complete_nread(conn *c) {
687    assert(c != NULL);
688
689    item *it = c->item;
690    int comm = c->item_comm;
691
692    STATS_LOCK();
693    stats.set_cmds++;
694    STATS_UNLOCK();
695
696    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
697        out_string(c, "CLIENT_ERROR bad data chunk");
698    } else {
699        if (store_item(it, comm)) {
700            out_string(c, "STORED");
701        } else {
702            out_string(c, "NOT_STORED");
703        }
704    }
705
706    item_remove(c->item);       /* release the c->item reference */
707    c->item = 0;
708}
709
710/*
711 * Stores an item in the cache according to the semantics of one of the set
712 * commands. In threaded mode, this is protected by the cache lock.
713 *
714 * Returns true if the item was stored.
715 */
716int do_store_item(item *it, int comm) {
717    char *key = ITEM_key(it);
718    bool delete_locked = false;
719    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
720    int stored = 0;
721
722    item *new_it = NULL;
723    int flags;
724
725    if (old_it != NULL && comm == NREAD_ADD) {
726        /* add only adds a nonexistent item, but promote to head of LRU */
727        do_item_update(old_it);
728    } else if (!old_it && (comm == NREAD_REPLACE
729        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
730    {
731        /* replace only replaces an existing value; don't store */
732    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
733        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
734    {
735        /* replace and add can't override delete locks; don't store */
736    } else {
737
738        /*
739         * Append - combine new and old record into single one. Here it's
740         * atomic and thread-safe.
741         */
742
743        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
744
745            /* we have it and old_it here - alloc memory to hold both */
746            /* flags was already lost - so recover them from ITEM_suffix(it) */
747
748            flags = (int) strtol(ITEM_suffix(it), (char **) NULL, 10);
749
750            new_it = do_item_alloc(key, it->nkey, flags, it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
751
752            if (new_it == NULL) {
753                /* SERVER_ERROR out of memory */
754                return 0;
755            }
756
757            /* copy data from it and old_it to new_it */
758
759            if (comm == NREAD_APPEND) {
760                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
761                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
762            } else {
763                /* NREAD_PREPEND */ 
764                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
765                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
766            }
767
768            it = new_it;
769        }
770
771        /* "set" commands can override the delete lock
772           window... in which case we have to find the old hidden item
773           that's in the namespace/LRU but wasn't returned by
774           item_get.... because we need to replace it */
775        if (delete_locked)
776            old_it = do_item_get_nocheck(key, it->nkey);
777
778        if (old_it != NULL)
779            do_item_replace(old_it, it);
780        else
781            do_item_link(it);
782
783        stored = 1;
784    }
785
786    if (old_it != NULL)
787        do_item_remove(old_it);         /* release our reference */
788    if (new_it != NULL)
789        do_item_remove(new_it);
790
791    return stored;
792}
793
794typedef struct token_s {
795    char *value;
796    size_t length;
797} token_t;
798
799#define COMMAND_TOKEN 0
800#define SUBCOMMAND_TOKEN 1
801#define KEY_TOKEN 1
802#define KEY_MAX_LENGTH 250
803
804#define MAX_TOKENS 6
805
806/*
807 * Tokenize the command string by replacing whitespace with '\0' and update
808 * the token array tokens with pointer to start of each token and length.
809 * Returns total number of tokens.  The last valid token is the terminal
810 * token (value points to the first unprocessed character of the string and
811 * length zero).
812 *
813 * Usage example:
814 *
815 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
816 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
817 *          ...
818 *      }
819 *      ncommand = tokens[ix].value - command;
820 *      command  = tokens[ix].value;
821 *   }
822 */
823static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
824    char *s, *e;
825    size_t ntokens = 0;
826
827    assert(command != NULL && tokens != NULL && max_tokens > 1);
828
829    for (s = e = command; ntokens < max_tokens - 1; ++e) {
830        if (*e == ' ') {
831            if (s != e) {
832                tokens[ntokens].value = s;
833                tokens[ntokens].length = e - s;
834                ntokens++;
835                *e = '\0';
836            }
837            s = e + 1;
838        }
839        else if (*e == '\0') {
840            if (s != e) {
841                tokens[ntokens].value = s;
842                tokens[ntokens].length = e - s;
843                ntokens++;
844            }
845
846            break; /* string end */
847        }
848    }
849
850    /*
851     * If we scanned the whole string, the terminal value pointer is null,
852     * otherwise it is the first unprocessed character.
853     */
854    tokens[ntokens].value =  *e == '\0' ? NULL : e;
855    tokens[ntokens].length = 0;
856    ntokens++;
857
858    return ntokens;
859}
860
861/* set up a connection to write a buffer then free it, used for stats */
862static void write_and_free(conn *c, char *buf, int bytes) {
863    if (buf) {
864        c->write_and_free = buf;
865        c->wcurr = buf;
866        c->wbytes = bytes;
867        conn_set_state(c, conn_write);
868        c->write_and_go = conn_read;
869    } else {
870        out_string(c, "SERVER_ERROR out of memory");
871    }
872}
873
874inline static void process_stats_detail(conn *c, const char *command) {
875    assert(c != NULL);
876
877    if (strcmp(command, "on") == 0) {
878        settings.detail_enabled = 1;
879        out_string(c, "OK");
880    }
881    else if (strcmp(command, "off") == 0) {
882        settings.detail_enabled = 0;
883        out_string(c, "OK");
884    }
885    else if (strcmp(command, "dump") == 0) {
886        int len;
887        char *stats = stats_prefix_dump(&len);
888        write_and_free(c, stats, len);
889    }
890    else {
891        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
892    }
893}
894
895static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
896    rel_time_t now = current_time;
897    char *command;
898    char *subcommand;
899
900    assert(c != NULL);
901
902    if(ntokens < 2) {
903        out_string(c, "CLIENT_ERROR bad command line");
904        return;
905    }
906
907    command = tokens[COMMAND_TOKEN].value;
908
909    if (ntokens == 2 && strcmp(command, "stats") == 0) {
910        char temp[1024];
911        pid_t pid = getpid();
912        char *pos = temp;
913
914#ifndef WIN32
915        struct rusage usage;
916        getrusage(RUSAGE_SELF, &usage);
917#endif /* !WIN32 */
918
919        STATS_LOCK();
920        pos += sprintf(pos, "STAT pid %u\r\n", pid);
921        pos += sprintf(pos, "STAT uptime %u\r\n", now);
922        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
923        pos += sprintf(pos, "STAT version " VERSION "\r\n");
924        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
925#ifndef WIN32
926        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
927        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
928#endif /* !WIN32 */
929        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
930        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
931        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
932        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
933        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
934        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
935        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
936        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
937        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
938        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
939        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
940        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
941        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
942        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
943        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
944        pos += sprintf(pos, "END");
945        STATS_UNLOCK();
946        out_string(c, temp);
947        return;
948    }
949
950    subcommand = tokens[SUBCOMMAND_TOKEN].value;
951
952    if (strcmp(subcommand, "reset") == 0) {
953        stats_reset();
954        out_string(c, "RESET");
955        return;
956    }
957
958#ifdef HAVE_MALLOC_H
959#ifdef HAVE_STRUCT_MALLINFO
960    if (strcmp(subcommand, "malloc") == 0) {
961        char temp[512];
962        struct mallinfo info;
963        char *pos = temp;
964
965        info = mallinfo();
966        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
967        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
968        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
969        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
970        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
971        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
972        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
973        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
974        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
975        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
976        out_string(c, temp);
977        return;
978    }
979#endif /* HAVE_STRUCT_MALLINFO */
980#endif /* HAVE_MALLOC_H */
981
982#if !defined(WIN32) || !defined(__APPLE__)
983    if (strcmp(subcommand, "maps") == 0) {
984        char *wbuf;
985        int wsize = 8192; /* should be enough */
986        int fd;
987        int res;
988
989        if ((wbuf = (char *)malloc(wsize)) == NULL) {
990            out_string(c, "SERVER_ERROR out of memory");
991            return;
992        }
993
994        fd = open("/proc/self/maps", O_RDONLY);
995        if (fd == -1) {
996            out_string(c, "SERVER_ERROR cannot open the maps file");
997            free(wbuf);
998            return;
999        }
1000
1001        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1002        if (res == wsize - 6) {
1003            out_string(c, "SERVER_ERROR buffer overflow");
1004            free(wbuf); close(fd);
1005            return;
1006        }
1007        if (res == 0 || res == -1) {
1008            out_string(c, "SERVER_ERROR can't read the maps file");
1009            free(wbuf); close(fd);
1010            return;
1011        }
1012        memcpy(wbuf + res, "END\r\n", 5);
1013        write_and_free(c, wbuf, res + 5);
1014        close(fd);
1015        return;
1016    }
1017#endif
1018
1019    if (strcmp(subcommand, "cachedump") == 0) {
1020
1021        char *buf;
1022        unsigned int bytes, id, limit = 0;
1023
1024        if(ntokens < 5) {
1025            out_string(c, "CLIENT_ERROR bad command line");
1026            return;
1027        }
1028
1029        id = strtoul(tokens[2].value, NULL, 10);
1030        limit = strtoul(tokens[3].value, NULL, 10);
1031
1032        if(errno == ERANGE) {
1033            out_string(c, "CLIENT_ERROR bad command line format");
1034            return;
1035        }
1036
1037        buf = item_cachedump(id, limit, &bytes);
1038        write_and_free(c, buf, bytes);
1039        return;
1040    }
1041
1042    if (strcmp(subcommand, "slabs") == 0) {
1043        int bytes = 0;
1044        char *buf = slabs_stats(&bytes);
1045        write_and_free(c, buf, bytes);
1046        return;
1047    }
1048
1049    if (strcmp(subcommand, "items") == 0) {
1050        int bytes = 0;
1051        char *buf = item_stats(&bytes);
1052        write_and_free(c, buf, bytes);
1053        return;
1054    }
1055
1056    if (strcmp(subcommand, "detail") == 0) {
1057        if (ntokens < 4)
1058            process_stats_detail(c, "");  /* outputs the error message */
1059        else
1060            process_stats_detail(c, tokens[2].value);
1061        return;
1062    }
1063
1064    if (strcmp(subcommand, "sizes") == 0) {
1065        int bytes = 0;
1066        char *buf = item_stats_sizes(&bytes);
1067        write_and_free(c, buf, bytes);
1068        return;
1069    }
1070
1071    out_string(c, "ERROR");
1072}
1073
1074/* ntokens is overwritten here... shrug.. */
1075static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_key_ptr) {
1076    char *key;
1077    size_t nkey;
1078    int i = 0;
1079    item *it;
1080    token_t *key_token = &tokens[KEY_TOKEN];
1081    char suffix[255];
1082    uint32_t in_memory_ptr;
1083    assert(c != NULL);
1084
1085    if (settings.managed) {
1086        int bucket = c->bucket;
1087        if (bucket == -1) {
1088            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1089            return;
1090        }
1091        c->bucket = -1;
1092        if (buckets[bucket] != c->gen) {
1093            out_string(c, "ERROR_NOT_OWNER");
1094            return;
1095        }
1096    }
1097
1098    do {
1099        while(key_token->length != 0) {
1100
1101            key = key_token->value;
1102            nkey = key_token->length;
1103
1104            if(nkey > KEY_MAX_LENGTH) {
1105                out_string(c, "CLIENT_ERROR bad command line format");
1106                return;
1107            }
1108
1109            STATS_LOCK();
1110            stats.get_cmds++;
1111            STATS_UNLOCK();
1112            it = item_get(key, nkey);
1113            if (settings.detail_enabled) {
1114                stats_prefix_record_get(key, NULL != it);
1115            }
1116            if (it) {
1117                if (i >= c->isize) {
1118                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1119                    if (new_list) {
1120                        c->isize *= 2;
1121                        c->ilist = new_list;
1122                    } else break;
1123                }
1124
1125                /*
1126                 * Construct the response. Each hit adds three elements to the
1127                 * outgoing data list:
1128                 *   "VALUE "
1129                 *   key
1130                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1131                 */
1132
1133                if(return_key_ptr == true)
1134                {
1135                  in_memory_ptr = (uint32_t)item_get(key, nkey);
1136                  sprintf(suffix," %d %d %lu\r\n", atoi(ITEM_suffix(it) + 1), it->nbytes - 2, in_memory_ptr);
1137                  if (add_iov(c, "VALUE ", 6) != 0 ||
1138                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1139                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1140                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1141                      {
1142                          break;
1143                      }
1144                }
1145                else
1146                {
1147                  if (add_iov(c, "VALUE ", 6) != 0 ||
1148                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1149                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1150                      {
1151                          break;
1152                      }
1153                }
1154
1155
1156                if (settings.verbose > 1)
1157                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1158
1159                /* item_get() has incremented it->refcount for us */
1160                STATS_LOCK();
1161                stats.get_hits++;
1162                STATS_UNLOCK();
1163                item_update(it);
1164                *(c->ilist + i) = it;
1165                i++;
1166
1167            } else {
1168                STATS_LOCK();
1169                stats.get_misses++;
1170                STATS_UNLOCK();
1171            }
1172
1173            key_token++;
1174        }
1175
1176        /*
1177         * If the command string hasn't been fully processed, get the next set
1178         * of tokens.
1179         */
1180        if(key_token->value != NULL) {
1181            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1182            key_token = tokens;
1183        }
1184
1185    } while(key_token->value != NULL);
1186
1187    c->icurr = c->ilist;
1188    c->ileft = i;
1189
1190    if (settings.verbose > 1)
1191        fprintf(stderr, ">%d END\n", c->sfd);
1192    add_iov(c, "END\r\n", 5);
1193
1194    if (c->udp && build_udp_headers(c) != 0) {
1195        out_string(c, "SERVER_ERROR out of memory");
1196    }
1197    else {
1198        conn_set_state(c, conn_mwrite);
1199        c->msgcurr = 0;
1200    }
1201    return;
1202}
1203
1204static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1205    char *key;
1206    size_t nkey;
1207    int flags;
1208    time_t exptime;
1209    int vlen, old_vlen;
1210    uint32_t req_memory_ptr, in_memory_ptr;
1211    item *it, *old_it;
1212
1213    assert(c != NULL);
1214
1215    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1216        out_string(c, "CLIENT_ERROR bad command line format");
1217        return;
1218    }
1219
1220    key = tokens[KEY_TOKEN].value;
1221    nkey = tokens[KEY_TOKEN].length;
1222
1223    flags = strtoul(tokens[2].value, NULL, 10);
1224    exptime = strtol(tokens[3].value, NULL, 10);
1225    vlen = strtol(tokens[4].value, NULL, 10);
1226
1227    // does cas value exist?
1228    if(tokens[5].value)
1229    {
1230      req_memory_ptr = strtoull(tokens[5].value, NULL, 10);
1231    }
1232
1233    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1234        out_string(c, "CLIENT_ERROR bad command line format");
1235        return;
1236    }
1237
1238    if (settings.detail_enabled) {
1239        stats_prefix_record_set(key);
1240    }
1241
1242    if (settings.managed) {
1243        int bucket = c->bucket;
1244        if (bucket == -1) {
1245            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1246            return;
1247        }
1248        c->bucket = -1;
1249        if (buckets[bucket] != c->gen) {
1250            out_string(c, "ERROR_NOT_OWNER");
1251            return;
1252        }
1253    }
1254
1255    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1256
1257    /* HANDLE_CAS VALIDATION */
1258    if (handle_cas == true)
1259    {
1260      item *itmp=item_get(key, it->nkey);
1261      /* Release the reference */
1262      if(itmp) {
1263        item_remove(itmp);
1264      }
1265      in_memory_ptr = (uint32_t)itmp;
1266      if(in_memory_ptr == req_memory_ptr)
1267      {
1268        // validates allow the set
1269      }
1270      else if(in_memory_ptr)
1271      {
1272        out_string(c, "EXISTS");
1273
1274        /* swallow the data line */
1275        c->write_and_go = conn_swallow;
1276        c->sbytes = vlen + 2;
1277        return;
1278      }
1279      else
1280      {
1281        out_string(c, "NOT FOUND");
1282        /* swallow the data line */
1283        c->write_and_go = conn_swallow;
1284        c->sbytes = vlen + 2;
1285        return;
1286      }
1287    }
1288
1289    if (it == 0) {
1290        if (! item_size_ok(nkey, flags, vlen + 2))
1291            out_string(c, "SERVER_ERROR object too large for cache");
1292        else
1293            out_string(c, "SERVER_ERROR out of memory");
1294        /* swallow the data line */
1295        c->write_and_go = conn_swallow;
1296        c->sbytes = vlen + 2;
1297        return;
1298    }
1299
1300    c->item = it;
1301    c->ritem = ITEM_data(it);
1302    c->rlbytes = it->nbytes;
1303    c->item_comm = comm;
1304    conn_set_state(c, conn_nread);
1305}
1306
1307static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
1308    char temp[sizeof("18446744073709551615")];
1309    item *it;
1310    int64_t delta;
1311    char *key;
1312    size_t nkey;
1313
1314    assert(c != NULL);
1315
1316    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1317        out_string(c, "CLIENT_ERROR bad command line format");
1318        return;
1319    }
1320
1321    key = tokens[KEY_TOKEN].value;
1322    nkey = tokens[KEY_TOKEN].length;
1323
1324    if (settings.managed) {
1325        int bucket = c->bucket;
1326        if (bucket == -1) {
1327            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1328            return;
1329        }
1330        c->bucket = -1;
1331        if (buckets[bucket] != c->gen) {
1332            out_string(c, "ERROR_NOT_OWNER");
1333            return;
1334        }
1335    }
1336
1337    delta = strtoll(tokens[2].value, NULL, 10);
1338
1339    if(errno == ERANGE) {
1340        out_string(c, "CLIENT_ERROR bad command line format");
1341        return;
1342    }
1343
1344    it = item_get(key, nkey);
1345    if (!it) {
1346        out_string(c, "NOT_FOUND");
1347        return;
1348    }
1349
1350    out_string(c, add_delta(it, incr, delta, temp));
1351    item_remove(it);         /* release our reference */
1352}
1353
1354/*
1355 * adds a delta value to a numeric item.
1356 *
1357 * it    item to adjust
1358 * incr  true to increment value, false to decrement
1359 * delta amount to adjust value by
1360 * buf   buffer for response string
1361 *
1362 * returns a response string to send back to the client.
1363 */
1364char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
1365    char *ptr;
1366    int64_t value;
1367    int res;
1368
1369    ptr = ITEM_data(it);
1370    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1371
1372    value = strtoull(ptr, NULL, 10);
1373
1374    if(errno == ERANGE) {
1375        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1376    }
1377
1378    if (incr)
1379        value += delta;
1380    else {
1381        if (delta >= value) value = 0;
1382        else value -= delta;
1383    }
1384    sprintf(buf, "%llu", value);
1385    res = strlen(buf);
1386    if (res + 2 > it->nbytes) { /* need to realloc */
1387        item *new_it;
1388        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1389        if (new_it == 0) {
1390            return "SERVER_ERROR out of memory";
1391        }
1392        memcpy(ITEM_data(new_it), buf, res);
1393        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1394        do_item_replace(it, new_it);
1395        do_item_remove(new_it);       /* release our reference */
1396    } else { /* replace in-place */
1397        memcpy(ITEM_data(it), buf, res);
1398        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1399    }
1400
1401    return buf;
1402}
1403
1404static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1405    char *key;
1406    size_t nkey;
1407    item *it;
1408    time_t exptime = 0;
1409
1410    assert(c != NULL);
1411
1412    if (settings.managed) {
1413        int bucket = c->bucket;
1414        if (bucket == -1) {
1415            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1416            return;
1417        }
1418        c->bucket = -1;
1419        if (buckets[bucket] != c->gen) {
1420            out_string(c, "ERROR_NOT_OWNER");
1421            return;
1422        }
1423    }
1424
1425    key = tokens[KEY_TOKEN].value;
1426    nkey = tokens[KEY_TOKEN].length;
1427
1428    if(nkey > KEY_MAX_LENGTH) {
1429        out_string(c, "CLIENT_ERROR bad command line format");
1430        return;
1431    }
1432
1433    if(ntokens == 4) {
1434        exptime = strtol(tokens[2].value, NULL, 10);
1435
1436        if(errno == ERANGE) {
1437            out_string(c, "CLIENT_ERROR bad command line format");
1438            return;
1439        }
1440    }
1441
1442    if (settings.detail_enabled) {
1443        stats_prefix_record_delete(key);
1444    }
1445
1446    it = item_get(key, nkey);
1447    if (it) {
1448        if (exptime == 0) {
1449            item_unlink(it);
1450            item_remove(it);      /* release our reference */
1451            out_string(c, "DELETED");
1452        } else {
1453            /* our reference will be transfered to the delete queue */
1454            out_string(c, defer_delete(it, exptime));
1455        }
1456    } else {
1457        out_string(c, "NOT_FOUND");
1458    }
1459}
1460
1461/*
1462 * Adds an item to the deferred-delete list so it can be reaped later.
1463 *
1464 * Returns the result to send to the client.
1465 */
1466char *do_defer_delete(item *it, time_t exptime)
1467{
1468    if (delcurr >= deltotal) {
1469        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1470        if (new_delete) {
1471            todelete = new_delete;
1472            deltotal *= 2;
1473        } else {
1474            /*
1475             * can't delete it immediately, user wants a delay,
1476             * but we ran out of memory for the delete queue
1477             */
1478            item_remove(it);    /* release reference */
1479            return "SERVER_ERROR out of memory";
1480        }
1481    }
1482
1483    /* use its expiration time as its deletion time now */
1484    it->exptime = realtime(exptime);
1485    it->it_flags |= ITEM_DELETED;
1486    todelete[delcurr++] = it;
1487
1488    return "DELETED";
1489}
1490
1491static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1492    unsigned int level;
1493
1494    assert(c != NULL);
1495
1496    level = strtoul(tokens[1].value, NULL, 10);
1497    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1498    out_string(c, "OK");
1499    return;
1500}
1501
1502static void process_command(conn *c, char *command) {
1503
1504    token_t tokens[MAX_TOKENS];
1505    size_t ntokens;
1506    int comm;
1507
1508    assert(c != NULL);
1509
1510    if (settings.verbose > 1)
1511        fprintf(stderr, "<%d %s\n", c->sfd, command);
1512
1513    /*
1514     * for commands set/add/replace, we build an item and read the data
1515     * directly into it, then continue in nread_complete().
1516     */
1517
1518    c->msgcurr = 0;
1519    c->msgused = 0;
1520    c->iovused = 0;
1521    if (add_msghdr(c) != 0) {
1522        out_string(c, "SERVER_ERROR out of memory");
1523        return;
1524    }
1525
1526    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1527    if (ntokens >= 3 &&
1528        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1529         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1530
1531        process_get_command(c, tokens, ntokens, false);
1532
1533    } else if (ntokens == 6 &&
1534               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1535                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1536                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
1537                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
1538                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
1539
1540        process_update_command(c, tokens, ntokens, comm, false);
1541
1542    } else if (ntokens == 6 && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0)) {
1543
1544        process_update_command(c, tokens, ntokens, comm, true);
1545
1546    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1547
1548        process_arithmetic_command(c, tokens, ntokens, 1);
1549
1550    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
1551
1552        process_get_command(c, tokens, ntokens, true);
1553
1554    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1555
1556        process_arithmetic_command(c, tokens, ntokens, 0);
1557
1558    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1559
1560        process_delete_command(c, tokens, ntokens);
1561
1562    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1563        unsigned int bucket, gen;
1564        if (!settings.managed) {
1565            out_string(c, "CLIENT_ERROR not a managed instance");
1566            return;
1567        }
1568
1569        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1570            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1571                out_string(c, "CLIENT_ERROR bucket number out of range");
1572                return;
1573            }
1574            buckets[bucket] = gen;
1575            out_string(c, "OWNED");
1576            return;
1577        } else {
1578            out_string(c, "CLIENT_ERROR bad format");
1579            return;
1580        }
1581
1582    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1583
1584        int bucket;
1585        if (!settings.managed) {
1586            out_string(c, "CLIENT_ERROR not a managed instance");
1587            return;
1588        }
1589        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1590            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1591                out_string(c, "CLIENT_ERROR bucket number out of range");
1592                return;
1593            }
1594            buckets[bucket] = 0;
1595            out_string(c, "DISOWNED");
1596            return;
1597        } else {
1598            out_string(c, "CLIENT_ERROR bad format");
1599            return;
1600        }
1601
1602    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1603        int bucket, gen;
1604        if (!settings.managed) {
1605            out_string(c, "CLIENT_ERROR not a managed instance");
1606            return;
1607        }
1608        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1609            /* we never write anything back, even if input's wrong */
1610            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1611                /* do nothing, bad input */
1612            } else {
1613                c->bucket = bucket;
1614                c->gen = gen;
1615            }
1616            conn_set_state(c, conn_read);
1617            return;
1618        } else {
1619            out_string(c, "CLIENT_ERROR bad format");
1620            return;
1621        }
1622
1623    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1624
1625        process_stat(c, tokens, ntokens);
1626
1627    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1628        time_t exptime = 0;
1629        set_current_time();
1630
1631        if(ntokens == 2) {
1632            settings.oldest_live = current_time - 1;
1633            item_flush_expired();
1634            out_string(c, "OK");
1635            return;
1636        }
1637
1638        exptime = strtol(tokens[1].value, NULL, 10);
1639        if(errno == ERANGE) {
1640            out_string(c, "CLIENT_ERROR bad command line format");
1641            return;
1642        }
1643
1644        settings.oldest_live = realtime(exptime) - 1;
1645        item_flush_expired();
1646        out_string(c, "OK");
1647        return;
1648
1649    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1650
1651        out_string(c, "VERSION " VERSION);
1652
1653    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1654
1655        conn_set_state(c, conn_closing);
1656
1657    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1658                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1659#ifdef ALLOW_SLABS_REASSIGN
1660
1661        int src, dst, rv;
1662
1663        src = strtol(tokens[2].value, NULL, 10);
1664        dst  = strtol(tokens[3].value, NULL, 10);
1665
1666        if(errno == ERANGE) {
1667            out_string(c, "CLIENT_ERROR bad command line format");
1668            return;
1669        }
1670
1671        rv = slabs_reassign(src, dst);
1672        if (rv == 1) {
1673            out_string(c, "DONE");
1674            return;
1675        }
1676        if (rv == 0) {
1677            out_string(c, "CANT");
1678            return;
1679        }
1680        if (rv == -1) {
1681            out_string(c, "BUSY");
1682            return;
1683        }
1684#else
1685        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1686#endif
1687    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1688        process_verbosity_command(c, tokens, ntokens);
1689    } else {
1690        out_string(c, "ERROR");
1691    }
1692    return;
1693}
1694
1695/*
1696 * if we have a complete line in the buffer, process it.
1697 */
1698static int try_read_command(conn *c) {
1699    char *el, *cont;
1700
1701    assert(c != NULL);
1702    assert(c->rcurr <= (c->rbuf + c->rsize));
1703
1704    if (c->rbytes == 0)
1705        return 0;
1706    el = memchr(c->rcurr, '\n', c->rbytes);
1707    if (!el)
1708        return 0;
1709    cont = el + 1;
1710    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1711        el--;
1712    }
1713    *el = '\0';
1714
1715    assert(cont <= (c->rcurr + c->rbytes));
1716
1717    process_command(c, c->rcurr);
1718
1719    c->rbytes -= (cont - c->rcurr);
1720    c->rcurr = cont;
1721
1722    assert(c->rcurr <= (c->rbuf + c->rsize));
1723
1724    return 1;
1725}
1726
1727/*
1728 * read a UDP request.
1729 * return 0 if there's nothing to read.
1730 */
1731static int try_read_udp(conn *c) {
1732    int res;
1733
1734    assert(c != NULL);
1735
1736    c->request_addr_size = sizeof(c->request_addr);
1737    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1738                   0, &c->request_addr, &c->request_addr_size);
1739    if (res > 8) {
1740        unsigned char *buf = (unsigned char *)c->rbuf;
1741        STATS_LOCK();
1742        stats.bytes_read += res;
1743        STATS_UNLOCK();
1744
1745        /* Beginning of UDP packet is the request ID; save it. */
1746        c->request_id = buf[0] * 256 + buf[1];
1747
1748        /* If this is a multi-packet request, drop it. */
1749        if (buf[4] != 0 || buf[5] != 1) {
1750            out_string(c, "SERVER_ERROR multi-packet request not supported");
1751            return 0;
1752        }
1753
1754        /* Don't care about any of the rest of the header. */
1755        res -= 8;
1756        memmove(c->rbuf, c->rbuf + 8, res);
1757
1758        c->rbytes += res;
1759        c->rcurr = c->rbuf;
1760        return 1;
1761    }
1762    return 0;
1763}
1764
1765/*
1766 * read from network as much as we can, handle buffer overflow and connection
1767 * close.
1768 * before reading, move the remaining incomplete fragment of a command
1769 * (if any) to the beginning of the buffer.
1770 * return 0 if there's nothing to read on the first read.
1771 */
1772static int try_read_network(conn *c) {
1773    int gotdata = 0;
1774    int res;
1775
1776    assert(c != NULL);
1777
1778    if (c->rcurr != c->rbuf) {
1779        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1780            memmove(c->rbuf, c->rcurr, c->rbytes);
1781        c->rcurr = c->rbuf;
1782    }
1783
1784    while (1) {
1785        if (c->rbytes >= c->rsize) {
1786            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1787            if (!new_rbuf) {
1788                if (settings.verbose > 0)
1789                    fprintf(stderr, "Couldn't realloc input buffer\n");
1790                c->rbytes = 0; /* ignore what we read */
1791                out_string(c, "SERVER_ERROR out of memory");
1792                c->write_and_go = conn_closing;
1793                return 1;
1794            }
1795            c->rcurr = c->rbuf = new_rbuf;
1796            c->rsize *= 2;
1797        }
1798
1799        /* unix socket mode doesn't need this, so zeroed out.  but why
1800         * is this done for every command?  presumably for UDP
1801         * mode.  */
1802        if (!settings.socketpath) {
1803            c->request_addr_size = sizeof(c->request_addr);
1804        } else {
1805            c->request_addr_size = 0;
1806        }
1807
1808        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
1809        if (res > 0) {
1810            STATS_LOCK();
1811            stats.bytes_read += res;
1812            STATS_UNLOCK();
1813            gotdata = 1;
1814            c->rbytes += res;
1815            continue;
1816        }
1817        if (res == 0) {
1818            /* connection closed */
1819            conn_set_state(c, conn_closing);
1820            return 1;
1821        }
1822        if (res == -1) {
1823            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1824            else return 0;
1825        }
1826    }
1827    return gotdata;
1828}
1829
1830static bool update_event(conn *c, const int new_flags) {
1831    assert(c != NULL);
1832
1833    struct event_base *base = c->event.ev_base;
1834    if (c->ev_flags == new_flags)
1835        return true;
1836    if (event_del(&c->event) == -1) return false;
1837    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1838    event_base_set(base, &c->event);
1839    c->ev_flags = new_flags;
1840    if (event_add(&c->event, 0) == -1) return false;
1841    return true;
1842}
1843
1844/*
1845 * Sets whether we are listening for new connections or not.
1846 */
1847void accept_new_conns(const bool do_accept) {
1848    if (! is_listen_thread())
1849        return;
1850    if (do_accept) {
1851        update_event(listen_conn, EV_READ | EV_PERSIST);
1852        if (listen(listen_conn->sfd, 1024) != 0) {
1853            perror("listen");
1854        }
1855    }
1856    else {
1857        update_event(listen_conn, 0);
1858        if (listen(listen_conn->sfd, 0) != 0) {
1859            perror("listen");
1860        }
1861    }
1862}
1863
1864
1865/*
1866 * Transmit the next chunk of data from our list of msgbuf structures.
1867 *
1868 * Returns:
1869 *   TRANSMIT_COMPLETE   All done writing.
1870 *   TRANSMIT_INCOMPLETE More data remaining to write.
1871 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
1872 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1873 */
1874static int transmit(conn *c) {
1875    assert(c != NULL);
1876
1877    if (c->msgcurr < c->msgused &&
1878            c->msglist[c->msgcurr].msg_iovlen == 0) {
1879        /* Finished writing the current msg; advance to the next. */
1880        c->msgcurr++;
1881    }
1882    if (c->msgcurr < c->msgused) {
1883        ssize_t res;
1884        struct msghdr *m = &c->msglist[c->msgcurr];
1885
1886        res = sendmsg(c->sfd, m, 0);
1887        if (res > 0) {
1888            STATS_LOCK();
1889            stats.bytes_written += res;
1890            STATS_UNLOCK();
1891
1892            /* We've written some of the data. Remove the completed
1893               iovec entries from the list of pending writes. */
1894            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
1895                res -= m->msg_iov->iov_len;
1896                m->msg_iovlen--;
1897                m->msg_iov++;
1898            }
1899
1900            /* Might have written just part of the last iovec entry;
1901               adjust it so the next write will do the rest. */
1902            if (res > 0) {
1903                m->msg_iov->iov_base += res;
1904                m->msg_iov->iov_len -= res;
1905            }
1906            return TRANSMIT_INCOMPLETE;
1907        }
1908        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1909            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1910                if (settings.verbose > 0)
1911                    fprintf(stderr, "Couldn't update event\n");
1912                conn_set_state(c, conn_closing);
1913                return TRANSMIT_HARD_ERROR;
1914            }
1915            return TRANSMIT_SOFT_ERROR;
1916        }
1917        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1918           we have a real error, on which we close the connection */
1919        if (settings.verbose > 0)
1920            perror("Failed to write, and not due to blocking");
1921
1922        if (c->udp)
1923            conn_set_state(c, conn_read);
1924        else
1925            conn_set_state(c, conn_closing);
1926        return TRANSMIT_HARD_ERROR;
1927    } else {
1928        return TRANSMIT_COMPLETE;
1929    }
1930}
1931
1932static void drive_machine(conn *c) {
1933    bool stop = false;
1934    int sfd, flags = 1;
1935    socklen_t addrlen;
1936    struct sockaddr addr;
1937    int res;
1938
1939    assert(c != NULL);
1940
1941    while (!stop) {
1942
1943        switch(c->state) {
1944        case conn_listening:
1945            addrlen = sizeof(addr);
1946            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
1947                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1948                    /* these are transient, so don't log anything */
1949                    stop = true;
1950                } else if (errno == EMFILE) {
1951                    if (settings.verbose > 0)
1952                        fprintf(stderr, "Too many open connections\n");
1953                    accept_new_conns(false);
1954                    stop = true;
1955                } else {
1956                    perror("accept()");
1957                    stop = true;
1958                }
1959                break;
1960            }
1961            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
1962                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
1963                perror("setting O_NONBLOCK");
1964                close(sfd);
1965                break;
1966            }
1967            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
1968                                     DATA_BUFFER_SIZE, false);
1969            break;
1970
1971        case conn_read:
1972            if (try_read_command(c) != 0) {
1973                continue;
1974            }
1975            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
1976                continue;
1977            }
1978            /* we have no command line and no data to read from network */
1979            if (!update_event(c, EV_READ | EV_PERSIST)) {
1980                if (settings.verbose > 0)
1981                    fprintf(stderr, "Couldn't update event\n");
1982                conn_set_state(c, conn_closing);
1983                break;
1984            }
1985            stop = true;
1986            break;
1987
1988        case conn_nread:
1989            /* we are reading rlbytes into ritem; */
1990            if (c->rlbytes == 0) {
1991                complete_nread(c);
1992                break;
1993            }
1994            /* first check if we have leftovers in the conn_read buffer */
1995            if (c->rbytes > 0) {
1996                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
1997                memcpy(c->ritem, c->rcurr, tocopy);
1998                c->ritem += tocopy;
1999                c->rlbytes -= tocopy;
2000                c->rcurr += tocopy;
2001                c->rbytes -= tocopy;
2002                break;
2003            }
2004
2005            /*  now try reading from the socket */
2006            res = read(c->sfd, c->ritem, c->rlbytes);
2007            if (res > 0) {
2008                STATS_LOCK();
2009                stats.bytes_read += res;
2010                STATS_UNLOCK();
2011                c->ritem += res;
2012                c->rlbytes -= res;
2013                break;
2014            }
2015            if (res == 0) { /* end of stream */
2016                conn_set_state(c, conn_closing);
2017                break;
2018            }
2019            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2020                if (!update_event(c, EV_READ | EV_PERSIST)) {
2021                    if (settings.verbose > 0)
2022                        fprintf(stderr, "Couldn't update event\n");
2023                    conn_set_state(c, conn_closing);
2024                    break;
2025                }
2026                stop = true;
2027                break;
2028            }
2029            /* otherwise we have a real error, on which we close the connection */
2030            if (settings.verbose > 0)
2031                fprintf(stderr, "Failed to read, and not due to blocking\n");
2032            conn_set_state(c, conn_closing);
2033            break;
2034
2035        case conn_swallow:
2036            /* we are reading sbytes and throwing them away */
2037            if (c->sbytes == 0) {
2038                conn_set_state(c, conn_read);
2039                break;
2040            }
2041
2042            /* first check if we have leftovers in the conn_read buffer */
2043            if (c->rbytes > 0) {
2044                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2045                c->sbytes -= tocopy;
2046                c->rcurr += tocopy;
2047                c->rbytes -= tocopy;
2048                break;
2049            }
2050
2051            /*  now try reading from the socket */
2052            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2053            if (res > 0) {
2054                STATS_LOCK();
2055                stats.bytes_read += res;
2056                STATS_UNLOCK();
2057                c->sbytes -= res;
2058                break;
2059            }
2060            if (res == 0) { /* end of stream */
2061                conn_set_state(c, conn_closing);
2062                break;
2063            }
2064            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2065                if (!update_event(c, EV_READ | EV_PERSIST)) {
2066                    if (settings.verbose > 0)
2067                        fprintf(stderr, "Couldn't update event\n");
2068                    conn_set_state(c, conn_closing);
2069                    break;
2070                }
2071                stop = true;
2072                break;
2073            }
2074            /* otherwise we have a real error, on which we close the connection */
2075            if (settings.verbose > 0)
2076                fprintf(stderr, "Failed to read, and not due to blocking\n");
2077            conn_set_state(c, conn_closing);
2078            break;
2079
2080        case conn_write:
2081            /*
2082             * We want to write out a simple response. If we haven't already,
2083             * assemble it into a msgbuf list (this will be a single-entry
2084             * list for TCP or a two-entry list for UDP).
2085             */
2086            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
2087                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2088                    (c->udp && build_udp_headers(c) != 0)) {
2089                    if (settings.verbose > 0)
2090                        fprintf(stderr, "Couldn't build response\n");
2091                    conn_set_state(c, conn_closing);
2092                    break;
2093                }
2094            }
2095
2096            /* fall through... */
2097
2098        case conn_mwrite:
2099            switch (transmit(c)) {
2100            case TRANSMIT_COMPLETE:
2101                if (c->state == conn_mwrite) {
2102                    while (c->ileft > 0) {
2103                        item *it = *(c->icurr);
2104                        assert((it->it_flags & ITEM_SLABBED) == 0);
2105                        item_remove(it);
2106                        c->icurr++;
2107                        c->ileft--;
2108                    }
2109                    conn_set_state(c, conn_read);
2110                } else if (c->state == conn_write) {
2111                    if (c->write_and_free) {
2112                        free(c->write_and_free);
2113                        c->write_and_free = 0;
2114                    }
2115                    conn_set_state(c, c->write_and_go);
2116                } else {
2117                    if (settings.verbose > 0)
2118                        fprintf(stderr, "Unexpected state %d\n", c->state);
2119                    conn_set_state(c, conn_closing);
2120                }
2121                break;
2122
2123            case TRANSMIT_INCOMPLETE:
2124            case TRANSMIT_HARD_ERROR:
2125                break;                   /* Continue in state machine. */
2126
2127            case TRANSMIT_SOFT_ERROR:
2128                stop = true;
2129                break;
2130            }
2131            break;
2132
2133        case conn_closing:
2134            if (c->udp)
2135                conn_cleanup(c);
2136            else
2137                conn_close(c);
2138            stop = true;
2139            break;
2140        }
2141    }
2142
2143    return;
2144}
2145
2146void event_handler(const int fd, const short which, void *arg) {
2147    conn *c;
2148
2149    c = (conn *)arg;
2150    assert(c != NULL);
2151
2152    c->which = which;
2153
2154    /* sanity */
2155    if (fd != c->sfd) {
2156        if (settings.verbose > 0)
2157            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2158        conn_close(c);
2159        return;
2160    }
2161
2162    drive_machine(c);
2163
2164    /* wait for next event */
2165    return;
2166}
2167
2168static int new_socket(const bool is_udp) {
2169    int sfd;
2170    int flags;
2171
2172    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
2173        perror("socket()");
2174        return -1;
2175    }
2176
2177    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2178        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2179        perror("setting O_NONBLOCK");
2180        close(sfd);
2181        return -1;
2182    }
2183    return sfd;
2184}
2185
2186
2187/*
2188 * Sets a socket's send buffer size to the maximum allowed by the system.
2189 */
2190static void maximize_sndbuf(const int sfd) {
2191    socklen_t intsize = sizeof(int);
2192    int last_good = 0;
2193    int min, max, avg;
2194    int old_size;
2195
2196    /* Start with the default size. */
2197    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2198        if (settings.verbose > 0)
2199            perror("getsockopt(SO_SNDBUF)");
2200        return;
2201    }
2202
2203    /* Binary-search for the real maximum. */
2204    min = old_size;
2205    max = MAX_SENDBUF_SIZE;
2206
2207    while (min <= max) {
2208        avg = ((unsigned int)(min + max)) / 2;
2209        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2210            last_good = avg;
2211            min = avg + 1;
2212        } else {
2213            max = avg - 1;
2214        }
2215    }
2216
2217    if (settings.verbose > 1)
2218        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2219}
2220
2221
2222static int server_socket(const int port, const bool is_udp) {
2223    int sfd;
2224    struct linger ling = {0, 0};
2225    struct sockaddr_in addr;
2226    int flags =1;
2227
2228    if ((sfd = new_socket(is_udp)) == -1) {
2229        return -1;
2230    }
2231
2232    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2233    if (is_udp) {
2234        maximize_sndbuf(sfd);
2235    } else {
2236        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2237        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2238        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2239    }
2240
2241    /*
2242     * the memset call clears nonstandard fields in some impementations
2243     * that otherwise mess things up.
2244     */
2245    memset(&addr, 0, sizeof(addr));
2246
2247    addr.sin_family = AF_INET;
2248    addr.sin_port = htons(port);
2249    addr.sin_addr = settings.interf;
2250    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2251        perror("bind()");
2252        close(sfd);
2253        return -1;
2254    }
2255    if (!is_udp && listen(sfd, 1024) == -1) {
2256        perror("listen()");
2257        close(sfd);
2258        return -1;
2259    }
2260    return sfd;
2261}
2262
2263static int new_socket_unix(void) {
2264    int sfd;
2265    int flags;
2266
2267    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2268        perror("socket()");
2269        return -1;
2270    }
2271
2272    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2273        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2274        perror("setting O_NONBLOCK");
2275        close(sfd);
2276        return -1;
2277    }
2278    return sfd;
2279}
2280
2281static int server_socket_unix(const char *path) {
2282    int sfd;
2283    struct linger ling = {0, 0};
2284    struct sockaddr_un addr;
2285    struct stat tstat;
2286    int flags =1;
2287
2288    if (!path) {
2289        return -1;
2290    }
2291
2292    if ((sfd = new_socket_unix()) == -1) {
2293        return -1;
2294    }
2295
2296    /*
2297     * Clean up a previous socket file if we left it around
2298     */
2299    if (lstat(path, &tstat) == 0) {
2300        if (S_ISSOCK(tstat.st_mode))
2301            unlink(path);
2302    }
2303
2304    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2305    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2306    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2307
2308    /*
2309     * the memset call clears nonstandard fields in some impementations
2310     * that otherwise mess things up.
2311     */
2312    memset(&addr, 0, sizeof(addr));
2313
2314    addr.sun_family = AF_UNIX;
2315    strcpy(addr.sun_path, path);
2316    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2317        perror("bind()");
2318        close(sfd);
2319        return -1;
2320    }
2321    if (listen(sfd, 1024) == -1) {
2322        perror("listen()");
2323        close(sfd);
2324        return -1;
2325    }
2326    return sfd;
2327}
2328
2329/* listening socket */
2330static int l_socket = 0;
2331
2332/* udp socket */
2333static int u_socket = -1;
2334
2335/* invoke right before gdb is called, on assert */
2336void pre_gdb(void) {
2337    int i;
2338    if (l_socket > -1) close(l_socket);
2339    if (u_socket > -1) close(u_socket);
2340    for (i = 3; i <= 500; i++) close(i); /* so lame */
2341    kill(getpid(), SIGABRT);
2342}
2343
2344/*
2345 * We keep the current time of day in a global variable that's updated by a
2346 * timer event. This saves us a bunch of time() system calls (we really only
2347 * need to get the time once a second, whereas there can be tens of thousands
2348 * of requests a second) and allows us to use server-start-relative timestamps
2349 * rather than absolute UNIX timestamps, a space savings on systems where
2350 * sizeof(time_t) > sizeof(unsigned int).
2351 */
2352volatile rel_time_t current_time;
2353static struct event clockevent;
2354
2355/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2356static void set_current_time(void) {
2357    current_time = (rel_time_t) (time(0) - stats.started);
2358}
2359
2360static void clock_handler(const int fd, const short which, void *arg) {
2361    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2362    static bool initialized = false;
2363
2364    if (initialized) {
2365        /* only delete the event if it's actually there. */
2366        evtimer_del(&clockevent);
2367    } else {
2368        initialized = true;
2369    }
2370
2371    evtimer_set(&clockevent, clock_handler, 0);
2372    event_base_set(main_base, &clockevent);
2373    evtimer_add(&clockevent, &t);
2374
2375    set_current_time();
2376}
2377
2378static struct event deleteevent;
2379
2380static void delete_handler(const int fd, const short which, void *arg) {
2381    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2382    static bool initialized = false;
2383
2384    if (initialized) {
2385        /* some versions of libevent don't like deleting events that don't exist,
2386           so only delete once we know this event has been added. */
2387        evtimer_del(&deleteevent);
2388    } else {
2389        initialized = true;
2390    }
2391
2392    evtimer_set(&deleteevent, delete_handler, 0);
2393    event_base_set(main_base, &deleteevent);
2394    evtimer_add(&deleteevent, &t);
2395    run_deferred_deletes();
2396}
2397
2398/* Call run_deferred_deletes instead of this. */
2399void do_run_deferred_deletes(void)
2400{
2401    int i, j = 0;
2402
2403    for (i = 0; i < delcurr; i++) {
2404        item *it = todelete[i];
2405        if (item_delete_lock_over(it)) {
2406            assert(it->refcount > 0);
2407            it->it_flags &= ~ITEM_DELETED;
2408            do_item_unlink(it);
2409            do_item_remove(it);
2410        } else {
2411            todelete[j++] = it;
2412        }
2413    }
2414    delcurr = j;
2415}
2416
2417static void usage(void) {
2418    printf(PACKAGE " " VERSION "\n");
2419    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2420           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2421           "-s <file>     unix socket path to listen on (disables network support)\n"
2422           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2423           "-d            run as a daemon\n"
2424           "-r            maximize core file limit\n"
2425           "-u <username> assume identity of <username> (only when run as root)\n"
2426           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2427           "-M            return error on memory exhausted (rather than removing items)\n"
2428           "-c <num>      max simultaneous connections, default is 1024\n"
2429           "-k            lock down all paged memory\n"
2430           "-v            verbose (print errors/warnings while in event loop)\n"
2431           "-vv           very verbose (also print client commands/reponses)\n"
2432           "-h            print this help and exit\n"
2433           "-i            print memcached and libevent license\n"
2434           "-b            run a managed instanced (mnemonic: buckets)\n"
2435           "-P <file>     save PID in <file>, only used with -d option\n"
2436           "-f <factor>   chunk size growth factor, default 1.25\n"
2437           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
2438#ifdef USE_THREADS
2439    printf("-t <num>      number of threads to use, default 4\n");
2440#endif
2441    return;
2442}
2443
2444static void usage_license(void) {
2445    printf(PACKAGE " " VERSION "\n\n");
2446    printf(
2447    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2448    "All rights reserved.\n"
2449    "\n"
2450    "Redistribution and use in source and binary forms, with or without\n"
2451    "modification, are permitted provided that the following conditions are\n"
2452    "met:\n"
2453    "\n"
2454    "    * Redistributions of source code must retain the above copyright\n"
2455    "notice, this list of conditions and the following disclaimer.\n"
2456    "\n"
2457    "    * Redistributions in binary form must reproduce the above\n"
2458    "copyright notice, this list of conditions and the following disclaimer\n"
2459    "in the documentation and/or other materials provided with the\n"
2460    "distribution.\n"
2461    "\n"
2462    "    * Neither the name of the Danga Interactive nor the names of its\n"
2463    "contributors may be used to endorse or promote products derived from\n"
2464    "this software without specific prior written permission.\n"
2465    "\n"
2466    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2467    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2468    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2469    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2470    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2471    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2472    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2473    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2474    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2475    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2476    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2477    "\n"
2478    "\n"
2479    "This product includes software developed by Niels Provos.\n"
2480    "\n"
2481    "[ libevent ]\n"
2482    "\n"
2483    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2484    "All rights reserved.\n"
2485    "\n"
2486    "Redistribution and use in source and binary forms, with or without\n"
2487    "modification, are permitted provided that the following conditions\n"
2488    "are met:\n"
2489    "1. Redistributions of source code must retain the above copyright\n"
2490    "   notice, this list of conditions and the following disclaimer.\n"
2491    "2. Redistributions in binary form must reproduce the above copyright\n"
2492    "   notice, this list of conditions and the following disclaimer in the\n"
2493    "   documentation and/or other materials provided with the distribution.\n"
2494    "3. All advertising materials mentioning features or use of this software\n"
2495    "   must display the following acknowledgement:\n"
2496    "      This product includes software developed by Niels Provos.\n"
2497    "4. The name of the author may not be used to endorse or promote products\n"
2498    "   derived from this software without specific prior written permission.\n"
2499    "\n"
2500    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2501    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2502    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2503    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2504    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2505    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2506    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2507    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2508    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2509    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2510    );
2511
2512    return;
2513}
2514
2515static void save_pid(const pid_t pid, const char *pid_file) {
2516    FILE *fp;
2517    if (pid_file == NULL)
2518        return;
2519
2520    if ((fp = fopen(pid_file, "w")) == NULL) {
2521        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2522        return;
2523    }
2524
2525    fprintf(fp,"%ld\n", (long)pid);
2526    if (fclose(fp) == -1) {
2527        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2528        return;
2529    }
2530}
2531
2532static void remove_pidfile(const char *pid_file) {
2533  if (pid_file == NULL)
2534      return;
2535
2536  if (unlink(pid_file) != 0) {
2537      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2538  }
2539
2540}
2541
2542
2543static void sig_handler(const int sig) {
2544    printf("SIGINT handled.\n");
2545    exit(EXIT_SUCCESS);
2546}
2547
2548int main (int argc, char **argv) {
2549    int c;
2550    struct in_addr addr;
2551    bool lock_memory = false;
2552    bool daemonize = false;
2553    int maxcore = 0;
2554    char *username = NULL;
2555    char *pid_file = NULL;
2556    struct passwd *pw;
2557    struct sigaction sa;
2558    struct rlimit rlim;
2559
2560    /* handle SIGINT */
2561    signal(SIGINT, sig_handler);
2562
2563    /* init settings */
2564    settings_init();
2565
2566    /* set stderr non-buffering (for running under, say, daemontools) */
2567    setbuf(stderr, NULL);
2568
2569    /* process arguments */
2570    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
2571        switch (c) {
2572        case 'U':
2573            settings.udpport = atoi(optarg);
2574            break;
2575        case 'b':
2576            settings.managed = true;
2577            break;
2578        case 'p':
2579            settings.port = atoi(optarg);
2580            break;
2581        case 's':
2582            settings.socketpath = optarg;
2583            break;
2584        case 'm':
2585            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2586            break;
2587        case 'M':
2588            settings.evict_to_free = 0;
2589            break;
2590        case 'c':
2591            settings.maxconns = atoi(optarg);
2592            break;
2593        case 'h':
2594            usage();
2595            exit(EXIT_SUCCESS);
2596        case 'i':
2597            usage_license();
2598            exit(EXIT_SUCCESS);
2599        case 'k':
2600            lock_memory = true;
2601            break;
2602        case 'v':
2603            settings.verbose++;
2604            break;
2605        case 'l':
2606            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
2607                fprintf(stderr, "Illegal address: %s\n", optarg);
2608                return 1;
2609            } else {
2610                settings.interf = addr;
2611            }
2612            break;
2613        case 'd':
2614            daemonize = true;
2615            break;
2616        case 'r':
2617            maxcore = 1;
2618            break;
2619        case 'u':
2620            username = optarg;
2621            break;
2622        case 'P':
2623            pid_file = optarg;
2624            break;
2625        case 'f':
2626            settings.factor = atof(optarg);
2627            if (settings.factor <= 1.0) {
2628                fprintf(stderr, "Factor must be greater than 1\n");
2629                return 1;
2630            }
2631            break;
2632        case 'n':
2633            settings.chunk_size = atoi(optarg);
2634            if (settings.chunk_size == 0) {
2635                fprintf(stderr, "Chunk size must be greater than 0\n");
2636                return 1;
2637            }
2638            break;
2639        case 't':
2640            settings.num_threads = atoi(optarg);
2641            if (settings.num_threads == 0) {
2642                fprintf(stderr, "Number of threads must be greater than 0\n");
2643                return 1;
2644            }
2645            break;
2646        case 'D':
2647            if (! optarg || ! optarg[0]) {
2648                fprintf(stderr, "No delimiter specified\n");
2649                return 1;
2650            }
2651            settings.prefix_delimiter = optarg[0];
2652            settings.detail_enabled = 1;
2653            break;
2654        default:
2655            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2656            return 1;
2657        }
2658    }
2659
2660    if (maxcore != 0) {
2661        struct rlimit rlim_new;
2662        /*
2663         * First try raising to infinity; if that fails, try bringing
2664         * the soft limit to the hard.
2665         */
2666        if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
2667            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
2668            if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
2669                /* failed. try raising just to the old max */
2670                rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
2671                (void)setrlimit(RLIMIT_CORE, &rlim_new);
2672            }
2673        }
2674        /*
2675         * getrlimit again to see what we ended up with. Only fail if
2676         * the soft limit ends up 0, because then no core files will be
2677         * created at all.
2678         */
2679
2680        if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
2681            fprintf(stderr, "failed to ensure corefile creation\n");
2682            exit(EXIT_FAILURE);
2683        }
2684    }
2685
2686    /*
2687     * If needed, increase rlimits to allow as many connections
2688     * as needed.
2689     */
2690
2691    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2692        fprintf(stderr, "failed to getrlimit number of files\n");
2693        exit(EXIT_FAILURE);
2694    } else {
2695        int maxfiles = settings.maxconns;
2696        if (rlim.rlim_cur < maxfiles)
2697            rlim.rlim_cur = maxfiles + 3;
2698        if (rlim.rlim_max < rlim.rlim_cur)
2699            rlim.rlim_max = rlim.rlim_cur;
2700        if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2701            fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
2702            exit(EXIT_FAILURE);
2703        }
2704    }
2705
2706    /*
2707     * initialization order: first create the listening sockets
2708     * (may need root on low ports), then drop root if needed,
2709     * then daemonise if needed, then init libevent (in some cases
2710     * descriptors created by libevent wouldn't survive forking).
2711     */
2712
2713    /* create the listening socket and bind it */
2714    if (settings.socketpath == NULL) {
2715        l_socket = server_socket(settings.port, 0);
2716        if (l_socket == -1) {
2717            fprintf(stderr, "failed to listen\n");
2718            exit(EXIT_FAILURE);
2719        }
2720    }
2721
2722    if (settings.udpport > 0 && settings.socketpath == NULL) {
2723        /* create the UDP listening socket and bind it */
2724        u_socket = server_socket(settings.udpport, 1);
2725        if (u_socket == -1) {
2726            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
2727            exit(EXIT_FAILURE);
2728        }
2729    }
2730
2731    /* lose root privileges if we have them */
2732    if (getuid() == 0 || geteuid() == 0) {
2733        if (username == 0 || *username == '\0') {
2734            fprintf(stderr, "can't run as root without the -u switch\n");
2735            return 1;
2736        }
2737        if ((pw = getpwnam(username)) == 0) {
2738            fprintf(stderr, "can't find the user %s to switch to\n", username);
2739            return 1;
2740        }
2741        if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
2742            fprintf(stderr, "failed to assume identity of user %s\n", username);
2743            return 1;
2744        }
2745    }
2746
2747    /* create unix mode sockets after dropping privileges */
2748    if (settings.socketpath != NULL) {
2749        l_socket = server_socket_unix(settings.socketpath);
2750        if (l_socket == -1) {
2751            fprintf(stderr, "failed to listen\n");
2752            exit(EXIT_FAILURE);
2753        }
2754    }
2755
2756    /* daemonize if requested */
2757    /* if we want to ensure our ability to dump core, don't chdir to / */
2758    if (daemonize) {
2759        int res;
2760        res = daemon(maxcore, settings.verbose);
2761        if (res == -1) {
2762            fprintf(stderr, "failed to daemon() in order to daemonize\n");
2763            return 1;
2764        }
2765    }
2766
2767
2768    /* initialize main thread libevent instance */
2769    main_base = event_init();
2770
2771    /* initialize other stuff */
2772    item_init();
2773    stats_init();
2774    assoc_init();
2775    conn_init();
2776    slabs_init(settings.maxbytes, settings.factor);
2777
2778    /* managed instance? alloc and zero a bucket array */
2779    if (settings.managed) {
2780        buckets = malloc(sizeof(int) * MAX_BUCKETS);
2781        if (buckets == 0) {
2782            fprintf(stderr, "failed to allocate the bucket array");
2783            exit(EXIT_FAILURE);
2784        }
2785        memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
2786    }
2787
2788    /* lock paged memory if needed */
2789    if (lock_memory) {
2790#ifdef HAVE_MLOCKALL
2791        mlockall(MCL_CURRENT | MCL_FUTURE);
2792#else
2793        fprintf(stderr, "warning: mlockall() not supported on this platform.  proceeding without.\n");
2794#endif
2795    }
2796
2797    /*
2798     * ignore SIGPIPE signals; we can use errno==EPIPE if we
2799     * need that information
2800     */
2801    sa.sa_handler = SIG_IGN;
2802    sa.sa_flags = 0;
2803    if (sigemptyset(&sa.sa_mask) == -1 ||
2804        sigaction(SIGPIPE, &sa, 0) == -1) {
2805        perror("failed to ignore SIGPIPE; sigaction");
2806        exit(EXIT_FAILURE);
2807    }
2808    /* create the initial listening connection */
2809    if (!(listen_conn = conn_new(l_socket, conn_listening,
2810                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
2811        fprintf(stderr, "failed to create listening connection");
2812        exit(EXIT_FAILURE);
2813    }
2814    /* start up worker threads if MT mode */
2815    thread_init(settings.num_threads, main_base);
2816    /* save the PID in if we're a daemon, do this after thread_init due to
2817       a file descriptor handling bug somewhere in libevent */
2818    if (daemonize)
2819        save_pid(getpid(), pid_file);
2820    /* initialise clock event */
2821    clock_handler(0, 0, 0);
2822    /* initialise deletion array and timer event */
2823    deltotal = 200;
2824    delcurr = 0;
2825    if ((todelete = malloc(sizeof(item *) * deltotal)) == NULL) {
2826        perror("failed to allocate memory for deletion array");
2827        exit(EXIT_FAILURE);
2828    }
2829    delete_handler(0, 0, 0); /* sets up the event */
2830    /* create the initial listening udp connection, monitored on all threads */
2831    if (u_socket > -1) {
2832        for (c = 0; c < settings.num_threads; c++) {
2833            /* this is guaranteed to hit all threads because we round-robin */
2834            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
2835                              UDP_READ_BUFFER_SIZE, 1);
2836        }
2837    }
2838    /* enter the event loop */
2839    event_base_loop(main_base, 0);
2840    /* remove the PID file if we're a daemon */
2841    if (daemonize)
2842        remove_pidfile(pid_file);
2843    return 0;
2844}
Note: See TracBrowser for help on using the browser.