root/trunk/server/memcached.c @ 608

Revision 608, 80.1 kB (checked in by plindner, 2 years ago)

Incorporate incrememnt patch from Evan Miller
<emiller@…> to define increment overflow behavior.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <sys/signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(const bool is_udp);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92/* time handling */
93static void set_current_time(void);  /* update the global variable holding
94                              global 32-bit seconds-since-start time
95                              (to avoid 64 bit time_t) */
96
97void pre_gdb(void);
98static void conn_free(conn *c);
99
100/** exported globals **/
101struct stats stats;
102struct settings settings;
103
104/** file scope variables **/
105static item **todelete = NULL;
106static int delcurr;
107static int deltotal;
108static conn *listen_conn;
109static struct event_base *main_base;
110
111#define TRANSMIT_COMPLETE   0
112#define TRANSMIT_INCOMPLETE 1
113#define TRANSMIT_SOFT_ERROR 2
114#define TRANSMIT_HARD_ERROR 3
115
116static int *buckets = 0; /* bucket->generation array for a managed instance */
117
118#define REALTIME_MAXDELTA 60*60*24*30
119/*
120 * given time value that's either unix time or delta from current unix time, return
121 * unix time. Use the fact that delta can't exceed one month (and real time value can't
122 * be that low).
123 */
124static rel_time_t realtime(const time_t exptime) {
125    /* no. of seconds in 30 days - largest possible delta exptime */
126
127    if (exptime == 0) return 0; /* 0 means never expire */
128
129    if (exptime > REALTIME_MAXDELTA) {
130        /* if item expiration is at/before the server started, give it an
131           expiration time of 1 second after the server started.
132           (because 0 means don't expire).  without this, we'd
133           underflow and wrap around to some large value way in the
134           future, effectively making items expiring in the past
135           really expiring never */
136        if (exptime <= stats.started)
137            return (rel_time_t)1;
138        return (rel_time_t)(exptime - stats.started);
139    } else {
140        return (rel_time_t)(exptime + current_time);
141    }
142}
143
144static void stats_init(void) {
145    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
146    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
147    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
148
149    /* make the time we started always be 2 seconds before we really
150       did, so time(0) - time.started is never zero.  if so, things
151       like 'settings.oldest_live' which act as booleans as well as
152       values are now false in boolean context... */
153    stats.started = time(0) - 2;
154    stats_prefix_init();
155}
156
157static void stats_reset(void) {
158    STATS_LOCK();
159    stats.total_items = stats.total_conns = 0;
160    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
161    stats.bytes_read = stats.bytes_written = 0;
162    stats_prefix_clear();
163    STATS_UNLOCK();
164}
165
166static void settings_init(void) {
167    settings.port = 11211;
168    settings.udpport = 0;
169    settings.interf.s_addr = htonl(INADDR_ANY);
170    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
171    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
172    settings.verbose = 0;
173    settings.oldest_live = 0;
174    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
175    settings.socketpath = NULL;       /* by default, not using a unix socket */
176    settings.managed = false;
177    settings.factor = 1.25;
178    settings.chunk_size = 48;         /* space for a modest key and value */
179#ifdef USE_THREADS
180    settings.num_threads = 4;
181#else
182    settings.num_threads = 1;
183#endif
184    settings.prefix_delimiter = ':';
185    settings.detail_enabled = 0;
186}
187
188/* returns true if a deleted item's delete-locked-time is over, and it
189   should be removed from the namespace */
190static bool item_delete_lock_over (item *it) {
191    assert(it->it_flags & ITEM_DELETED);
192    return (current_time >= it->exptime);
193}
194
195/*
196 * Adds a message header to a connection.
197 *
198 * Returns 0 on success, -1 on out-of-memory.
199 */
200static int add_msghdr(conn *c)
201{
202    struct msghdr *msg;
203
204    assert(c != NULL);
205
206    if (c->msgsize == c->msgused) {
207        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
208        if (! msg)
209            return -1;
210        c->msglist = msg;
211        c->msgsize *= 2;
212    }
213
214    msg = c->msglist + c->msgused;
215
216    /* this wipes msg_iovlen, msg_control, msg_controllen, and
217       msg_flags, the last 3 of which aren't defined on solaris: */
218    memset(msg, 0, sizeof(struct msghdr));
219
220    msg->msg_iov = &c->iov[c->iovused];
221    msg->msg_name = &c->request_addr;
222    msg->msg_namelen = c->request_addr_size;
223
224    c->msgbytes = 0;
225    c->msgused++;
226
227    if (c->udp) {
228        /* Leave room for the UDP header, which we'll fill in later. */
229        return add_iov(c, NULL, UDP_HEADER_SIZE);
230    }
231
232    return 0;
233}
234
235
236/*
237 * Free list management for connections.
238 */
239
240static conn **freeconns;
241static int freetotal;
242static int freecurr;
243
244
245static void conn_init(void) {
246    freetotal = 200;
247    freecurr = 0;
248    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
249        perror("malloc()");
250    }
251    return;
252}
253
254/*
255 * Returns a connection from the freelist, if any. Should call this using
256 * conn_from_freelist() for thread safety.
257 */
258conn *do_conn_from_freelist() {
259    conn *c;
260
261    if (freecurr > 0) {
262        c = freeconns[--freecurr];
263    } else {
264        c = NULL;
265    }
266
267    return c;
268}
269
270/*
271 * Adds a connection to the freelist. 0 = success. Should call this using
272 * conn_add_to_freelist() for thread safety.
273 */
274bool do_conn_add_to_freelist(conn *c) {
275    if (freecurr < freetotal) {
276        freeconns[freecurr++] = c;
277        return false;
278    } else {
279        /* try to enlarge free connections array */
280        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
281        if (new_freeconns) {
282            freetotal *= 2;
283            freeconns = new_freeconns;
284            freeconns[freecurr++] = c;
285            return false;
286        }
287    }
288    return true;
289}
290
291conn *conn_new(const int sfd, const int init_state, const int event_flags,
292                const int read_buffer_size, const bool is_udp, struct event_base *base) {
293    conn *c = conn_from_freelist();
294
295    if (NULL == c) {
296        if (!(c = (conn *)malloc(sizeof(conn)))) {
297            perror("malloc()");
298            return NULL;
299        }
300        c->rbuf = c->wbuf = 0;
301        c->ilist = 0;
302        c->iov = 0;
303        c->msglist = 0;
304        c->hdrbuf = 0;
305
306        c->rsize = read_buffer_size;
307        c->wsize = DATA_BUFFER_SIZE;
308        c->isize = ITEM_LIST_INITIAL;
309        c->iovsize = IOV_LIST_INITIAL;
310        c->msgsize = MSG_LIST_INITIAL;
311        c->hdrsize = 0;
312
313        c->rbuf = (char *)malloc((size_t)c->rsize);
314        c->wbuf = (char *)malloc((size_t)c->wsize);
315        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
316        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
317        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
318
319        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
320                c->msglist == 0) {
321            if (c->rbuf != 0) free(c->rbuf);
322            if (c->wbuf != 0) free(c->wbuf);
323            if (c->ilist !=0) free(c->ilist);
324            if (c->iov != 0) free(c->iov);
325            if (c->msglist != 0) free(c->msglist);
326            free(c);
327            perror("malloc()");
328            return NULL;
329        }
330
331        STATS_LOCK();
332        stats.conn_structs++;
333        STATS_UNLOCK();
334    }
335
336    if (settings.verbose > 1) {
337        if (init_state == conn_listening)
338            fprintf(stderr, "<%d server listening\n", sfd);
339        else if (is_udp)
340            fprintf(stderr, "<%d server listening (udp)\n", sfd);
341        else
342            fprintf(stderr, "<%d new client connection\n", sfd);
343    }
344
345    c->sfd = sfd;
346    c->udp = is_udp;
347    c->state = init_state;
348    c->rlbytes = 0;
349    c->rbytes = c->wbytes = 0;
350    c->wcurr = c->wbuf;
351    c->rcurr = c->rbuf;
352    c->ritem = 0;
353    c->icurr = c->ilist;
354    c->ileft = 0;
355    c->iovused = 0;
356    c->msgcurr = 0;
357    c->msgused = 0;
358
359    c->write_and_go = conn_read;
360    c->write_and_free = 0;
361    c->item = 0;
362    c->bucket = -1;
363    c->gen = 0;
364
365    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
366    event_base_set(base, &c->event);
367    c->ev_flags = event_flags;
368
369    if (event_add(&c->event, 0) == -1) {
370        if (conn_add_to_freelist(c)) {
371            conn_free(c);
372        }
373        return NULL;
374    }
375
376    STATS_LOCK();
377    stats.curr_conns++;
378    stats.total_conns++;
379    STATS_UNLOCK();
380
381    return c;
382}
383
384static void conn_cleanup(conn *c) {
385    assert(c != NULL);
386
387    if (c->item) {
388        item_remove(c->item);
389        c->item = 0;
390    }
391
392    if (c->ileft != 0) {
393        for (; c->ileft > 0; c->ileft--,c->icurr++) {
394            item_remove(*(c->icurr));
395        }
396    }
397
398    if (c->write_and_free) {
399        free(c->write_and_free);
400        c->write_and_free = 0;
401    }
402}
403
404/*
405 * Frees a connection.
406 */
407void conn_free(conn *c) {
408    if (c) {
409        if (c->hdrbuf)
410            free(c->hdrbuf);
411        if (c->msglist)
412            free(c->msglist);
413        if (c->rbuf)
414            free(c->rbuf);
415        if (c->wbuf)
416            free(c->wbuf);
417        if (c->ilist)
418            free(c->ilist);
419        if (c->iov)
420            free(c->iov);
421        free(c);
422    }
423}
424
425static void conn_close(conn *c) {
426    assert(c != NULL);
427
428    /* delete the event, the socket and the conn */
429    event_del(&c->event);
430
431    if (settings.verbose > 1)
432        fprintf(stderr, "<%d connection closed.\n", c->sfd);
433
434    close(c->sfd);
435    accept_new_conns(true);
436    conn_cleanup(c);
437
438    /* if the connection has big buffers, just free it */
439    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
440        conn_free(c);
441    }
442
443    STATS_LOCK();
444    stats.curr_conns--;
445    STATS_UNLOCK();
446
447    return;
448}
449
450
451/*
452 * Shrinks a connection's buffers if they're too big.  This prevents
453 * periodic large "get" requests from permanently chewing lots of server
454 * memory.
455 *
456 * This should only be called in between requests since it can wipe output
457 * buffers!
458 */
459static void conn_shrink(conn *c) {
460    assert(c != NULL);
461
462    if (c->udp)
463        return;
464
465    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
466        char *newbuf;
467
468        if (c->rcurr != c->rbuf)
469            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
470
471        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
472
473        if (newbuf) {
474            c->rbuf = newbuf;
475            c->rsize = DATA_BUFFER_SIZE;
476        }
477        /* TODO check other branch... */
478        c->rcurr = c->rbuf;
479    }
480
481    if (c->isize > ITEM_LIST_HIGHWAT) {
482        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
483        if (newbuf) {
484            c->ilist = newbuf;
485            c->isize = ITEM_LIST_INITIAL;
486        }
487    /* TODO check error condition? */
488    }
489
490    if (c->msgsize > MSG_LIST_HIGHWAT) {
491        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
492        if (newbuf) {
493            c->msglist = newbuf;
494            c->msgsize = MSG_LIST_INITIAL;
495        }
496    /* TODO check error condition? */
497    }
498
499    if (c->iovsize > IOV_LIST_HIGHWAT) {
500        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
501        if (newbuf) {
502            c->iov = newbuf;
503            c->iovsize = IOV_LIST_INITIAL;
504        }
505    /* TODO check return value */
506    }
507}
508
509/*
510 * Sets a connection's current state in the state machine. Any special
511 * processing that needs to happen on certain state transitions can
512 * happen here.
513 */
514static void conn_set_state(conn *c, int state) {
515    assert(c != NULL);
516
517    if (state != c->state) {
518        if (state == conn_read) {
519            conn_shrink(c);
520            assoc_move_next_bucket();
521        }
522        c->state = state;
523    }
524}
525
526
527/*
528 * Ensures that there is room for another struct iovec in a connection's
529 * iov list.
530 *
531 * Returns 0 on success, -1 on out-of-memory.
532 */
533static int ensure_iov_space(conn *c) {
534    assert(c != NULL);
535
536    if (c->iovused >= c->iovsize) {
537        int i, iovnum;
538        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
539                                (c->iovsize * 2) * sizeof(struct iovec));
540        if (! new_iov)
541            return -1;
542        c->iov = new_iov;
543        c->iovsize *= 2;
544
545        /* Point all the msghdr structures at the new list. */
546        for (i = 0, iovnum = 0; i < c->msgused; i++) {
547            c->msglist[i].msg_iov = &c->iov[iovnum];
548            iovnum += c->msglist[i].msg_iovlen;
549        }
550    }
551
552    return 0;
553}
554
555
556/*
557 * Adds data to the list of pending data that will be written out to a
558 * connection.
559 *
560 * Returns 0 on success, -1 on out-of-memory.
561 */
562
563static int add_iov(conn *c, const void *buf, int len) {
564    struct msghdr *m;
565    int leftover;
566    bool limit_to_mtu;
567
568    assert(c != NULL);
569
570    do {
571        m = &c->msglist[c->msgused - 1];
572
573        /*
574         * Limit UDP packets, and the first payloads of TCP replies, to
575         * UDP_MAX_PAYLOAD_SIZE bytes.
576         */
577        limit_to_mtu = c->udp || (1 == c->msgused);
578
579        /* We may need to start a new msghdr if this one is full. */
580        if (m->msg_iovlen == IOV_MAX ||
581            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
582            add_msghdr(c);
583            m = &c->msglist[c->msgused - 1];
584        }
585
586        if (ensure_iov_space(c) != 0)
587            return -1;
588
589        /* If the fragment is too big to fit in the datagram, split it up */
590        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
591            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
592            len -= leftover;
593        } else {
594            leftover = 0;
595        }
596
597        m = &c->msglist[c->msgused - 1];
598        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
599        m->msg_iov[m->msg_iovlen].iov_len = len;
600
601        c->msgbytes += len;
602        c->iovused++;
603        m->msg_iovlen++;
604
605        buf = ((char *)buf) + len;
606        len = leftover;
607    } while (leftover > 0);
608
609    return 0;
610}
611
612
613/*
614 * Constructs a set of UDP headers and attaches them to the outgoing messages.
615 */
616static int build_udp_headers(conn *c) {
617    int i;
618    unsigned char *hdr;
619
620    assert(c != NULL);
621
622    if (c->msgused > c->hdrsize) {
623        void *new_hdrbuf;
624        if (c->hdrbuf)
625            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
626        else
627            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
628        if (! new_hdrbuf)
629            return -1;
630        c->hdrbuf = (unsigned char *)new_hdrbuf;
631        c->hdrsize = c->msgused * 2;
632    }
633
634    hdr = c->hdrbuf;
635    for (i = 0; i < c->msgused; i++) {
636        c->msglist[i].msg_iov[0].iov_base = hdr;
637        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
638        *hdr++ = c->request_id / 256;
639        *hdr++ = c->request_id % 256;
640        *hdr++ = i / 256;
641        *hdr++ = i % 256;
642        *hdr++ = c->msgused / 256;
643        *hdr++ = c->msgused % 256;
644        *hdr++ = 0;
645        *hdr++ = 0;
646        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
647    }
648
649    return 0;
650}
651
652
653static void out_string(conn *c, const char *str) {
654    size_t len;
655
656    assert(c != NULL);
657
658    if (settings.verbose > 1)
659        fprintf(stderr, ">%d %s\n", c->sfd, str);
660
661    len = strlen(str);
662    if ((len + 2) > c->wsize) {
663        /* ought to be always enough. just fail for simplicity */
664        str = "SERVER_ERROR output line too long";
665        len = strlen(str);
666    }
667
668    memcpy(c->wbuf, str, len);
669    memcpy(c->wbuf + len, "\r\n", 3);
670    c->wbytes = len + 2;
671    c->wcurr = c->wbuf;
672
673    conn_set_state(c, conn_write);
674    c->write_and_go = conn_read;
675    return;
676}
677
678/*
679 * we get here after reading the value in set/add/replace commands. The command
680 * has been stored in c->item_comm, and the item is ready in c->item.
681 */
682
683static void complete_nread(conn *c) {
684    assert(c != NULL);
685
686    item *it = c->item;
687    int comm = c->item_comm;
688
689    STATS_LOCK();
690    stats.set_cmds++;
691    STATS_UNLOCK();
692
693    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
694        out_string(c, "CLIENT_ERROR bad data chunk");
695    } else {
696        if (store_item(it, comm)) {
697            out_string(c, "STORED");
698        } else {
699            out_string(c, "NOT_STORED");
700        }
701    }
702
703    item_remove(c->item);       /* release the c->item reference */
704    c->item = 0;
705}
706
707/*
708 * Stores an item in the cache according to the semantics of one of the set
709 * commands. In threaded mode, this is protected by the cache lock.
710 *
711 * Returns true if the item was stored.
712 */
713int do_store_item(item *it, int comm) {
714    char *key = ITEM_key(it);
715    bool delete_locked = false;
716    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
717    int stored = 0;
718
719    if (old_it != NULL && comm == NREAD_ADD) {
720        /* add only adds a nonexistent item, but promote to head of LRU */
721        do_item_update(old_it);
722    } else if (!old_it && comm == NREAD_REPLACE) {
723        /* replace only replaces an existing value; don't store */
724    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
725        /* replace and add can't override delete locks; don't store */
726    } else {
727        /* "set" commands can override the delete lock
728           window... in which case we have to find the old hidden item
729           that's in the namespace/LRU but wasn't returned by
730           item_get.... because we need to replace it */
731        if (delete_locked)
732            old_it = do_item_get_nocheck(key, it->nkey);
733
734        if (old_it != NULL)
735            do_item_replace(old_it, it);
736        else
737            do_item_link(it);
738
739        stored = 1;
740    }
741
742    if (old_it)
743        do_item_remove(old_it);         /* release our reference */
744    return stored;
745}
746
747typedef struct token_s {
748    char *value;
749    size_t length;
750} token_t;
751
752#define COMMAND_TOKEN 0
753#define SUBCOMMAND_TOKEN 1
754#define KEY_TOKEN 1
755#define KEY_MAX_LENGTH 250
756
757#define MAX_TOKENS 6
758
759/*
760 * Tokenize the command string by replacing whitespace with '\0' and update
761 * the token array tokens with pointer to start of each token and length.
762 * Returns total number of tokens.  The last valid token is the terminal
763 * token (value points to the first unprocessed character of the string and
764 * length zero).
765 *
766 * Usage example:
767 *
768 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
769 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
770 *          ...
771 *      }
772 *      ncommand = tokens[ix].value - command;
773 *      command  = tokens[ix].value;
774 *   }
775 */
776static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
777    char *s, *e;
778    size_t ntokens = 0;
779
780    assert(command != NULL && tokens != NULL && max_tokens > 1);
781
782    for (s = e = command; ntokens < max_tokens - 1; ++e) {
783        if (*e == ' ') {
784            if (s != e) {
785                tokens[ntokens].value = s;
786                tokens[ntokens].length = e - s;
787                ntokens++;
788                *e = '\0';
789            }
790            s = e + 1;
791        }
792        else if (*e == '\0') {
793            if (s != e) {
794                tokens[ntokens].value = s;
795                tokens[ntokens].length = e - s;
796                ntokens++;
797            }
798
799            break; /* string end */
800        }
801    }
802
803    /*
804     * If we scanned the whole string, the terminal value pointer is null,
805     * otherwise it is the first unprocessed character.
806     */
807    tokens[ntokens].value =  *e == '\0' ? NULL : e;
808    tokens[ntokens].length = 0;
809    ntokens++;
810
811    return ntokens;
812}
813
814/* set up a connection to write a buffer then free it, used for stats */
815static void write_and_free(conn *c, char *buf, int bytes) {
816    if (buf) {
817        c->write_and_free = buf;
818        c->wcurr = buf;
819        c->wbytes = bytes;
820        conn_set_state(c, conn_write);
821        c->write_and_go = conn_read;
822    } else {
823        out_string(c, "SERVER_ERROR out of memory");
824    }
825}
826
827inline static void process_stats_detail(conn *c, const char *command) {
828    assert(c != NULL);
829
830    if (strcmp(command, "on") == 0) {
831        settings.detail_enabled = 1;
832        out_string(c, "OK");
833    }
834    else if (strcmp(command, "off") == 0) {
835        settings.detail_enabled = 0;
836        out_string(c, "OK");
837    }
838    else if (strcmp(command, "dump") == 0) {
839        int len;
840        char *stats = stats_prefix_dump(&len);
841        write_and_free(c, stats, len);
842    }
843    else {
844        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
845    }
846}
847
848static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
849    rel_time_t now = current_time;
850    char *command;
851    char *subcommand;
852
853    assert(c != NULL);
854
855    if(ntokens < 2) {
856        out_string(c, "CLIENT_ERROR bad command line");
857        return;
858    }
859
860    command = tokens[COMMAND_TOKEN].value;
861
862    if (ntokens == 2 && strcmp(command, "stats") == 0) {
863        char temp[1024];
864        pid_t pid = getpid();
865        char *pos = temp;
866
867#ifndef WIN32
868        struct rusage usage;
869        getrusage(RUSAGE_SELF, &usage);
870#endif /* !WIN32 */
871
872        STATS_LOCK();
873        pos += sprintf(pos, "STAT pid %u\r\n", pid);
874        pos += sprintf(pos, "STAT uptime %u\r\n", now);
875        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
876        pos += sprintf(pos, "STAT version " VERSION "\r\n");
877        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
878#ifndef WIN32
879        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
880        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
881#endif /* !WIN32 */
882        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
883        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
884        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
885        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
886        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
887        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
888        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
889        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
890        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
891        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
892        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
893        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
894        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
895        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
896        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
897        pos += sprintf(pos, "END");
898        STATS_UNLOCK();
899        out_string(c, temp);
900        return;
901    }
902
903    subcommand = tokens[SUBCOMMAND_TOKEN].value;
904
905    if (strcmp(subcommand, "reset") == 0) {
906        stats_reset();
907        out_string(c, "RESET");
908        return;
909    }
910
911#ifdef HAVE_MALLOC_H
912#ifdef HAVE_STRUCT_MALLINFO
913    if (strcmp(subcommand, "malloc") == 0) {
914        char temp[512];
915        struct mallinfo info;
916        char *pos = temp;
917
918        info = mallinfo();
919        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
920        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
921        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
922        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
923        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
924        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
925        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
926        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
927        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
928        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
929        out_string(c, temp);
930        return;
931    }
932#endif /* HAVE_STRUCT_MALLINFO */
933#endif /* HAVE_MALLOC_H */
934
935#if !defined(WIN32) || !defined(__APPLE__)
936    if (strcmp(subcommand, "maps") == 0) {
937        char *wbuf;
938        int wsize = 8192; /* should be enough */
939        int fd;
940        int res;
941
942        if ((wbuf = (char *)malloc(wsize)) == NULL) {
943            out_string(c, "SERVER_ERROR out of memory");
944            return;
945        }
946
947        fd = open("/proc/self/maps", O_RDONLY);
948        if (fd == -1) {
949            out_string(c, "SERVER_ERROR cannot open the maps file");
950            free(wbuf);
951            return;
952        }
953
954        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
955        if (res == wsize - 6) {
956            out_string(c, "SERVER_ERROR buffer overflow");
957            free(wbuf); close(fd);
958            return;
959        }
960        if (res == 0 || res == -1) {
961            out_string(c, "SERVER_ERROR can't read the maps file");
962            free(wbuf); close(fd);
963            return;
964        }
965        memcpy(wbuf + res, "END\r\n", 5);
966        write_and_free(c, wbuf, res + 5);
967        close(fd);
968        return;
969    }
970#endif
971
972    if (strcmp(subcommand, "cachedump") == 0) {
973
974        char *buf;
975        unsigned int bytes, id, limit = 0;
976
977        if(ntokens < 5) {
978            out_string(c, "CLIENT_ERROR bad command line");
979            return;
980        }
981
982        id = strtoul(tokens[2].value, NULL, 10);
983        limit = strtoul(tokens[3].value, NULL, 10);
984
985        if(errno == ERANGE) {
986            out_string(c, "CLIENT_ERROR bad command line format");
987            return;
988        }
989
990        buf = item_cachedump(id, limit, &bytes);
991        write_and_free(c, buf, bytes);
992        return;
993    }
994
995    if (strcmp(subcommand, "slabs") == 0) {
996        int bytes = 0;
997        char *buf = slabs_stats(&bytes);
998        write_and_free(c, buf, bytes);
999        return;
1000    }
1001
1002    if (strcmp(subcommand, "items") == 0) {
1003        int bytes = 0;
1004        char *buf = item_stats(&bytes);
1005        write_and_free(c, buf, bytes);
1006        return;
1007    }
1008
1009    if (strcmp(subcommand, "detail") == 0) {
1010        if (ntokens < 4)
1011            process_stats_detail(c, "");  /* outputs the error message */
1012        else
1013            process_stats_detail(c, tokens[2].value);
1014        return;
1015    }
1016
1017    if (strcmp(subcommand, "sizes") == 0) {
1018        int bytes = 0;
1019        char *buf = item_stats_sizes(&bytes);
1020        write_and_free(c, buf, bytes);
1021        return;
1022    }
1023
1024    out_string(c, "ERROR");
1025}
1026
1027/* ntokens is overwritten here... shrug.. */
1028static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) {
1029    char *key;
1030    size_t nkey;
1031    int i = 0;
1032    item *it;
1033    token_t *key_token = &tokens[KEY_TOKEN];
1034
1035    assert(c != NULL);
1036
1037    if (settings.managed) {
1038        int bucket = c->bucket;
1039        if (bucket == -1) {
1040            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1041            return;
1042        }
1043        c->bucket = -1;
1044        if (buckets[bucket] != c->gen) {
1045            out_string(c, "ERROR_NOT_OWNER");
1046            return;
1047        }
1048    }
1049
1050    do {
1051        while(key_token->length != 0) {
1052
1053            key = key_token->value;
1054            nkey = key_token->length;
1055
1056            if(nkey > KEY_MAX_LENGTH) {
1057                out_string(c, "CLIENT_ERROR bad command line format");
1058                return;
1059            }
1060
1061            STATS_LOCK();
1062            stats.get_cmds++;
1063            STATS_UNLOCK();
1064            it = item_get(key, nkey);
1065            if (settings.detail_enabled) {
1066                stats_prefix_record_get(key, NULL != it);
1067            }
1068            if (it) {
1069                if (i >= c->isize) {
1070                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1071                    if (new_list) {
1072                        c->isize *= 2;
1073                        c->ilist = new_list;
1074                    } else break;
1075                }
1076
1077                /*
1078                 * Construct the response. Each hit adds three elements to the
1079                 * outgoing data list:
1080                 *   "VALUE "
1081                 *   key
1082                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1083                 */
1084                if (add_iov(c, "VALUE ", 6) != 0 ||
1085                    add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1086                    add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1087                    {
1088                        break;
1089                    }
1090                if (settings.verbose > 1)
1091                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1092
1093                /* item_get() has incremented it->refcount for us */
1094                STATS_LOCK();
1095                stats.get_hits++;
1096                STATS_UNLOCK();
1097                item_update(it);
1098                *(c->ilist + i) = it;
1099                i++;
1100
1101            } else {
1102                STATS_LOCK();
1103                stats.get_misses++;
1104                STATS_UNLOCK();
1105            }
1106
1107            key_token++;
1108        }
1109
1110        /*
1111         * If the command string hasn't been fully processed, get the next set
1112         * of tokens.
1113         */
1114        if(key_token->value != NULL) {
1115            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1116            key_token = tokens;
1117        }
1118
1119    } while(key_token->value != NULL);
1120
1121    c->icurr = c->ilist;
1122    c->ileft = i;
1123
1124    if (settings.verbose > 1)
1125        fprintf(stderr, ">%d END\n", c->sfd);
1126    add_iov(c, "END\r\n", 5);
1127
1128    if (c->udp && build_udp_headers(c) != 0) {
1129        out_string(c, "SERVER_ERROR out of memory");
1130    }
1131    else {
1132        conn_set_state(c, conn_mwrite);
1133        c->msgcurr = 0;
1134    }
1135    return;
1136}
1137
1138static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) {
1139    char *key;
1140    size_t nkey;
1141    int flags;
1142    time_t exptime;
1143    int vlen;
1144    item *it;
1145
1146    assert(c != NULL);
1147
1148    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1149        out_string(c, "CLIENT_ERROR bad command line format");
1150        return;
1151    }
1152
1153    key = tokens[KEY_TOKEN].value;
1154    nkey = tokens[KEY_TOKEN].length;
1155
1156    flags = strtoul(tokens[2].value, NULL, 10);
1157    exptime = strtol(tokens[3].value, NULL, 10);
1158    vlen = strtol(tokens[4].value, NULL, 10);
1159
1160    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1161        out_string(c, "CLIENT_ERROR bad command line format");
1162        return;
1163    }
1164
1165    if (settings.detail_enabled) {
1166        stats_prefix_record_set(key);
1167    }
1168
1169    if (settings.managed) {
1170        int bucket = c->bucket;
1171        if (bucket == -1) {
1172            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1173            return;
1174        }
1175        c->bucket = -1;
1176        if (buckets[bucket] != c->gen) {
1177            out_string(c, "ERROR_NOT_OWNER");
1178            return;
1179        }
1180    }
1181
1182    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1183
1184    if (it == 0) {
1185        if (! item_size_ok(nkey, flags, vlen + 2))
1186            out_string(c, "SERVER_ERROR object too large for cache");
1187        else
1188            out_string(c, "SERVER_ERROR out of memory");
1189        /* swallow the data line */
1190        c->write_and_go = conn_swallow;
1191        c->sbytes = vlen + 2;
1192        return;
1193    }
1194
1195    c->item_comm = comm;
1196    c->item = it;
1197    c->ritem = ITEM_data(it);
1198    c->rlbytes = it->nbytes;
1199    conn_set_state(c, conn_nread);
1200}
1201
1202static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
1203    char temp[32];
1204    item *it;
1205    unsigned int delta;
1206    char *key;
1207    size_t nkey;
1208
1209    assert(c != NULL);
1210
1211    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1212        out_string(c, "CLIENT_ERROR bad command line format");
1213        return;
1214    }
1215
1216    key = tokens[KEY_TOKEN].value;
1217    nkey = tokens[KEY_TOKEN].length;
1218
1219    if (settings.managed) {
1220        int bucket = c->bucket;
1221        if (bucket == -1) {
1222            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1223            return;
1224        }
1225        c->bucket = -1;
1226        if (buckets[bucket] != c->gen) {
1227            out_string(c, "ERROR_NOT_OWNER");
1228            return;
1229        }
1230    }
1231
1232    delta = strtoul(tokens[2].value, NULL, 10);
1233
1234    if(errno == ERANGE) {
1235        out_string(c, "CLIENT_ERROR bad command line format");
1236        return;
1237    }
1238
1239    it = item_get(key, nkey);
1240    if (!it) {
1241        out_string(c, "NOT_FOUND");
1242        return;
1243    }
1244
1245    out_string(c, add_delta(it, incr, delta, temp));
1246    item_remove(it);         /* release our reference */
1247}
1248
1249/*
1250 * adds a delta value to a numeric item.
1251 *
1252 * it    item to adjust
1253 * incr  true to increment value, false to decrement
1254 * delta amount to adjust value by
1255 * buf   buffer for response string
1256 *
1257 * returns a response string to send back to the client.
1258 */
1259char *do_add_delta(item *it, const int incr, const unsigned int delta, char *buf) {
1260    char *ptr;
1261    uint32_t value;
1262    int res;
1263
1264    ptr = ITEM_data(it);
1265    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1266
1267    value = strtoul(ptr, NULL, 10);
1268
1269    if(errno == ERANGE) {
1270        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1271    }
1272
1273    if (incr != 0)
1274        value += delta;
1275    else {
1276        if (delta >= value) value = 0;
1277        else value -= delta;
1278    }
1279    snprintf(buf, 32, "%u", value);
1280    res = strlen(buf);
1281    if (res + 2 > it->nbytes) { /* need to realloc */
1282        item *new_it;
1283        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1284        if (new_it == 0) {
1285            return "SERVER_ERROR out of memory";
1286        }
1287        memcpy(ITEM_data(new_it), buf, res);
1288        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1289        do_item_replace(it, new_it);
1290        do_item_remove(new_it);       /* release our reference */
1291    } else { /* replace in-place */
1292        memcpy(ITEM_data(it), buf, res);
1293        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1294    }
1295
1296    return buf;
1297}
1298
1299static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1300    char *key;
1301    size_t nkey;
1302    item *it;
1303    time_t exptime = 0;
1304
1305    assert(c != NULL);
1306
1307    if (settings.managed) {
1308        int bucket = c->bucket;
1309        if (bucket == -1) {
1310            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1311            return;
1312        }
1313        c->bucket = -1;
1314        if (buckets[bucket] != c->gen) {
1315            out_string(c, "ERROR_NOT_OWNER");
1316            return;
1317        }
1318    }
1319
1320    key = tokens[KEY_TOKEN].value;
1321    nkey = tokens[KEY_TOKEN].length;
1322
1323    if(nkey > KEY_MAX_LENGTH) {
1324        out_string(c, "CLIENT_ERROR bad command line format");
1325        return;
1326    }
1327
1328    if(ntokens == 4) {
1329        exptime = strtol(tokens[2].value, NULL, 10);
1330
1331        if(errno == ERANGE) {
1332            out_string(c, "CLIENT_ERROR bad command line format");
1333            return;
1334        }
1335    }
1336
1337    if (settings.detail_enabled) {
1338        stats_prefix_record_delete(key);
1339    }
1340
1341    it = item_get(key, nkey);
1342    if (it) {
1343        if (exptime == 0) {
1344            item_unlink(it);
1345            item_remove(it);      /* release our reference */
1346            out_string(c, "DELETED");
1347        } else {
1348            /* our reference will be transfered to the delete queue */
1349            out_string(c, defer_delete(it, exptime));
1350        }
1351    } else {
1352        out_string(c, "NOT_FOUND");
1353    }
1354}
1355
1356/*
1357 * Adds an item to the deferred-delete list so it can be reaped later.
1358 *
1359 * Returns the result to send to the client.
1360 */
1361char *do_defer_delete(item *it, time_t exptime)
1362{
1363    if (delcurr >= deltotal) {
1364        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1365        if (new_delete) {
1366            todelete = new_delete;
1367            deltotal *= 2;
1368        } else {
1369            /*
1370             * can't delete it immediately, user wants a delay,
1371             * but we ran out of memory for the delete queue
1372             */
1373            item_remove(it);    /* release reference */
1374            return "SERVER_ERROR out of memory";
1375        }
1376    }
1377
1378    /* use its expiration time as its deletion time now */
1379    it->exptime = realtime(exptime);
1380    it->it_flags |= ITEM_DELETED;
1381    todelete[delcurr++] = it;
1382
1383    return "DELETED";
1384}
1385
1386static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1387    unsigned int level;
1388
1389    assert(c != NULL);
1390
1391    level = strtoul(tokens[1].value, NULL, 10);
1392    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1393    out_string(c, "OK");
1394    return;
1395}
1396
1397static void process_command(conn *c, char *command) {
1398
1399    token_t tokens[MAX_TOKENS];
1400    size_t ntokens;
1401    int comm;
1402
1403    assert(c != NULL);
1404
1405    if (settings.verbose > 1)
1406        fprintf(stderr, "<%d %s\n", c->sfd, command);
1407
1408    /*
1409     * for commands set/add/replace, we build an item and read the data
1410     * directly into it, then continue in nread_complete().
1411     */
1412
1413    c->msgcurr = 0;
1414    c->msgused = 0;
1415    c->iovused = 0;
1416    if (add_msghdr(c) != 0) {
1417        out_string(c, "SERVER_ERROR out of memory");
1418        return;
1419    }
1420
1421    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1422
1423    if (ntokens >= 3 &&
1424        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1425         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1426
1427        process_get_command(c, tokens, ntokens);
1428
1429    } else if (ntokens == 6 &&
1430               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1431                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1432                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
1433
1434        process_update_command(c, tokens, ntokens, comm);
1435
1436    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1437
1438        process_arithmetic_command(c, tokens, ntokens, 1);
1439
1440    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1441
1442        process_arithmetic_command(c, tokens, ntokens, 0);
1443
1444    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1445
1446        process_delete_command(c, tokens, ntokens);
1447
1448    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1449        unsigned int bucket, gen;
1450        if (!settings.managed) {
1451            out_string(c, "CLIENT_ERROR not a managed instance");
1452            return;
1453        }
1454
1455        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1456            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1457                out_string(c, "CLIENT_ERROR bucket number out of range");
1458                return;
1459            }
1460            buckets[bucket] = gen;
1461            out_string(c, "OWNED");
1462            return;
1463        } else {
1464            out_string(c, "CLIENT_ERROR bad format");
1465            return;
1466        }
1467
1468    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1469
1470        int bucket;
1471        if (!settings.managed) {
1472            out_string(c, "CLIENT_ERROR not a managed instance");
1473            return;
1474        }
1475        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1476            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1477                out_string(c, "CLIENT_ERROR bucket number out of range");
1478                return;
1479            }
1480            buckets[bucket] = 0;
1481            out_string(c, "DISOWNED");
1482            return;
1483        } else {
1484            out_string(c, "CLIENT_ERROR bad format");
1485            return;
1486        }
1487
1488    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1489        int bucket, gen;
1490        if (!settings.managed) {
1491            out_string(c, "CLIENT_ERROR not a managed instance");
1492            return;
1493        }
1494        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1495            /* we never write anything back, even if input's wrong */
1496            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1497                /* do nothing, bad input */
1498            } else {
1499                c->bucket = bucket;
1500                c->gen = gen;
1501            }
1502            conn_set_state(c, conn_read);
1503            return;
1504        } else {
1505            out_string(c, "CLIENT_ERROR bad format");
1506            return;
1507        }
1508
1509    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1510
1511        process_stat(c, tokens, ntokens);
1512
1513    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1514        time_t exptime = 0;
1515        set_current_time();
1516
1517        if(ntokens == 2) {
1518            settings.oldest_live = current_time - 1;
1519            item_flush_expired();
1520            out_string(c, "OK");
1521            return;
1522        }
1523
1524        exptime = strtol(tokens[1].value, NULL, 10);
1525        if(errno == ERANGE) {
1526            out_string(c, "CLIENT_ERROR bad command line format");
1527            return;
1528        }
1529
1530        settings.oldest_live = realtime(exptime) - 1;
1531        item_flush_expired();
1532        out_string(c, "OK");
1533        return;
1534
1535    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1536
1537        out_string(c, "VERSION " VERSION);
1538
1539    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1540
1541        conn_set_state(c, conn_closing);
1542
1543    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1544                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1545#ifdef ALLOW_SLABS_REASSIGN
1546
1547        int src, dst, rv;
1548
1549        src = strtol(tokens[2].value, NULL, 10);
1550        dst  = strtol(tokens[3].value, NULL, 10);
1551
1552        if(errno == ERANGE) {
1553            out_string(c, "CLIENT_ERROR bad command line format");
1554            return;
1555        }
1556
1557        rv = slabs_reassign(src, dst);
1558        if (rv == 1) {
1559            out_string(c, "DONE");
1560            return;
1561        }
1562        if (rv == 0) {
1563            out_string(c, "CANT");
1564            return;
1565        }
1566        if (rv == -1) {
1567            out_string(c, "BUSY");
1568            return;
1569        }
1570#else
1571        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1572#endif
1573    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1574        process_verbosity_command(c, tokens, ntokens);
1575    } else {
1576        out_string(c, "ERROR");
1577    }
1578    return;
1579}
1580
1581/*
1582 * if we have a complete line in the buffer, process it.
1583 */
1584static int try_read_command(conn *c) {
1585    char *el, *cont;
1586
1587    assert(c != NULL);
1588    assert(c->rcurr <= (c->rbuf + c->rsize));
1589
1590    if (c->rbytes == 0)
1591        return 0;
1592    el = memchr(c->rcurr, '\n', c->rbytes);
1593    if (!el)
1594        return 0;
1595    cont = el + 1;
1596    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1597        el--;
1598    }
1599    *el = '\0';
1600
1601    assert(cont <= (c->rcurr + c->rbytes));
1602
1603    process_command(c, c->rcurr);
1604
1605    c->rbytes -= (cont - c->rcurr);
1606    c->rcurr = cont;
1607
1608    assert(c->rcurr <= (c->rbuf + c->rsize));
1609
1610    return 1;
1611}
1612
1613/*
1614 * read a UDP request.
1615 * return 0 if there's nothing to read.
1616 */
1617static int try_read_udp(conn *c) {
1618    int res;
1619
1620    assert(c != NULL);
1621
1622    c->request_addr_size = sizeof(c->request_addr);
1623    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1624                   0, &c->request_addr, &c->request_addr_size);
1625    if (res > 8) {
1626        unsigned char *buf = (unsigned char *)c->rbuf;
1627        STATS_LOCK();
1628        stats.bytes_read += res;
1629        STATS_UNLOCK();
1630
1631        /* Beginning of UDP packet is the request ID; save it. */
1632        c->request_id = buf[0] * 256 + buf[1];
1633
1634        /* If this is a multi-packet request, drop it. */
1635        if (buf[4] != 0 || buf[5] != 1) {
1636            out_string(c, "SERVER_ERROR multi-packet request not supported");
1637            return 0;
1638        }
1639
1640        /* Don't care about any of the rest of the header. */
1641        res -= 8;
1642        memmove(c->rbuf, c->rbuf + 8, res);
1643
1644        c->rbytes += res;
1645        c->rcurr = c->rbuf;
1646        return 1;
1647    }
1648    return 0;
1649}
1650
1651/*
1652 * read from network as much as we can, handle buffer overflow and connection
1653 * close.
1654 * before reading, move the remaining incomplete fragment of a command
1655 * (if any) to the beginning of the buffer.
1656 * return 0 if there's nothing to read on the first read.
1657 */
1658static int try_read_network(conn *c) {
1659    int gotdata = 0;
1660    int res;
1661
1662    assert(c != NULL);
1663
1664    if (c->rcurr != c->rbuf) {
1665        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1666            memmove(c->rbuf, c->rcurr, c->rbytes);
1667        c->rcurr = c->rbuf;
1668    }
1669
1670    while (1) {
1671        if (c->rbytes >= c->rsize) {
1672            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1673            if (!new_rbuf) {
1674                if (settings.verbose > 0)
1675                    fprintf(stderr, "Couldn't realloc input buffer\n");
1676                c->rbytes = 0; /* ignore what we read */
1677                out_string(c, "SERVER_ERROR out of memory");
1678                c->write_and_go = conn_closing;
1679                return 1;
1680            }
1681            c->rcurr = c->rbuf = new_rbuf;
1682            c->rsize *= 2;
1683        }
1684
1685        /* unix socket mode doesn't need this, so zeroed out.  but why
1686         * is this done for every command?  presumably for UDP
1687         * mode.  */
1688        if (!settings.socketpath) {
1689            c->request_addr_size = sizeof(c->request_addr);
1690        } else {
1691            c->request_addr_size = 0;
1692        }
1693
1694        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
1695        if (res > 0) {
1696            STATS_LOCK();
1697            stats.bytes_read += res;
1698            STATS_UNLOCK();
1699            gotdata = 1;
1700            c->rbytes += res;
1701            continue;
1702        }
1703        if (res == 0) {
1704            /* connection closed */
1705            conn_set_state(c, conn_closing);
1706            return 1;
1707        }
1708        if (res == -1) {
1709            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1710            else return 0;
1711        }
1712    }
1713    return gotdata;
1714}
1715
1716static bool update_event(conn *c, const int new_flags) {
1717    assert(c != NULL);
1718
1719    struct event_base *base = c->event.ev_base;
1720    if (c->ev_flags == new_flags)
1721        return true;
1722    if (event_del(&c->event) == -1) return false;
1723    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1724    event_base_set(base, &c->event);
1725    c->ev_flags = new_flags;
1726    if (event_add(&c->event, 0) == -1) return false;
1727    return true;
1728}
1729
1730/*
1731 * Sets whether we are listening for new connections or not.
1732 */
1733void accept_new_conns(const bool do_accept) {
1734    if (! is_listen_thread())
1735        return;
1736    if (do_accept) {
1737        update_event(listen_conn, EV_READ | EV_PERSIST);
1738        if (listen(listen_conn->sfd, 1024) != 0) {
1739            perror("listen");
1740        }
1741    }
1742    else {
1743        update_event(listen_conn, 0);
1744        if (listen(listen_conn->sfd, 0) != 0) {
1745            perror("listen");
1746        }
1747    }
1748}
1749
1750
1751/*
1752 * Transmit the next chunk of data from our list of msgbuf structures.
1753 *
1754 * Returns:
1755 *   TRANSMIT_COMPLETE   All done writing.
1756 *   TRANSMIT_INCOMPLETE More data remaining to write.
1757 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
1758 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1759 */
1760static int transmit(conn *c) {
1761    assert(c != NULL);
1762
1763    if (c->msgcurr < c->msgused &&
1764            c->msglist[c->msgcurr].msg_iovlen == 0) {
1765        /* Finished writing the current msg; advance to the next. */
1766        c->msgcurr++;
1767    }
1768    if (c->msgcurr < c->msgused) {
1769        ssize_t res;
1770        struct msghdr *m = &c->msglist[c->msgcurr];
1771
1772        res = sendmsg(c->sfd, m, 0);
1773        if (res > 0) {
1774            STATS_LOCK();
1775            stats.bytes_written += res;
1776            STATS_UNLOCK();
1777
1778            /* We've written some of the data. Remove the completed
1779               iovec entries from the list of pending writes. */
1780            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
1781                res -= m->msg_iov->iov_len;
1782                m->msg_iovlen--;
1783                m->msg_iov++;
1784            }
1785
1786            /* Might have written just part of the last iovec entry;
1787               adjust it so the next write will do the rest. */
1788            if (res > 0) {
1789                m->msg_iov->iov_base += res;
1790                m->msg_iov->iov_len -= res;
1791            }
1792            return TRANSMIT_INCOMPLETE;
1793        }
1794        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1795            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1796                if (settings.verbose > 0)
1797                    fprintf(stderr, "Couldn't update event\n");
1798                conn_set_state(c, conn_closing);
1799                return TRANSMIT_HARD_ERROR;
1800            }
1801            return TRANSMIT_SOFT_ERROR;
1802        }
1803        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1804           we have a real error, on which we close the connection */
1805        if (settings.verbose > 0)
1806            perror("Failed to write, and not due to blocking");
1807
1808        if (c->udp)
1809            conn_set_state(c, conn_read);
1810        else
1811            conn_set_state(c, conn_closing);
1812        return TRANSMIT_HARD_ERROR;
1813    } else {
1814        return TRANSMIT_COMPLETE;
1815    }
1816}
1817
1818static void drive_machine(conn *c) {
1819    bool stop = false;
1820    int sfd, flags = 1;
1821    socklen_t addrlen;
1822    struct sockaddr addr;
1823    int res;
1824
1825    assert(c != NULL);
1826
1827    while (!stop) {
1828
1829        switch(c->state) {
1830        case conn_listening:
1831            addrlen = sizeof(addr);
1832            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
1833                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1834                    /* these are transient, so don't log anything */
1835                    stop = true;
1836                } else if (errno == EMFILE) {
1837                    if (settings.verbose > 0)
1838                        fprintf(stderr, "Too many open connections\n");
1839                    accept_new_conns(false);
1840                    stop = true;
1841                } else {
1842                    perror("accept()");
1843                    stop = true;
1844                }
1845                break;
1846            }
1847            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
1848                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
1849                perror("setting O_NONBLOCK");
1850                close(sfd);
1851                break;
1852            }
1853            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
1854                                     DATA_BUFFER_SIZE, false);
1855            break;
1856
1857        case conn_read:
1858            if (try_read_command(c) != 0) {
1859                continue;
1860            }
1861            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
1862                continue;
1863            }
1864            /* we have no command line and no data to read from network */
1865            if (!update_event(c, EV_READ | EV_PERSIST)) {
1866                if (settings.verbose > 0)
1867                    fprintf(stderr, "Couldn't update event\n");
1868                conn_set_state(c, conn_closing);
1869                break;
1870            }
1871            stop = true;
1872            break;
1873
1874        case conn_nread:
1875            /* we are reading rlbytes into ritem; */
1876            if (c->rlbytes == 0) {
1877                complete_nread(c);
1878                break;
1879            }
1880            /* first check if we have leftovers in the conn_read buffer */
1881            if (c->rbytes > 0) {
1882                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
1883                memcpy(c->ritem, c->rcurr, tocopy);
1884                c->ritem += tocopy;
1885                c->rlbytes -= tocopy;
1886                c->rcurr += tocopy;
1887                c->rbytes -= tocopy;
1888                break;
1889            }
1890
1891            /*  now try reading from the socket */
1892            res = read(c->sfd, c->ritem, c->rlbytes);
1893            if (res > 0) {
1894                STATS_LOCK();
1895                stats.bytes_read += res;
1896                STATS_UNLOCK();
1897                c->ritem += res;
1898                c->rlbytes -= res;
1899                break;
1900            }
1901            if (res == 0) { /* end of stream */
1902                conn_set_state(c, conn_closing);
1903                break;
1904            }
1905            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1906                if (!update_event(c, EV_READ | EV_PERSIST)) {
1907                    if (settings.verbose > 0)
1908                        fprintf(stderr, "Couldn't update event\n");
1909                    conn_set_state(c, conn_closing);
1910                    break;
1911                }
1912                stop = true;
1913                break;
1914            }
1915            /* otherwise we have a real error, on which we close the connection */
1916            if (settings.verbose > 0)
1917                fprintf(stderr, "Failed to read, and not due to blocking\n");
1918            conn_set_state(c, conn_closing);
1919            break;
1920
1921        case conn_swallow:
1922            /* we are reading sbytes and throwing them away */
1923            if (c->sbytes == 0) {
1924                conn_set_state(c, conn_read);
1925                break;
1926            }
1927
1928            /* first check if we have leftovers in the conn_read buffer */
1929            if (c->rbytes > 0) {
1930                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
1931                c->sbytes -= tocopy;
1932                c->rcurr += tocopy;
1933                c->rbytes -= tocopy;
1934                break;
1935            }
1936
1937            /*  now try reading from the socket */
1938            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
1939            if (res > 0) {
1940                STATS_LOCK();
1941                stats.bytes_read += res;
1942                STATS_UNLOCK();
1943                c->sbytes -= res;
1944                break;
1945            }
1946            if (res == 0) { /* end of stream */
1947                conn_set_state(c, conn_closing);
1948                break;
1949            }
1950            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1951                if (!update_event(c, EV_READ | EV_PERSIST)) {
1952                    if (settings.verbose > 0)
1953                        fprintf(stderr, "Couldn't update event\n");
1954                    conn_set_state(c, conn_closing);
1955                    break;
1956                }
1957                stop = true;
1958                break;
1959            }
1960            /* otherwise we have a real error, on which we close the connection */
1961            if (settings.verbose > 0)
1962                fprintf(stderr, "Failed to read, and not due to blocking\n");
1963            conn_set_state(c, conn_closing);
1964            break;
1965
1966        case conn_write:
1967            /*
1968             * We want to write out a simple response. If we haven't already,
1969             * assemble it into a msgbuf list (this will be a single-entry
1970             * list for TCP or a two-entry list for UDP).
1971             */
1972            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
1973                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
1974                    (c->udp && build_udp_headers(c) != 0)) {
1975                    if (settings.verbose > 0)
1976                        fprintf(stderr, "Couldn't build response\n");
1977                    conn_set_state(c, conn_closing);
1978                    break;
1979                }
1980            }
1981
1982            /* fall through... */
1983
1984        case conn_mwrite:
1985            switch (transmit(c)) {
1986            case TRANSMIT_COMPLETE:
1987                if (c->state == conn_mwrite) {
1988                    while (c->ileft > 0) {
1989                        item *it = *(c->icurr);
1990                        assert((it->it_flags & ITEM_SLABBED) == 0);
1991                        item_remove(it);
1992                        c->icurr++;
1993                        c->ileft--;
1994                    }
1995                    conn_set_state(c, conn_read);
1996                } else if (c->state == conn_write) {
1997                    if (c->write_and_free) {
1998                        free(c->write_and_free);
1999                        c->write_and_free = 0;
2000                    }
2001                    conn_set_state(c, c->write_and_go);
2002                } else {
2003                    if (settings.verbose > 0)
2004                        fprintf(stderr, "Unexpected state %d\n", c->state);
2005                    conn_set_state(c, conn_closing);
2006                }
2007                break;
2008
2009            case TRANSMIT_INCOMPLETE:
2010            case TRANSMIT_HARD_ERROR:
2011                break;                   /* Continue in state machine. */
2012
2013            case TRANSMIT_SOFT_ERROR:
2014                stop = true;
2015                break;
2016            }
2017            break;
2018
2019        case conn_closing:
2020            if (c->udp)
2021                conn_cleanup(c);
2022            else
2023                conn_close(c);
2024            stop = true;
2025            break;
2026        }
2027    }
2028
2029    return;
2030}
2031
2032void event_handler(const int fd, const short which, void *arg) {
2033    conn *c;
2034
2035    c = (conn *)arg;
2036    assert(c != NULL);
2037
2038    c->which = which;
2039
2040    /* sanity */
2041    if (fd != c->sfd) {
2042        if (settings.verbose > 0)
2043            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2044        conn_close(c);
2045        return;
2046    }
2047
2048    drive_machine(c);
2049
2050    /* wait for next event */
2051    return;
2052}
2053
2054static int new_socket(const bool is_udp) {
2055    int sfd;
2056    int flags;
2057
2058    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
2059        perror("socket()");
2060        return -1;
2061    }
2062
2063    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2064        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2065        perror("setting O_NONBLOCK");
2066        close(sfd);
2067        return -1;
2068    }
2069    return sfd;
2070}
2071
2072
2073/*
2074 * Sets a socket's send buffer size to the maximum allowed by the system.
2075 */
2076static void maximize_sndbuf(const int sfd) {
2077    socklen_t intsize = sizeof(int);
2078    int last_good = 0;
2079    int min, max, avg;
2080    int old_size;
2081
2082    /* Start with the default size. */
2083    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2084        if (settings.verbose > 0)
2085            perror("getsockopt(SO_SNDBUF)");
2086        return;
2087    }
2088
2089    /* Binary-search for the real maximum. */
2090    min = old_size;
2091    max = MAX_SENDBUF_SIZE;
2092
2093    while (min <= max) {
2094        avg = ((unsigned int)(min + max)) / 2;
2095        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2096            last_good = avg;
2097            min = avg + 1;
2098        } else {
2099            max = avg - 1;
2100        }
2101    }
2102
2103    if (settings.verbose > 1)
2104        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2105}
2106
2107
2108static int server_socket(const int port, const bool is_udp) {
2109    int sfd;
2110    struct linger ling = {0, 0};
2111    struct sockaddr_in addr;
2112    int flags =1;
2113
2114    if ((sfd = new_socket(is_udp)) == -1) {
2115        return -1;
2116    }
2117
2118    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2119    if (is_udp) {
2120        maximize_sndbuf(sfd);
2121    } else {
2122        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2123        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2124        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2125    }
2126
2127    /*
2128     * the memset call clears nonstandard fields in some impementations
2129     * that otherwise mess things up.
2130     */
2131    memset(&addr, 0, sizeof(addr));
2132
2133    addr.sin_family = AF_INET;
2134    addr.sin_port = htons(port);
2135    addr.sin_addr = settings.interf;
2136    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2137        perror("bind()");
2138        close(sfd);
2139        return -1;
2140    }
2141    if (!is_udp && listen(sfd, 1024) == -1) {
2142        perror("listen()");
2143        close(sfd);
2144        return -1;
2145    }
2146    return sfd;
2147}
2148
2149static int new_socket_unix(void) {
2150    int sfd;
2151    int flags;
2152
2153    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2154        perror("socket()");
2155        return -1;
2156    }
2157
2158    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2159        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2160        perror("setting O_NONBLOCK");
2161        close(sfd);
2162        return -1;
2163    }
2164    return sfd;
2165}
2166
2167static int server_socket_unix(const char *path) {
2168    int sfd;
2169    struct linger ling = {0, 0};
2170    struct sockaddr_un addr;
2171    struct stat tstat;
2172    int flags =1;
2173
2174    if (!path) {
2175        return -1;
2176    }
2177
2178    if ((sfd = new_socket_unix()) == -1) {
2179        return -1;
2180    }
2181
2182    /*
2183     * Clean up a previous socket file if we left it around
2184     */
2185    if (lstat(path, &tstat) == 0) {
2186        if (S_ISSOCK(tstat.st_mode))
2187            unlink(path);
2188    }
2189
2190    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2191    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2192    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2193
2194    /*
2195     * the memset call clears nonstandard fields in some impementations
2196     * that otherwise mess things up.
2197     */
2198    memset(&addr, 0, sizeof(addr));
2199
2200    addr.sun_family = AF_UNIX;
2201    strcpy(addr.sun_path, path);
2202    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2203        perror("bind()");
2204        close(sfd);
2205        return -1;
2206    }
2207    if (listen(sfd, 1024) == -1) {
2208        perror("listen()");
2209        close(sfd);
2210        return -1;
2211    }
2212    return sfd;
2213}
2214
2215/* listening socket */
2216static int l_socket = 0;
2217
2218/* udp socket */
2219static int u_socket = -1;
2220
2221/* invoke right before gdb is called, on assert */
2222void pre_gdb(void) {
2223    int i;
2224    if (l_socket > -1) close(l_socket);
2225    if (u_socket > -1) close(u_socket);
2226    for (i = 3; i <= 500; i++) close(i); /* so lame */
2227    kill(getpid(), SIGABRT);
2228}
2229
2230/*
2231 * We keep the current time of day in a global variable that's updated by a
2232 * timer event. This saves us a bunch of time() system calls (we really only
2233 * need to get the time once a second, whereas there can be tens of thousands
2234 * of requests a second) and allows us to use server-start-relative timestamps
2235 * rather than absolute UNIX timestamps, a space savings on systems where
2236 * sizeof(time_t) > sizeof(unsigned int).
2237 */
2238volatile rel_time_t current_time;
2239static struct event clockevent;
2240
2241/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2242static void set_current_time(void) {
2243    current_time = (rel_time_t) (time(0) - stats.started);
2244}
2245
2246static void clock_handler(const int fd, const short which, void *arg) {
2247    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2248    static bool initialized = false;
2249
2250    if (initialized) {
2251        /* only delete the event if it's actually there. */
2252        evtimer_del(&clockevent);
2253    } else {
2254        initialized = true;
2255    }
2256
2257    evtimer_set(&clockevent, clock_handler, 0);
2258    event_base_set(main_base, &clockevent);
2259    evtimer_add(&clockevent, &t);
2260
2261    set_current_time();
2262}
2263
2264static struct event deleteevent;
2265
2266static void delete_handler(const int fd, const short which, void *arg) {
2267    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2268    static bool initialized = false;
2269
2270    if (initialized) {
2271        /* some versions of libevent don't like deleting events that don't exist,
2272           so only delete once we know this event has been added. */
2273        evtimer_del(&deleteevent);
2274    } else {
2275        initialized = true;
2276    }
2277
2278    evtimer_set(&deleteevent, delete_handler, 0);
2279    event_base_set(main_base, &deleteevent);
2280    evtimer_add(&deleteevent, &t);
2281    run_deferred_deletes();
2282}
2283
2284/* Call run_deferred_deletes instead of this. */
2285void do_run_deferred_deletes(void)
2286{
2287    int i, j = 0;
2288
2289    for (i = 0; i < delcurr; i++) {
2290        item *it = todelete[i];
2291        if (item_delete_lock_over(it)) {
2292            assert(it->refcount > 0);
2293            it->it_flags &= ~ITEM_DELETED;
2294            do_item_unlink(it);
2295            do_item_remove(it);
2296        } else {
2297            todelete[j++] = it;
2298        }
2299    }
2300    delcurr = j;
2301}
2302
2303static void usage(void) {
2304    printf(PACKAGE " " VERSION "\n");
2305    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2306           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2307           "-s <file>     unix socket path to listen on (disables network support)\n"
2308           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2309           "-d            run as a daemon\n"
2310           "-r            maximize core file limit\n"
2311           "-u <username> assume identity of <username> (only when run as root)\n"
2312           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2313           "-M            return error on memory exhausted (rather than removing items)\n"
2314           "-c <num>      max simultaneous connections, default is 1024\n"
2315           "-k            lock down all paged memory\n"
2316           "-v            verbose (print errors/warnings while in event loop)\n"
2317           "-vv           very verbose (also print client commands/reponses)\n"
2318           "-h            print this help and exit\n"
2319           "-i            print memcached and libevent license\n"
2320           "-b            run a managed instanced (mnemonic: buckets)\n"
2321           "-P <file>     save PID in <file>, only used with -d option\n"
2322           "-f <factor>   chunk size growth factor, default 1.25\n"
2323           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
2324#ifdef USE_THREADS
2325    printf("-t <num>      number of threads to use, default 4\n");
2326#endif
2327    return;
2328}
2329
2330static void usage_license(void) {
2331    printf(PACKAGE " " VERSION "\n\n");
2332    printf(
2333    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2334    "All rights reserved.\n"
2335    "\n"
2336    "Redistribution and use in source and binary forms, with or without\n"
2337    "modification, are permitted provided that the following conditions are\n"
2338    "met:\n"
2339    "\n"
2340    "    * Redistributions of source code must retain the above copyright\n"
2341    "notice, this list of conditions and the following disclaimer.\n"
2342    "\n"
2343    "    * Redistributions in binary form must reproduce the above\n"
2344    "copyright notice, this list of conditions and the following disclaimer\n"
2345    "in the documentation and/or other materials provided with the\n"
2346    "distribution.\n"
2347    "\n"
2348    "    * Neither the name of the Danga Interactive nor the names of its\n"
2349    "contributors may be used to endorse or promote products derived from\n"
2350    "this software without specific prior written permission.\n"
2351    "\n"
2352    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2353    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2354    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2355    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2356    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2357    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2358    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2359    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2360    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2361    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2362    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2363    "\n"
2364    "\n"
2365    "This product includes software developed by Niels Provos.\n"
2366    "\n"
2367    "[ libevent ]\n"
2368    "\n"
2369    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2370    "All rights reserved.\n"
2371    "\n"
2372    "Redistribution and use in source and binary forms, with or without\n"
2373    "modification, are permitted provided that the following conditions\n"
2374    "are met:\n"
2375    "1. Redistributions of source code must retain the above copyright\n"
2376    "   notice, this list of conditions and the following disclaimer.\n"
2377    "2. Redistributions in binary form must reproduce the above copyright\n"
2378    "   notice, this list of conditions and the following disclaimer in the\n"
2379    "   documentation and/or other materials provided with the distribution.\n"
2380    "3. All advertising materials mentioning features or use of this software\n"
2381    "   must display the following acknowledgement:\n"
2382    "      This product includes software developed by Niels Provos.\n"
2383    "4. The name of the author may not be used to endorse or promote products\n"
2384    "   derived from this software without specific prior written permission.\n"
2385    "\n"
2386    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2387    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2388    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2389    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2390    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2391    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2392    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2393    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2394    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2395    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2396    );
2397
2398    return;
2399}
2400
2401static void save_pid(const pid_t pid, const char *pid_file) {
2402    FILE *fp;
2403    if (pid_file == NULL)
2404        return;
2405
2406    if ((fp = fopen(pid_file, "w")) == NULL) {
2407        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2408        return;
2409    }
2410
2411    fprintf(fp,"%ld\n", (long)pid);
2412    if (fclose(fp) == -1) {
2413        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2414        return;
2415    }
2416}
2417
2418static void remove_pidfile(const char *pid_file) {
2419  if (pid_file == NULL)
2420      return;
2421
2422  if (unlink(pid_file) != 0) {
2423      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2424  }
2425
2426}
2427
2428
2429static void sig_handler(const int sig) {
2430    printf("SIGINT handled.\n");
2431    exit(EXIT_SUCCESS);
2432}
2433
2434int main (int argc, char **argv) {
2435    int c;
2436    struct in_addr addr;
2437    bool lock_memory = false;
2438    bool daemonize = false;
2439    int maxcore = 0;
2440    char *username = NULL;
2441    char *pid_file = NULL;
2442    struct passwd *pw;
2443    struct sigaction sa;
2444    struct rlimit rlim;
2445
2446    /* handle SIGINT */
2447    signal(SIGINT, sig_handler);
2448
2449    /* init settings */
2450    settings_init();
2451
2452    /* set stderr non-buffering (for running under, say, daemontools) */
2453    setbuf(stderr, NULL);
2454
2455    /* process arguments */
2456    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
2457        switch (c) {
2458        case 'U':
2459            settings.udpport = atoi(optarg);
2460            break;
2461        case 'b':
2462            settings.managed = true;
2463            break;
2464        case 'p':
2465            settings.port = atoi(optarg);
2466            break;
2467        case 's':
2468            settings.socketpath = optarg;
2469            break;
2470        case 'm':
2471            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2472            break;
2473        case 'M':
2474            settings.evict_to_free = 0;
2475            break;
2476        case 'c':
2477            settings.maxconns = atoi(optarg);
2478            break;
2479        case 'h':
2480            usage();
2481            exit(EXIT_SUCCESS);
2482        case 'i':
2483            usage_license();
2484            exit(EXIT_SUCCESS);
2485        case 'k':
2486            lock_memory = true;
2487            break;
2488        case 'v':
2489            settings.verbose++;
2490            break;
2491        case 'l':
2492            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
2493                fprintf(stderr, "Illegal address: %s\n", optarg);
2494                return 1;
2495            } else {
2496                settings.interf = addr;
2497            }
2498            break;
2499        case 'd':
2500            daemonize = true;
2501            break;
2502        case 'r':
2503            maxcore = 1;
2504            break;
2505        case 'u':
2506            username = optarg;
2507            break;
2508        case 'P':
2509            pid_file = optarg;
2510            break;
2511        case 'f':
2512            settings.factor = atof(optarg);
2513            if (settings.factor <= 1.0) {
2514                fprintf(stderr, "Factor must be greater than 1\n");
2515                return 1;
2516            }
2517            break;
2518        case 'n':
2519            settings.chunk_size = atoi(optarg);
2520            if (settings.chunk_size == 0) {
2521                fprintf(stderr, "Chunk size must be greater than 0\n");
2522                return 1;
2523            }
2524            break;
2525        case 't':
2526            settings.num_threads = atoi(optarg);
2527            if (settings.num_threads == 0) {
2528                fprintf(stderr, "Number of threads must be greater than 0\n");
2529                return 1;
2530            }
2531            break;
2532        case 'D':
2533            if (! optarg || ! optarg[0]) {
2534                fprintf(stderr, "No delimiter specified\n");
2535                return 1;
2536            }
2537            settings.prefix_delimiter = optarg[0];
2538            settings.detail_enabled = 1;
2539            break;
2540        default:
2541            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2542            return 1;
2543        }
2544    }
2545
2546    if (maxcore != 0) {
2547        struct rlimit rlim_new;
2548        /*
2549         * First try raising to infinity; if that fails, try bringing
2550         * the soft limit to the hard.
2551         */
2552        if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
2553            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
2554            if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
2555                /* failed. try raising just to the old max */
2556                rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
2557                (void)setrlimit(RLIMIT_CORE, &rlim_new);
2558            }
2559        }
2560        /*
2561         * getrlimit again to see what we ended up with. Only fail if
2562         * the soft limit ends up 0, because then no core files will be
2563         * created at all.
2564         */
2565
2566        if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
2567            fprintf(stderr, "failed to ensure corefile creation\n");
2568            exit(EXIT_FAILURE);
2569        }
2570    }
2571
2572    /*
2573     * If needed, increase rlimits to allow as many connections
2574     * as needed.
2575     */
2576
2577    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2578        fprintf(stderr, "failed to getrlimit number of files\n");
2579        exit(EXIT_FAILURE);
2580    } else {
2581        int maxfiles = settings.maxconns;
2582        if (rlim.rlim_cur < maxfiles)
2583            rlim.rlim_cur = maxfiles + 3;
2584        if (rlim.rlim_max < rlim.rlim_cur)
2585            rlim.rlim_max = rlim.rlim_cur;
2586        if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2587            fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
2588            exit(EXIT_FAILURE);
2589        }
2590    }
2591
2592    /*
2593     * initialization order: first create the listening sockets
2594     * (may need root on low ports), then drop root if needed,
2595     * then daemonise if needed, then init libevent (in some cases
2596     * descriptors created by libevent wouldn't survive forking).
2597     */
2598
2599    /* create the listening socket and bind it */
2600    if (settings.socketpath == NULL) {
2601        l_socket = server_socket(settings.port, 0);
2602        if (l_socket == -1) {
2603            fprintf(stderr, "failed to listen\n");
2604            exit(EXIT_FAILURE);
2605        }
2606    }
2607
2608    if (settings.udpport > 0 && settings.socketpath == NULL) {
2609        /* create the UDP listening socket and bind it */
2610        u_socket = server_socket(settings.udpport, 1);
2611        if (u_socket == -1) {
2612            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
2613            exit(EXIT_FAILURE);
2614        }
2615    }
2616
2617    /* lose root privileges if we have them */
2618    if (getuid() == 0 || geteuid() == 0) {
2619        if (username == 0 || *username == '\0') {
2620            fprintf(stderr, "can't run as root without the -u switch\n");
2621            return 1;
2622        }
2623        if ((pw = getpwnam(username)) == 0) {
2624            fprintf(stderr, "can't find the user %s to switch to\n", username);
2625            return 1;
2626        }
2627        if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
2628            fprintf(stderr, "failed to assume identity of user %s\n", username);
2629            return 1;
2630        }
2631    }
2632
2633    /* create unix mode sockets after dropping privileges */
2634    if (settings.socketpath != NULL) {
2635        l_socket = server_socket_unix(settings.socketpath);
2636        if (l_socket == -1) {
2637            fprintf(stderr, "failed to listen\n");
2638            exit(EXIT_FAILURE);
2639        }
2640    }
2641
2642    /* daemonize if requested */
2643    /* if we want to ensure our ability to dump core, don't chdir to / */
2644    if (daemonize) {
2645        int res;
2646        res = daemon(maxcore, settings.verbose);
2647        if (res == -1) {
2648            fprintf(stderr, "failed to daemon() in order to daemonize\n");
2649            return 1;
2650        }
2651    }
2652
2653
2654    /* initialize main thread libevent instance */
2655    main_base = event_init();
2656
2657    /* initialize other stuff */
2658    item_init();
2659    stats_init();
2660    assoc_init();
2661    conn_init();
2662    slabs_init(settings.maxbytes, settings.factor);
2663
2664    /* managed instance? alloc and zero a bucket array */
2665    if (settings.managed) {
2666        buckets = malloc(sizeof(int) * MAX_BUCKETS);
2667        if (buckets == 0) {
2668            fprintf(stderr, "failed to allocate the bucket array");
2669            exit(EXIT_FAILURE);
2670        }
2671        memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
2672    }
2673
2674    /* lock paged memory if needed */
2675    if (lock_memory) {
2676#ifdef HAVE_MLOCKALL
2677        mlockall(MCL_CURRENT | MCL_FUTURE);
2678#else
2679        fprintf(stderr, "warning: mlockall() not supported on this platform.  proceeding without.\n");
2680#endif
2681    }
2682
2683    /*
2684     * ignore SIGPIPE signals; we can use errno==EPIPE if we
2685     * need that information
2686     */
2687    sa.sa_handler = SIG_IGN;
2688    sa.sa_flags = 0;
2689    if (sigemptyset(&sa.sa_mask) == -1 ||
2690        sigaction(SIGPIPE, &sa, 0) == -1) {
2691        perror("failed to ignore SIGPIPE; sigaction");
2692        exit(EXIT_FAILURE);
2693    }
2694    /* create the initial listening connection */
2695    if (!(listen_conn = conn_new(l_socket, conn_listening,
2696                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
2697        fprintf(stderr, "failed to create listening connection");
2698        exit(EXIT_FAILURE);
2699    }
2700    /* start up worker threads if MT mode */
2701    thread_init(settings.num_threads, main_base);
2702    /* save the PID in if we're a daemon, do this after thread_init due to
2703       a file descriptor handling bug somewhere in libevent */
2704    if (daemonize)
2705        save_pid(getpid(), pid_file);
2706    /* initialise clock event */
2707    clock_handler(0, 0, 0);
2708    /* initialise deletion array and timer event */
2709    deltotal = 200;
2710    delcurr = 0;
2711    if ((todelete = malloc(sizeof(item *) * deltotal)) == NULL) {
2712        perror("failed to allocate memory for deletion array");
2713        exit(EXIT_FAILURE);
2714    }
2715    delete_handler(0, 0, 0); /* sets up the event */
2716    /* create the initial listening udp connection, monitored on all threads */
2717    if (u_socket > -1) {
2718        for (c = 0; c < settings.num_threads; c++) {
2719            /* this is guaranteed to hit all threads because we round-robin */
2720            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
2721                              UDP_READ_BUFFER_SIZE, 1);
2722        }
2723    }
2724    /* enter the event loop */
2725    event_base_loop(main_base, 0);
2726    /* remove the PID file if we're a daemon */
2727    if (daemonize)
2728        remove_pidfile(pid_file);
2729    return 0;
2730}
Note: See TracBrowser for help on using the browser.