root/trunk/server/memcached.c @ 624

Revision 624, 82.9 kB (checked in by plindner, 2 years ago)

Fix for Unix Domain sockets on FreeBSD
FreeBSD's sendmsg() requires msg_name in msghdr structure
to be NULL if not used, setting msg_namelen to 0 isn't enough.
Patch from Maxim Dounin <mdounin@…>

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <sys/signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(const bool is_udp);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92/* time handling */
93static void set_current_time(void);  /* update the global variable holding
94                              global 32-bit seconds-since-start time
95                              (to avoid 64 bit time_t) */
96
97void pre_gdb(void);
98static void conn_free(conn *c);
99
100/** exported globals **/
101struct stats stats;
102struct settings settings;
103
104/** file scope variables **/
105static item **todelete = NULL;
106static int delcurr;
107static int deltotal;
108static conn *listen_conn;
109static struct event_base *main_base;
110
111#define TRANSMIT_COMPLETE   0
112#define TRANSMIT_INCOMPLETE 1
113#define TRANSMIT_SOFT_ERROR 2
114#define TRANSMIT_HARD_ERROR 3
115
116static int *buckets = 0; /* bucket->generation array for a managed instance */
117
118#define REALTIME_MAXDELTA 60*60*24*30
119/*
120 * given time value that's either unix time or delta from current unix time, return
121 * unix time. Use the fact that delta can't exceed one month (and real time value can't
122 * be that low).
123 */
124static rel_time_t realtime(const time_t exptime) {
125    /* no. of seconds in 30 days - largest possible delta exptime */
126
127    if (exptime == 0) return 0; /* 0 means never expire */
128
129    if (exptime > REALTIME_MAXDELTA) {
130        /* if item expiration is at/before the server started, give it an
131           expiration time of 1 second after the server started.
132           (because 0 means don't expire).  without this, we'd
133           underflow and wrap around to some large value way in the
134           future, effectively making items expiring in the past
135           really expiring never */
136        if (exptime <= stats.started)
137            return (rel_time_t)1;
138        return (rel_time_t)(exptime - stats.started);
139    } else {
140        return (rel_time_t)(exptime + current_time);
141    }
142}
143
144static void stats_init(void) {
145    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
146    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
147    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
148
149    /* make the time we started always be 2 seconds before we really
150       did, so time(0) - time.started is never zero.  if so, things
151       like 'settings.oldest_live' which act as booleans as well as
152       values are now false in boolean context... */
153    stats.started = time(0) - 2;
154    stats_prefix_init();
155}
156
157static void stats_reset(void) {
158    STATS_LOCK();
159    stats.total_items = stats.total_conns = 0;
160    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
161    stats.bytes_read = stats.bytes_written = 0;
162    stats_prefix_clear();
163    STATS_UNLOCK();
164}
165
166static void settings_init(void) {
167    settings.port = 11211;
168    settings.udpport = 0;
169    settings.interf.s_addr = htonl(INADDR_ANY);
170    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
171    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
172    settings.verbose = 0;
173    settings.oldest_live = 0;
174    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
175    settings.socketpath = NULL;       /* by default, not using a unix socket */
176    settings.managed = false;
177    settings.factor = 1.25;
178    settings.chunk_size = 48;         /* space for a modest key and value */
179#ifdef USE_THREADS
180    settings.num_threads = 4;
181#else
182    settings.num_threads = 1;
183#endif
184    settings.prefix_delimiter = ':';
185    settings.detail_enabled = 0;
186}
187
188/* returns true if a deleted item's delete-locked-time is over, and it
189   should be removed from the namespace */
190static bool item_delete_lock_over (item *it) {
191    assert(it->it_flags & ITEM_DELETED);
192    return (current_time >= it->exptime);
193}
194
195/*
196 * Adds a message header to a connection.
197 *
198 * Returns 0 on success, -1 on out-of-memory.
199 */
200static int add_msghdr(conn *c)
201{
202    struct msghdr *msg;
203
204    assert(c != NULL);
205
206    if (c->msgsize == c->msgused) {
207        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
208        if (! msg)
209            return -1;
210        c->msglist = msg;
211        c->msgsize *= 2;
212    }
213
214    msg = c->msglist + c->msgused;
215
216    /* this wipes msg_iovlen, msg_control, msg_controllen, and
217       msg_flags, the last 3 of which aren't defined on solaris: */
218    memset(msg, 0, sizeof(struct msghdr));
219
220    msg->msg_iov = &c->iov[c->iovused];
221
222    if (c->request_addr_size > 0) {
223        msg->msg_name = &c->request_addr;
224        msg->msg_namelen = c->request_addr_size;
225    }
226
227    c->msgbytes = 0;
228    c->msgused++;
229
230    if (c->udp) {
231        /* Leave room for the UDP header, which we'll fill in later. */
232        return add_iov(c, NULL, UDP_HEADER_SIZE);
233    }
234
235    return 0;
236}
237
238
239/*
240 * Free list management for connections.
241 */
242
243static conn **freeconns;
244static int freetotal;
245static int freecurr;
246
247
248static void conn_init(void) {
249    freetotal = 200;
250    freecurr = 0;
251    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
252        perror("malloc()");
253    }
254    return;
255}
256
257/*
258 * Returns a connection from the freelist, if any. Should call this using
259 * conn_from_freelist() for thread safety.
260 */
261conn *do_conn_from_freelist() {
262    conn *c;
263
264    if (freecurr > 0) {
265        c = freeconns[--freecurr];
266    } else {
267        c = NULL;
268    }
269
270    return c;
271}
272
273/*
274 * Adds a connection to the freelist. 0 = success. Should call this using
275 * conn_add_to_freelist() for thread safety.
276 */
277bool do_conn_add_to_freelist(conn *c) {
278    if (freecurr < freetotal) {
279        freeconns[freecurr++] = c;
280        return false;
281    } else {
282        /* try to enlarge free connections array */
283        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
284        if (new_freeconns) {
285            freetotal *= 2;
286            freeconns = new_freeconns;
287            freeconns[freecurr++] = c;
288            return false;
289        }
290    }
291    return true;
292}
293
294conn *conn_new(const int sfd, const int init_state, const int event_flags,
295                const int read_buffer_size, const bool is_udp, struct event_base *base) {
296    conn *c = conn_from_freelist();
297
298    if (NULL == c) {
299        if (!(c = (conn *)malloc(sizeof(conn)))) {
300            perror("malloc()");
301            return NULL;
302        }
303        c->rbuf = c->wbuf = 0;
304        c->ilist = 0;
305        c->iov = 0;
306        c->msglist = 0;
307        c->hdrbuf = 0;
308
309        c->rsize = read_buffer_size;
310        c->wsize = DATA_BUFFER_SIZE;
311        c->isize = ITEM_LIST_INITIAL;
312        c->iovsize = IOV_LIST_INITIAL;
313        c->msgsize = MSG_LIST_INITIAL;
314        c->hdrsize = 0;
315
316        c->rbuf = (char *)malloc((size_t)c->rsize);
317        c->wbuf = (char *)malloc((size_t)c->wsize);
318        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
319        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
320        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
321
322        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
323                c->msglist == 0) {
324            if (c->rbuf != 0) free(c->rbuf);
325            if (c->wbuf != 0) free(c->wbuf);
326            if (c->ilist !=0) free(c->ilist);
327            if (c->iov != 0) free(c->iov);
328            if (c->msglist != 0) free(c->msglist);
329            free(c);
330            perror("malloc()");
331            return NULL;
332        }
333
334        STATS_LOCK();
335        stats.conn_structs++;
336        STATS_UNLOCK();
337    }
338
339    if (settings.verbose > 1) {
340        if (init_state == conn_listening)
341            fprintf(stderr, "<%d server listening\n", sfd);
342        else if (is_udp)
343            fprintf(stderr, "<%d server listening (udp)\n", sfd);
344        else
345            fprintf(stderr, "<%d new client connection\n", sfd);
346    }
347
348    c->sfd = sfd;
349    c->udp = is_udp;
350    c->state = init_state;
351    c->rlbytes = 0;
352    c->rbytes = c->wbytes = 0;
353    c->wcurr = c->wbuf;
354    c->rcurr = c->rbuf;
355    c->ritem = 0;
356    c->icurr = c->ilist;
357    c->ileft = 0;
358    c->iovused = 0;
359    c->msgcurr = 0;
360    c->msgused = 0;
361
362    c->write_and_go = conn_read;
363    c->write_and_free = 0;
364    c->item = 0;
365    c->bucket = -1;
366    c->gen = 0;
367
368    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
369    event_base_set(base, &c->event);
370    c->ev_flags = event_flags;
371
372    if (event_add(&c->event, 0) == -1) {
373        if (conn_add_to_freelist(c)) {
374            conn_free(c);
375        }
376        return NULL;
377    }
378
379    STATS_LOCK();
380    stats.curr_conns++;
381    stats.total_conns++;
382    STATS_UNLOCK();
383
384    return c;
385}
386
387static void conn_cleanup(conn *c) {
388    assert(c != NULL);
389
390    if (c->item) {
391        item_remove(c->item);
392        c->item = 0;
393    }
394
395    if (c->ileft != 0) {
396        for (; c->ileft > 0; c->ileft--,c->icurr++) {
397            item_remove(*(c->icurr));
398        }
399    }
400
401    if (c->write_and_free) {
402        free(c->write_and_free);
403        c->write_and_free = 0;
404    }
405}
406
407/*
408 * Frees a connection.
409 */
410void conn_free(conn *c) {
411    if (c) {
412        if (c->hdrbuf)
413            free(c->hdrbuf);
414        if (c->msglist)
415            free(c->msglist);
416        if (c->rbuf)
417            free(c->rbuf);
418        if (c->wbuf)
419            free(c->wbuf);
420        if (c->ilist)
421            free(c->ilist);
422        if (c->iov)
423            free(c->iov);
424        free(c);
425    }
426}
427
428static void conn_close(conn *c) {
429    assert(c != NULL);
430
431    /* delete the event, the socket and the conn */
432    event_del(&c->event);
433
434    if (settings.verbose > 1)
435        fprintf(stderr, "<%d connection closed.\n", c->sfd);
436
437    close(c->sfd);
438    accept_new_conns(true);
439    conn_cleanup(c);
440
441    /* if the connection has big buffers, just free it */
442    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
443        conn_free(c);
444    }
445
446    STATS_LOCK();
447    stats.curr_conns--;
448    STATS_UNLOCK();
449
450    return;
451}
452
453
454/*
455 * Shrinks a connection's buffers if they're too big.  This prevents
456 * periodic large "get" requests from permanently chewing lots of server
457 * memory.
458 *
459 * This should only be called in between requests since it can wipe output
460 * buffers!
461 */
462static void conn_shrink(conn *c) {
463    assert(c != NULL);
464
465    if (c->udp)
466        return;
467
468    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
469        char *newbuf;
470
471        if (c->rcurr != c->rbuf)
472            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
473
474        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
475
476        if (newbuf) {
477            c->rbuf = newbuf;
478            c->rsize = DATA_BUFFER_SIZE;
479        }
480        /* TODO check other branch... */
481        c->rcurr = c->rbuf;
482    }
483
484    if (c->isize > ITEM_LIST_HIGHWAT) {
485        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
486        if (newbuf) {
487            c->ilist = newbuf;
488            c->isize = ITEM_LIST_INITIAL;
489        }
490    /* TODO check error condition? */
491    }
492
493    if (c->msgsize > MSG_LIST_HIGHWAT) {
494        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
495        if (newbuf) {
496            c->msglist = newbuf;
497            c->msgsize = MSG_LIST_INITIAL;
498        }
499    /* TODO check error condition? */
500    }
501
502    if (c->iovsize > IOV_LIST_HIGHWAT) {
503        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
504        if (newbuf) {
505            c->iov = newbuf;
506            c->iovsize = IOV_LIST_INITIAL;
507        }
508    /* TODO check return value */
509    }
510}
511
512/*
513 * Sets a connection's current state in the state machine. Any special
514 * processing that needs to happen on certain state transitions can
515 * happen here.
516 */
517static void conn_set_state(conn *c, int state) {
518    assert(c != NULL);
519
520    if (state != c->state) {
521        if (state == conn_read) {
522            conn_shrink(c);
523            assoc_move_next_bucket();
524        }
525        c->state = state;
526    }
527}
528
529
530/*
531 * Ensures that there is room for another struct iovec in a connection's
532 * iov list.
533 *
534 * Returns 0 on success, -1 on out-of-memory.
535 */
536static int ensure_iov_space(conn *c) {
537    assert(c != NULL);
538
539    if (c->iovused >= c->iovsize) {
540        int i, iovnum;
541        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
542                                (c->iovsize * 2) * sizeof(struct iovec));
543        if (! new_iov)
544            return -1;
545        c->iov = new_iov;
546        c->iovsize *= 2;
547
548        /* Point all the msghdr structures at the new list. */
549        for (i = 0, iovnum = 0; i < c->msgused; i++) {
550            c->msglist[i].msg_iov = &c->iov[iovnum];
551            iovnum += c->msglist[i].msg_iovlen;
552        }
553    }
554
555    return 0;
556}
557
558
559/*
560 * Adds data to the list of pending data that will be written out to a
561 * connection.
562 *
563 * Returns 0 on success, -1 on out-of-memory.
564 */
565
566static int add_iov(conn *c, const void *buf, int len) {
567    struct msghdr *m;
568    int leftover;
569    bool limit_to_mtu;
570
571    assert(c != NULL);
572
573    do {
574        m = &c->msglist[c->msgused - 1];
575
576        /*
577         * Limit UDP packets, and the first payloads of TCP replies, to
578         * UDP_MAX_PAYLOAD_SIZE bytes.
579         */
580        limit_to_mtu = c->udp || (1 == c->msgused);
581
582        /* We may need to start a new msghdr if this one is full. */
583        if (m->msg_iovlen == IOV_MAX ||
584            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
585            add_msghdr(c);
586            m = &c->msglist[c->msgused - 1];
587        }
588
589        if (ensure_iov_space(c) != 0)
590            return -1;
591
592        /* If the fragment is too big to fit in the datagram, split it up */
593        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
594            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
595            len -= leftover;
596        } else {
597            leftover = 0;
598        }
599
600        m = &c->msglist[c->msgused - 1];
601        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
602        m->msg_iov[m->msg_iovlen].iov_len = len;
603
604        c->msgbytes += len;
605        c->iovused++;
606        m->msg_iovlen++;
607
608        buf = ((char *)buf) + len;
609        len = leftover;
610    } while (leftover > 0);
611
612    return 0;
613}
614
615
616/*
617 * Constructs a set of UDP headers and attaches them to the outgoing messages.
618 */
619static int build_udp_headers(conn *c) {
620    int i;
621    unsigned char *hdr;
622
623    assert(c != NULL);
624
625    if (c->msgused > c->hdrsize) {
626        void *new_hdrbuf;
627        if (c->hdrbuf)
628            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
629        else
630            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
631        if (! new_hdrbuf)
632            return -1;
633        c->hdrbuf = (unsigned char *)new_hdrbuf;
634        c->hdrsize = c->msgused * 2;
635    }
636
637    hdr = c->hdrbuf;
638    for (i = 0; i < c->msgused; i++) {
639        c->msglist[i].msg_iov[0].iov_base = hdr;
640        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
641        *hdr++ = c->request_id / 256;
642        *hdr++ = c->request_id % 256;
643        *hdr++ = i / 256;
644        *hdr++ = i % 256;
645        *hdr++ = c->msgused / 256;
646        *hdr++ = c->msgused % 256;
647        *hdr++ = 0;
648        *hdr++ = 0;
649        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
650    }
651
652    return 0;
653}
654
655
656static void out_string(conn *c, const char *str) {
657    size_t len;
658
659    assert(c != NULL);
660
661    if (settings.verbose > 1)
662        fprintf(stderr, ">%d %s\n", c->sfd, str);
663
664    len = strlen(str);
665    if ((len + 2) > c->wsize) {
666        /* ought to be always enough. just fail for simplicity */
667        str = "SERVER_ERROR output line too long";
668        len = strlen(str);
669    }
670
671    memcpy(c->wbuf, str, len);
672    memcpy(c->wbuf + len, "\r\n", 3);
673    c->wbytes = len + 2;
674    c->wcurr = c->wbuf;
675
676    conn_set_state(c, conn_write);
677    c->write_and_go = conn_read;
678    return;
679}
680
681/*
682 * we get here after reading the value in set/add/replace commands. The command
683 * has been stored in c->item_comm, and the item is ready in c->item.
684 */
685
686static void complete_nread(conn *c) {
687    assert(c != NULL);
688
689    item *it = c->item;
690    int comm = c->item_comm;
691
692    STATS_LOCK();
693    stats.set_cmds++;
694    STATS_UNLOCK();
695
696    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
697        out_string(c, "CLIENT_ERROR bad data chunk");
698    } else {
699        if (store_item(it, comm)) {
700            out_string(c, "STORED");
701        } else {
702            out_string(c, "NOT_STORED");
703        }
704    }
705
706    item_remove(c->item);       /* release the c->item reference */
707    c->item = 0;
708}
709
710/*
711 * Stores an item in the cache according to the semantics of one of the set
712 * commands. In threaded mode, this is protected by the cache lock.
713 *
714 * Returns true if the item was stored.
715 */
716int do_store_item(item *it, int comm) {
717    char *key = ITEM_key(it);
718    bool delete_locked = false;
719    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
720    int stored = 0;
721
722    if (old_it != NULL && comm == NREAD_ADD) {
723        /* add only adds a nonexistent item, but promote to head of LRU */
724        do_item_update(old_it);
725    } else if (!old_it && comm == NREAD_REPLACE) {
726        /* replace only replaces an existing value; don't store */
727    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
728        /* replace and add can't override delete locks; don't store */
729    } else {
730        /* "set" commands can override the delete lock
731           window... in which case we have to find the old hidden item
732           that's in the namespace/LRU but wasn't returned by
733           item_get.... because we need to replace it */
734        if (delete_locked)
735            old_it = do_item_get_nocheck(key, it->nkey);
736
737        if (old_it != NULL)
738            do_item_replace(old_it, it);
739        else
740            do_item_link(it);
741
742        stored = 1;
743    }
744
745    if (old_it)
746        do_item_remove(old_it);         /* release our reference */
747    return stored;
748}
749
750typedef struct token_s {
751    char *value;
752    size_t length;
753} token_t;
754
755#define COMMAND_TOKEN 0
756#define SUBCOMMAND_TOKEN 1
757#define KEY_TOKEN 1
758#define KEY_MAX_LENGTH 250
759
760#define MAX_TOKENS 6
761
762/*
763 * Tokenize the command string by replacing whitespace with '\0' and update
764 * the token array tokens with pointer to start of each token and length.
765 * Returns total number of tokens.  The last valid token is the terminal
766 * token (value points to the first unprocessed character of the string and
767 * length zero).
768 *
769 * Usage example:
770 *
771 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
772 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
773 *          ...
774 *      }
775 *      ncommand = tokens[ix].value - command;
776 *      command  = tokens[ix].value;
777 *   }
778 */
779static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
780    char *s, *e;
781    size_t ntokens = 0;
782
783    assert(command != NULL && tokens != NULL && max_tokens > 1);
784
785    for (s = e = command; ntokens < max_tokens - 1; ++e) {
786        if (*e == ' ') {
787            if (s != e) {
788                tokens[ntokens].value = s;
789                tokens[ntokens].length = e - s;
790                ntokens++;
791                *e = '\0';
792            }
793            s = e + 1;
794        }
795        else if (*e == '\0') {
796            if (s != e) {
797                tokens[ntokens].value = s;
798                tokens[ntokens].length = e - s;
799                ntokens++;
800            }
801
802            break; /* string end */
803        }
804    }
805
806    /*
807     * If we scanned the whole string, the terminal value pointer is null,
808     * otherwise it is the first unprocessed character.
809     */
810    tokens[ntokens].value =  *e == '\0' ? NULL : e;
811    tokens[ntokens].length = 0;
812    ntokens++;
813
814    return ntokens;
815}
816
817/* set up a connection to write a buffer then free it, used for stats */
818static void write_and_free(conn *c, char *buf, int bytes) {
819    if (buf) {
820        c->write_and_free = buf;
821        c->wcurr = buf;
822        c->wbytes = bytes;
823        conn_set_state(c, conn_write);
824        c->write_and_go = conn_read;
825    } else {
826        out_string(c, "SERVER_ERROR out of memory");
827    }
828}
829
830inline static void process_stats_detail(conn *c, const char *command) {
831    assert(c != NULL);
832
833    if (strcmp(command, "on") == 0) {
834        settings.detail_enabled = 1;
835        out_string(c, "OK");
836    }
837    else if (strcmp(command, "off") == 0) {
838        settings.detail_enabled = 0;
839        out_string(c, "OK");
840    }
841    else if (strcmp(command, "dump") == 0) {
842        int len;
843        char *stats = stats_prefix_dump(&len);
844        write_and_free(c, stats, len);
845    }
846    else {
847        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
848    }
849}
850
851static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
852    rel_time_t now = current_time;
853    char *command;
854    char *subcommand;
855
856    assert(c != NULL);
857
858    if(ntokens < 2) {
859        out_string(c, "CLIENT_ERROR bad command line");
860        return;
861    }
862
863    command = tokens[COMMAND_TOKEN].value;
864
865    if (ntokens == 2 && strcmp(command, "stats") == 0) {
866        char temp[1024];
867        pid_t pid = getpid();
868        char *pos = temp;
869
870#ifndef WIN32
871        struct rusage usage;
872        getrusage(RUSAGE_SELF, &usage);
873#endif /* !WIN32 */
874
875        STATS_LOCK();
876        pos += sprintf(pos, "STAT pid %u\r\n", pid);
877        pos += sprintf(pos, "STAT uptime %u\r\n", now);
878        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
879        pos += sprintf(pos, "STAT version " VERSION "\r\n");
880        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
881#ifndef WIN32
882        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
883        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
884#endif /* !WIN32 */
885        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
886        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
887        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
888        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
889        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
890        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
891        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
892        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
893        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
894        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
895        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
896        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
897        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
898        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
899        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
900        pos += sprintf(pos, "END");
901        STATS_UNLOCK();
902        out_string(c, temp);
903        return;
904    }
905
906    subcommand = tokens[SUBCOMMAND_TOKEN].value;
907
908    if (strcmp(subcommand, "reset") == 0) {
909        stats_reset();
910        out_string(c, "RESET");
911        return;
912    }
913
914#ifdef HAVE_MALLOC_H
915#ifdef HAVE_STRUCT_MALLINFO
916    if (strcmp(subcommand, "malloc") == 0) {
917        char temp[512];
918        struct mallinfo info;
919        char *pos = temp;
920
921        info = mallinfo();
922        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
923        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
924        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
925        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
926        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
927        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
928        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
929        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
930        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
931        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
932        out_string(c, temp);
933        return;
934    }
935#endif /* HAVE_STRUCT_MALLINFO */
936#endif /* HAVE_MALLOC_H */
937
938#if !defined(WIN32) || !defined(__APPLE__)
939    if (strcmp(subcommand, "maps") == 0) {
940        char *wbuf;
941        int wsize = 8192; /* should be enough */
942        int fd;
943        int res;
944
945        if ((wbuf = (char *)malloc(wsize)) == NULL) {
946            out_string(c, "SERVER_ERROR out of memory");
947            return;
948        }
949
950        fd = open("/proc/self/maps", O_RDONLY);
951        if (fd == -1) {
952            out_string(c, "SERVER_ERROR cannot open the maps file");
953            free(wbuf);
954            return;
955        }
956
957        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
958        if (res == wsize - 6) {
959            out_string(c, "SERVER_ERROR buffer overflow");
960            free(wbuf); close(fd);
961            return;
962        }
963        if (res == 0 || res == -1) {
964            out_string(c, "SERVER_ERROR can't read the maps file");
965            free(wbuf); close(fd);
966            return;
967        }
968        memcpy(wbuf + res, "END\r\n", 5);
969        write_and_free(c, wbuf, res + 5);
970        close(fd);
971        return;
972    }
973#endif
974
975    if (strcmp(subcommand, "cachedump") == 0) {
976
977        char *buf;
978        unsigned int bytes, id, limit = 0;
979
980        if(ntokens < 5) {
981            out_string(c, "CLIENT_ERROR bad command line");
982            return;
983        }
984
985        id = strtoul(tokens[2].value, NULL, 10);
986        limit = strtoul(tokens[3].value, NULL, 10);
987
988        if(errno == ERANGE) {
989            out_string(c, "CLIENT_ERROR bad command line format");
990            return;
991        }
992
993        buf = item_cachedump(id, limit, &bytes);
994        write_and_free(c, buf, bytes);
995        return;
996    }
997
998    if (strcmp(subcommand, "slabs") == 0) {
999        int bytes = 0;
1000        char *buf = slabs_stats(&bytes);
1001        write_and_free(c, buf, bytes);
1002        return;
1003    }
1004
1005    if (strcmp(subcommand, "items") == 0) {
1006        int bytes = 0;
1007        char *buf = item_stats(&bytes);
1008        write_and_free(c, buf, bytes);
1009        return;
1010    }
1011
1012    if (strcmp(subcommand, "detail") == 0) {
1013        if (ntokens < 4)
1014            process_stats_detail(c, "");  /* outputs the error message */
1015        else
1016            process_stats_detail(c, tokens[2].value);
1017        return;
1018    }
1019
1020    if (strcmp(subcommand, "sizes") == 0) {
1021        int bytes = 0;
1022        char *buf = item_stats_sizes(&bytes);
1023        write_and_free(c, buf, bytes);
1024        return;
1025    }
1026
1027    out_string(c, "ERROR");
1028}
1029
1030/* ntokens is overwritten here... shrug.. */
1031static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_key_ptr) {
1032    char *key;
1033    size_t nkey;
1034    int i = 0;
1035    item *it;
1036    token_t *key_token = &tokens[KEY_TOKEN];
1037    char suffix[255];
1038    uint32_t in_memory_ptr;
1039    assert(c != NULL);
1040
1041    if (settings.managed) {
1042        int bucket = c->bucket;
1043        if (bucket == -1) {
1044            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1045            return;
1046        }
1047        c->bucket = -1;
1048        if (buckets[bucket] != c->gen) {
1049            out_string(c, "ERROR_NOT_OWNER");
1050            return;
1051        }
1052    }
1053
1054    do {
1055        while(key_token->length != 0) {
1056
1057            key = key_token->value;
1058            nkey = key_token->length;
1059
1060            if(nkey > KEY_MAX_LENGTH) {
1061                out_string(c, "CLIENT_ERROR bad command line format");
1062                return;
1063            }
1064
1065            STATS_LOCK();
1066            stats.get_cmds++;
1067            STATS_UNLOCK();
1068            it = item_get(key, nkey);
1069            if (settings.detail_enabled) {
1070                stats_prefix_record_get(key, NULL != it);
1071            }
1072            if (it) {
1073                if (i >= c->isize) {
1074                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1075                    if (new_list) {
1076                        c->isize *= 2;
1077                        c->ilist = new_list;
1078                    } else break;
1079                }
1080
1081                /*
1082                 * Construct the response. Each hit adds three elements to the
1083                 * outgoing data list:
1084                 *   "VALUE "
1085                 *   key
1086                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1087                 */
1088
1089                if(return_key_ptr == true)
1090                {
1091                  in_memory_ptr = (uint32_t)item_get(key, nkey);
1092                  sprintf(suffix," %d %d %lu\r\n", atoi(ITEM_suffix(it) + 1), it->nbytes - 2, in_memory_ptr);
1093                  if (add_iov(c, "VALUE ", 6) != 0 ||
1094                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1095                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1096                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1097                      {
1098                          break;
1099                      }
1100                }
1101                else
1102                {
1103                  if (add_iov(c, "VALUE ", 6) != 0 ||
1104                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1105                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1106                      {
1107                          break;
1108                      }
1109                }
1110
1111
1112                if (settings.verbose > 1)
1113                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1114
1115                /* item_get() has incremented it->refcount for us */
1116                STATS_LOCK();
1117                stats.get_hits++;
1118                STATS_UNLOCK();
1119                item_update(it);
1120                *(c->ilist + i) = it;
1121                i++;
1122
1123            } else {
1124                STATS_LOCK();
1125                stats.get_misses++;
1126                STATS_UNLOCK();
1127            }
1128
1129            key_token++;
1130        }
1131
1132        /*
1133         * If the command string hasn't been fully processed, get the next set
1134         * of tokens.
1135         */
1136        if(key_token->value != NULL) {
1137            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1138            key_token = tokens;
1139        }
1140
1141    } while(key_token->value != NULL);
1142
1143    c->icurr = c->ilist;
1144    c->ileft = i;
1145
1146    if (settings.verbose > 1)
1147        fprintf(stderr, ">%d END\n", c->sfd);
1148    add_iov(c, "END\r\n", 5);
1149
1150    if (c->udp && build_udp_headers(c) != 0) {
1151        out_string(c, "SERVER_ERROR out of memory");
1152    }
1153    else {
1154        conn_set_state(c, conn_mwrite);
1155        c->msgcurr = 0;
1156    }
1157    return;
1158}
1159
1160static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1161    char *key;
1162    size_t nkey;
1163    int flags;
1164    time_t exptime;
1165    int vlen, old_vlen;
1166    uint32_t req_memory_ptr, in_memory_ptr;
1167    item *it, *old_it;
1168
1169    assert(c != NULL);
1170
1171    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1172        out_string(c, "CLIENT_ERROR bad command line format");
1173        return;
1174    }
1175
1176    key = tokens[KEY_TOKEN].value;
1177    nkey = tokens[KEY_TOKEN].length;
1178
1179    flags = strtoul(tokens[2].value, NULL, 10);
1180    exptime = strtol(tokens[3].value, NULL, 10);
1181    vlen = strtol(tokens[4].value, NULL, 10);
1182
1183    // does cas value exist?
1184    if(tokens[5].value)
1185    {
1186      req_memory_ptr = strtoull(tokens[5].value, NULL, 10);
1187    }
1188
1189    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1190        out_string(c, "CLIENT_ERROR bad command line format");
1191        return;
1192    }
1193
1194    if (settings.detail_enabled) {
1195        stats_prefix_record_set(key);
1196    }
1197
1198    if (settings.managed) {
1199        int bucket = c->bucket;
1200        if (bucket == -1) {
1201            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1202            return;
1203        }
1204        c->bucket = -1;
1205        if (buckets[bucket] != c->gen) {
1206            out_string(c, "ERROR_NOT_OWNER");
1207            return;
1208        }
1209    }
1210
1211    /* Check if append -- if yes, search for previous entry, and allocate memory for both */
1212    if( comm == NREAD_APPEND ){
1213       old_it = assoc_find(key,nkey);
1214
1215       if( old_it && (old_it->nbytes)>2 ){ // previous must be more than \r\n
1216          old_vlen = old_it->nbytes - 2;
1217          vlen += old_vlen;                // append the length of old data
1218       } else {
1219          comm = NREAD_REPLACE;            // no old entry: treat as replace
1220       }
1221    }
1222
1223    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1224
1225    /* HANDLE_CAS VALIDATION */
1226    if (handle_cas == true)
1227    {
1228      item *itmp=item_get(key, it->nkey);
1229      /* Release the reference */
1230      if(itmp) {
1231        item_remove(itmp);
1232      }
1233      in_memory_ptr = (uint32_t)itmp;
1234      if(in_memory_ptr == req_memory_ptr)
1235      {
1236        // validates allow the set
1237      }
1238      else if(in_memory_ptr)
1239      {
1240        out_string(c, "EXISTS");
1241
1242        /* swallow the data line */
1243        c->write_and_go = conn_swallow;
1244        c->sbytes = vlen + 2;
1245        return;
1246      }
1247      else
1248      {
1249        out_string(c, "NOT FOUND");
1250        /* swallow the data line */
1251        c->write_and_go = conn_swallow;
1252        c->sbytes = vlen + 2;
1253        return;
1254      }
1255    }
1256
1257    if (it == 0) {
1258        if (! item_size_ok(nkey, flags, vlen + 2))
1259            out_string(c, "SERVER_ERROR object too large for cache");
1260        else
1261            out_string(c, "SERVER_ERROR out of memory");
1262        /* swallow the data line */
1263        c->write_and_go = conn_swallow;
1264        c->sbytes = vlen + 2;
1265        return;
1266    }
1267
1268    c->item = it;
1269    c->ritem = ITEM_data(it);
1270    c->rlbytes = it->nbytes;
1271
1272
1273    /* If append, prepend old data before new - adjust item, rlbytes variables too
1274     * Now that data has been merged, treat simply as a replace command
1275     */
1276    if (comm == NREAD_APPEND ){
1277       memcpy( c->ritem, ITEM_data(old_it), old_vlen );
1278       c->ritem += old_vlen;
1279       c->rlbytes -= old_vlen;
1280       comm = NREAD_REPLACE;
1281    }
1282
1283    c->item_comm = comm;
1284    conn_set_state(c, conn_nread);
1285}
1286
1287static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
1288    char temp[sizeof("18446744073709551615")];
1289    item *it;
1290    int64_t delta;
1291    char *key;
1292    size_t nkey;
1293
1294    assert(c != NULL);
1295
1296    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1297        out_string(c, "CLIENT_ERROR bad command line format");
1298        return;
1299    }
1300
1301    key = tokens[KEY_TOKEN].value;
1302    nkey = tokens[KEY_TOKEN].length;
1303
1304    if (settings.managed) {
1305        int bucket = c->bucket;
1306        if (bucket == -1) {
1307            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1308            return;
1309        }
1310        c->bucket = -1;
1311        if (buckets[bucket] != c->gen) {
1312            out_string(c, "ERROR_NOT_OWNER");
1313            return;
1314        }
1315    }
1316
1317    delta = strtoll(tokens[2].value, NULL, 10);
1318
1319    if(errno == ERANGE) {
1320        out_string(c, "CLIENT_ERROR bad command line format");
1321        return;
1322    }
1323
1324    it = item_get(key, nkey);
1325    if (!it) {
1326        out_string(c, "NOT_FOUND");
1327        return;
1328    }
1329
1330    out_string(c, add_delta(it, incr, delta, temp));
1331    item_remove(it);         /* release our reference */
1332}
1333
1334/*
1335 * adds a delta value to a numeric item.
1336 *
1337 * it    item to adjust
1338 * incr  true to increment value, false to decrement
1339 * delta amount to adjust value by
1340 * buf   buffer for response string
1341 *
1342 * returns a response string to send back to the client.
1343 */
1344char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
1345    char *ptr;
1346    int64_t value;
1347    int res;
1348
1349    ptr = ITEM_data(it);
1350    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1351
1352    value = strtoull(ptr, NULL, 10);
1353
1354    if(errno == ERANGE) {
1355        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1356    }
1357
1358    if (incr)
1359        value += delta;
1360    else {
1361        if (delta >= value) value = 0;
1362        else value -= delta;
1363    }
1364    sprintf(buf, "%llu", value);
1365    res = strlen(buf);
1366    if (res + 2 > it->nbytes) { /* need to realloc */
1367        item *new_it;
1368        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1369        if (new_it == 0) {
1370            return "SERVER_ERROR out of memory";
1371        }
1372        memcpy(ITEM_data(new_it), buf, res);
1373        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1374        do_item_replace(it, new_it);
1375        do_item_remove(new_it);       /* release our reference */
1376    } else { /* replace in-place */
1377        memcpy(ITEM_data(it), buf, res);
1378        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1379    }
1380
1381    return buf;
1382}
1383
1384static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1385    char *key;
1386    size_t nkey;
1387    item *it;
1388    time_t exptime = 0;
1389
1390    assert(c != NULL);
1391
1392    if (settings.managed) {
1393        int bucket = c->bucket;
1394        if (bucket == -1) {
1395            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1396            return;
1397        }
1398        c->bucket = -1;
1399        if (buckets[bucket] != c->gen) {
1400            out_string(c, "ERROR_NOT_OWNER");
1401            return;
1402        }
1403    }
1404
1405    key = tokens[KEY_TOKEN].value;
1406    nkey = tokens[KEY_TOKEN].length;
1407
1408    if(nkey > KEY_MAX_LENGTH) {
1409        out_string(c, "CLIENT_ERROR bad command line format");
1410        return;
1411    }
1412
1413    if(ntokens == 4) {
1414        exptime = strtol(tokens[2].value, NULL, 10);
1415
1416        if(errno == ERANGE) {
1417            out_string(c, "CLIENT_ERROR bad command line format");
1418            return;
1419        }
1420    }
1421
1422    if (settings.detail_enabled) {
1423        stats_prefix_record_delete(key);
1424    }
1425
1426    it = item_get(key, nkey);
1427    if (it) {
1428        if (exptime == 0) {
1429            item_unlink(it);
1430            item_remove(it);      /* release our reference */
1431            out_string(c, "DELETED");
1432        } else {
1433            /* our reference will be transfered to the delete queue */
1434            out_string(c, defer_delete(it, exptime));
1435        }
1436    } else {
1437        out_string(c, "NOT_FOUND");
1438    }
1439}
1440
1441/*
1442 * Adds an item to the deferred-delete list so it can be reaped later.
1443 *
1444 * Returns the result to send to the client.
1445 */
1446char *do_defer_delete(item *it, time_t exptime)
1447{
1448    if (delcurr >= deltotal) {
1449        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1450        if (new_delete) {
1451            todelete = new_delete;
1452            deltotal *= 2;
1453        } else {
1454            /*
1455             * can't delete it immediately, user wants a delay,
1456             * but we ran out of memory for the delete queue
1457             */
1458            item_remove(it);    /* release reference */
1459            return "SERVER_ERROR out of memory";
1460        }
1461    }
1462
1463    /* use its expiration time as its deletion time now */
1464    it->exptime = realtime(exptime);
1465    it->it_flags |= ITEM_DELETED;
1466    todelete[delcurr++] = it;
1467
1468    return "DELETED";
1469}
1470
1471static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1472    unsigned int level;
1473
1474    assert(c != NULL);
1475
1476    level = strtoul(tokens[1].value, NULL, 10);
1477    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1478    out_string(c, "OK");
1479    return;
1480}
1481
1482static void process_command(conn *c, char *command) {
1483
1484    token_t tokens[MAX_TOKENS];
1485    size_t ntokens;
1486    int comm;
1487
1488    assert(c != NULL);
1489
1490    if (settings.verbose > 1)
1491        fprintf(stderr, "<%d %s\n", c->sfd, command);
1492
1493    /*
1494     * for commands set/add/replace, we build an item and read the data
1495     * directly into it, then continue in nread_complete().
1496     */
1497
1498    c->msgcurr = 0;
1499    c->msgused = 0;
1500    c->iovused = 0;
1501    if (add_msghdr(c) != 0) {
1502        out_string(c, "SERVER_ERROR out of memory");
1503        return;
1504    }
1505
1506    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1507    if (ntokens >= 3 &&
1508        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1509         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1510
1511        process_get_command(c, tokens, ntokens, false);
1512
1513    } else if (ntokens == 6 &&
1514               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1515                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1516                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
1517                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
1518
1519        process_update_command(c, tokens, ntokens, comm, false);
1520
1521    } else if (ntokens == 6 && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0)) {
1522
1523        process_update_command(c, tokens, ntokens, comm, true);
1524
1525    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1526
1527        process_arithmetic_command(c, tokens, ntokens, 1);
1528
1529    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
1530
1531        process_get_command(c, tokens, ntokens, true);
1532
1533    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1534
1535        process_arithmetic_command(c, tokens, ntokens, 0);
1536
1537    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1538
1539        process_delete_command(c, tokens, ntokens);
1540
1541    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1542        unsigned int bucket, gen;
1543        if (!settings.managed) {
1544            out_string(c, "CLIENT_ERROR not a managed instance");
1545            return;
1546        }
1547
1548        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1549            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1550                out_string(c, "CLIENT_ERROR bucket number out of range");
1551                return;
1552            }
1553            buckets[bucket] = gen;
1554            out_string(c, "OWNED");
1555            return;
1556        } else {
1557            out_string(c, "CLIENT_ERROR bad format");
1558            return;
1559        }
1560
1561    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1562
1563        int bucket;
1564        if (!settings.managed) {
1565            out_string(c, "CLIENT_ERROR not a managed instance");
1566            return;
1567        }
1568        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1569            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1570                out_string(c, "CLIENT_ERROR bucket number out of range");
1571                return;
1572            }
1573            buckets[bucket] = 0;
1574            out_string(c, "DISOWNED");
1575            return;
1576        } else {
1577            out_string(c, "CLIENT_ERROR bad format");
1578            return;
1579        }
1580
1581    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1582        int bucket, gen;
1583        if (!settings.managed) {
1584            out_string(c, "CLIENT_ERROR not a managed instance");
1585            return;
1586        }
1587        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1588            /* we never write anything back, even if input's wrong */
1589            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1590                /* do nothing, bad input */
1591            } else {
1592                c->bucket = bucket;
1593                c->gen = gen;
1594            }
1595            conn_set_state(c, conn_read);
1596            return;
1597        } else {
1598            out_string(c, "CLIENT_ERROR bad format");
1599            return;
1600        }
1601
1602    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1603
1604        process_stat(c, tokens, ntokens);
1605
1606    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1607        time_t exptime = 0;
1608        set_current_time();
1609
1610        if(ntokens == 2) {
1611            settings.oldest_live = current_time - 1;
1612            item_flush_expired();
1613            out_string(c, "OK");
1614            return;
1615        }
1616
1617        exptime = strtol(tokens[1].value, NULL, 10);
1618        if(errno == ERANGE) {
1619            out_string(c, "CLIENT_ERROR bad command line format");
1620            return;
1621        }
1622
1623        settings.oldest_live = realtime(exptime) - 1;
1624        item_flush_expired();
1625        out_string(c, "OK");
1626        return;
1627
1628    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1629
1630        out_string(c, "VERSION " VERSION);
1631
1632    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1633
1634        conn_set_state(c, conn_closing);
1635
1636    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1637                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1638#ifdef ALLOW_SLABS_REASSIGN
1639
1640        int src, dst, rv;
1641
1642        src = strtol(tokens[2].value, NULL, 10);
1643        dst  = strtol(tokens[3].value, NULL, 10);
1644
1645        if(errno == ERANGE) {
1646            out_string(c, "CLIENT_ERROR bad command line format");
1647            return;
1648        }
1649
1650        rv = slabs_reassign(src, dst);
1651        if (rv == 1) {
1652            out_string(c, "DONE");
1653            return;
1654        }
1655        if (rv == 0) {
1656            out_string(c, "CANT");
1657            return;
1658        }
1659        if (rv == -1) {
1660            out_string(c, "BUSY");
1661            return;
1662        }
1663#else
1664        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1665#endif
1666    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1667        process_verbosity_command(c, tokens, ntokens);
1668    } else {
1669        out_string(c, "ERROR");
1670    }
1671    return;
1672}
1673
1674/*
1675 * if we have a complete line in the buffer, process it.
1676 */
1677static int try_read_command(conn *c) {
1678    char *el, *cont;
1679
1680    assert(c != NULL);
1681    assert(c->rcurr <= (c->rbuf + c->rsize));
1682
1683    if (c->rbytes == 0)
1684        return 0;
1685    el = memchr(c->rcurr, '\n', c->rbytes);
1686    if (!el)
1687        return 0;
1688    cont = el + 1;
1689    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1690        el--;
1691    }
1692    *el = '\0';
1693
1694    assert(cont <= (c->rcurr + c->rbytes));
1695
1696    process_command(c, c->rcurr);
1697
1698    c->rbytes -= (cont - c->rcurr);
1699    c->rcurr = cont;
1700
1701    assert(c->rcurr <= (c->rbuf + c->rsize));
1702
1703    return 1;
1704}
1705
1706/*
1707 * read a UDP request.
1708 * return 0 if there's nothing to read.
1709 */
1710static int try_read_udp(conn *c) {
1711    int res;
1712
1713    assert(c != NULL);
1714
1715    c->request_addr_size = sizeof(c->request_addr);
1716    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1717                   0, &c->request_addr, &c->request_addr_size);
1718    if (res > 8) {
1719        unsigned char *buf = (unsigned char *)c->rbuf;
1720        STATS_LOCK();
1721        stats.bytes_read += res;
1722        STATS_UNLOCK();
1723
1724        /* Beginning of UDP packet is the request ID; save it. */
1725        c->request_id = buf[0] * 256 + buf[1];
1726
1727        /* If this is a multi-packet request, drop it. */
1728        if (buf[4] != 0 || buf[5] != 1) {
1729            out_string(c, "SERVER_ERROR multi-packet request not supported");
1730            return 0;
1731        }
1732
1733        /* Don't care about any of the rest of the header. */
1734        res -= 8;
1735        memmove(c->rbuf, c->rbuf + 8, res);
1736
1737        c->rbytes += res;
1738        c->rcurr = c->rbuf;
1739        return 1;
1740    }
1741    return 0;
1742}
1743
1744/*
1745 * read from network as much as we can, handle buffer overflow and connection
1746 * close.
1747 * before reading, move the remaining incomplete fragment of a command
1748 * (if any) to the beginning of the buffer.
1749 * return 0 if there's nothing to read on the first read.
1750 */
1751static int try_read_network(conn *c) {
1752    int gotdata = 0;
1753    int res;
1754
1755    assert(c != NULL);
1756
1757    if (c->rcurr != c->rbuf) {
1758        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1759            memmove(c->rbuf, c->rcurr, c->rbytes);
1760        c->rcurr = c->rbuf;
1761    }
1762
1763    while (1) {
1764        if (c->rbytes >= c->rsize) {
1765            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1766            if (!new_rbuf) {
1767                if (settings.verbose > 0)
1768                    fprintf(stderr, "Couldn't realloc input buffer\n");
1769                c->rbytes = 0; /* ignore what we read */
1770                out_string(c, "SERVER_ERROR out of memory");
1771                c->write_and_go = conn_closing;
1772                return 1;
1773            }
1774            c->rcurr = c->rbuf = new_rbuf;
1775            c->rsize *= 2;
1776        }
1777
1778        /* unix socket mode doesn't need this, so zeroed out.  but why
1779         * is this done for every command?  presumably for UDP
1780         * mode.  */
1781        if (!settings.socketpath) {
1782            c->request_addr_size = sizeof(c->request_addr);
1783        } else {
1784            c->request_addr_size = 0;
1785        }
1786
1787        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
1788        if (res > 0) {
1789            STATS_LOCK();
1790            stats.bytes_read += res;
1791            STATS_UNLOCK();
1792            gotdata = 1;
1793            c->rbytes += res;
1794            continue;
1795        }
1796        if (res == 0) {
1797            /* connection closed */
1798            conn_set_state(c, conn_closing);
1799            return 1;
1800        }
1801        if (res == -1) {
1802            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1803            else return 0;
1804        }
1805    }
1806    return gotdata;
1807}
1808
1809static bool update_event(conn *c, const int new_flags) {
1810    assert(c != NULL);
1811
1812    struct event_base *base = c->event.ev_base;
1813    if (c->ev_flags == new_flags)
1814        return true;
1815    if (event_del(&c->event) == -1) return false;
1816    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1817    event_base_set(base, &c->event);
1818    c->ev_flags = new_flags;
1819    if (event_add(&c->event, 0) == -1) return false;
1820    return true;
1821}
1822
1823/*
1824 * Sets whether we are listening for new connections or not.
1825 */
1826void accept_new_conns(const bool do_accept) {
1827    if (! is_listen_thread())
1828        return;
1829    if (do_accept) {
1830        update_event(listen_conn, EV_READ | EV_PERSIST);
1831        if (listen(listen_conn->sfd, 1024) != 0) {
1832            perror("listen");
1833        }
1834    }
1835    else {
1836        update_event(listen_conn, 0);
1837        if (listen(listen_conn->sfd, 0) != 0) {
1838            perror("listen");
1839        }
1840    }
1841}
1842
1843
1844/*
1845 * Transmit the next chunk of data from our list of msgbuf structures.
1846 *
1847 * Returns:
1848 *   TRANSMIT_COMPLETE   All done writing.
1849 *   TRANSMIT_INCOMPLETE More data remaining to write.
1850 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
1851 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1852 */
1853static int transmit(conn *c) {
1854    assert(c != NULL);
1855
1856    if (c->msgcurr < c->msgused &&
1857            c->msglist[c->msgcurr].msg_iovlen == 0) {
1858        /* Finished writing the current msg; advance to the next. */
1859        c->msgcurr++;
1860    }
1861    if (c->msgcurr < c->msgused) {
1862        ssize_t res;
1863        struct msghdr *m = &c->msglist[c->msgcurr];
1864
1865        res = sendmsg(c->sfd, m, 0);
1866        if (res > 0) {
1867            STATS_LOCK();
1868            stats.bytes_written += res;
1869            STATS_UNLOCK();
1870
1871            /* We've written some of the data. Remove the completed
1872               iovec entries from the list of pending writes. */
1873            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
1874                res -= m->msg_iov->iov_len;
1875                m->msg_iovlen--;
1876                m->msg_iov++;
1877            }
1878
1879            /* Might have written just part of the last iovec entry;
1880               adjust it so the next write will do the rest. */
1881            if (res > 0) {
1882                m->msg_iov->iov_base += res;
1883                m->msg_iov->iov_len -= res;
1884            }
1885            return TRANSMIT_INCOMPLETE;
1886        }
1887        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1888            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1889                if (settings.verbose > 0)
1890                    fprintf(stderr, "Couldn't update event\n");
1891                conn_set_state(c, conn_closing);
1892                return TRANSMIT_HARD_ERROR;
1893            }
1894            return TRANSMIT_SOFT_ERROR;
1895        }
1896        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1897           we have a real error, on which we close the connection */
1898        if (settings.verbose > 0)
1899            perror("Failed to write, and not due to blocking");
1900
1901        if (c->udp)
1902            conn_set_state(c, conn_read);
1903        else
1904            conn_set_state(c, conn_closing);
1905        return TRANSMIT_HARD_ERROR;
1906    } else {
1907        return TRANSMIT_COMPLETE;
1908    }
1909}
1910
1911static void drive_machine(conn *c) {
1912    bool stop = false;
1913    int sfd, flags = 1;
1914    socklen_t addrlen;
1915    struct sockaddr addr;
1916    int res;
1917
1918    assert(c != NULL);
1919
1920    while (!stop) {
1921
1922        switch(c->state) {
1923        case conn_listening:
1924            addrlen = sizeof(addr);
1925            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
1926                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1927                    /* these are transient, so don't log anything */
1928                    stop = true;
1929                } else if (errno == EMFILE) {
1930                    if (settings.verbose > 0)
1931                        fprintf(stderr, "Too many open connections\n");
1932                    accept_new_conns(false);
1933                    stop = true;
1934                } else {
1935                    perror("accept()");
1936                    stop = true;
1937                }
1938                break;
1939            }
1940            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
1941                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
1942                perror("setting O_NONBLOCK");
1943                close(sfd);
1944                break;
1945            }
1946            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
1947                                     DATA_BUFFER_SIZE, false);
1948            break;
1949
1950        case conn_read:
1951            if (try_read_command(c) != 0) {
1952                continue;
1953            }
1954            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
1955                continue;
1956            }
1957            /* we have no command line and no data to read from network */
1958            if (!update_event(c, EV_READ | EV_PERSIST)) {
1959                if (settings.verbose > 0)
1960                    fprintf(stderr, "Couldn't update event\n");
1961                conn_set_state(c, conn_closing);
1962                break;
1963            }
1964            stop = true;
1965            break;
1966
1967        case conn_nread:
1968            /* we are reading rlbytes into ritem; */
1969            if (c->rlbytes == 0) {
1970                complete_nread(c);
1971                break;
1972            }
1973            /* first check if we have leftovers in the conn_read buffer */
1974            if (c->rbytes > 0) {
1975                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
1976                memcpy(c->ritem, c->rcurr, tocopy);
1977                c->ritem += tocopy;
1978                c->rlbytes -= tocopy;
1979                c->rcurr += tocopy;
1980                c->rbytes -= tocopy;
1981                break;
1982            }
1983
1984            /*  now try reading from the socket */
1985            res = read(c->sfd, c->ritem, c->rlbytes);
1986            if (res > 0) {
1987                STATS_LOCK();
1988                stats.bytes_read += res;
1989                STATS_UNLOCK();
1990                c->ritem += res;
1991                c->rlbytes -= res;
1992                break;
1993            }
1994            if (res == 0) { /* end of stream */
1995                conn_set_state(c, conn_closing);
1996                break;
1997            }
1998            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1999                if (!update_event(c, EV_READ | EV_PERSIST)) {
2000                    if (settings.verbose > 0)
2001                        fprintf(stderr, "Couldn't update event\n");
2002                    conn_set_state(c, conn_closing);
2003                    break;
2004                }
2005                stop = true;
2006                break;
2007            }
2008            /* otherwise we have a real error, on which we close the connection */
2009            if (settings.verbose > 0)
2010                fprintf(stderr, "Failed to read, and not due to blocking\n");
2011            conn_set_state(c, conn_closing);
2012            break;
2013
2014        case conn_swallow:
2015            /* we are reading sbytes and throwing them away */
2016            if (c->sbytes == 0) {
2017                conn_set_state(c, conn_read);
2018                break;
2019            }
2020
2021            /* first check if we have leftovers in the conn_read buffer */
2022            if (c->rbytes > 0) {
2023                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2024                c->sbytes -= tocopy;
2025                c->rcurr += tocopy;
2026                c->rbytes -= tocopy;
2027                break;
2028            }
2029
2030            /*  now try reading from the socket */
2031            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2032            if (res > 0) {
2033                STATS_LOCK();
2034                stats.bytes_read += res;
2035                STATS_UNLOCK();
2036                c->sbytes -= res;
2037                break;
2038            }
2039            if (res == 0) { /* end of stream */
2040                conn_set_state(c, conn_closing);
2041                break;
2042            }
2043            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2044                if (!update_event(c, EV_READ | EV_PERSIST)) {
2045                    if (settings.verbose > 0)
2046                        fprintf(stderr, "Couldn't update event\n");
2047                    conn_set_state(c, conn_closing);
2048                    break;
2049                }
2050                stop = true;
2051                break;
2052            }
2053            /* otherwise we have a real error, on which we close the connection */
2054            if (settings.verbose > 0)
2055                fprintf(stderr, "Failed to read, and not due to blocking\n");
2056            conn_set_state(c, conn_closing);
2057            break;
2058
2059        case conn_write:
2060            /*
2061             * We want to write out a simple response. If we haven't already,
2062             * assemble it into a msgbuf list (this will be a single-entry
2063             * list for TCP or a two-entry list for UDP).
2064             */
2065            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
2066                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2067                    (c->udp && build_udp_headers(c) != 0)) {
2068                    if (settings.verbose > 0)
2069                        fprintf(stderr, "Couldn't build response\n");
2070                    conn_set_state(c, conn_closing);
2071                    break;
2072                }
2073            }
2074
2075            /* fall through... */
2076
2077        case conn_mwrite:
2078            switch (transmit(c)) {
2079            case TRANSMIT_COMPLETE:
2080                if (c->state == conn_mwrite) {
2081                    while (c->ileft > 0) {
2082                        item *it = *(c->icurr);
2083                        assert((it->it_flags & ITEM_SLABBED) == 0);
2084                        item_remove(it);
2085                        c->icurr++;
2086                        c->ileft--;
2087                    }
2088                    conn_set_state(c, conn_read);
2089                } else if (c->state == conn_write) {
2090                    if (c->write_and_free) {
2091                        free(c->write_and_free);
2092                        c->write_and_free = 0;
2093                    }
2094                    conn_set_state(c, c->write_and_go);
2095                } else {
2096                    if (settings.verbose > 0)
2097                        fprintf(stderr, "Unexpected state %d\n", c->state);
2098                    conn_set_state(c, conn_closing);
2099                }
2100                break;
2101
2102            case TRANSMIT_INCOMPLETE:
2103            case TRANSMIT_HARD_ERROR:
2104                break;                   /* Continue in state machine. */
2105
2106            case TRANSMIT_SOFT_ERROR:
2107                stop = true;
2108                break;
2109            }
2110            break;
2111
2112        case conn_closing:
2113            if (c->udp)
2114                conn_cleanup(c);
2115            else
2116                conn_close(c);
2117            stop = true;
2118            break;
2119        }
2120    }
2121
2122    return;
2123}
2124
2125void event_handler(const int fd, const short which, void *arg) {
2126    conn *c;
2127
2128    c = (conn *)arg;
2129    assert(c != NULL);
2130
2131    c->which = which;
2132
2133    /* sanity */
2134    if (fd != c->sfd) {
2135        if (settings.verbose > 0)
2136            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2137        conn_close(c);
2138        return;
2139    }
2140
2141    drive_machine(c);
2142
2143    /* wait for next event */
2144    return;
2145}
2146
2147static int new_socket(const bool is_udp) {
2148    int sfd;
2149    int flags;
2150
2151    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
2152        perror("socket()");
2153        return -1;
2154    }
2155
2156    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2157        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2158        perror("setting O_NONBLOCK");
2159        close(sfd);
2160        return -1;
2161    }
2162    return sfd;
2163}
2164
2165
2166/*
2167 * Sets a socket's send buffer size to the maximum allowed by the system.
2168 */
2169static void maximize_sndbuf(const int sfd) {
2170    socklen_t intsize = sizeof(int);
2171    int last_good = 0;
2172    int min, max, avg;
2173    int old_size;
2174
2175    /* Start with the default size. */
2176    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2177        if (settings.verbose > 0)
2178            perror("getsockopt(SO_SNDBUF)");
2179        return;
2180    }
2181
2182    /* Binary-search for the real maximum. */
2183    min = old_size;
2184    max = MAX_SENDBUF_SIZE;
2185
2186    while (min <= max) {
2187        avg = ((unsigned int)(min + max)) / 2;
2188        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2189            last_good = avg;
2190            min = avg + 1;
2191        } else {
2192            max = avg - 1;
2193        }
2194    }
2195
2196    if (settings.verbose > 1)
2197        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2198}
2199
2200
2201static int server_socket(const int port, const bool is_udp) {
2202    int sfd;
2203    struct linger ling = {0, 0};
2204    struct sockaddr_in addr;
2205    int flags =1;
2206
2207    if ((sfd = new_socket(is_udp)) == -1) {
2208        return -1;
2209    }
2210
2211    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2212    if (is_udp) {
2213        maximize_sndbuf(sfd);
2214    } else {
2215        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2216        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2217        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2218    }
2219
2220    /*
2221     * the memset call clears nonstandard fields in some impementations
2222     * that otherwise mess things up.
2223     */
2224    memset(&addr, 0, sizeof(addr));
2225
2226    addr.sin_family = AF_INET;
2227    addr.sin_port = htons(port);
2228    addr.sin_addr = settings.interf;
2229    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2230        perror("bind()");
2231        close(sfd);
2232        return -1;
2233    }
2234    if (!is_udp && listen(sfd, 1024) == -1) {
2235        perror("listen()");
2236        close(sfd);
2237        return -1;
2238    }
2239    return sfd;
2240}
2241
2242static int new_socket_unix(void) {
2243    int sfd;
2244    int flags;
2245
2246    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2247        perror("socket()");
2248        return -1;
2249    }
2250
2251    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2252        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2253        perror("setting O_NONBLOCK");
2254        close(sfd);
2255        return -1;
2256    }
2257    return sfd;
2258}
2259
2260static int server_socket_unix(const char *path) {
2261    int sfd;
2262    struct linger ling = {0, 0};
2263    struct sockaddr_un addr;
2264    struct stat tstat;
2265    int flags =1;
2266
2267    if (!path) {
2268        return -1;
2269    }
2270
2271    if ((sfd = new_socket_unix()) == -1) {
2272        return -1;
2273    }
2274
2275    /*
2276     * Clean up a previous socket file if we left it around
2277     */
2278    if (lstat(path, &tstat) == 0) {
2279        if (S_ISSOCK(tstat.st_mode))
2280            unlink(path);
2281    }
2282
2283    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2284    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2285    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2286
2287    /*
2288     * the memset call clears nonstandard fields in some impementations
2289     * that otherwise mess things up.
2290     */
2291    memset(&addr, 0, sizeof(addr));
2292
2293    addr.sun_family = AF_UNIX;
2294    strcpy(addr.sun_path, path);
2295    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2296        perror("bind()");
2297        close(sfd);
2298        return -1;
2299    }
2300    if (listen(sfd, 1024) == -1) {
2301        perror("listen()");
2302        close(sfd);
2303        return -1;
2304    }
2305    return sfd;
2306}
2307
2308/* listening socket */
2309static int l_socket = 0;
2310
2311/* udp socket */
2312static int u_socket = -1;
2313
2314/* invoke right before gdb is called, on assert */
2315void pre_gdb(void) {
2316    int i;
2317    if (l_socket > -1) close(l_socket);
2318    if (u_socket > -1) close(u_socket);
2319    for (i = 3; i <= 500; i++) close(i); /* so lame */
2320    kill(getpid(), SIGABRT);
2321}
2322
2323/*
2324 * We keep the current time of day in a global variable that's updated by a
2325 * timer event. This saves us a bunch of time() system calls (we really only
2326 * need to get the time once a second, whereas there can be tens of thousands
2327 * of requests a second) and allows us to use server-start-relative timestamps
2328 * rather than absolute UNIX timestamps, a space savings on systems where
2329 * sizeof(time_t) > sizeof(unsigned int).
2330 */
2331volatile rel_time_t current_time;
2332static struct event clockevent;
2333
2334/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2335static void set_current_time(void) {
2336    current_time = (rel_time_t) (time(0) - stats.started);
2337}
2338
2339static void clock_handler(const int fd, const short which, void *arg) {
2340    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2341    static bool initialized = false;
2342
2343    if (initialized) {
2344        /* only delete the event if it's actually there. */
2345        evtimer_del(&clockevent);
2346    } else {
2347        initialized = true;
2348    }
2349
2350    evtimer_set(&clockevent, clock_handler, 0);
2351    event_base_set(main_base, &clockevent);
2352    evtimer_add(&clockevent, &t);
2353
2354    set_current_time();
2355}
2356
2357static struct event deleteevent;
2358
2359static void delete_handler(const int fd, const short which, void *arg) {
2360    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2361    static bool initialized = false;
2362
2363    if (initialized) {
2364        /* some versions of libevent don't like deleting events that don't exist,
2365           so only delete once we know this event has been added. */
2366        evtimer_del(&deleteevent);
2367    } else {
2368        initialized = true;
2369    }
2370
2371    evtimer_set(&deleteevent, delete_handler, 0);
2372    event_base_set(main_base, &deleteevent);
2373    evtimer_add(&deleteevent, &t);
2374    run_deferred_deletes();
2375}
2376
2377/* Call run_deferred_deletes instead of this. */
2378void do_run_deferred_deletes(void)
2379{
2380    int i, j = 0;
2381
2382    for (i = 0; i < delcurr; i++) {
2383        item *it = todelete[i];
2384        if (item_delete_lock_over(it)) {
2385            assert(it->refcount > 0);
2386            it->it_flags &= ~ITEM_DELETED;
2387            do_item_unlink(it);
2388            do_item_remove(it);
2389        } else {
2390            todelete[j++] = it;
2391        }
2392    }
2393    delcurr = j;
2394}
2395
2396static void usage(void) {
2397    printf(PACKAGE " " VERSION "\n");
2398    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2399           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2400           "-s <file>     unix socket path to listen on (disables network support)\n"
2401           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2402           "-d            run as a daemon\n"
2403           "-r            maximize core file limit\n"
2404           "-u <username> assume identity of <username> (only when run as root)\n"
2405           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2406           "-M            return error on memory exhausted (rather than removing items)\n"
2407           "-c <num>      max simultaneous connections, default is 1024\n"
2408           "-k            lock down all paged memory\n"
2409           "-v            verbose (print errors/warnings while in event loop)\n"
2410           "-vv           very verbose (also print client commands/reponses)\n"
2411           "-h            print this help and exit\n"
2412           "-i            print memcached and libevent license\n"
2413           "-b            run a managed instanced (mnemonic: buckets)\n"
2414           "-P <file>     save PID in <file>, only used with -d option\n"
2415           "-f <factor>   chunk size growth factor, default 1.25\n"
2416           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
2417#ifdef USE_THREADS
2418    printf("-t <num>      number of threads to use, default 4\n");
2419#endif
2420    return;
2421}
2422
2423static void usage_license(void) {
2424    printf(PACKAGE " " VERSION "\n\n");
2425    printf(
2426    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2427    "All rights reserved.\n"
2428    "\n"
2429    "Redistribution and use in source and binary forms, with or without\n"
2430    "modification, are permitted provided that the following conditions are\n"
2431    "met:\n"
2432    "\n"
2433    "    * Redistributions of source code must retain the above copyright\n"
2434    "notice, this list of conditions and the following disclaimer.\n"
2435    "\n"
2436    "    * Redistributions in binary form must reproduce the above\n"
2437    "copyright notice, this list of conditions and the following disclaimer\n"
2438    "in the documentation and/or other materials provided with the\n"
2439    "distribution.\n"
2440    "\n"
2441    "    * Neither the name of the Danga Interactive nor the names of its\n"
2442    "contributors may be used to endorse or promote products derived from\n"
2443    "this software without specific prior written permission.\n"
2444    "\n"
2445    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2446    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2447    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2448    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2449    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2450    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2451    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2452    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2453    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2454    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2455    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2456    "\n"
2457    "\n"
2458    "This product includes software developed by Niels Provos.\n"
2459    "\n"
2460    "[ libevent ]\n"
2461    "\n"
2462    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2463    "All rights reserved.\n"
2464    "\n"
2465    "Redistribution and use in source and binary forms, with or without\n"
2466    "modification, are permitted provided that the following conditions\n"
2467    "are met:\n"
2468    "1. Redistributions of source code must retain the above copyright\n"
2469    "   notice, this list of conditions and the following disclaimer.\n"
2470    "2. Redistributions in binary form must reproduce the above copyright\n"
2471    "   notice, this list of conditions and the following disclaimer in the\n"
2472    "   documentation and/or other materials provided with the distribution.\n"
2473    "3. All advertising materials mentioning features or use of this software\n"
2474    "   must display the following acknowledgement:\n"
2475    "      This product includes software developed by Niels Provos.\n"
2476    "4. The name of the author may not be used to endorse or promote products\n"
2477    "   derived from this software without specific prior written permission.\n"
2478    "\n"
2479    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2480    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2481    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2482    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2483    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2484    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2485    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2486    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2487    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2488    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2489    );
2490
2491    return;
2492}
2493
2494static void save_pid(const pid_t pid, const char *pid_file) {
2495    FILE *fp;
2496    if (pid_file == NULL)
2497        return;
2498
2499    if ((fp = fopen(pid_file, "w")) == NULL) {
2500        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2501        return;
2502    }
2503
2504    fprintf(fp,"%ld\n", (long)pid);
2505    if (fclose(fp) == -1) {
2506        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2507        return;
2508    }
2509}
2510
2511static void remove_pidfile(const char *pid_file) {
2512  if (pid_file == NULL)
2513      return;
2514
2515  if (unlink(pid_file) != 0) {
2516      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2517  }
2518
2519}
2520
2521
2522static void sig_handler(const int sig) {
2523    printf("SIGINT handled.\n");
2524    exit(EXIT_SUCCESS);
2525}
2526
2527int main (int argc, char **argv) {
2528    int c;
2529    struct in_addr addr;
2530    bool lock_memory = false;
2531    bool daemonize = false;
2532    int maxcore = 0;
2533    char *username = NULL;
2534    char *pid_file = NULL;
2535    struct passwd *pw;
2536    struct sigaction sa;
2537    struct rlimit rlim;
2538
2539    /* handle SIGINT */
2540    signal(SIGINT, sig_handler);
2541
2542    /* init settings */
2543    settings_init();
2544
2545    /* set stderr non-buffering (for running under, say, daemontools) */
2546    setbuf(stderr, NULL);
2547
2548    /* process arguments */
2549    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
2550        switch (c) {
2551        case 'U':
2552            settings.udpport = atoi(optarg);
2553            break;
2554        case 'b':
2555            settings.managed = true;
2556            break;
2557        case 'p':
2558            settings.port = atoi(optarg);
2559            break;
2560        case 's':
2561            settings.socketpath = optarg;
2562            break;
2563        case 'm':
2564            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2565            break;
2566        case 'M':
2567            settings.evict_to_free = 0;
2568            break;
2569        case 'c':
2570            settings.maxconns = atoi(optarg);
2571            break;
2572        case 'h':
2573            usage();
2574            exit(EXIT_SUCCESS);
2575        case 'i':
2576            usage_license();
2577            exit(EXIT_SUCCESS);
2578        case 'k':
2579            lock_memory = true;
2580            break;
2581        case 'v':
2582            settings.verbose++;
2583            break;
2584        case 'l':
2585            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
2586                fprintf(stderr, "Illegal address: %s\n", optarg);
2587                return 1;
2588            } else {
2589                settings.interf = addr;
2590            }
2591            break;
2592        case 'd':
2593            daemonize = true;
2594            break;
2595        case 'r':
2596            maxcore = 1;
2597            break;
2598        case 'u':
2599            username = optarg;
2600            break;
2601        case 'P':
2602            pid_file = optarg;
2603            break;
2604        case 'f':
2605            settings.factor = atof(optarg);
2606            if (settings.factor <= 1.0) {
2607                fprintf(stderr, "Factor must be greater than 1\n");
2608                return 1;
2609            }
2610            break;
2611        case 'n':
2612            settings.chunk_size = atoi(optarg);
2613            if (settings.chunk_size == 0) {
2614                fprintf(stderr, "Chunk size must be greater than 0\n");
2615                return 1;
2616            }
2617            break;
2618        case 't':
2619            settings.num_threads = atoi(optarg);
2620            if (settings.num_threads == 0) {
2621                fprintf(stderr, "Number of threads must be greater than 0\n");
2622                return 1;
2623            }
2624            break;
2625        case 'D':
2626            if (! optarg || ! optarg[0]) {
2627                fprintf(stderr, "No delimiter specified\n");
2628                return 1;
2629            }
2630            settings.prefix_delimiter = optarg[0];
2631            settings.detail_enabled = 1;
2632            break;
2633        default:
2634            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2635            return 1;
2636        }
2637    }
2638
2639    if (maxcore != 0) {
2640        struct rlimit rlim_new;
2641        /*
2642         * First try raising to infinity; if that fails, try bringing
2643         * the soft limit to the hard.
2644         */
2645        if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
2646            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
2647            if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
2648                /* failed. try raising just to the old max */
2649                rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
2650                (void)setrlimit(RLIMIT_CORE, &rlim_new);
2651            }
2652        }
2653        /*
2654         * getrlimit again to see what we ended up with. Only fail if
2655         * the soft limit ends up 0, because then no core files will be
2656         * created at all.
2657         */
2658
2659        if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
2660            fprintf(stderr, "failed to ensure corefile creation\n");
2661            exit(EXIT_FAILURE);
2662        }
2663    }
2664
2665    /*
2666     * If needed, increase rlimits to allow as many connections
2667     * as needed.
2668     */
2669
2670    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2671        fprintf(stderr, "failed to getrlimit number of files\n");
2672        exit(EXIT_FAILURE);
2673    } else {
2674        int maxfiles = settings.maxconns;
2675        if (rlim.rlim_cur < maxfiles)
2676            rlim.rlim_cur = maxfiles + 3;
2677        if (rlim.rlim_max < rlim.rlim_cur)
2678            rlim.rlim_max = rlim.rlim_cur;
2679        if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2680            fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
2681            exit(EXIT_FAILURE);
2682        }
2683    }
2684
2685    /*
2686     * initialization order: first create the listening sockets
2687     * (may need root on low ports), then drop root if needed,
2688     * then daemonise if needed, then init libevent (in some cases
2689     * descriptors created by libevent wouldn't survive forking).
2690     */
2691
2692    /* create the listening socket and bind it */
2693    if (settings.socketpath == NULL) {
2694        l_socket = server_socket(settings.port, 0);
2695        if (l_socket == -1) {
2696            fprintf(stderr, "failed to listen\n");
2697            exit(EXIT_FAILURE);
2698        }
2699    }
2700
2701    if (settings.udpport > 0 && settings.socketpath == NULL) {
2702        /* create the UDP listening socket and bind it */
2703        u_socket = server_socket(settings.udpport, 1);
2704        if (u_socket == -1) {
2705            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
2706            exit(EXIT_FAILURE);
2707        }
2708    }
2709
2710    /* lose root privileges if we have them */
2711    if (getuid() == 0 || geteuid() == 0) {
2712        if (username == 0 || *username == '\0') {
2713            fprintf(stderr, "can't run as root without the -u switch\n");
2714            return 1;
2715        }
2716        if ((pw = getpwnam(username)) == 0) {
2717            fprintf(stderr, "can't find the user %s to switch to\n", username);
2718            return 1;
2719        }
2720        if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
2721            fprintf(stderr, "failed to assume identity of user %s\n", username);
2722            return 1;
2723        }
2724    }
2725
2726    /* create unix mode sockets after dropping privileges */
2727    if (settings.socketpath != NULL) {
2728        l_socket = server_socket_unix(settings.socketpath);
2729        if (l_socket == -1) {
2730            fprintf(stderr, "failed to listen\n");
2731            exit(EXIT_FAILURE);
2732        }
2733    }
2734
2735    /* daemonize if requested */
2736    /* if we want to ensure our ability to dump core, don't chdir to / */
2737    if (daemonize) {
2738        int res;
2739        res = daemon(maxcore, settings.verbose);
2740        if (res == -1) {
2741            fprintf(stderr, "failed to daemon() in order to daemonize\n");
2742            return 1;
2743        }
2744    }
2745
2746
2747    /* initialize main thread libevent instance */
2748    main_base = event_init();
2749
2750    /* initialize other stuff */
2751    item_init();
2752    stats_init();
2753    assoc_init();
2754    conn_init();
2755    slabs_init(settings.maxbytes, settings.factor);
2756
2757    /* managed instance? alloc and zero a bucket array */
2758    if (settings.managed) {
2759        buckets = malloc(sizeof(int) * MAX_BUCKETS);
2760        if (buckets == 0) {
2761            fprintf(stderr, "failed to allocate the bucket array");
2762            exit(EXIT_FAILURE);
2763        }
2764        memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
2765    }
2766
2767    /* lock paged memory if needed */
2768    if (lock_memory) {
2769#ifdef HAVE_MLOCKALL
2770        mlockall(MCL_CURRENT | MCL_FUTURE);
2771#else
2772        fprintf(stderr, "warning: mlockall() not supported on this platform.  proceeding without.\n");
2773#endif
2774    }
2775
2776    /*
2777     * ignore SIGPIPE signals; we can use errno==EPIPE if we
2778     * need that information
2779     */
2780    sa.sa_handler = SIG_IGN;
2781    sa.sa_flags = 0;
2782    if (sigemptyset(&sa.sa_mask) == -1 ||
2783        sigaction(SIGPIPE, &sa, 0) == -1) {
2784        perror("failed to ignore SIGPIPE; sigaction");
2785        exit(EXIT_FAILURE);
2786    }
2787    /* create the initial listening connection */
2788    if (!(listen_conn = conn_new(l_socket, conn_listening,
2789                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
2790        fprintf(stderr, "failed to create listening connection");
2791        exit(EXIT_FAILURE);
2792    }
2793    /* start up worker threads if MT mode */
2794    thread_init(settings.num_threads, main_base);
2795    /* save the PID in if we're a daemon, do this after thread_init due to
2796       a file descriptor handling bug somewhere in libevent */
2797    if (daemonize)
2798        save_pid(getpid(), pid_file);
2799    /* initialise clock event */
2800    clock_handler(0, 0, 0);
2801    /* initialise deletion array and timer event */
2802    deltotal = 200;
2803    delcurr = 0;
2804    if ((todelete = malloc(sizeof(item *) * deltotal)) == NULL) {
2805        perror("failed to allocate memory for deletion array");
2806        exit(EXIT_FAILURE);
2807    }
2808    delete_handler(0, 0, 0); /* sets up the event */
2809    /* create the initial listening udp connection, monitored on all threads */
2810    if (u_socket > -1) {
2811        for (c = 0; c < settings.num_threads; c++) {
2812            /* this is guaranteed to hit all threads because we round-robin */
2813            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
2814                              UDP_READ_BUFFER_SIZE, 1);
2815        }
2816    }
2817    /* enter the event loop */
2818    event_base_loop(main_base, 0);
2819    /* remove the PID file if we're a daemon */
2820    if (daemonize)
2821        remove_pidfile(pid_file);
2822    return 0;
2823}
Note: See TracBrowser for help on using the browser.