root/trunk/server/memcached.c @ 590

Revision 590, 80.0 kB (checked in by sgrimm, 2 years ago)

Fix the "stats items" command; the powers-of-N chunk size change broke it

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <sys/signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(const bool is_udp);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92
93/* time handling */
94static void set_current_time(void);  /* update the global variable holding
95                              global 32-bit seconds-since-start time
96                              (to avoid 64 bit time_t) */
97
98void pre_gdb(void);
99static void conn_free(conn *c);
100
101/** exported globals **/
102struct stats stats;
103struct settings settings;
104
105/** file scope variables **/
106static item **todelete = 0;
107static int delcurr;
108static int deltotal;
109static conn *listen_conn;
110static struct event_base *main_base;
111
112#define TRANSMIT_COMPLETE   0
113#define TRANSMIT_INCOMPLETE 1
114#define TRANSMIT_SOFT_ERROR 2
115#define TRANSMIT_HARD_ERROR 3
116
117static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119#define REALTIME_MAXDELTA 60*60*24*30
120/*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125static rel_time_t realtime(const time_t exptime) {
126    /* no. of seconds in 30 days - largest possible delta exptime */
127
128    if (exptime == 0) return 0; /* 0 means never expire */
129
130    if (exptime > REALTIME_MAXDELTA) {
131        /* if item expiration is at/before the server started, give it an
132           expiration time of 1 second after the server started.
133           (because 0 means don't expire).  without this, we'd
134           underflow and wrap around to some large value way in the
135           future, effectively making items expiring in the past
136           really expiring never */
137        if (exptime <= stats.started)
138            return (rel_time_t)1;
139        return (rel_time_t)(exptime - stats.started);
140    } else {
141        return (rel_time_t)(exptime + current_time);
142    }
143}
144
145static void stats_init(void) {
146    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150    /* make the time we started always be 2 seconds before we really
151       did, so time(0) - time.started is never zero.  if so, things
152       like 'settings.oldest_live' which act as booleans as well as
153       values are now false in boolean context... */
154    stats.started = time(0) - 2;
155    stats_prefix_init();
156}
157
158static void stats_reset(void) {
159    STATS_LOCK();
160    stats.total_items = stats.total_conns = 0;
161    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162    stats.bytes_read = stats.bytes_written = 0;
163    stats_prefix_clear();
164    STATS_UNLOCK();
165}
166
167static void settings_init(void) {
168    settings.port = 11211;
169    settings.udpport = 0;
170    settings.interf.s_addr = htonl(INADDR_ANY);
171    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
172    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
173    settings.verbose = 0;
174    settings.oldest_live = 0;
175    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
176    settings.socketpath = NULL;       /* by default, not using a unix socket */
177    settings.managed = false;
178    settings.factor = 1.25;
179    settings.chunk_size = 48;         /* space for a modest key and value */
180#ifdef USE_THREADS
181    settings.num_threads = 4;
182#else
183    settings.num_threads = 1;
184#endif
185    settings.prefix_delimiter = ':';
186    settings.detail_enabled = 0;
187}
188
189/* returns true if a deleted item's delete-locked-time is over, and it
190   should be removed from the namespace */
191static bool item_delete_lock_over (item *it) {
192    assert(it->it_flags & ITEM_DELETED);
193    return (current_time >= it->exptime);
194}
195
196/*
197 * Adds a message header to a connection.
198 *
199 * Returns 0 on success, -1 on out-of-memory.
200 */
201static int add_msghdr(conn *c)
202{
203    struct msghdr *msg;
204
205    assert(c != NULL);
206
207    if (c->msgsize == c->msgused) {
208        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
209        if (! msg)
210            return -1;
211        c->msglist = msg;
212        c->msgsize *= 2;
213    }
214
215    msg = c->msglist + c->msgused;
216
217    /* this wipes msg_iovlen, msg_control, msg_controllen, and
218       msg_flags, the last 3 of which aren't defined on solaris: */
219    memset(msg, 0, sizeof(struct msghdr));
220
221    msg->msg_iov = &c->iov[c->iovused];
222    msg->msg_name = &c->request_addr;
223    msg->msg_namelen = c->request_addr_size;
224
225    c->msgbytes = 0;
226    c->msgused++;
227
228    if (c->udp) {
229        /* Leave room for the UDP header, which we'll fill in later. */
230        return add_iov(c, NULL, UDP_HEADER_SIZE);
231    }
232
233    return 0;
234}
235
236
237/*
238 * Free list management for connections.
239 */
240
241static conn **freeconns;
242static int freetotal;
243static int freecurr;
244
245
246static void conn_init(void) {
247    freetotal = 200;
248    freecurr = 0;
249    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
250        perror("malloc()");
251    }
252    return;
253}
254
255/*
256 * Returns a connection from the freelist, if any. Should call this using
257 * conn_from_freelist() for thread safety.
258 */
259conn *do_conn_from_freelist() {
260    conn *c;
261
262    if (freecurr > 0) {
263        c = freeconns[--freecurr];
264    } else {
265        c = NULL;
266    }
267
268    return c;
269}
270
271/*
272 * Adds a connection to the freelist. 0 = success. Should call this using
273 * conn_add_to_freelist() for thread safety.
274 */
275int do_conn_add_to_freelist(conn *c) {
276    if (freecurr < freetotal) {
277        freeconns[freecurr++] = c;
278        return 0;
279    } else {
280        /* try to enlarge free connections array */
281        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
282        if (new_freeconns) {
283            freetotal *= 2;
284            freeconns = new_freeconns;
285            freeconns[freecurr++] = c;
286            return 0;
287        }
288    }
289    return 1;
290}
291
292conn *conn_new(const int sfd, const int init_state, const int event_flags,
293                const int read_buffer_size, const bool is_udp, struct event_base *base) {
294    conn *c = conn_from_freelist();
295
296    if (NULL == c) {
297        if (!(c = (conn *)malloc(sizeof(conn)))) {
298            perror("malloc()");
299            return NULL;
300        }
301        c->rbuf = c->wbuf = 0;
302        c->ilist = 0;
303        c->iov = 0;
304        c->msglist = 0;
305        c->hdrbuf = 0;
306
307        c->rsize = read_buffer_size;
308        c->wsize = DATA_BUFFER_SIZE;
309        c->isize = ITEM_LIST_INITIAL;
310        c->iovsize = IOV_LIST_INITIAL;
311        c->msgsize = MSG_LIST_INITIAL;
312        c->hdrsize = 0;
313
314        c->rbuf = (char *)malloc((size_t)c->rsize);
315        c->wbuf = (char *)malloc((size_t)c->wsize);
316        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
317        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
318        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
319
320        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
321                c->msglist == 0) {
322            if (c->rbuf != 0) free(c->rbuf);
323            if (c->wbuf != 0) free(c->wbuf);
324            if (c->ilist !=0) free(c->ilist);
325            if (c->iov != 0) free(c->iov);
326            if (c->msglist != 0) free(c->msglist);
327            free(c);
328            perror("malloc()");
329            return NULL;
330        }
331
332        STATS_LOCK();
333        stats.conn_structs++;
334        STATS_UNLOCK();
335    }
336
337    if (settings.verbose > 1) {
338        if (init_state == conn_listening)
339            fprintf(stderr, "<%d server listening\n", sfd);
340        else if (is_udp)
341            fprintf(stderr, "<%d server listening (udp)\n", sfd);
342        else
343            fprintf(stderr, "<%d new client connection\n", sfd);
344    }
345
346    c->sfd = sfd;
347    c->udp = is_udp;
348    c->state = init_state;
349    c->rlbytes = 0;
350    c->rbytes = c->wbytes = 0;
351    c->wcurr = c->wbuf;
352    c->rcurr = c->rbuf;
353    c->ritem = 0;
354    c->icurr = c->ilist;
355    c->ileft = 0;
356    c->iovused = 0;
357    c->msgcurr = 0;
358    c->msgused = 0;
359
360    c->write_and_go = conn_read;
361    c->write_and_free = 0;
362    c->item = 0;
363    c->bucket = -1;
364    c->gen = 0;
365
366    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
367    event_base_set(base, &c->event);
368    c->ev_flags = event_flags;
369
370    if (event_add(&c->event, 0) == -1) {
371        if (conn_add_to_freelist(c)) {
372            conn_free(c);
373        }
374        return NULL;
375    }
376
377    STATS_LOCK();
378    stats.curr_conns++;
379    stats.total_conns++;
380    STATS_UNLOCK();
381
382    return c;
383}
384
385static void conn_cleanup(conn *c) {
386    assert(c != NULL);
387
388    if (c->item) {
389        item_remove(c->item);
390        c->item = 0;
391    }
392
393    if (c->ileft != 0) {
394        for (; c->ileft > 0; c->ileft--,c->icurr++) {
395            item_remove(*(c->icurr));
396        }
397    }
398
399    if (c->write_and_free) {
400        free(c->write_and_free);
401        c->write_and_free = 0;
402    }
403}
404
405/*
406 * Frees a connection.
407 */
408void conn_free(conn *c) {
409    if (c) {
410        if (c->hdrbuf)
411            free(c->hdrbuf);
412        if (c->msglist)
413            free(c->msglist);
414        if (c->rbuf)
415            free(c->rbuf);
416        if (c->wbuf)
417            free(c->wbuf);
418        if (c->ilist)
419            free(c->ilist);
420        if (c->iov)
421            free(c->iov);
422        free(c);
423    }
424}
425
426static void conn_close(conn *c) {
427    assert(c != NULL);
428
429    /* delete the event, the socket and the conn */
430    event_del(&c->event);
431
432    if (settings.verbose > 1)
433        fprintf(stderr, "<%d connection closed.\n", c->sfd);
434
435    close(c->sfd);
436    accept_new_conns(true);
437    conn_cleanup(c);
438
439    /* if the connection has big buffers, just free it */
440    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
441        conn_free(c);
442    }
443
444    STATS_LOCK();
445    stats.curr_conns--;
446    STATS_UNLOCK();
447
448    return;
449}
450
451
452/*
453 * Shrinks a connection's buffers if they're too big.  This prevents
454 * periodic large "get" requests from permanently chewing lots of server
455 * memory.
456 *
457 * This should only be called in between requests since it can wipe output
458 * buffers!
459 */
460static void conn_shrink(conn *c) {
461    assert(c != NULL);
462
463    if (c->udp)
464        return;
465
466    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
467        char *newbuf;
468
469        if (c->rcurr != c->rbuf)
470            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
471
472        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
473
474        if (newbuf) {
475            c->rbuf = newbuf;
476            c->rsize = DATA_BUFFER_SIZE;
477        }
478        /* TODO check other branch... */
479        c->rcurr = c->rbuf;
480    }
481
482    if (c->isize > ITEM_LIST_HIGHWAT) {
483        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
484        if (newbuf) {
485            c->ilist = newbuf;
486            c->isize = ITEM_LIST_INITIAL;
487        }
488    /* TODO check error condition? */
489    }
490
491    if (c->msgsize > MSG_LIST_HIGHWAT) {
492        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
493        if (newbuf) {
494            c->msglist = newbuf;
495            c->msgsize = MSG_LIST_INITIAL;
496        }
497    /* TODO check error condition? */
498    }
499
500    if (c->iovsize > IOV_LIST_HIGHWAT) {
501        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
502        if (newbuf) {
503            c->iov = newbuf;
504            c->iovsize = IOV_LIST_INITIAL;
505        }
506    /* TODO check return value */
507    }
508}
509
510/*
511 * Sets a connection's current state in the state machine. Any special
512 * processing that needs to happen on certain state transitions can
513 * happen here.
514 */
515static void conn_set_state(conn *c, int state) {
516    assert(c != NULL);
517
518    if (state != c->state) {
519        if (state == conn_read) {
520            conn_shrink(c);
521            assoc_move_next_bucket();
522        }
523        c->state = state;
524    }
525}
526
527
528/*
529 * Ensures that there is room for another struct iovec in a connection's
530 * iov list.
531 *
532 * Returns 0 on success, -1 on out-of-memory.
533 */
534static int ensure_iov_space(conn *c) {
535    assert(c != NULL);
536
537    if (c->iovused >= c->iovsize) {
538        int i, iovnum;
539        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
540                                (c->iovsize * 2) * sizeof(struct iovec));
541        if (! new_iov)
542            return -1;
543        c->iov = new_iov;
544        c->iovsize *= 2;
545
546        /* Point all the msghdr structures at the new list. */
547        for (i = 0, iovnum = 0; i < c->msgused; i++) {
548            c->msglist[i].msg_iov = &c->iov[iovnum];
549            iovnum += c->msglist[i].msg_iovlen;
550        }
551    }
552
553    return 0;
554}
555
556
557/*
558 * Adds data to the list of pending data that will be written out to a
559 * connection.
560 *
561 * Returns 0 on success, -1 on out-of-memory.
562 */
563
564static int add_iov(conn *c, const void *buf, int len) {
565    struct msghdr *m;
566    int leftover;
567    bool limit_to_mtu;
568
569    assert(c != NULL);
570
571    do {
572        m = &c->msglist[c->msgused - 1];
573
574        /*
575         * Limit UDP packets, and the first payloads of TCP replies, to
576         * UDP_MAX_PAYLOAD_SIZE bytes.
577         */
578        limit_to_mtu = c->udp || (1 == c->msgused);
579
580        /* We may need to start a new msghdr if this one is full. */
581        if (m->msg_iovlen == IOV_MAX ||
582            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
583            add_msghdr(c);
584            m = &c->msglist[c->msgused - 1];
585        }
586
587        if (ensure_iov_space(c) != 0)
588            return -1;
589
590        /* If the fragment is too big to fit in the datagram, split it up */
591        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
592            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
593            len -= leftover;
594        } else {
595            leftover = 0;
596        }
597
598        m = &c->msglist[c->msgused - 1];
599        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
600        m->msg_iov[m->msg_iovlen].iov_len = len;
601
602        c->msgbytes += len;
603        c->iovused++;
604        m->msg_iovlen++;
605
606        buf = ((char *)buf) + len;
607        len = leftover;
608    } while (leftover > 0);
609
610    return 0;
611}
612
613
614/*
615 * Constructs a set of UDP headers and attaches them to the outgoing messages.
616 */
617static int build_udp_headers(conn *c) {
618    int i;
619    unsigned char *hdr;
620
621    assert(c != NULL);
622
623    if (c->msgused > c->hdrsize) {
624        void *new_hdrbuf;
625        if (c->hdrbuf)
626            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
627        else
628            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
629        if (! new_hdrbuf)
630            return -1;
631        c->hdrbuf = (unsigned char *)new_hdrbuf;
632        c->hdrsize = c->msgused * 2;
633    }
634
635    hdr = c->hdrbuf;
636    for (i = 0; i < c->msgused; i++) {
637        c->msglist[i].msg_iov[0].iov_base = hdr;
638        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
639        *hdr++ = c->request_id / 256;
640        *hdr++ = c->request_id % 256;
641        *hdr++ = i / 256;
642        *hdr++ = i % 256;
643        *hdr++ = c->msgused / 256;
644        *hdr++ = c->msgused % 256;
645        *hdr++ = 0;
646        *hdr++ = 0;
647        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
648    }
649
650    return 0;
651}
652
653
654static void out_string(conn *c, const char *str) {
655    size_t len;
656
657    assert(c != NULL);
658
659    if (settings.verbose > 1)
660        fprintf(stderr, ">%d %s\n", c->sfd, str);
661
662    len = strlen(str);
663    if ((len + 2) > c->wsize) {
664        /* ought to be always enough. just fail for simplicity */
665        str = "SERVER_ERROR output line too long";
666        len = strlen(str);
667    }
668
669    memcpy(c->wbuf, str, len);
670    memcpy(c->wbuf + len, "\r\n", 3);
671    c->wbytes = len + 2;
672    c->wcurr = c->wbuf;
673
674    conn_set_state(c, conn_write);
675    c->write_and_go = conn_read;
676    return;
677}
678
679/*
680 * we get here after reading the value in set/add/replace commands. The command
681 * has been stored in c->item_comm, and the item is ready in c->item.
682 */
683
684static void complete_nread(conn *c) {
685    assert(c != NULL);
686
687    item *it = c->item;
688    int comm = c->item_comm;
689
690    STATS_LOCK();
691    stats.set_cmds++;
692    STATS_UNLOCK();
693
694    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
695        out_string(c, "CLIENT_ERROR bad data chunk");
696    } else {
697        if (store_item(it, comm)) {
698            out_string(c, "STORED");
699        } else {
700            out_string(c, "NOT_STORED");
701        }
702    }
703
704    item_remove(c->item);       /* release the c->item reference */
705    c->item = 0;
706}
707
708/*
709 * Stores an item in the cache according to the semantics of one of the set
710 * commands. In threaded mode, this is protected by the cache lock.
711 *
712 * Returns true if the item was stored.
713 */
714int do_store_item(item *it, int comm) {
715    char *key = ITEM_key(it);
716    bool delete_locked = false;
717    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
718    int stored = 0;
719
720    if (old_it != NULL && comm == NREAD_ADD) {
721        /* add only adds a nonexistent item, but promote to head of LRU */
722        do_item_update(old_it);
723    } else if (!old_it && comm == NREAD_REPLACE) {
724        /* replace only replaces an existing value; don't store */
725    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
726        /* replace and add can't override delete locks; don't store */
727    } else {
728        /* "set" commands can override the delete lock
729           window... in which case we have to find the old hidden item
730           that's in the namespace/LRU but wasn't returned by
731           item_get.... because we need to replace it */
732        if (delete_locked)
733            old_it = do_item_get_nocheck(key, it->nkey);
734
735        if (old_it != NULL)
736            do_item_replace(old_it, it);
737        else
738            do_item_link(it);
739
740        stored = 1;
741    }
742
743    if (old_it)
744        do_item_remove(old_it);         /* release our reference */
745    return stored;
746}
747
748typedef struct token_s {
749    char *value;
750    size_t length;
751} token_t;
752
753#define COMMAND_TOKEN 0
754#define SUBCOMMAND_TOKEN 1
755#define KEY_TOKEN 1
756#define KEY_MAX_LENGTH 250
757
758#define MAX_TOKENS 6
759
760/*
761 * Tokenize the command string by replacing whitespace with '\0' and update
762 * the token array tokens with pointer to start of each token and length.
763 * Returns total number of tokens.  The last valid token is the terminal
764 * token (value points to the first unprocessed character of the string and
765 * length zero).
766 *
767 * Usage example:
768 *
769 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
770 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
771 *          ...
772 *      }
773 *      ncommand = tokens[ix].value - command;
774 *      command  = tokens[ix].value;
775 *   }
776 */
777static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
778    char *s, *e;
779    size_t ntokens = 0;
780
781    assert(command != NULL && tokens != NULL && max_tokens > 1);
782
783    for (s = e = command; ntokens < max_tokens - 1; ++e) {
784        if (*e == ' ') {
785            if (s != e) {
786                tokens[ntokens].value = s;
787                tokens[ntokens].length = e - s;
788                ntokens++;
789                *e = '\0';
790            }
791            s = e + 1;
792        }
793        else if (*e == '\0') {
794            if (s != e) {
795                tokens[ntokens].value = s;
796                tokens[ntokens].length = e - s;
797                ntokens++;
798            }
799
800            break; /* string end */
801        }
802    }
803
804    /*
805     * If we scanned the whole string, the terminal value pointer is null,
806     * otherwise it is the first unprocessed character.
807     */
808    tokens[ntokens].value =  *e == '\0' ? NULL : e;
809    tokens[ntokens].length = 0;
810    ntokens++;
811
812    return ntokens;
813}
814
815/* set up a connection to write a buffer then free it, used for stats */
816static void write_and_free(conn *c, char *buf, int bytes) {
817    if (buf) {
818        c->write_and_free = buf;
819        c->wcurr = buf;
820        c->wbytes = bytes;
821        conn_set_state(c, conn_write);
822        c->write_and_go = conn_read;
823    } else {
824        out_string(c, "SERVER_ERROR out of memory");
825    }
826}
827
828inline static void process_stats_detail(conn *c, const char *command) {
829    assert(c != NULL);
830
831    if (strcmp(command, "on") == 0) {
832        settings.detail_enabled = 1;
833        out_string(c, "OK");
834    }
835    else if (strcmp(command, "off") == 0) {
836        settings.detail_enabled = 0;
837        out_string(c, "OK");
838    }
839    else if (strcmp(command, "dump") == 0) {
840        int len;
841        char *stats = stats_prefix_dump(&len);
842        write_and_free(c, stats, len);
843    }
844    else {
845        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
846    }
847}
848
849static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
850    rel_time_t now = current_time;
851    char *command;
852    char *subcommand;
853
854    assert(c != NULL);
855
856    if(ntokens < 2) {
857        out_string(c, "CLIENT_ERROR bad command line");
858        return;
859    }
860
861    command = tokens[COMMAND_TOKEN].value;
862
863    if (ntokens == 2 && strcmp(command, "stats") == 0) {
864        char temp[1024];
865        pid_t pid = getpid();
866        char *pos = temp;
867
868#ifndef WIN32
869        struct rusage usage;
870        getrusage(RUSAGE_SELF, &usage);
871#endif /* !WIN32 */
872
873        STATS_LOCK();
874        pos += sprintf(pos, "STAT pid %u\r\n", pid);
875        pos += sprintf(pos, "STAT uptime %u\r\n", now);
876        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
877        pos += sprintf(pos, "STAT version " VERSION "\r\n");
878        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
879#ifndef WIN32
880        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
881        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
882#endif /* !WIN32 */
883        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
884        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
885        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
886        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
887        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
888        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
889        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
890        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
891        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
892        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
893        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
894        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
895        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
896        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
897        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
898        pos += sprintf(pos, "END");
899        STATS_UNLOCK();
900        out_string(c, temp);
901        return;
902    }
903
904    subcommand = tokens[SUBCOMMAND_TOKEN].value;
905
906    if (strcmp(subcommand, "reset") == 0) {
907        stats_reset();
908        out_string(c, "RESET");
909        return;
910    }
911
912#ifdef HAVE_MALLOC_H
913#ifdef HAVE_STRUCT_MALLINFO
914    if (strcmp(subcommand, "malloc") == 0) {
915        char temp[512];
916        struct mallinfo info;
917        char *pos = temp;
918
919        info = mallinfo();
920        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
921        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
922        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
923        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
924        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
925        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
926        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
927        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
928        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
929        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
930        out_string(c, temp);
931        return;
932    }
933#endif /* HAVE_STRUCT_MALLINFO */
934#endif /* HAVE_MALLOC_H */
935
936#if !defined(WIN32) || !defined(__APPLE__)
937    if (strcmp(subcommand, "maps") == 0) {
938        char *wbuf;
939        int wsize = 8192; /* should be enough */
940        int fd;
941        int res;
942
943        if ((wbuf = (char *)malloc(wsize)) == NULL) {
944            out_string(c, "SERVER_ERROR out of memory");
945            return;
946        }
947
948        fd = open("/proc/self/maps", O_RDONLY);
949        if (fd == -1) {
950            out_string(c, "SERVER_ERROR cannot open the maps file");
951            free(wbuf);
952            return;
953        }
954
955        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
956        if (res == wsize - 6) {
957            out_string(c, "SERVER_ERROR buffer overflow");
958            free(wbuf); close(fd);
959            return;
960        }
961        if (res == 0 || res == -1) {
962            out_string(c, "SERVER_ERROR can't read the maps file");
963            free(wbuf); close(fd);
964            return;
965        }
966        memcpy(wbuf + res, "END\r\n", 5);
967        write_and_free(c, wbuf, res + 5);
968        close(fd);
969        return;
970    }
971#endif
972
973    if (strcmp(subcommand, "cachedump") == 0) {
974
975        char *buf;
976        unsigned int bytes, id, limit = 0;
977
978        if(ntokens < 5) {
979            out_string(c, "CLIENT_ERROR bad command line");
980            return;
981        }
982
983        id = strtoul(tokens[2].value, NULL, 10);
984        limit = strtoul(tokens[3].value, NULL, 10);
985
986        if(errno == ERANGE) {
987            out_string(c, "CLIENT_ERROR bad command line format");
988            return;
989        }
990
991        buf = item_cachedump(id, limit, &bytes);
992        write_and_free(c, buf, bytes);
993        return;
994    }
995
996    if (strcmp(subcommand, "slabs") == 0) {
997        int bytes = 0;
998        char *buf = slabs_stats(&bytes);
999        write_and_free(c, buf, bytes);
1000        return;
1001    }
1002
1003    if (strcmp(subcommand, "items") == 0) {
1004        int bytes = 0;
1005        char *buf = item_stats(&bytes);
1006        write_and_free(c, buf, bytes);
1007        return;
1008    }
1009
1010    if (strcmp(subcommand, "detail") == 0) {
1011        if (ntokens < 4)
1012            process_stats_detail(c, "");  /* outputs the error message */
1013        else
1014            process_stats_detail(c, tokens[2].value);
1015        return;
1016    }
1017
1018    if (strcmp(subcommand, "sizes") == 0) {
1019        int bytes = 0;
1020        char *buf = item_stats_sizes(&bytes);
1021        write_and_free(c, buf, bytes);
1022        return;
1023    }
1024
1025    out_string(c, "ERROR");
1026}
1027
1028/* ntokens is overwritten here... shrug.. */
1029static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) {
1030    char *key;
1031    size_t nkey;
1032    int i = 0;
1033    item *it;
1034    token_t *key_token = &tokens[KEY_TOKEN];
1035
1036    assert(c != NULL);
1037
1038    if (settings.managed) {
1039        int bucket = c->bucket;
1040        if (bucket == -1) {
1041            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1042            return;
1043        }
1044        c->bucket = -1;
1045        if (buckets[bucket] != c->gen) {
1046            out_string(c, "ERROR_NOT_OWNER");
1047            return;
1048        }
1049    }
1050
1051    do {
1052        while(key_token->length != 0) {
1053
1054            key = key_token->value;
1055            nkey = key_token->length;
1056
1057            if(nkey > KEY_MAX_LENGTH) {
1058                out_string(c, "CLIENT_ERROR bad command line format");
1059                return;
1060            }
1061
1062            STATS_LOCK();
1063            stats.get_cmds++;
1064            STATS_UNLOCK();
1065            it = item_get(key, nkey);
1066            if (settings.detail_enabled) {
1067                stats_prefix_record_get(key, NULL != it);
1068            }
1069            if (it) {
1070                if (i >= c->isize) {
1071                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1072                    if (new_list) {
1073                        c->isize *= 2;
1074                        c->ilist = new_list;
1075                    } else break;
1076                }
1077
1078                /*
1079                 * Construct the response. Each hit adds three elements to the
1080                 * outgoing data list:
1081                 *   "VALUE "
1082                 *   key
1083                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1084                 */
1085                if (add_iov(c, "VALUE ", 6) != 0 ||
1086                    add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1087                    add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1088                    {
1089                        break;
1090                    }
1091                if (settings.verbose > 1)
1092                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1093
1094                /* item_get() has incremented it->refcount for us */
1095                STATS_LOCK();
1096                stats.get_hits++;
1097                STATS_UNLOCK();
1098                item_update(it);
1099                *(c->ilist + i) = it;
1100                i++;
1101
1102            } else {
1103                STATS_LOCK();
1104                stats.get_misses++;
1105                STATS_UNLOCK();
1106            }
1107
1108            key_token++;
1109        }
1110
1111        /*
1112         * If the command string hasn't been fully processed, get the next set
1113         * of tokens.
1114         */
1115        if(key_token->value != NULL) {
1116            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1117            key_token = tokens;
1118        }
1119
1120    } while(key_token->value != NULL);
1121
1122    c->icurr = c->ilist;
1123    c->ileft = i;
1124
1125    if (settings.verbose > 1)
1126        fprintf(stderr, ">%d END\n", c->sfd);
1127    add_iov(c, "END\r\n", 5);
1128
1129    if (c->udp && build_udp_headers(c) != 0) {
1130        out_string(c, "SERVER_ERROR out of memory");
1131    }
1132    else {
1133        conn_set_state(c, conn_mwrite);
1134        c->msgcurr = 0;
1135    }
1136    return;
1137}
1138
1139static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm) {
1140    char *key;
1141    size_t nkey;
1142    int flags;
1143    time_t exptime;
1144    int vlen;
1145    item *it;
1146
1147    assert(c != NULL);
1148
1149    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1150        out_string(c, "CLIENT_ERROR bad command line format");
1151        return;
1152    }
1153
1154    key = tokens[KEY_TOKEN].value;
1155    nkey = tokens[KEY_TOKEN].length;
1156
1157    flags = strtoul(tokens[2].value, NULL, 10);
1158    exptime = strtol(tokens[3].value, NULL, 10);
1159    vlen = strtol(tokens[4].value, NULL, 10);
1160
1161    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1162        out_string(c, "CLIENT_ERROR bad command line format");
1163        return;
1164    }
1165
1166    if (settings.detail_enabled) {
1167        stats_prefix_record_set(key);
1168    }
1169
1170    if (settings.managed) {
1171        int bucket = c->bucket;
1172        if (bucket == -1) {
1173            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1174            return;
1175        }
1176        c->bucket = -1;
1177        if (buckets[bucket] != c->gen) {
1178            out_string(c, "ERROR_NOT_OWNER");
1179            return;
1180        }
1181    }
1182
1183    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1184
1185    if (it == 0) {
1186        if (! item_size_ok(nkey, flags, vlen + 2))
1187            out_string(c, "SERVER_ERROR object too large for cache");
1188        else
1189            out_string(c, "SERVER_ERROR out of memory");
1190        /* swallow the data line */
1191        c->write_and_go = conn_swallow;
1192        c->sbytes = vlen + 2;
1193        return;
1194    }
1195
1196    c->item_comm = comm;
1197    c->item = it;
1198    c->ritem = ITEM_data(it);
1199    c->rlbytes = it->nbytes;
1200    conn_set_state(c, conn_nread);
1201}
1202
1203static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
1204    char temp[32];
1205    item *it;
1206    unsigned int delta;
1207    char *key;
1208    size_t nkey;
1209
1210    assert(c != NULL);
1211
1212    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1213        out_string(c, "CLIENT_ERROR bad command line format");
1214        return;
1215    }
1216
1217    key = tokens[KEY_TOKEN].value;
1218    nkey = tokens[KEY_TOKEN].length;
1219
1220    if (settings.managed) {
1221        int bucket = c->bucket;
1222        if (bucket == -1) {
1223            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1224            return;
1225        }
1226        c->bucket = -1;
1227        if (buckets[bucket] != c->gen) {
1228            out_string(c, "ERROR_NOT_OWNER");
1229            return;
1230        }
1231    }
1232
1233    delta = strtoul(tokens[2].value, NULL, 10);
1234
1235    if(errno == ERANGE) {
1236        out_string(c, "CLIENT_ERROR bad command line format");
1237        return;
1238    }
1239
1240    it = item_get(key, nkey);
1241    if (!it) {
1242        out_string(c, "NOT_FOUND");
1243        return;
1244    }
1245
1246    out_string(c, add_delta(it, incr, delta, temp));
1247    item_remove(it);         /* release our reference */
1248}
1249
1250/*
1251 * adds a delta value to a numeric item.
1252 *
1253 * it    item to adjust
1254 * incr  true to increment value, false to decrement
1255 * delta amount to adjust value by
1256 * buf   buffer for response string
1257 *
1258 * returns a response string to send back to the client.
1259 */
1260char *do_add_delta(item *it, const int incr, unsigned int delta, char *buf) {
1261    char *ptr;
1262    unsigned int value;
1263    int res;
1264
1265    ptr = ITEM_data(it);
1266    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1267
1268    value = strtol(ptr, NULL, 10);
1269
1270    if(errno == ERANGE) {
1271        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1272    }
1273
1274    if (incr != 0)
1275        value += delta;
1276    else {
1277        if (delta >= value) value = 0;
1278        else value -= delta;
1279    }
1280    snprintf(buf, 32, "%u", value);
1281    res = strlen(buf);
1282    if (res + 2 > it->nbytes) { /* need to realloc */
1283        item *new_it;
1284        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1285        if (new_it == 0) {
1286            return "SERVER_ERROR out of memory";
1287        }
1288        memcpy(ITEM_data(new_it), buf, res);
1289        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1290        do_item_replace(it, new_it);
1291        do_item_remove(new_it);       /* release our reference */
1292    } else { /* replace in-place */
1293        memcpy(ITEM_data(it), buf, res);
1294        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1295    }
1296
1297    return buf;
1298}
1299
1300static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1301    char *key;
1302    size_t nkey;
1303    item *it;
1304    time_t exptime = 0;
1305
1306    assert(c != NULL);
1307
1308    if (settings.managed) {
1309        int bucket = c->bucket;
1310        if (bucket == -1) {
1311            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1312            return;
1313        }
1314        c->bucket = -1;
1315        if (buckets[bucket] != c->gen) {
1316            out_string(c, "ERROR_NOT_OWNER");
1317            return;
1318        }
1319    }
1320
1321    key = tokens[KEY_TOKEN].value;
1322    nkey = tokens[KEY_TOKEN].length;
1323
1324    if(nkey > KEY_MAX_LENGTH) {
1325        out_string(c, "CLIENT_ERROR bad command line format");
1326        return;
1327    }
1328
1329    if(ntokens == 4) {
1330        exptime = strtol(tokens[2].value, NULL, 10);
1331
1332        if(errno == ERANGE) {
1333            out_string(c, "CLIENT_ERROR bad command line format");
1334            return;
1335        }
1336    }
1337
1338    if (settings.detail_enabled) {
1339        stats_prefix_record_delete(key);
1340    }
1341
1342    it = item_get(key, nkey);
1343    if (it) {
1344        if (exptime == 0) {
1345            item_unlink(it);
1346            item_remove(it);      /* release our reference */
1347            out_string(c, "DELETED");
1348        } else {
1349            /* our reference will be transfered to the delete queue */
1350            out_string(c, defer_delete(it, exptime));
1351        }
1352    } else {
1353        out_string(c, "NOT_FOUND");
1354    }
1355}
1356
1357/*
1358 * Adds an item to the deferred-delete list so it can be reaped later.
1359 *
1360 * Returns the result to send to the client.
1361 */
1362char *do_defer_delete(item *it, time_t exptime)
1363{
1364    if (delcurr >= deltotal) {
1365        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1366        if (new_delete) {
1367            todelete = new_delete;
1368            deltotal *= 2;
1369        } else {
1370            /*
1371             * can't delete it immediately, user wants a delay,
1372             * but we ran out of memory for the delete queue
1373             */
1374            item_remove(it);    /* release reference */
1375            return "SERVER_ERROR out of memory";
1376        }
1377    }
1378
1379    /* use its expiration time as its deletion time now */
1380    it->exptime = realtime(exptime);
1381    it->it_flags |= ITEM_DELETED;
1382    todelete[delcurr++] = it;
1383
1384    return "DELETED";
1385}
1386
1387static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1388    unsigned int level;
1389
1390    assert(c != NULL);
1391
1392    level = strtoul(tokens[1].value, NULL, 10);
1393    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1394    out_string(c, "OK");
1395    return;
1396}
1397
1398static void process_command(conn *c, char *command) {
1399
1400    token_t tokens[MAX_TOKENS];
1401    size_t ntokens;
1402    int comm;
1403
1404    assert(c != NULL);
1405
1406    if (settings.verbose > 1)
1407        fprintf(stderr, "<%d %s\n", c->sfd, command);
1408
1409    /*
1410     * for commands set/add/replace, we build an item and read the data
1411     * directly into it, then continue in nread_complete().
1412     */
1413
1414    c->msgcurr = 0;
1415    c->msgused = 0;
1416    c->iovused = 0;
1417    if (add_msghdr(c) != 0) {
1418        out_string(c, "SERVER_ERROR out of memory");
1419        return;
1420    }
1421
1422    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1423
1424    if (ntokens >= 3 &&
1425        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1426         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1427
1428        process_get_command(c, tokens, ntokens);
1429
1430    } else if (ntokens == 6 &&
1431               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1432                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1433                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
1434
1435        process_update_command(c, tokens, ntokens, comm);
1436
1437    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1438
1439        process_arithmetic_command(c, tokens, ntokens, 1);
1440
1441    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1442
1443        process_arithmetic_command(c, tokens, ntokens, 0);
1444
1445    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1446
1447        process_delete_command(c, tokens, ntokens);
1448
1449    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1450        unsigned int bucket, gen;
1451        if (!settings.managed) {
1452            out_string(c, "CLIENT_ERROR not a managed instance");
1453            return;
1454        }
1455
1456        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1457            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1458                out_string(c, "CLIENT_ERROR bucket number out of range");
1459                return;
1460            }
1461            buckets[bucket] = gen;
1462            out_string(c, "OWNED");
1463            return;
1464        } else {
1465            out_string(c, "CLIENT_ERROR bad format");
1466            return;
1467        }
1468
1469    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1470
1471        int bucket;
1472        if (!settings.managed) {
1473            out_string(c, "CLIENT_ERROR not a managed instance");
1474            return;
1475        }
1476        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1477            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1478                out_string(c, "CLIENT_ERROR bucket number out of range");
1479                return;
1480            }
1481            buckets[bucket] = 0;
1482            out_string(c, "DISOWNED");
1483            return;
1484        } else {
1485            out_string(c, "CLIENT_ERROR bad format");
1486            return;
1487        }
1488
1489    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1490        int bucket, gen;
1491        if (!settings.managed) {
1492            out_string(c, "CLIENT_ERROR not a managed instance");
1493            return;
1494        }
1495        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1496            /* we never write anything back, even if input's wrong */
1497            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1498                /* do nothing, bad input */
1499            } else {
1500                c->bucket = bucket;
1501                c->gen = gen;
1502            }
1503            conn_set_state(c, conn_read);
1504            return;
1505        } else {
1506            out_string(c, "CLIENT_ERROR bad format");
1507            return;
1508        }
1509
1510    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1511
1512        process_stat(c, tokens, ntokens);
1513
1514    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1515        time_t exptime = 0;
1516        set_current_time();
1517
1518        if(ntokens == 2) {
1519            settings.oldest_live = current_time - 1;
1520            item_flush_expired();
1521            out_string(c, "OK");
1522            return;
1523        }
1524
1525        exptime = strtol(tokens[1].value, NULL, 10);
1526        if(errno == ERANGE) {
1527            out_string(c, "CLIENT_ERROR bad command line format");
1528            return;
1529        }
1530
1531        settings.oldest_live = realtime(exptime) - 1;
1532        item_flush_expired();
1533        out_string(c, "OK");
1534        return;
1535
1536    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1537
1538        out_string(c, "VERSION " VERSION);
1539
1540    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1541
1542        conn_set_state(c, conn_closing);
1543
1544    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1545                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1546#ifdef ALLOW_SLABS_REASSIGN
1547
1548        int src, dst, rv;
1549
1550        src = strtol(tokens[2].value, NULL, 10);
1551        dst  = strtol(tokens[3].value, NULL, 10);
1552
1553        if(errno == ERANGE) {
1554            out_string(c, "CLIENT_ERROR bad command line format");
1555            return;
1556        }
1557
1558        rv = slabs_reassign(src, dst);
1559        if (rv == 1) {
1560            out_string(c, "DONE");
1561            return;
1562        }
1563        if (rv == 0) {
1564            out_string(c, "CANT");
1565            return;
1566        }
1567        if (rv == -1) {
1568            out_string(c, "BUSY");
1569            return;
1570        }
1571#else
1572        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1573#endif
1574    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1575        process_verbosity_command(c, tokens, ntokens);
1576    } else {
1577        out_string(c, "ERROR");
1578    }
1579    return;
1580}
1581
1582/*
1583 * if we have a complete line in the buffer, process it.
1584 */
1585static int try_read_command(conn *c) {
1586    char *el, *cont;
1587
1588    assert(c != NULL);
1589    assert(c->rcurr <= (c->rbuf + c->rsize));
1590
1591    if (c->rbytes == 0)
1592        return 0;
1593    el = memchr(c->rcurr, '\n', c->rbytes);
1594    if (!el)
1595        return 0;
1596    cont = el + 1;
1597    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1598        el--;
1599    }
1600    *el = '\0';
1601
1602    assert(cont <= (c->rcurr + c->rbytes));
1603
1604    process_command(c, c->rcurr);
1605
1606    c->rbytes -= (cont - c->rcurr);
1607    c->rcurr = cont;
1608
1609    assert(c->rcurr <= (c->rbuf + c->rsize));
1610
1611    return 1;
1612}
1613
1614/*
1615 * read a UDP request.
1616 * return 0 if there's nothing to read.
1617 */
1618static int try_read_udp(conn *c) {
1619    int res;
1620
1621    assert(c != NULL);
1622
1623    c->request_addr_size = sizeof(c->request_addr);
1624    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1625                   0, &c->request_addr, &c->request_addr_size);
1626    if (res > 8) {
1627        unsigned char *buf = (unsigned char *)c->rbuf;
1628        STATS_LOCK();
1629        stats.bytes_read += res;
1630        STATS_UNLOCK();
1631
1632        /* Beginning of UDP packet is the request ID; save it. */
1633        c->request_id = buf[0] * 256 + buf[1];
1634
1635        /* If this is a multi-packet request, drop it. */
1636        if (buf[4] != 0 || buf[5] != 1) {
1637            out_string(c, "SERVER_ERROR multi-packet request not supported");
1638            return 0;
1639        }
1640
1641        /* Don't care about any of the rest of the header. */
1642        res -= 8;
1643        memmove(c->rbuf, c->rbuf + 8, res);
1644
1645        c->rbytes += res;
1646        c->rcurr = c->rbuf;
1647        return 1;
1648    }
1649    return 0;
1650}
1651
1652/*
1653 * read from network as much as we can, handle buffer overflow and connection
1654 * close.
1655 * before reading, move the remaining incomplete fragment of a command
1656 * (if any) to the beginning of the buffer.
1657 * return 0 if there's nothing to read on the first read.
1658 */
1659static int try_read_network(conn *c) {
1660    int gotdata = 0;
1661    int res;
1662
1663    assert(c != NULL);
1664
1665    if (c->rcurr != c->rbuf) {
1666        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1667            memmove(c->rbuf, c->rcurr, c->rbytes);
1668        c->rcurr = c->rbuf;
1669    }
1670
1671    while (1) {
1672        if (c->rbytes >= c->rsize) {
1673            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1674            if (!new_rbuf) {
1675                if (settings.verbose > 0)
1676                    fprintf(stderr, "Couldn't realloc input buffer\n");
1677                c->rbytes = 0; /* ignore what we read */
1678                out_string(c, "SERVER_ERROR out of memory");
1679                c->write_and_go = conn_closing;
1680                return 1;
1681            }
1682            c->rcurr = c->rbuf = new_rbuf;
1683            c->rsize *= 2;
1684        }
1685
1686        /* unix socket mode doesn't need this, so zeroed out.  but why
1687         * is this done for every command?  presumably for UDP
1688         * mode.  */
1689        if (!settings.socketpath) {
1690            c->request_addr_size = sizeof(c->request_addr);
1691        } else {
1692            c->request_addr_size = 0;
1693        }
1694
1695        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
1696        if (res > 0) {
1697            STATS_LOCK();
1698            stats.bytes_read += res;
1699            STATS_UNLOCK();
1700            gotdata = 1;
1701            c->rbytes += res;
1702            continue;
1703        }
1704        if (res == 0) {
1705            /* connection closed */
1706            conn_set_state(c, conn_closing);
1707            return 1;
1708        }
1709        if (res == -1) {
1710            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1711            else return 0;
1712        }
1713    }
1714    return gotdata;
1715}
1716
1717static bool update_event(conn *c, const int new_flags) {
1718    assert(c != NULL);
1719
1720    struct event_base *base = c->event.ev_base;
1721    if (c->ev_flags == new_flags)
1722        return true;
1723    if (event_del(&c->event) == -1) return false;
1724    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1725    event_base_set(base, &c->event);
1726    c->ev_flags = new_flags;
1727    if (event_add(&c->event, 0) == -1) return false;
1728    return true;
1729}
1730
1731/*
1732 * Sets whether we are listening for new connections or not.
1733 */
1734void accept_new_conns(const bool do_accept) {
1735    if (! is_listen_thread())
1736        return;
1737    if (do_accept) {
1738        update_event(listen_conn, EV_READ | EV_PERSIST);
1739        if (listen(listen_conn->sfd, 1024) != 0) {
1740            perror("listen");
1741        }
1742    }
1743    else {
1744        update_event(listen_conn, 0);
1745        if (listen(listen_conn->sfd, 0) != 0) {
1746            perror("listen");
1747        }
1748    }
1749}
1750
1751
1752/*
1753 * Transmit the next chunk of data from our list of msgbuf structures.
1754 *
1755 * Returns:
1756 *   TRANSMIT_COMPLETE   All done writing.
1757 *   TRANSMIT_INCOMPLETE More data remaining to write.
1758 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
1759 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1760 */
1761static int transmit(conn *c) {
1762    assert(c != NULL);
1763
1764    if (c->msgcurr < c->msgused &&
1765            c->msglist[c->msgcurr].msg_iovlen == 0) {
1766        /* Finished writing the current msg; advance to the next. */
1767        c->msgcurr++;
1768    }
1769    if (c->msgcurr < c->msgused) {
1770        ssize_t res;
1771        struct msghdr *m = &c->msglist[c->msgcurr];
1772
1773        res = sendmsg(c->sfd, m, 0);
1774        if (res > 0) {
1775            STATS_LOCK();
1776            stats.bytes_written += res;
1777            STATS_UNLOCK();
1778
1779            /* We've written some of the data. Remove the completed
1780               iovec entries from the list of pending writes. */
1781            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
1782                res -= m->msg_iov->iov_len;
1783                m->msg_iovlen--;
1784                m->msg_iov++;
1785            }
1786
1787            /* Might have written just part of the last iovec entry;
1788               adjust it so the next write will do the rest. */
1789            if (res > 0) {
1790                m->msg_iov->iov_base += res;
1791                m->msg_iov->iov_len -= res;
1792            }
1793            return TRANSMIT_INCOMPLETE;
1794        }
1795        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1796            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1797                if (settings.verbose > 0)
1798                    fprintf(stderr, "Couldn't update event\n");
1799                conn_set_state(c, conn_closing);
1800                return TRANSMIT_HARD_ERROR;
1801            }
1802            return TRANSMIT_SOFT_ERROR;
1803        }
1804        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1805           we have a real error, on which we close the connection */
1806        if (settings.verbose > 0)
1807            perror("Failed to write, and not due to blocking");
1808
1809        if (c->udp)
1810            conn_set_state(c, conn_read);
1811        else
1812            conn_set_state(c, conn_closing);
1813        return TRANSMIT_HARD_ERROR;
1814    } else {
1815        return TRANSMIT_COMPLETE;
1816    }
1817}
1818
1819static void drive_machine(conn *c) {
1820    bool stop = false;
1821    int sfd, flags = 1;
1822    socklen_t addrlen;
1823    struct sockaddr addr;
1824    int res;
1825
1826    assert(c != NULL);
1827
1828    while (!stop) {
1829
1830        switch(c->state) {
1831        case conn_listening:
1832            addrlen = sizeof(addr);
1833            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
1834                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1835                    /* these are transient, so don't log anything */
1836                    stop = true;
1837                } else if (errno == EMFILE) {
1838                    if (settings.verbose > 0)
1839                        fprintf(stderr, "Too many open connections\n");
1840                    accept_new_conns(false);
1841                    stop = true;
1842                } else {
1843                    perror("accept()");
1844                    stop = true;
1845                }
1846                break;
1847            }
1848            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
1849                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
1850                perror("setting O_NONBLOCK");
1851                close(sfd);
1852                break;
1853            }
1854            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
1855                                     DATA_BUFFER_SIZE, false);
1856            break;
1857
1858        case conn_read:
1859            if (try_read_command(c) != 0) {
1860                continue;
1861            }
1862            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
1863                continue;
1864            }
1865            /* we have no command line and no data to read from network */
1866            if (!update_event(c, EV_READ | EV_PERSIST)) {
1867                if (settings.verbose > 0)
1868                    fprintf(stderr, "Couldn't update event\n");
1869                conn_set_state(c, conn_closing);
1870                break;
1871            }
1872            stop = true;
1873            break;
1874
1875        case conn_nread:
1876            /* we are reading rlbytes into ritem; */
1877            if (c->rlbytes == 0) {
1878                complete_nread(c);
1879                break;
1880            }
1881            /* first check if we have leftovers in the conn_read buffer */
1882            if (c->rbytes > 0) {
1883                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
1884                memcpy(c->ritem, c->rcurr, tocopy);
1885                c->ritem += tocopy;
1886                c->rlbytes -= tocopy;
1887                c->rcurr += tocopy;
1888                c->rbytes -= tocopy;
1889                break;
1890            }
1891
1892            /*  now try reading from the socket */
1893            res = read(c->sfd, c->ritem, c->rlbytes);
1894            if (res > 0) {
1895                STATS_LOCK();
1896                stats.bytes_read += res;
1897                STATS_UNLOCK();
1898                c->ritem += res;
1899                c->rlbytes -= res;
1900                break;
1901            }
1902            if (res == 0) { /* end of stream */
1903                conn_set_state(c, conn_closing);
1904                break;
1905            }
1906            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1907                if (!update_event(c, EV_READ | EV_PERSIST)) {
1908                    if (settings.verbose > 0)
1909                        fprintf(stderr, "Couldn't update event\n");
1910                    conn_set_state(c, conn_closing);
1911                    break;
1912                }
1913                stop = true;
1914                break;
1915            }
1916            /* otherwise we have a real error, on which we close the connection */
1917            if (settings.verbose > 0)
1918                fprintf(stderr, "Failed to read, and not due to blocking\n");
1919            conn_set_state(c, conn_closing);
1920            break;
1921
1922        case conn_swallow:
1923            /* we are reading sbytes and throwing them away */
1924            if (c->sbytes == 0) {
1925                conn_set_state(c, conn_read);
1926                break;
1927            }
1928
1929            /* first check if we have leftovers in the conn_read buffer */
1930            if (c->rbytes > 0) {
1931                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
1932                c->sbytes -= tocopy;
1933                c->rcurr += tocopy;
1934                c->rbytes -= tocopy;
1935                break;
1936            }
1937
1938            /*  now try reading from the socket */
1939            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
1940            if (res > 0) {
1941                STATS_LOCK();
1942                stats.bytes_read += res;
1943                STATS_UNLOCK();
1944                c->sbytes -= res;
1945                break;
1946            }
1947            if (res == 0) { /* end of stream */
1948                conn_set_state(c, conn_closing);
1949                break;
1950            }
1951            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1952                if (!update_event(c, EV_READ | EV_PERSIST)) {
1953                    if (settings.verbose > 0)
1954                        fprintf(stderr, "Couldn't update event\n");
1955                    conn_set_state(c, conn_closing);
1956                    break;
1957                }
1958                stop = true;
1959                break;
1960            }
1961            /* otherwise we have a real error, on which we close the connection */
1962            if (settings.verbose > 0)
1963                fprintf(stderr, "Failed to read, and not due to blocking\n");
1964            conn_set_state(c, conn_closing);
1965            break;
1966
1967        case conn_write:
1968            /*
1969             * We want to write out a simple response. If we haven't already,
1970             * assemble it into a msgbuf list (this will be a single-entry
1971             * list for TCP or a two-entry list for UDP).
1972             */
1973            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
1974                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
1975                    (c->udp && build_udp_headers(c) != 0)) {
1976                    if (settings.verbose > 0)
1977                        fprintf(stderr, "Couldn't build response\n");
1978                    conn_set_state(c, conn_closing);
1979                    break;
1980                }
1981            }
1982
1983            /* fall through... */
1984
1985        case conn_mwrite:
1986            switch (transmit(c)) {
1987            case TRANSMIT_COMPLETE:
1988                if (c->state == conn_mwrite) {
1989                    while (c->ileft > 0) {
1990                        item *it = *(c->icurr);
1991                        assert((it->it_flags & ITEM_SLABBED) == 0);
1992                        item_remove(it);
1993                        c->icurr++;
1994                        c->ileft--;
1995                    }
1996                    conn_set_state(c, conn_read);
1997                } else if (c->state == conn_write) {
1998                    if (c->write_and_free) {
1999                        free(c->write_and_free);
2000                        c->write_and_free = 0;
2001                    }
2002                    conn_set_state(c, c->write_and_go);
2003                } else {
2004                    if (settings.verbose > 0)
2005                        fprintf(stderr, "Unexpected state %d\n", c->state);
2006                    conn_set_state(c, conn_closing);
2007                }
2008                break;
2009
2010            case TRANSMIT_INCOMPLETE:
2011            case TRANSMIT_HARD_ERROR:
2012                break;                   /* Continue in state machine. */
2013
2014            case TRANSMIT_SOFT_ERROR:
2015                stop = true;
2016                break;
2017            }
2018            break;
2019
2020        case conn_closing:
2021            if (c->udp)
2022                conn_cleanup(c);
2023            else
2024                conn_close(c);
2025            stop = true;
2026            break;
2027        }
2028    }
2029
2030    return;
2031}
2032
2033void event_handler(const int fd, const short which, void *arg) {
2034    conn *c;
2035
2036    c = (conn *)arg;
2037    assert(c != NULL);
2038
2039    c->which = which;
2040
2041    /* sanity */
2042    if (fd != c->sfd) {
2043        if (settings.verbose > 0)
2044            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2045        conn_close(c);
2046        return;
2047    }
2048
2049    drive_machine(c);
2050
2051    /* wait for next event */
2052    return;
2053}
2054
2055static int new_socket(const bool is_udp) {
2056    int sfd;
2057    int flags;
2058
2059    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
2060        perror("socket()");
2061        return -1;
2062    }
2063
2064    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2065        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2066        perror("setting O_NONBLOCK");
2067        close(sfd);
2068        return -1;
2069    }
2070    return sfd;
2071}
2072
2073
2074/*
2075 * Sets a socket's send buffer size to the maximum allowed by the system.
2076 */
2077static void maximize_sndbuf(const int sfd) {
2078    socklen_t intsize = sizeof(int);
2079    int last_good = 0;
2080    int min, max, avg;
2081    int old_size;
2082
2083    /* Start with the default size. */
2084    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2085        if (settings.verbose > 0)
2086            perror("getsockopt(SO_SNDBUF)");
2087        return;
2088    }
2089
2090    /* Binary-search for the real maximum. */
2091    min = old_size;
2092    max = MAX_SENDBUF_SIZE;
2093
2094    while (min <= max) {
2095        avg = ((unsigned int)(min + max)) / 2;
2096        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2097            last_good = avg;
2098            min = avg + 1;
2099        } else {
2100            max = avg - 1;
2101        }
2102    }
2103
2104    if (settings.verbose > 1)
2105        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2106}
2107
2108
2109static int server_socket(const int port, const bool is_udp) {
2110    int sfd;
2111    struct linger ling = {0, 0};
2112    struct sockaddr_in addr;
2113    int flags =1;
2114
2115    if ((sfd = new_socket(is_udp)) == -1) {
2116        return -1;
2117    }
2118
2119    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2120    if (is_udp) {
2121        maximize_sndbuf(sfd);
2122    } else {
2123        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2124        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2125        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2126    }
2127
2128    /*
2129     * the memset call clears nonstandard fields in some impementations
2130     * that otherwise mess things up.
2131     */
2132    memset(&addr, 0, sizeof(addr));
2133
2134    addr.sin_family = AF_INET;
2135    addr.sin_port = htons(port);
2136    addr.sin_addr = settings.interf;
2137    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2138        perror("bind()");
2139        close(sfd);
2140        return -1;
2141    }
2142    if (!is_udp && listen(sfd, 1024) == -1) {
2143        perror("listen()");
2144        close(sfd);
2145        return -1;
2146    }
2147    return sfd;
2148}
2149
2150static int new_socket_unix(void) {
2151    int sfd;
2152    int flags;
2153
2154    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2155        perror("socket()");
2156        return -1;
2157    }
2158
2159    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2160        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2161        perror("setting O_NONBLOCK");
2162        close(sfd);
2163        return -1;
2164    }
2165    return sfd;
2166}
2167
2168static int server_socket_unix(const char *path) {
2169    int sfd;
2170    struct linger ling = {0, 0};
2171    struct sockaddr_un addr;
2172    struct stat tstat;
2173    int flags =1;
2174
2175    if (!path) {
2176        return -1;
2177    }
2178
2179    if ((sfd = new_socket_unix()) == -1) {
2180        return -1;
2181    }
2182
2183    /*
2184     * Clean up a previous socket file if we left it around
2185     */
2186    if (lstat(path, &tstat) == 0) {
2187        if (S_ISSOCK(tstat.st_mode))
2188            unlink(path);
2189    }
2190
2191    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2192    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2193    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2194
2195    /*
2196     * the memset call clears nonstandard fields in some impementations
2197     * that otherwise mess things up.
2198     */
2199    memset(&addr, 0, sizeof(addr));
2200
2201    addr.sun_family = AF_UNIX;
2202    strcpy(addr.sun_path, path);
2203    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2204        perror("bind()");
2205        close(sfd);
2206        return -1;
2207    }
2208    if (listen(sfd, 1024) == -1) {
2209        perror("listen()");
2210        close(sfd);
2211        return -1;
2212    }
2213    return sfd;
2214}
2215
2216/* listening socket */
2217static int l_socket = 0;
2218
2219/* udp socket */
2220static int u_socket = -1;
2221
2222/* invoke right before gdb is called, on assert */
2223void pre_gdb(void) {
2224    int i;
2225    if (l_socket > -1) close(l_socket);
2226    if (u_socket > -1) close(u_socket);
2227    for (i = 3; i <= 500; i++) close(i); /* so lame */
2228    kill(getpid(), SIGABRT);
2229}
2230
2231/*
2232 * We keep the current time of day in a global variable that's updated by a
2233 * timer event. This saves us a bunch of time() system calls (we really only
2234 * need to get the time once a second, whereas there can be tens of thousands
2235 * of requests a second) and allows us to use server-start-relative timestamps
2236 * rather than absolute UNIX timestamps, a space savings on systems where
2237 * sizeof(time_t) > sizeof(unsigned int).
2238 */
2239volatile rel_time_t current_time;
2240static struct event clockevent;
2241
2242/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2243static void set_current_time(void) {
2244    current_time = (rel_time_t) (time(0) - stats.started);
2245}
2246
2247static void clock_handler(const int fd, const short which, void *arg) {
2248    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2249    static bool initialized = false;
2250
2251    if (initialized) {
2252        /* only delete the event if it's actually there. */
2253        evtimer_del(&clockevent);
2254    } else {
2255        initialized = true;
2256    }
2257
2258    evtimer_set(&clockevent, clock_handler, 0);
2259    event_base_set(main_base, &clockevent);
2260    evtimer_add(&clockevent, &t);
2261
2262    set_current_time();
2263}
2264
2265static struct event deleteevent;
2266
2267static void delete_handler(const int fd, const short which, void *arg) {
2268    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2269    static bool initialized = false;
2270
2271    if (initialized) {
2272        /* some versions of libevent don't like deleting events that don't exist,
2273           so only delete once we know this event has been added. */
2274        evtimer_del(&deleteevent);
2275    } else {
2276        initialized = true;
2277    }
2278
2279    evtimer_set(&deleteevent, delete_handler, 0);
2280    event_base_set(main_base, &deleteevent);
2281    evtimer_add(&deleteevent, &t);
2282    run_deferred_deletes();
2283}
2284
2285/* Call run_deferred_deletes instead of this. */
2286void do_run_deferred_deletes(void)
2287{
2288    int i, j = 0;
2289
2290    for (i = 0; i < delcurr; i++) {
2291        item *it = todelete[i];
2292        if (item_delete_lock_over(it)) {
2293            assert(it->refcount > 0);
2294            it->it_flags &= ~ITEM_DELETED;
2295            do_item_unlink(it);
2296            do_item_remove(it);
2297        } else {
2298            todelete[j++] = it;
2299        }
2300    }
2301    delcurr = j;
2302}
2303
2304static void usage(void) {
2305    printf(PACKAGE " " VERSION "\n");
2306    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2307           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2308           "-s <file>     unix socket path to listen on (disables network support)\n"
2309           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2310           "-d            run as a daemon\n"
2311           "-r            maximize core file limit\n"
2312           "-u <username> assume identity of <username> (only when run as root)\n"
2313           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2314           "-M            return error on memory exhausted (rather than removing items)\n"
2315           "-c <num>      max simultaneous connections, default is 1024\n"
2316           "-k            lock down all paged memory\n"
2317           "-v            verbose (print errors/warnings while in event loop)\n"
2318           "-vv           very verbose (also print client commands/reponses)\n"
2319           "-h            print this help and exit\n"
2320           "-i            print memcached and libevent license\n"
2321           "-b            run a managed instanced (mnemonic: buckets)\n"
2322           "-P <file>     save PID in <file>, only used with -d option\n"
2323           "-f <factor>   chunk size growth factor, default 1.25\n"
2324           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
2325#ifdef USE_THREADS
2326    printf("-t <num>      number of threads to use, default 4\n");
2327#endif
2328    return;
2329}
2330
2331static void usage_license(void) {
2332    printf(PACKAGE " " VERSION "\n\n");
2333    printf(
2334    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2335    "All rights reserved.\n"
2336    "\n"
2337    "Redistribution and use in source and binary forms, with or without\n"
2338    "modification, are permitted provided that the following conditions are\n"
2339    "met:\n"
2340    "\n"
2341    "    * Redistributions of source code must retain the above copyright\n"
2342    "notice, this list of conditions and the following disclaimer.\n"
2343    "\n"
2344    "    * Redistributions in binary form must reproduce the above\n"
2345    "copyright notice, this list of conditions and the following disclaimer\n"
2346    "in the documentation and/or other materials provided with the\n"
2347    "distribution.\n"
2348    "\n"
2349    "    * Neither the name of the Danga Interactive nor the names of its\n"
2350    "contributors may be used to endorse or promote products derived from\n"
2351    "this software without specific prior written permission.\n"
2352    "\n"
2353    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2354    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2355    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2356    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2357    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2358    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2359    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2360    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2361    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2362    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2363    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2364    "\n"
2365    "\n"
2366    "This product includes software developed by Niels Provos.\n"
2367    "\n"
2368    "[ libevent ]\n"
2369    "\n"
2370    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2371    "All rights reserved.\n"
2372    "\n"
2373    "Redistribution and use in source and binary forms, with or without\n"
2374    "modification, are permitted provided that the following conditions\n"
2375    "are met:\n"
2376    "1. Redistributions of source code must retain the above copyright\n"
2377    "   notice, this list of conditions and the following disclaimer.\n"
2378    "2. Redistributions in binary form must reproduce the above copyright\n"
2379    "   notice, this list of conditions and the following disclaimer in the\n"
2380    "   documentation and/or other materials provided with the distribution.\n"
2381    "3. All advertising materials mentioning features or use of this software\n"
2382    "   must display the following acknowledgement:\n"
2383    "      This product includes software developed by Niels Provos.\n"
2384    "4. The name of the author may not be used to endorse or promote products\n"
2385    "   derived from this software without specific prior written permission.\n"
2386    "\n"
2387    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2388    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2389    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2390    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2391    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2392    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2393    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2394    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2395    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2396    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2397    );
2398
2399    return;
2400}
2401
2402static void save_pid(const pid_t pid, const char *pid_file) {
2403    FILE *fp;
2404    if (pid_file == NULL)
2405        return;
2406
2407    if ((fp = fopen(pid_file, "w")) == NULL) {
2408        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2409        return;
2410    }
2411
2412    fprintf(fp,"%ld\n", (long)pid);
2413    if (fclose(fp) == -1) {
2414        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2415        return;
2416    }
2417}
2418
2419static void remove_pidfile(const char *pid_file) {
2420  if (pid_file == NULL)
2421      return;
2422
2423  if (unlink(pid_file) != 0) {
2424      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2425  }
2426
2427}
2428
2429
2430static void sig_handler(const int sig) {
2431    printf("SIGINT handled.\n");
2432    exit(EXIT_SUCCESS);
2433}
2434
2435int main (int argc, char **argv) {
2436    int c;
2437    struct in_addr addr;
2438    bool lock_memory = false;
2439    bool daemonize = false;
2440    int maxcore = 0;
2441    char *username = NULL;
2442    char *pid_file = NULL;
2443    struct passwd *pw;
2444    struct sigaction sa;
2445    struct rlimit rlim;
2446
2447    /* handle SIGINT */
2448    signal(SIGINT, sig_handler);
2449
2450    /* init settings */
2451    settings_init();
2452
2453    /* set stderr non-buffering (for running under, say, daemontools) */
2454    setbuf(stderr, NULL);
2455
2456    /* process arguments */
2457    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
2458        switch (c) {
2459        case 'U':
2460            settings.udpport = atoi(optarg);
2461            break;
2462        case 'b':
2463            settings.managed = true;
2464            break;
2465        case 'p':
2466            settings.port = atoi(optarg);
2467            break;
2468        case 's':
2469            settings.socketpath = optarg;
2470            break;
2471        case 'm':
2472            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2473            break;
2474        case 'M':
2475            settings.evict_to_free = 0;
2476            break;
2477        case 'c':
2478            settings.maxconns = atoi(optarg);
2479            break;
2480        case 'h':
2481            usage();
2482            exit(EXIT_SUCCESS);
2483        case 'i':
2484            usage_license();
2485            exit(EXIT_SUCCESS);
2486        case 'k':
2487            lock_memory = true;
2488            break;
2489        case 'v':
2490            settings.verbose++;
2491            break;
2492        case 'l':
2493            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
2494                fprintf(stderr, "Illegal address: %s\n", optarg);
2495                return 1;
2496            } else {
2497                settings.interf = addr;
2498            }
2499            break;
2500        case 'd':
2501            daemonize = true;
2502            break;
2503        case 'r':
2504            maxcore = 1;
2505            break;
2506        case 'u':
2507            username = optarg;
2508            break;
2509        case 'P':
2510            pid_file = optarg;
2511            break;
2512        case 'f':
2513            settings.factor = atof(optarg);
2514            if (settings.factor <= 1.0) {
2515                fprintf(stderr, "Factor must be greater than 1\n");
2516                return 1;
2517            }
2518            break;
2519        case 'n':
2520            settings.chunk_size = atoi(optarg);
2521            if (settings.chunk_size == 0) {
2522                fprintf(stderr, "Chunk size must be greater than 0\n");
2523                return 1;
2524            }
2525            break;
2526        case 't':
2527            settings.num_threads = atoi(optarg);
2528            if (settings.num_threads == 0) {
2529                fprintf(stderr, "Number of threads must be greater than 0\n");
2530                return 1;
2531            }
2532            break;
2533        case 'D':
2534            if (! optarg || ! optarg[0]) {
2535                fprintf(stderr, "No delimiter specified\n");
2536                return 1;
2537            }
2538            settings.prefix_delimiter = optarg[0];
2539            settings.detail_enabled = 1;
2540            break;
2541        default:
2542            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2543            return 1;
2544        }
2545    }
2546
2547    if (maxcore != 0) {
2548        struct rlimit rlim_new;
2549        /*
2550         * First try raising to infinity; if that fails, try bringing
2551         * the soft limit to the hard.
2552         */
2553        if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
2554            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
2555            if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
2556                /* failed. try raising just to the old max */
2557                rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
2558                (void)setrlimit(RLIMIT_CORE, &rlim_new);
2559            }
2560        }
2561        /*
2562         * getrlimit again to see what we ended up with. Only fail if
2563         * the soft limit ends up 0, because then no core files will be
2564         * created at all.
2565         */
2566
2567        if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
2568            fprintf(stderr, "failed to ensure corefile creation\n");
2569            exit(EXIT_FAILURE);
2570        }
2571    }
2572
2573    /*
2574     * If needed, increase rlimits to allow as many connections
2575     * as needed.
2576     */
2577
2578    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2579        fprintf(stderr, "failed to getrlimit number of files\n");
2580        exit(EXIT_FAILURE);
2581    } else {
2582        int maxfiles = settings.maxconns;
2583        if (rlim.rlim_cur < maxfiles)
2584            rlim.rlim_cur = maxfiles + 3;
2585        if (rlim.rlim_max < rlim.rlim_cur)
2586            rlim.rlim_max = rlim.rlim_cur;
2587        if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2588            fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
2589            exit(EXIT_FAILURE);
2590        }
2591    }
2592
2593    /*
2594     * initialization order: first create the listening sockets
2595     * (may need root on low ports), then drop root if needed,
2596     * then daemonise if needed, then init libevent (in some cases
2597     * descriptors created by libevent wouldn't survive forking).
2598     */
2599
2600    /* create the listening socket and bind it */
2601    if (settings.socketpath == NULL) {
2602        l_socket = server_socket(settings.port, 0);
2603        if (l_socket == -1) {
2604            fprintf(stderr, "failed to listen\n");
2605            exit(EXIT_FAILURE);
2606        }
2607    }
2608
2609    if (settings.udpport > 0 && settings.socketpath == NULL) {
2610        /* create the UDP listening socket and bind it */
2611        u_socket = server_socket(settings.udpport, 1);
2612        if (u_socket == -1) {
2613            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
2614            exit(EXIT_FAILURE);
2615        }
2616    }
2617
2618    /* lose root privileges if we have them */
2619    if (getuid() == 0 || geteuid() == 0) {
2620        if (username == 0 || *username == '\0') {
2621            fprintf(stderr, "can't run as root without the -u switch\n");
2622            return 1;
2623        }
2624        if ((pw = getpwnam(username)) == 0) {
2625            fprintf(stderr, "can't find the user %s to switch to\n", username);
2626            return 1;
2627        }
2628        if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
2629            fprintf(stderr, "failed to assume identity of user %s\n", username);
2630            return 1;
2631        }
2632    }
2633
2634    /* create unix mode sockets after dropping privileges */
2635    if (settings.socketpath != NULL) {
2636        l_socket = server_socket_unix(settings.socketpath);
2637        if (l_socket == -1) {
2638            fprintf(stderr, "failed to listen\n");
2639            exit(EXIT_FAILURE);
2640        }
2641    }
2642
2643    /* daemonize if requested */
2644    /* if we want to ensure our ability to dump core, don't chdir to / */
2645    if (daemonize) {
2646        int res;
2647        res = daemon(maxcore, settings.verbose);
2648        if (res == -1) {
2649            fprintf(stderr, "failed to daemon() in order to daemonize\n");
2650            return 1;
2651        }
2652    }
2653
2654    /* initialize main thread libevent instance */
2655    main_base = event_init();
2656
2657    /* initialize other stuff */
2658    item_init();
2659    stats_init();
2660    assoc_init();
2661    conn_init();
2662    slabs_init(settings.maxbytes, settings.factor);
2663
2664    /* managed instance? alloc and zero a bucket array */
2665    if (settings.managed) {
2666        buckets = malloc(sizeof(int) * MAX_BUCKETS);
2667        if (buckets == 0) {
2668            fprintf(stderr, "failed to allocate the bucket array");
2669            exit(EXIT_FAILURE);
2670        }
2671        memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
2672    }
2673
2674    /* lock paged memory if needed */
2675    if (lock_memory) {
2676#ifdef HAVE_MLOCKALL
2677        mlockall(MCL_CURRENT | MCL_FUTURE);
2678#else
2679        fprintf(stderr, "warning: mlockall() not supported on this platform.  proceeding without.\n");
2680#endif
2681    }
2682
2683    /*
2684     * ignore SIGPIPE signals; we can use errno==EPIPE if we
2685     * need that information
2686     */
2687    sa.sa_handler = SIG_IGN;
2688    sa.sa_flags = 0;
2689    if (sigemptyset(&sa.sa_mask) == -1 ||
2690        sigaction(SIGPIPE, &sa, 0) == -1) {
2691        perror("failed to ignore SIGPIPE; sigaction");
2692        exit(EXIT_FAILURE);
2693    }
2694    /* create the initial listening connection */
2695    if (!(listen_conn = conn_new(l_socket, conn_listening,
2696                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
2697        fprintf(stderr, "failed to create listening connection");
2698        exit(EXIT_FAILURE);
2699    }
2700    /* save the PID in if we're a daemon */
2701    if (daemonize)
2702        save_pid(getpid(), pid_file);
2703    /* start up worker threads if MT mode */
2704    thread_init(settings.num_threads, main_base);
2705    /* initialise clock event */
2706    clock_handler(0, 0, 0);
2707    /* initialise deletion array and timer event */
2708    deltotal = 200;
2709    delcurr = 0;
2710    if ((todelete = malloc(sizeof(item *) * deltotal)) == NULL) {
2711        perror("failed to allocate memory for deletion array");
2712        exit(EXIT_FAILURE);
2713    }
2714    delete_handler(0, 0, 0); /* sets up the event */
2715    /* create the initial listening udp connection, monitored on all threads */
2716    if (u_socket > -1) {
2717        for (c = 0; c < settings.num_threads; c++) {
2718            /* this is guaranteed to hit all threads because we round-robin */
2719            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
2720                              UDP_READ_BUFFER_SIZE, 1);
2721        }
2722    }
2723    /* enter the event loop */
2724    event_base_loop(main_base, 0);
2725    /* remove the PID file if we're a daemon */
2726    if (daemonize)
2727        remove_pidfile(pid_file);
2728    return 0;
2729}
Note: See TracBrowser for help on using the browser.