root/trunk/server/memcached.c @ 615

Revision 615, 82.0 kB (checked in by plindner, 2 years ago)

Incorporate "cas" operation developed by Dustin Sallings
<dustin@…> and implemented by Chris Goffinet
<goffinet@…>. This change allows you to do
atomic changes to an existing key.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <sys/signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(const bool is_udp);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92/* time handling */
93static void set_current_time(void);  /* update the global variable holding
94                              global 32-bit seconds-since-start time
95                              (to avoid 64 bit time_t) */
96
97void pre_gdb(void);
98static void conn_free(conn *c);
99
100/** exported globals **/
101struct stats stats;
102struct settings settings;
103
104/** file scope variables **/
105static item **todelete = NULL;
106static int delcurr;
107static int deltotal;
108static conn *listen_conn;
109static struct event_base *main_base;
110
111#define TRANSMIT_COMPLETE   0
112#define TRANSMIT_INCOMPLETE 1
113#define TRANSMIT_SOFT_ERROR 2
114#define TRANSMIT_HARD_ERROR 3
115
116static int *buckets = 0; /* bucket->generation array for a managed instance */
117
118#define REALTIME_MAXDELTA 60*60*24*30
119/*
120 * given time value that's either unix time or delta from current unix time, return
121 * unix time. Use the fact that delta can't exceed one month (and real time value can't
122 * be that low).
123 */
124static rel_time_t realtime(const time_t exptime) {
125    /* no. of seconds in 30 days - largest possible delta exptime */
126
127    if (exptime == 0) return 0; /* 0 means never expire */
128
129    if (exptime > REALTIME_MAXDELTA) {
130        /* if item expiration is at/before the server started, give it an
131           expiration time of 1 second after the server started.
132           (because 0 means don't expire).  without this, we'd
133           underflow and wrap around to some large value way in the
134           future, effectively making items expiring in the past
135           really expiring never */
136        if (exptime <= stats.started)
137            return (rel_time_t)1;
138        return (rel_time_t)(exptime - stats.started);
139    } else {
140        return (rel_time_t)(exptime + current_time);
141    }
142}
143
144static void stats_init(void) {
145    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
146    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
147    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
148
149    /* make the time we started always be 2 seconds before we really
150       did, so time(0) - time.started is never zero.  if so, things
151       like 'settings.oldest_live' which act as booleans as well as
152       values are now false in boolean context... */
153    stats.started = time(0) - 2;
154    stats_prefix_init();
155}
156
157static void stats_reset(void) {
158    STATS_LOCK();
159    stats.total_items = stats.total_conns = 0;
160    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
161    stats.bytes_read = stats.bytes_written = 0;
162    stats_prefix_clear();
163    STATS_UNLOCK();
164}
165
166static void settings_init(void) {
167    settings.port = 11211;
168    settings.udpport = 0;
169    settings.interf.s_addr = htonl(INADDR_ANY);
170    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
171    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
172    settings.verbose = 0;
173    settings.oldest_live = 0;
174    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
175    settings.socketpath = NULL;       /* by default, not using a unix socket */
176    settings.managed = false;
177    settings.factor = 1.25;
178    settings.chunk_size = 48;         /* space for a modest key and value */
179#ifdef USE_THREADS
180    settings.num_threads = 4;
181#else
182    settings.num_threads = 1;
183#endif
184    settings.prefix_delimiter = ':';
185    settings.detail_enabled = 0;
186}
187
188/* returns true if a deleted item's delete-locked-time is over, and it
189   should be removed from the namespace */
190static bool item_delete_lock_over (item *it) {
191    assert(it->it_flags & ITEM_DELETED);
192    return (current_time >= it->exptime);
193}
194
195/*
196 * Adds a message header to a connection.
197 *
198 * Returns 0 on success, -1 on out-of-memory.
199 */
200static int add_msghdr(conn *c)
201{
202    struct msghdr *msg;
203
204    assert(c != NULL);
205
206    if (c->msgsize == c->msgused) {
207        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
208        if (! msg)
209            return -1;
210        c->msglist = msg;
211        c->msgsize *= 2;
212    }
213
214    msg = c->msglist + c->msgused;
215
216    /* this wipes msg_iovlen, msg_control, msg_controllen, and
217       msg_flags, the last 3 of which aren't defined on solaris: */
218    memset(msg, 0, sizeof(struct msghdr));
219
220    msg->msg_iov = &c->iov[c->iovused];
221    msg->msg_name = &c->request_addr;
222    msg->msg_namelen = c->request_addr_size;
223
224    c->msgbytes = 0;
225    c->msgused++;
226
227    if (c->udp) {
228        /* Leave room for the UDP header, which we'll fill in later. */
229        return add_iov(c, NULL, UDP_HEADER_SIZE);
230    }
231
232    return 0;
233}
234
235
236/*
237 * Free list management for connections.
238 */
239
240static conn **freeconns;
241static int freetotal;
242static int freecurr;
243
244
245static void conn_init(void) {
246    freetotal = 200;
247    freecurr = 0;
248    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
249        perror("malloc()");
250    }
251    return;
252}
253
254/*
255 * Returns a connection from the freelist, if any. Should call this using
256 * conn_from_freelist() for thread safety.
257 */
258conn *do_conn_from_freelist() {
259    conn *c;
260
261    if (freecurr > 0) {
262        c = freeconns[--freecurr];
263    } else {
264        c = NULL;
265    }
266
267    return c;
268}
269
270/*
271 * Adds a connection to the freelist. 0 = success. Should call this using
272 * conn_add_to_freelist() for thread safety.
273 */
274bool do_conn_add_to_freelist(conn *c) {
275    if (freecurr < freetotal) {
276        freeconns[freecurr++] = c;
277        return false;
278    } else {
279        /* try to enlarge free connections array */
280        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
281        if (new_freeconns) {
282            freetotal *= 2;
283            freeconns = new_freeconns;
284            freeconns[freecurr++] = c;
285            return false;
286        }
287    }
288    return true;
289}
290
291conn *conn_new(const int sfd, const int init_state, const int event_flags,
292                const int read_buffer_size, const bool is_udp, struct event_base *base) {
293    conn *c = conn_from_freelist();
294
295    if (NULL == c) {
296        if (!(c = (conn *)malloc(sizeof(conn)))) {
297            perror("malloc()");
298            return NULL;
299        }
300        c->rbuf = c->wbuf = 0;
301        c->ilist = 0;
302        c->iov = 0;
303        c->msglist = 0;
304        c->hdrbuf = 0;
305
306        c->rsize = read_buffer_size;
307        c->wsize = DATA_BUFFER_SIZE;
308        c->isize = ITEM_LIST_INITIAL;
309        c->iovsize = IOV_LIST_INITIAL;
310        c->msgsize = MSG_LIST_INITIAL;
311        c->hdrsize = 0;
312
313        c->rbuf = (char *)malloc((size_t)c->rsize);
314        c->wbuf = (char *)malloc((size_t)c->wsize);
315        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
316        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
317        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
318
319        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
320                c->msglist == 0) {
321            if (c->rbuf != 0) free(c->rbuf);
322            if (c->wbuf != 0) free(c->wbuf);
323            if (c->ilist !=0) free(c->ilist);
324            if (c->iov != 0) free(c->iov);
325            if (c->msglist != 0) free(c->msglist);
326            free(c);
327            perror("malloc()");
328            return NULL;
329        }
330
331        STATS_LOCK();
332        stats.conn_structs++;
333        STATS_UNLOCK();
334    }
335
336    if (settings.verbose > 1) {
337        if (init_state == conn_listening)
338            fprintf(stderr, "<%d server listening\n", sfd);
339        else if (is_udp)
340            fprintf(stderr, "<%d server listening (udp)\n", sfd);
341        else
342            fprintf(stderr, "<%d new client connection\n", sfd);
343    }
344
345    c->sfd = sfd;
346    c->udp = is_udp;
347    c->state = init_state;
348    c->rlbytes = 0;
349    c->rbytes = c->wbytes = 0;
350    c->wcurr = c->wbuf;
351    c->rcurr = c->rbuf;
352    c->ritem = 0;
353    c->icurr = c->ilist;
354    c->ileft = 0;
355    c->iovused = 0;
356    c->msgcurr = 0;
357    c->msgused = 0;
358
359    c->write_and_go = conn_read;
360    c->write_and_free = 0;
361    c->item = 0;
362    c->bucket = -1;
363    c->gen = 0;
364
365    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
366    event_base_set(base, &c->event);
367    c->ev_flags = event_flags;
368
369    if (event_add(&c->event, 0) == -1) {
370        if (conn_add_to_freelist(c)) {
371            conn_free(c);
372        }
373        return NULL;
374    }
375
376    STATS_LOCK();
377    stats.curr_conns++;
378    stats.total_conns++;
379    STATS_UNLOCK();
380
381    return c;
382}
383
384static void conn_cleanup(conn *c) {
385    assert(c != NULL);
386
387    if (c->item) {
388        item_remove(c->item);
389        c->item = 0;
390    }
391
392    if (c->ileft != 0) {
393        for (; c->ileft > 0; c->ileft--,c->icurr++) {
394            item_remove(*(c->icurr));
395        }
396    }
397
398    if (c->write_and_free) {
399        free(c->write_and_free);
400        c->write_and_free = 0;
401    }
402}
403
404/*
405 * Frees a connection.
406 */
407void conn_free(conn *c) {
408    if (c) {
409        if (c->hdrbuf)
410            free(c->hdrbuf);
411        if (c->msglist)
412            free(c->msglist);
413        if (c->rbuf)
414            free(c->rbuf);
415        if (c->wbuf)
416            free(c->wbuf);
417        if (c->ilist)
418            free(c->ilist);
419        if (c->iov)
420            free(c->iov);
421        free(c);
422    }
423}
424
425static void conn_close(conn *c) {
426    assert(c != NULL);
427
428    /* delete the event, the socket and the conn */
429    event_del(&c->event);
430
431    if (settings.verbose > 1)
432        fprintf(stderr, "<%d connection closed.\n", c->sfd);
433
434    close(c->sfd);
435    accept_new_conns(true);
436    conn_cleanup(c);
437
438    /* if the connection has big buffers, just free it */
439    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
440        conn_free(c);
441    }
442
443    STATS_LOCK();
444    stats.curr_conns--;
445    STATS_UNLOCK();
446
447    return;
448}
449
450
451/*
452 * Shrinks a connection's buffers if they're too big.  This prevents
453 * periodic large "get" requests from permanently chewing lots of server
454 * memory.
455 *
456 * This should only be called in between requests since it can wipe output
457 * buffers!
458 */
459static void conn_shrink(conn *c) {
460    assert(c != NULL);
461
462    if (c->udp)
463        return;
464
465    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
466        char *newbuf;
467
468        if (c->rcurr != c->rbuf)
469            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
470
471        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
472
473        if (newbuf) {
474            c->rbuf = newbuf;
475            c->rsize = DATA_BUFFER_SIZE;
476        }
477        /* TODO check other branch... */
478        c->rcurr = c->rbuf;
479    }
480
481    if (c->isize > ITEM_LIST_HIGHWAT) {
482        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
483        if (newbuf) {
484            c->ilist = newbuf;
485            c->isize = ITEM_LIST_INITIAL;
486        }
487    /* TODO check error condition? */
488    }
489
490    if (c->msgsize > MSG_LIST_HIGHWAT) {
491        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
492        if (newbuf) {
493            c->msglist = newbuf;
494            c->msgsize = MSG_LIST_INITIAL;
495        }
496    /* TODO check error condition? */
497    }
498
499    if (c->iovsize > IOV_LIST_HIGHWAT) {
500        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
501        if (newbuf) {
502            c->iov = newbuf;
503            c->iovsize = IOV_LIST_INITIAL;
504        }
505    /* TODO check return value */
506    }
507}
508
509/*
510 * Sets a connection's current state in the state machine. Any special
511 * processing that needs to happen on certain state transitions can
512 * happen here.
513 */
514static void conn_set_state(conn *c, int state) {
515    assert(c != NULL);
516
517    if (state != c->state) {
518        if (state == conn_read) {
519            conn_shrink(c);
520            assoc_move_next_bucket();
521        }
522        c->state = state;
523    }
524}
525
526
527/*
528 * Ensures that there is room for another struct iovec in a connection's
529 * iov list.
530 *
531 * Returns 0 on success, -1 on out-of-memory.
532 */
533static int ensure_iov_space(conn *c) {
534    assert(c != NULL);
535
536    if (c->iovused >= c->iovsize) {
537        int i, iovnum;
538        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
539                                (c->iovsize * 2) * sizeof(struct iovec));
540        if (! new_iov)
541            return -1;
542        c->iov = new_iov;
543        c->iovsize *= 2;
544
545        /* Point all the msghdr structures at the new list. */
546        for (i = 0, iovnum = 0; i < c->msgused; i++) {
547            c->msglist[i].msg_iov = &c->iov[iovnum];
548            iovnum += c->msglist[i].msg_iovlen;
549        }
550    }
551
552    return 0;
553}
554
555
556/*
557 * Adds data to the list of pending data that will be written out to a
558 * connection.
559 *
560 * Returns 0 on success, -1 on out-of-memory.
561 */
562
563static int add_iov(conn *c, const void *buf, int len) {
564    struct msghdr *m;
565    int leftover;
566    bool limit_to_mtu;
567
568    assert(c != NULL);
569
570    do {
571        m = &c->msglist[c->msgused - 1];
572
573        /*
574         * Limit UDP packets, and the first payloads of TCP replies, to
575         * UDP_MAX_PAYLOAD_SIZE bytes.
576         */
577        limit_to_mtu = c->udp || (1 == c->msgused);
578
579        /* We may need to start a new msghdr if this one is full. */
580        if (m->msg_iovlen == IOV_MAX ||
581            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
582            add_msghdr(c);
583            m = &c->msglist[c->msgused - 1];
584        }
585
586        if (ensure_iov_space(c) != 0)
587            return -1;
588
589        /* If the fragment is too big to fit in the datagram, split it up */
590        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
591            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
592            len -= leftover;
593        } else {
594            leftover = 0;
595        }
596
597        m = &c->msglist[c->msgused - 1];
598        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
599        m->msg_iov[m->msg_iovlen].iov_len = len;
600
601        c->msgbytes += len;
602        c->iovused++;
603        m->msg_iovlen++;
604
605        buf = ((char *)buf) + len;
606        len = leftover;
607    } while (leftover > 0);
608
609    return 0;
610}
611
612
613/*
614 * Constructs a set of UDP headers and attaches them to the outgoing messages.
615 */
616static int build_udp_headers(conn *c) {
617    int i;
618    unsigned char *hdr;
619
620    assert(c != NULL);
621
622    if (c->msgused > c->hdrsize) {
623        void *new_hdrbuf;
624        if (c->hdrbuf)
625            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
626        else
627            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
628        if (! new_hdrbuf)
629            return -1;
630        c->hdrbuf = (unsigned char *)new_hdrbuf;
631        c->hdrsize = c->msgused * 2;
632    }
633
634    hdr = c->hdrbuf;
635    for (i = 0; i < c->msgused; i++) {
636        c->msglist[i].msg_iov[0].iov_base = hdr;
637        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
638        *hdr++ = c->request_id / 256;
639        *hdr++ = c->request_id % 256;
640        *hdr++ = i / 256;
641        *hdr++ = i % 256;
642        *hdr++ = c->msgused / 256;
643        *hdr++ = c->msgused % 256;
644        *hdr++ = 0;
645        *hdr++ = 0;
646        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
647    }
648
649    return 0;
650}
651
652
653static void out_string(conn *c, const char *str) {
654    size_t len;
655
656    assert(c != NULL);
657
658    if (settings.verbose > 1)
659        fprintf(stderr, ">%d %s\n", c->sfd, str);
660
661    len = strlen(str);
662    if ((len + 2) > c->wsize) {
663        /* ought to be always enough. just fail for simplicity */
664        str = "SERVER_ERROR output line too long";
665        len = strlen(str);
666    }
667
668    memcpy(c->wbuf, str, len);
669    memcpy(c->wbuf + len, "\r\n", 3);
670    c->wbytes = len + 2;
671    c->wcurr = c->wbuf;
672
673    conn_set_state(c, conn_write);
674    c->write_and_go = conn_read;
675    return;
676}
677
678/*
679 * we get here after reading the value in set/add/replace commands. The command
680 * has been stored in c->item_comm, and the item is ready in c->item.
681 */
682
683static void complete_nread(conn *c) {
684    assert(c != NULL);
685
686    item *it = c->item;
687    int comm = c->item_comm;
688
689    STATS_LOCK();
690    stats.set_cmds++;
691    STATS_UNLOCK();
692
693    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
694        out_string(c, "CLIENT_ERROR bad data chunk");
695    } else {
696        if (store_item(it, comm)) {
697            out_string(c, "STORED");
698        } else {
699            out_string(c, "NOT_STORED");
700        }
701    }
702
703    item_remove(c->item);       /* release the c->item reference */
704    c->item = 0;
705}
706
707/*
708 * Stores an item in the cache according to the semantics of one of the set
709 * commands. In threaded mode, this is protected by the cache lock.
710 *
711 * Returns true if the item was stored.
712 */
713int do_store_item(item *it, int comm) {
714    char *key = ITEM_key(it);
715    bool delete_locked = false;
716    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
717    int stored = 0;
718
719    if (old_it != NULL && comm == NREAD_ADD) {
720        /* add only adds a nonexistent item, but promote to head of LRU */
721        do_item_update(old_it);
722    } else if (!old_it && comm == NREAD_REPLACE) {
723        /* replace only replaces an existing value; don't store */
724    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
725        /* replace and add can't override delete locks; don't store */
726    } else {
727        /* "set" commands can override the delete lock
728           window... in which case we have to find the old hidden item
729           that's in the namespace/LRU but wasn't returned by
730           item_get.... because we need to replace it */
731        if (delete_locked)
732            old_it = do_item_get_nocheck(key, it->nkey);
733
734        if (old_it != NULL)
735            do_item_replace(old_it, it);
736        else
737            do_item_link(it);
738
739        stored = 1;
740    }
741
742    if (old_it)
743        do_item_remove(old_it);         /* release our reference */
744    return stored;
745}
746
747typedef struct token_s {
748    char *value;
749    size_t length;
750} token_t;
751
752#define COMMAND_TOKEN 0
753#define SUBCOMMAND_TOKEN 1
754#define KEY_TOKEN 1
755#define KEY_MAX_LENGTH 250
756
757#define MAX_TOKENS 6
758
759/*
760 * Tokenize the command string by replacing whitespace with '\0' and update
761 * the token array tokens with pointer to start of each token and length.
762 * Returns total number of tokens.  The last valid token is the terminal
763 * token (value points to the first unprocessed character of the string and
764 * length zero).
765 *
766 * Usage example:
767 *
768 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
769 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
770 *          ...
771 *      }
772 *      ncommand = tokens[ix].value - command;
773 *      command  = tokens[ix].value;
774 *   }
775 */
776static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
777    char *s, *e;
778    size_t ntokens = 0;
779
780    assert(command != NULL && tokens != NULL && max_tokens > 1);
781
782    for (s = e = command; ntokens < max_tokens - 1; ++e) {
783        if (*e == ' ') {
784            if (s != e) {
785                tokens[ntokens].value = s;
786                tokens[ntokens].length = e - s;
787                ntokens++;
788                *e = '\0';
789            }
790            s = e + 1;
791        }
792        else if (*e == '\0') {
793            if (s != e) {
794                tokens[ntokens].value = s;
795                tokens[ntokens].length = e - s;
796                ntokens++;
797            }
798
799            break; /* string end */
800        }
801    }
802
803    /*
804     * If we scanned the whole string, the terminal value pointer is null,
805     * otherwise it is the first unprocessed character.
806     */
807    tokens[ntokens].value =  *e == '\0' ? NULL : e;
808    tokens[ntokens].length = 0;
809    ntokens++;
810
811    return ntokens;
812}
813
814/* set up a connection to write a buffer then free it, used for stats */
815static void write_and_free(conn *c, char *buf, int bytes) {
816    if (buf) {
817        c->write_and_free = buf;
818        c->wcurr = buf;
819        c->wbytes = bytes;
820        conn_set_state(c, conn_write);
821        c->write_and_go = conn_read;
822    } else {
823        out_string(c, "SERVER_ERROR out of memory");
824    }
825}
826
827inline static void process_stats_detail(conn *c, const char *command) {
828    assert(c != NULL);
829
830    if (strcmp(command, "on") == 0) {
831        settings.detail_enabled = 1;
832        out_string(c, "OK");
833    }
834    else if (strcmp(command, "off") == 0) {
835        settings.detail_enabled = 0;
836        out_string(c, "OK");
837    }
838    else if (strcmp(command, "dump") == 0) {
839        int len;
840        char *stats = stats_prefix_dump(&len);
841        write_and_free(c, stats, len);
842    }
843    else {
844        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
845    }
846}
847
848static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
849    rel_time_t now = current_time;
850    char *command;
851    char *subcommand;
852
853    assert(c != NULL);
854
855    if(ntokens < 2) {
856        out_string(c, "CLIENT_ERROR bad command line");
857        return;
858    }
859
860    command = tokens[COMMAND_TOKEN].value;
861
862    if (ntokens == 2 && strcmp(command, "stats") == 0) {
863        char temp[1024];
864        pid_t pid = getpid();
865        char *pos = temp;
866
867#ifndef WIN32
868        struct rusage usage;
869        getrusage(RUSAGE_SELF, &usage);
870#endif /* !WIN32 */
871
872        STATS_LOCK();
873        pos += sprintf(pos, "STAT pid %u\r\n", pid);
874        pos += sprintf(pos, "STAT uptime %u\r\n", now);
875        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
876        pos += sprintf(pos, "STAT version " VERSION "\r\n");
877        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
878#ifndef WIN32
879        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
880        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
881#endif /* !WIN32 */
882        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
883        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
884        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
885        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
886        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
887        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
888        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
889        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
890        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
891        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
892        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
893        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
894        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
895        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
896        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
897        pos += sprintf(pos, "END");
898        STATS_UNLOCK();
899        out_string(c, temp);
900        return;
901    }
902
903    subcommand = tokens[SUBCOMMAND_TOKEN].value;
904
905    if (strcmp(subcommand, "reset") == 0) {
906        stats_reset();
907        out_string(c, "RESET");
908        return;
909    }
910
911#ifdef HAVE_MALLOC_H
912#ifdef HAVE_STRUCT_MALLINFO
913    if (strcmp(subcommand, "malloc") == 0) {
914        char temp[512];
915        struct mallinfo info;
916        char *pos = temp;
917
918        info = mallinfo();
919        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
920        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
921        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
922        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
923        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
924        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
925        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
926        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
927        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
928        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
929        out_string(c, temp);
930        return;
931    }
932#endif /* HAVE_STRUCT_MALLINFO */
933#endif /* HAVE_MALLOC_H */
934
935#if !defined(WIN32) || !defined(__APPLE__)
936    if (strcmp(subcommand, "maps") == 0) {
937        char *wbuf;
938        int wsize = 8192; /* should be enough */
939        int fd;
940        int res;
941
942        if ((wbuf = (char *)malloc(wsize)) == NULL) {
943            out_string(c, "SERVER_ERROR out of memory");
944            return;
945        }
946
947        fd = open("/proc/self/maps", O_RDONLY);
948        if (fd == -1) {
949            out_string(c, "SERVER_ERROR cannot open the maps file");
950            free(wbuf);
951            return;
952        }
953
954        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
955        if (res == wsize - 6) {
956            out_string(c, "SERVER_ERROR buffer overflow");
957            free(wbuf); close(fd);
958            return;
959        }
960        if (res == 0 || res == -1) {
961            out_string(c, "SERVER_ERROR can't read the maps file");
962            free(wbuf); close(fd);
963            return;
964        }
965        memcpy(wbuf + res, "END\r\n", 5);
966        write_and_free(c, wbuf, res + 5);
967        close(fd);
968        return;
969    }
970#endif
971
972    if (strcmp(subcommand, "cachedump") == 0) {
973
974        char *buf;
975        unsigned int bytes, id, limit = 0;
976
977        if(ntokens < 5) {
978            out_string(c, "CLIENT_ERROR bad command line");
979            return;
980        }
981
982        id = strtoul(tokens[2].value, NULL, 10);
983        limit = strtoul(tokens[3].value, NULL, 10);
984
985        if(errno == ERANGE) {
986            out_string(c, "CLIENT_ERROR bad command line format");
987            return;
988        }
989
990        buf = item_cachedump(id, limit, &bytes);
991        write_and_free(c, buf, bytes);
992        return;
993    }
994
995    if (strcmp(subcommand, "slabs") == 0) {
996        int bytes = 0;
997        char *buf = slabs_stats(&bytes);
998        write_and_free(c, buf, bytes);
999        return;
1000    }
1001
1002    if (strcmp(subcommand, "items") == 0) {
1003        int bytes = 0;
1004        char *buf = item_stats(&bytes);
1005        write_and_free(c, buf, bytes);
1006        return;
1007    }
1008
1009    if (strcmp(subcommand, "detail") == 0) {
1010        if (ntokens < 4)
1011            process_stats_detail(c, "");  /* outputs the error message */
1012        else
1013            process_stats_detail(c, tokens[2].value);
1014        return;
1015    }
1016
1017    if (strcmp(subcommand, "sizes") == 0) {
1018        int bytes = 0;
1019        char *buf = item_stats_sizes(&bytes);
1020        write_and_free(c, buf, bytes);
1021        return;
1022    }
1023
1024    out_string(c, "ERROR");
1025}
1026
1027/* ntokens is overwritten here... shrug.. */
1028static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_key_ptr) {
1029    char *key;
1030    size_t nkey;
1031    int i = 0;
1032    item *it;
1033    token_t *key_token = &tokens[KEY_TOKEN];
1034    char suffix[255];
1035    uint32_t in_memory_ptr;
1036    assert(c != NULL);
1037
1038    if (settings.managed) {
1039        int bucket = c->bucket;
1040        if (bucket == -1) {
1041            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1042            return;
1043        }
1044        c->bucket = -1;
1045        if (buckets[bucket] != c->gen) {
1046            out_string(c, "ERROR_NOT_OWNER");
1047            return;
1048        }
1049    }
1050
1051    do {
1052        while(key_token->length != 0) {
1053
1054            key = key_token->value;
1055            nkey = key_token->length;
1056
1057            if(nkey > KEY_MAX_LENGTH) {
1058                out_string(c, "CLIENT_ERROR bad command line format");
1059                return;
1060            }
1061
1062            STATS_LOCK();
1063            stats.get_cmds++;
1064            STATS_UNLOCK();
1065            it = item_get(key, nkey);
1066            if (settings.detail_enabled) {
1067                stats_prefix_record_get(key, NULL != it);
1068            }
1069            if (it) {
1070                if (i >= c->isize) {
1071                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1072                    if (new_list) {
1073                        c->isize *= 2;
1074                        c->ilist = new_list;
1075                    } else break;
1076                }
1077
1078                /*
1079                 * Construct the response. Each hit adds three elements to the
1080                 * outgoing data list:
1081                 *   "VALUE "
1082                 *   key
1083                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1084                 */
1085
1086                if(return_key_ptr == true)
1087                {
1088                  in_memory_ptr = (uint32_t)item_get(key, nkey);
1089                  sprintf(suffix," %d %d %lu\r\n", atoi(ITEM_suffix(it) + 1), it->nbytes - 2, in_memory_ptr);
1090                  if (add_iov(c, "VALUE ", 6) != 0 ||
1091                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1092                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1093                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1094                      {
1095                          break;
1096                      }
1097                }
1098                else
1099                {
1100                  if (add_iov(c, "VALUE ", 6) != 0 ||
1101                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1102                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1103                      {
1104                          break;
1105                      }
1106                }
1107
1108
1109                if (settings.verbose > 1)
1110                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1111
1112                /* item_get() has incremented it->refcount for us */
1113                STATS_LOCK();
1114                stats.get_hits++;
1115                STATS_UNLOCK();
1116                item_update(it);
1117                *(c->ilist + i) = it;
1118                i++;
1119
1120            } else {
1121                STATS_LOCK();
1122                stats.get_misses++;
1123                STATS_UNLOCK();
1124            }
1125
1126            key_token++;
1127        }
1128
1129        /*
1130         * If the command string hasn't been fully processed, get the next set
1131         * of tokens.
1132         */
1133        if(key_token->value != NULL) {
1134            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1135            key_token = tokens;
1136        }
1137
1138    } while(key_token->value != NULL);
1139
1140    c->icurr = c->ilist;
1141    c->ileft = i;
1142
1143    if (settings.verbose > 1)
1144        fprintf(stderr, ">%d END\n", c->sfd);
1145    add_iov(c, "END\r\n", 5);
1146
1147    if (c->udp && build_udp_headers(c) != 0) {
1148        out_string(c, "SERVER_ERROR out of memory");
1149    }
1150    else {
1151        conn_set_state(c, conn_mwrite);
1152        c->msgcurr = 0;
1153    }
1154    return;
1155}
1156
1157static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1158    char *key;
1159    size_t nkey;
1160    int flags;
1161    time_t exptime;
1162    int vlen;
1163    uint32_t req_memory_ptr, in_memory_ptr;
1164
1165    item *it;
1166
1167    assert(c != NULL);
1168
1169    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1170        out_string(c, "CLIENT_ERROR bad command line format");
1171        return;
1172    }
1173
1174    key = tokens[KEY_TOKEN].value;
1175    nkey = tokens[KEY_TOKEN].length;
1176
1177    flags = strtoul(tokens[2].value, NULL, 10);
1178    exptime = strtol(tokens[3].value, NULL, 10);
1179    vlen = strtol(tokens[4].value, NULL, 10);
1180
1181    // does cas value exist?
1182    if(tokens[5].value)
1183    {
1184      req_memory_ptr = strtoull(tokens[5].value, NULL, 10);
1185    }
1186
1187    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1188        out_string(c, "CLIENT_ERROR bad command line format");
1189        return;
1190    }
1191
1192    if (settings.detail_enabled) {
1193        stats_prefix_record_set(key);
1194    }
1195
1196    if (settings.managed) {
1197        int bucket = c->bucket;
1198        if (bucket == -1) {
1199            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1200            return;
1201        }
1202        c->bucket = -1;
1203        if (buckets[bucket] != c->gen) {
1204            out_string(c, "ERROR_NOT_OWNER");
1205            return;
1206        }
1207    }
1208
1209    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1210
1211    /* HANDLE_CAS VALIDATION */
1212    if(handle_cas == true)
1213    {
1214      item *itmp=item_get(key, it->nkey);
1215      /* Release the reference */
1216      if(itmp) {
1217        item_remove(itmp);
1218      }
1219      in_memory_ptr = (uint32_t)itmp;
1220      if(in_memory_ptr == req_memory_ptr)
1221      {
1222        // validates allow the set
1223      }
1224      else if(in_memory_ptr)
1225      {
1226        out_string(c, "EXISTS");
1227
1228        /* swallow the data line */
1229        c->write_and_go = conn_swallow;
1230        c->sbytes = vlen + 2;
1231        return;
1232      }
1233      else
1234      {
1235        out_string(c, "NOT FOUND");
1236        /* swallow the data line */
1237        c->write_and_go = conn_swallow;
1238        c->sbytes = vlen + 2;
1239        return;
1240      }
1241    }
1242
1243    if (it == 0) {
1244        if (! item_size_ok(nkey, flags, vlen + 2))
1245            out_string(c, "SERVER_ERROR object too large for cache");
1246        else
1247            out_string(c, "SERVER_ERROR out of memory");
1248        /* swallow the data line */
1249        c->write_and_go = conn_swallow;
1250        c->sbytes = vlen + 2;
1251        return;
1252    }
1253
1254    c->item_comm = comm;
1255    c->item = it;
1256    c->ritem = ITEM_data(it);
1257    c->rlbytes = it->nbytes;
1258    conn_set_state(c, conn_nread);
1259}
1260
1261static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
1262    char temp[32];
1263    item *it;
1264    unsigned int delta;
1265    char *key;
1266    size_t nkey;
1267
1268    assert(c != NULL);
1269
1270    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1271        out_string(c, "CLIENT_ERROR bad command line format");
1272        return;
1273    }
1274
1275    key = tokens[KEY_TOKEN].value;
1276    nkey = tokens[KEY_TOKEN].length;
1277
1278    if (settings.managed) {
1279        int bucket = c->bucket;
1280        if (bucket == -1) {
1281            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1282            return;
1283        }
1284        c->bucket = -1;
1285        if (buckets[bucket] != c->gen) {
1286            out_string(c, "ERROR_NOT_OWNER");
1287            return;
1288        }
1289    }
1290
1291    delta = strtoul(tokens[2].value, NULL, 10);
1292
1293    if(errno == ERANGE) {
1294        out_string(c, "CLIENT_ERROR bad command line format");
1295        return;
1296    }
1297
1298    it = item_get(key, nkey);
1299    if (!it) {
1300        out_string(c, "NOT_FOUND");
1301        return;
1302    }
1303
1304    out_string(c, add_delta(it, incr, delta, temp));
1305    item_remove(it);         /* release our reference */
1306}
1307
1308/*
1309 * adds a delta value to a numeric item.
1310 *
1311 * it    item to adjust
1312 * incr  true to increment value, false to decrement
1313 * delta amount to adjust value by
1314 * buf   buffer for response string
1315 *
1316 * returns a response string to send back to the client.
1317 */
1318char *do_add_delta(item *it, const int incr, const unsigned int delta, char *buf) {
1319    char *ptr;
1320    uint32_t value;
1321    int res;
1322
1323    ptr = ITEM_data(it);
1324    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1325
1326    value = strtoul(ptr, NULL, 10);
1327
1328    if(errno == ERANGE) {
1329        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1330    }
1331
1332    if (incr != 0)
1333        value += delta;
1334    else {
1335        if (delta >= value) value = 0;
1336        else value -= delta;
1337    }
1338    snprintf(buf, 32, "%u", value);
1339    res = strlen(buf);
1340    if (res + 2 > it->nbytes) { /* need to realloc */
1341        item *new_it;
1342        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1343        if (new_it == 0) {
1344            return "SERVER_ERROR out of memory";
1345        }
1346        memcpy(ITEM_data(new_it), buf, res);
1347        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1348        do_item_replace(it, new_it);
1349        do_item_remove(new_it);       /* release our reference */
1350    } else { /* replace in-place */
1351        memcpy(ITEM_data(it), buf, res);
1352        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1353    }
1354
1355    return buf;
1356}
1357
1358static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1359    char *key;
1360    size_t nkey;
1361    item *it;
1362    time_t exptime = 0;
1363
1364    assert(c != NULL);
1365
1366    if (settings.managed) {
1367        int bucket = c->bucket;
1368        if (bucket == -1) {
1369            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1370            return;
1371        }
1372        c->bucket = -1;
1373        if (buckets[bucket] != c->gen) {
1374            out_string(c, "ERROR_NOT_OWNER");
1375            return;
1376        }
1377    }
1378
1379    key = tokens[KEY_TOKEN].value;
1380    nkey = tokens[KEY_TOKEN].length;
1381
1382    if(nkey > KEY_MAX_LENGTH) {
1383        out_string(c, "CLIENT_ERROR bad command line format");
1384        return;
1385    }
1386
1387    if(ntokens == 4) {
1388        exptime = strtol(tokens[2].value, NULL, 10);
1389
1390        if(errno == ERANGE) {
1391            out_string(c, "CLIENT_ERROR bad command line format");
1392            return;
1393        }
1394    }
1395
1396    if (settings.detail_enabled) {
1397        stats_prefix_record_delete(key);
1398    }
1399
1400    it = item_get(key, nkey);
1401    if (it) {
1402        if (exptime == 0) {
1403            item_unlink(it);
1404            item_remove(it);      /* release our reference */
1405            out_string(c, "DELETED");
1406        } else {
1407            /* our reference will be transfered to the delete queue */
1408            out_string(c, defer_delete(it, exptime));
1409        }
1410    } else {
1411        out_string(c, "NOT_FOUND");
1412    }
1413}
1414
1415/*
1416 * Adds an item to the deferred-delete list so it can be reaped later.
1417 *
1418 * Returns the result to send to the client.
1419 */
1420char *do_defer_delete(item *it, time_t exptime)
1421{
1422    if (delcurr >= deltotal) {
1423        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1424        if (new_delete) {
1425            todelete = new_delete;
1426            deltotal *= 2;
1427        } else {
1428            /*
1429             * can't delete it immediately, user wants a delay,
1430             * but we ran out of memory for the delete queue
1431             */
1432            item_remove(it);    /* release reference */
1433            return "SERVER_ERROR out of memory";
1434        }
1435    }
1436
1437    /* use its expiration time as its deletion time now */
1438    it->exptime = realtime(exptime);
1439    it->it_flags |= ITEM_DELETED;
1440    todelete[delcurr++] = it;
1441
1442    return "DELETED";
1443}
1444
1445static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1446    unsigned int level;
1447
1448    assert(c != NULL);
1449
1450    level = strtoul(tokens[1].value, NULL, 10);
1451    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1452    out_string(c, "OK");
1453    return;
1454}
1455
1456static void process_command(conn *c, char *command) {
1457
1458    token_t tokens[MAX_TOKENS];
1459    size_t ntokens;
1460    int comm;
1461
1462    assert(c != NULL);
1463
1464    if (settings.verbose > 1)
1465        fprintf(stderr, "<%d %s\n", c->sfd, command);
1466
1467    /*
1468     * for commands set/add/replace, we build an item and read the data
1469     * directly into it, then continue in nread_complete().
1470     */
1471
1472    c->msgcurr = 0;
1473    c->msgused = 0;
1474    c->iovused = 0;
1475    if (add_msghdr(c) != 0) {
1476        out_string(c, "SERVER_ERROR out of memory");
1477        return;
1478    }
1479
1480    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1481    if (ntokens >= 3 &&
1482        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1483         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1484
1485        process_get_command(c, tokens, ntokens, false);
1486
1487    } else if (ntokens == 6 &&
1488               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1489                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1490                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
1491
1492        process_update_command(c, tokens, ntokens, comm, false);
1493
1494    } else if (ntokens == 6 && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0)) {
1495
1496        process_update_command(c, tokens, ntokens, comm, true);
1497
1498    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1499
1500        process_arithmetic_command(c, tokens, ntokens, 1);
1501
1502    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
1503
1504        process_get_command(c, tokens, ntokens, true);
1505
1506    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1507
1508        process_arithmetic_command(c, tokens, ntokens, 0);
1509
1510    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1511
1512        process_delete_command(c, tokens, ntokens);
1513
1514    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1515        unsigned int bucket, gen;
1516        if (!settings.managed) {
1517            out_string(c, "CLIENT_ERROR not a managed instance");
1518            return;
1519        }
1520
1521        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1522            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1523                out_string(c, "CLIENT_ERROR bucket number out of range");
1524                return;
1525            }
1526            buckets[bucket] = gen;
1527            out_string(c, "OWNED");
1528            return;
1529        } else {
1530            out_string(c, "CLIENT_ERROR bad format");
1531            return;
1532        }
1533
1534    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1535
1536        int bucket;
1537        if (!settings.managed) {
1538            out_string(c, "CLIENT_ERROR not a managed instance");
1539            return;
1540        }
1541        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1542            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1543                out_string(c, "CLIENT_ERROR bucket number out of range");
1544                return;
1545            }
1546            buckets[bucket] = 0;
1547            out_string(c, "DISOWNED");
1548            return;
1549        } else {
1550            out_string(c, "CLIENT_ERROR bad format");
1551            return;
1552        }
1553
1554    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1555        int bucket, gen;
1556        if (!settings.managed) {
1557            out_string(c, "CLIENT_ERROR not a managed instance");
1558            return;
1559        }
1560        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1561            /* we never write anything back, even if input's wrong */
1562            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1563                /* do nothing, bad input */
1564            } else {
1565                c->bucket = bucket;
1566                c->gen = gen;
1567            }
1568            conn_set_state(c, conn_read);
1569            return;
1570        } else {
1571            out_string(c, "CLIENT_ERROR bad format");
1572            return;
1573        }
1574
1575    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1576
1577        process_stat(c, tokens, ntokens);
1578
1579    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1580        time_t exptime = 0;
1581        set_current_time();
1582
1583        if(ntokens == 2) {
1584            settings.oldest_live = current_time - 1;
1585            item_flush_expired();
1586            out_string(c, "OK");
1587            return;
1588        }
1589
1590        exptime = strtol(tokens[1].value, NULL, 10);
1591        if(errno == ERANGE) {
1592            out_string(c, "CLIENT_ERROR bad command line format");
1593            return;
1594        }
1595
1596        settings.oldest_live = realtime(exptime) - 1;
1597        item_flush_expired();
1598        out_string(c, "OK");
1599        return;
1600
1601    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1602
1603        out_string(c, "VERSION " VERSION);
1604
1605    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1606
1607        conn_set_state(c, conn_closing);
1608
1609    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1610                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1611#ifdef ALLOW_SLABS_REASSIGN
1612
1613        int src, dst, rv;
1614
1615        src = strtol(tokens[2].value, NULL, 10);
1616        dst  = strtol(tokens[3].value, NULL, 10);
1617
1618        if(errno == ERANGE) {
1619            out_string(c, "CLIENT_ERROR bad command line format");
1620            return;
1621        }
1622
1623        rv = slabs_reassign(src, dst);
1624        if (rv == 1) {
1625            out_string(c, "DONE");
1626            return;
1627        }
1628        if (rv == 0) {
1629            out_string(c, "CANT");
1630            return;
1631        }
1632        if (rv == -1) {
1633            out_string(c, "BUSY");
1634            return;
1635        }
1636#else
1637        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1638#endif
1639    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1640        process_verbosity_command(c, tokens, ntokens);
1641    } else {
1642        out_string(c, "ERROR");
1643    }
1644    return;
1645}
1646
1647/*
1648 * if we have a complete line in the buffer, process it.
1649 */
1650static int try_read_command(conn *c) {
1651    char *el, *cont;
1652
1653    assert(c != NULL);
1654    assert(c->rcurr <= (c->rbuf + c->rsize));
1655
1656    if (c->rbytes == 0)
1657        return 0;
1658    el = memchr(c->rcurr, '\n', c->rbytes);
1659    if (!el)
1660        return 0;
1661    cont = el + 1;
1662    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1663        el--;
1664    }
1665    *el = '\0';
1666
1667    assert(cont <= (c->rcurr + c->rbytes));
1668
1669    process_command(c, c->rcurr);
1670
1671    c->rbytes -= (cont - c->rcurr);
1672    c->rcurr = cont;
1673
1674    assert(c->rcurr <= (c->rbuf + c->rsize));
1675
1676    return 1;
1677}
1678
1679/*
1680 * read a UDP request.
1681 * return 0 if there's nothing to read.
1682 */
1683static int try_read_udp(conn *c) {
1684    int res;
1685
1686    assert(c != NULL);
1687
1688    c->request_addr_size = sizeof(c->request_addr);
1689    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1690                   0, &c->request_addr, &c->request_addr_size);
1691    if (res > 8) {
1692        unsigned char *buf = (unsigned char *)c->rbuf;
1693        STATS_LOCK();
1694        stats.bytes_read += res;
1695        STATS_UNLOCK();
1696
1697        /* Beginning of UDP packet is the request ID; save it. */
1698        c->request_id = buf[0] * 256 + buf[1];
1699
1700        /* If this is a multi-packet request, drop it. */
1701        if (buf[4] != 0 || buf[5] != 1) {
1702            out_string(c, "SERVER_ERROR multi-packet request not supported");
1703            return 0;
1704        }
1705
1706        /* Don't care about any of the rest of the header. */
1707        res -= 8;
1708        memmove(c->rbuf, c->rbuf + 8, res);
1709
1710        c->rbytes += res;
1711        c->rcurr = c->rbuf;
1712        return 1;
1713    }
1714    return 0;
1715}
1716
1717/*
1718 * read from network as much as we can, handle buffer overflow and connection
1719 * close.
1720 * before reading, move the remaining incomplete fragment of a command
1721 * (if any) to the beginning of the buffer.
1722 * return 0 if there's nothing to read on the first read.
1723 */
1724static int try_read_network(conn *c) {
1725    int gotdata = 0;
1726    int res;
1727
1728    assert(c != NULL);
1729
1730    if (c->rcurr != c->rbuf) {
1731        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1732            memmove(c->rbuf, c->rcurr, c->rbytes);
1733        c->rcurr = c->rbuf;
1734    }
1735
1736    while (1) {
1737        if (c->rbytes >= c->rsize) {
1738            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1739            if (!new_rbuf) {
1740                if (settings.verbose > 0)
1741                    fprintf(stderr, "Couldn't realloc input buffer\n");
1742                c->rbytes = 0; /* ignore what we read */
1743                out_string(c, "SERVER_ERROR out of memory");
1744                c->write_and_go = conn_closing;
1745                return 1;
1746            }
1747            c->rcurr = c->rbuf = new_rbuf;
1748            c->rsize *= 2;
1749        }
1750
1751        /* unix socket mode doesn't need this, so zeroed out.  but why
1752         * is this done for every command?  presumably for UDP
1753         * mode.  */
1754        if (!settings.socketpath) {
1755            c->request_addr_size = sizeof(c->request_addr);
1756        } else {
1757            c->request_addr_size = 0;
1758        }
1759
1760        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
1761        if (res > 0) {
1762            STATS_LOCK();
1763            stats.bytes_read += res;
1764            STATS_UNLOCK();
1765            gotdata = 1;
1766            c->rbytes += res;
1767            continue;
1768        }
1769        if (res == 0) {
1770            /* connection closed */
1771            conn_set_state(c, conn_closing);
1772            return 1;
1773        }
1774        if (res == -1) {
1775            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1776            else return 0;
1777        }
1778    }
1779    return gotdata;
1780}
1781
1782static bool update_event(conn *c, const int new_flags) {
1783    assert(c != NULL);
1784
1785    struct event_base *base = c->event.ev_base;
1786    if (c->ev_flags == new_flags)
1787        return true;
1788    if (event_del(&c->event) == -1) return false;
1789    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1790    event_base_set(base, &c->event);
1791    c->ev_flags = new_flags;
1792    if (event_add(&c->event, 0) == -1) return false;
1793    return true;
1794}
1795
1796/*
1797 * Sets whether we are listening for new connections or not.
1798 */
1799void accept_new_conns(const bool do_accept) {
1800    if (! is_listen_thread())
1801        return;
1802    if (do_accept) {
1803        update_event(listen_conn, EV_READ | EV_PERSIST);
1804        if (listen(listen_conn->sfd, 1024) != 0) {
1805            perror("listen");
1806        }
1807    }
1808    else {
1809        update_event(listen_conn, 0);
1810        if (listen(listen_conn->sfd, 0) != 0) {
1811            perror("listen");
1812        }
1813    }
1814}
1815
1816
1817/*
1818 * Transmit the next chunk of data from our list of msgbuf structures.
1819 *
1820 * Returns:
1821 *   TRANSMIT_COMPLETE   All done writing.
1822 *   TRANSMIT_INCOMPLETE More data remaining to write.
1823 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
1824 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
1825 */
1826static int transmit(conn *c) {
1827    assert(c != NULL);
1828
1829    if (c->msgcurr < c->msgused &&
1830            c->msglist[c->msgcurr].msg_iovlen == 0) {
1831        /* Finished writing the current msg; advance to the next. */
1832        c->msgcurr++;
1833    }
1834    if (c->msgcurr < c->msgused) {
1835        ssize_t res;
1836        struct msghdr *m = &c->msglist[c->msgcurr];
1837
1838        res = sendmsg(c->sfd, m, 0);
1839        if (res > 0) {
1840            STATS_LOCK();
1841            stats.bytes_written += res;
1842            STATS_UNLOCK();
1843
1844            /* We've written some of the data. Remove the completed
1845               iovec entries from the list of pending writes. */
1846            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
1847                res -= m->msg_iov->iov_len;
1848                m->msg_iovlen--;
1849                m->msg_iov++;
1850            }
1851
1852            /* Might have written just part of the last iovec entry;
1853               adjust it so the next write will do the rest. */
1854            if (res > 0) {
1855                m->msg_iov->iov_base += res;
1856                m->msg_iov->iov_len -= res;
1857            }
1858            return TRANSMIT_INCOMPLETE;
1859        }
1860        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1861            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1862                if (settings.verbose > 0)
1863                    fprintf(stderr, "Couldn't update event\n");
1864                conn_set_state(c, conn_closing);
1865                return TRANSMIT_HARD_ERROR;
1866            }
1867            return TRANSMIT_SOFT_ERROR;
1868        }
1869        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1870           we have a real error, on which we close the connection */
1871        if (settings.verbose > 0)
1872            perror("Failed to write, and not due to blocking");
1873
1874        if (c->udp)
1875            conn_set_state(c, conn_read);
1876        else
1877            conn_set_state(c, conn_closing);
1878        return TRANSMIT_HARD_ERROR;
1879    } else {
1880        return TRANSMIT_COMPLETE;
1881    }
1882}
1883
1884static void drive_machine(conn *c) {
1885    bool stop = false;
1886    int sfd, flags = 1;
1887    socklen_t addrlen;
1888    struct sockaddr addr;
1889    int res;
1890
1891    assert(c != NULL);
1892
1893    while (!stop) {
1894
1895        switch(c->state) {
1896        case conn_listening:
1897            addrlen = sizeof(addr);
1898            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
1899                if (errno == EAGAIN || errno == EWOULDBLOCK) {
1900                    /* these are transient, so don't log anything */
1901                    stop = true;
1902                } else if (errno == EMFILE) {
1903                    if (settings.verbose > 0)
1904                        fprintf(stderr, "Too many open connections\n");
1905                    accept_new_conns(false);
1906                    stop = true;
1907                } else {
1908                    perror("accept()");
1909                    stop = true;
1910                }
1911                break;
1912            }
1913            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
1914                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
1915                perror("setting O_NONBLOCK");
1916                close(sfd);
1917                break;
1918            }
1919            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
1920                                     DATA_BUFFER_SIZE, false);
1921            break;
1922
1923        case conn_read:
1924            if (try_read_command(c) != 0) {
1925                continue;
1926            }
1927            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
1928                continue;
1929            }
1930            /* we have no command line and no data to read from network */
1931            if (!update_event(c, EV_READ | EV_PERSIST)) {
1932                if (settings.verbose > 0)
1933                    fprintf(stderr, "Couldn't update event\n");
1934                conn_set_state(c, conn_closing);
1935                break;
1936            }
1937            stop = true;
1938            break;
1939
1940        case conn_nread:
1941            /* we are reading rlbytes into ritem; */
1942            if (c->rlbytes == 0) {
1943                complete_nread(c);
1944                break;
1945            }
1946            /* first check if we have leftovers in the conn_read buffer */
1947            if (c->rbytes > 0) {
1948                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
1949                memcpy(c->ritem, c->rcurr, tocopy);
1950                c->ritem += tocopy;
1951                c->rlbytes -= tocopy;
1952                c->rcurr += tocopy;
1953                c->rbytes -= tocopy;
1954                break;
1955            }
1956
1957            /*  now try reading from the socket */
1958            res = read(c->sfd, c->ritem, c->rlbytes);
1959            if (res > 0) {
1960                STATS_LOCK();
1961                stats.bytes_read += res;
1962                STATS_UNLOCK();
1963                c->ritem += res;
1964                c->rlbytes -= res;
1965                break;
1966            }
1967            if (res == 0) { /* end of stream */
1968                conn_set_state(c, conn_closing);
1969                break;
1970            }
1971            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1972                if (!update_event(c, EV_READ | EV_PERSIST)) {
1973                    if (settings.verbose > 0)
1974                        fprintf(stderr, "Couldn't update event\n");
1975                    conn_set_state(c, conn_closing);
1976                    break;
1977                }
1978                stop = true;
1979                break;
1980            }
1981            /* otherwise we have a real error, on which we close the connection */
1982            if (settings.verbose > 0)
1983                fprintf(stderr, "Failed to read, and not due to blocking\n");
1984            conn_set_state(c, conn_closing);
1985            break;
1986
1987        case conn_swallow:
1988            /* we are reading sbytes and throwing them away */
1989            if (c->sbytes == 0) {
1990                conn_set_state(c, conn_read);
1991                break;
1992            }
1993
1994            /* first check if we have leftovers in the conn_read buffer */
1995            if (c->rbytes > 0) {
1996                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
1997                c->sbytes -= tocopy;
1998                c->rcurr += tocopy;
1999                c->rbytes -= tocopy;
2000                break;
2001            }
2002
2003            /*  now try reading from the socket */
2004            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2005            if (res > 0) {
2006                STATS_LOCK();
2007                stats.bytes_read += res;
2008                STATS_UNLOCK();
2009                c->sbytes -= res;
2010                break;
2011            }
2012            if (res == 0) { /* end of stream */
2013                conn_set_state(c, conn_closing);
2014                break;
2015            }
2016            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2017                if (!update_event(c, EV_READ | EV_PERSIST)) {
2018                    if (settings.verbose > 0)
2019                        fprintf(stderr, "Couldn't update event\n");
2020                    conn_set_state(c, conn_closing);
2021                    break;
2022                }
2023                stop = true;
2024                break;
2025            }
2026            /* otherwise we have a real error, on which we close the connection */
2027            if (settings.verbose > 0)
2028                fprintf(stderr, "Failed to read, and not due to blocking\n");
2029            conn_set_state(c, conn_closing);
2030            break;
2031
2032        case conn_write:
2033            /*
2034             * We want to write out a simple response. If we haven't already,
2035             * assemble it into a msgbuf list (this will be a single-entry
2036             * list for TCP or a two-entry list for UDP).
2037             */
2038            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
2039                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2040                    (c->udp && build_udp_headers(c) != 0)) {
2041                    if (settings.verbose > 0)
2042                        fprintf(stderr, "Couldn't build response\n");
2043                    conn_set_state(c, conn_closing);
2044                    break;
2045                }
2046            }
2047
2048            /* fall through... */
2049
2050        case conn_mwrite:
2051            switch (transmit(c)) {
2052            case TRANSMIT_COMPLETE:
2053                if (c->state == conn_mwrite) {
2054                    while (c->ileft > 0) {
2055                        item *it = *(c->icurr);
2056                        assert((it->it_flags & ITEM_SLABBED) == 0);
2057                        item_remove(it);
2058                        c->icurr++;
2059                        c->ileft--;
2060                    }
2061                    conn_set_state(c, conn_read);
2062                } else if (c->state == conn_write) {
2063                    if (c->write_and_free) {
2064                        free(c->write_and_free);
2065                        c->write_and_free = 0;
2066                    }
2067                    conn_set_state(c, c->write_and_go);
2068                } else {
2069                    if (settings.verbose > 0)
2070                        fprintf(stderr, "Unexpected state %d\n", c->state);
2071                    conn_set_state(c, conn_closing);
2072                }
2073                break;
2074
2075            case TRANSMIT_INCOMPLETE:
2076            case TRANSMIT_HARD_ERROR:
2077                break;                   /* Continue in state machine. */
2078
2079            case TRANSMIT_SOFT_ERROR:
2080                stop = true;
2081                break;
2082            }
2083            break;
2084
2085        case conn_closing:
2086            if (c->udp)
2087                conn_cleanup(c);
2088            else
2089                conn_close(c);
2090            stop = true;
2091            break;
2092        }
2093    }
2094
2095    return;
2096}
2097
2098void event_handler(const int fd, const short which, void *arg) {
2099    conn *c;
2100
2101    c = (conn *)arg;
2102    assert(c != NULL);
2103
2104    c->which = which;
2105
2106    /* sanity */
2107    if (fd != c->sfd) {
2108        if (settings.verbose > 0)
2109            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2110        conn_close(c);
2111        return;
2112    }
2113
2114    drive_machine(c);
2115
2116    /* wait for next event */
2117    return;
2118}
2119
2120static int new_socket(const bool is_udp) {
2121    int sfd;
2122    int flags;
2123
2124    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
2125        perror("socket()");
2126        return -1;
2127    }
2128
2129    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2130        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2131        perror("setting O_NONBLOCK");
2132        close(sfd);
2133        return -1;
2134    }
2135    return sfd;
2136}
2137
2138
2139/*
2140 * Sets a socket's send buffer size to the maximum allowed by the system.
2141 */
2142static void maximize_sndbuf(const int sfd) {
2143    socklen_t intsize = sizeof(int);
2144    int last_good = 0;
2145    int min, max, avg;
2146    int old_size;
2147
2148    /* Start with the default size. */
2149    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2150        if (settings.verbose > 0)
2151            perror("getsockopt(SO_SNDBUF)");
2152        return;
2153    }
2154
2155    /* Binary-search for the real maximum. */
2156    min = old_size;
2157    max = MAX_SENDBUF_SIZE;
2158
2159    while (min <= max) {
2160        avg = ((unsigned int)(min + max)) / 2;
2161        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2162            last_good = avg;
2163            min = avg + 1;
2164        } else {
2165            max = avg - 1;
2166        }
2167    }
2168
2169    if (settings.verbose > 1)
2170        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2171}
2172
2173
2174static int server_socket(const int port, const bool is_udp) {
2175    int sfd;
2176    struct linger ling = {0, 0};
2177    struct sockaddr_in addr;
2178    int flags =1;
2179
2180    if ((sfd = new_socket(is_udp)) == -1) {
2181        return -1;
2182    }
2183
2184    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2185    if (is_udp) {
2186        maximize_sndbuf(sfd);
2187    } else {
2188        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2189        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2190        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2191    }
2192
2193    /*
2194     * the memset call clears nonstandard fields in some impementations
2195     * that otherwise mess things up.
2196     */
2197    memset(&addr, 0, sizeof(addr));
2198
2199    addr.sin_family = AF_INET;
2200    addr.sin_port = htons(port);
2201    addr.sin_addr = settings.interf;
2202    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2203        perror("bind()");
2204        close(sfd);
2205        return -1;
2206    }
2207    if (!is_udp && listen(sfd, 1024) == -1) {
2208        perror("listen()");
2209        close(sfd);
2210        return -1;
2211    }
2212    return sfd;
2213}
2214
2215static int new_socket_unix(void) {
2216    int sfd;
2217    int flags;
2218
2219    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2220        perror("socket()");
2221        return -1;
2222    }
2223
2224    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2225        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2226        perror("setting O_NONBLOCK");
2227        close(sfd);
2228        return -1;
2229    }
2230    return sfd;
2231}
2232
2233static int server_socket_unix(const char *path) {
2234    int sfd;
2235    struct linger ling = {0, 0};
2236    struct sockaddr_un addr;
2237    struct stat tstat;
2238    int flags =1;
2239
2240    if (!path) {
2241        return -1;
2242    }
2243
2244    if ((sfd = new_socket_unix()) == -1) {
2245        return -1;
2246    }
2247
2248    /*
2249     * Clean up a previous socket file if we left it around
2250     */
2251    if (lstat(path, &tstat) == 0) {
2252        if (S_ISSOCK(tstat.st_mode))
2253            unlink(path);
2254    }
2255
2256    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2257    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2258    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2259
2260    /*
2261     * the memset call clears nonstandard fields in some impementations
2262     * that otherwise mess things up.
2263     */
2264    memset(&addr, 0, sizeof(addr));
2265
2266    addr.sun_family = AF_UNIX;
2267    strcpy(addr.sun_path, path);
2268    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2269        perror("bind()");
2270        close(sfd);
2271        return -1;
2272    }
2273    if (listen(sfd, 1024) == -1) {
2274        perror("listen()");
2275        close(sfd);
2276        return -1;
2277    }
2278    return sfd;
2279}
2280
2281/* listening socket */
2282static int l_socket = 0;
2283
2284/* udp socket */
2285static int u_socket = -1;
2286
2287/* invoke right before gdb is called, on assert */
2288void pre_gdb(void) {
2289    int i;
2290    if (l_socket > -1) close(l_socket);
2291    if (u_socket > -1) close(u_socket);
2292    for (i = 3; i <= 500; i++) close(i); /* so lame */
2293    kill(getpid(), SIGABRT);
2294}
2295
2296/*
2297 * We keep the current time of day in a global variable that's updated by a
2298 * timer event. This saves us a bunch of time() system calls (we really only
2299 * need to get the time once a second, whereas there can be tens of thousands
2300 * of requests a second) and allows us to use server-start-relative timestamps
2301 * rather than absolute UNIX timestamps, a space savings on systems where
2302 * sizeof(time_t) > sizeof(unsigned int).
2303 */
2304volatile rel_time_t current_time;
2305static struct event clockevent;
2306
2307/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2308static void set_current_time(void) {
2309    current_time = (rel_time_t) (time(0) - stats.started);
2310}
2311
2312static void clock_handler(const int fd, const short which, void *arg) {
2313    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2314    static bool initialized = false;
2315
2316    if (initialized) {
2317        /* only delete the event if it's actually there. */
2318        evtimer_del(&clockevent);
2319    } else {
2320        initialized = true;
2321    }
2322
2323    evtimer_set(&clockevent, clock_handler, 0);
2324    event_base_set(main_base, &clockevent);
2325    evtimer_add(&clockevent, &t);
2326
2327    set_current_time();
2328}
2329
2330static struct event deleteevent;
2331
2332static void delete_handler(const int fd, const short which, void *arg) {
2333    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2334    static bool initialized = false;
2335
2336    if (initialized) {
2337        /* some versions of libevent don't like deleting events that don't exist,
2338           so only delete once we know this event has been added. */
2339        evtimer_del(&deleteevent);
2340    } else {
2341        initialized = true;
2342    }
2343
2344    evtimer_set(&deleteevent, delete_handler, 0);
2345    event_base_set(main_base, &deleteevent);
2346    evtimer_add(&deleteevent, &t);
2347    run_deferred_deletes();
2348}
2349
2350/* Call run_deferred_deletes instead of this. */
2351void do_run_deferred_deletes(void)
2352{
2353    int i, j = 0;
2354
2355    for (i = 0; i < delcurr; i++) {
2356        item *it = todelete[i];
2357        if (item_delete_lock_over(it)) {
2358            assert(it->refcount > 0);
2359            it->it_flags &= ~ITEM_DELETED;
2360            do_item_unlink(it);
2361            do_item_remove(it);
2362        } else {
2363            todelete[j++] = it;
2364        }
2365    }
2366    delcurr = j;
2367}
2368
2369static void usage(void) {
2370    printf(PACKAGE " " VERSION "\n");
2371    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2372           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2373           "-s <file>     unix socket path to listen on (disables network support)\n"
2374           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2375           "-d            run as a daemon\n"
2376           "-r            maximize core file limit\n"
2377           "-u <username> assume identity of <username> (only when run as root)\n"
2378           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2379           "-M            return error on memory exhausted (rather than removing items)\n"
2380           "-c <num>      max simultaneous connections, default is 1024\n"
2381           "-k            lock down all paged memory\n"
2382           "-v            verbose (print errors/warnings while in event loop)\n"
2383           "-vv           very verbose (also print client commands/reponses)\n"
2384           "-h            print this help and exit\n"
2385           "-i            print memcached and libevent license\n"
2386           "-b            run a managed instanced (mnemonic: buckets)\n"
2387           "-P <file>     save PID in <file>, only used with -d option\n"
2388           "-f <factor>   chunk size growth factor, default 1.25\n"
2389           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
2390#ifdef USE_THREADS
2391    printf("-t <num>      number of threads to use, default 4\n");
2392#endif
2393    return;
2394}
2395
2396static void usage_license(void) {
2397    printf(PACKAGE " " VERSION "\n\n");
2398    printf(
2399    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2400    "All rights reserved.\n"
2401    "\n"
2402    "Redistribution and use in source and binary forms, with or without\n"
2403    "modification, are permitted provided that the following conditions are\n"
2404    "met:\n"
2405    "\n"
2406    "    * Redistributions of source code must retain the above copyright\n"
2407    "notice, this list of conditions and the following disclaimer.\n"
2408    "\n"
2409    "    * Redistributions in binary form must reproduce the above\n"
2410    "copyright notice, this list of conditions and the following disclaimer\n"
2411    "in the documentation and/or other materials provided with the\n"
2412    "distribution.\n"
2413    "\n"
2414    "    * Neither the name of the Danga Interactive nor the names of its\n"
2415    "contributors may be used to endorse or promote products derived from\n"
2416    "this software without specific prior written permission.\n"
2417    "\n"
2418    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2419    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2420    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2421    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2422    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2423    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2424    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2425    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2426    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2427    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2428    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2429    "\n"
2430    "\n"
2431    "This product includes software developed by Niels Provos.\n"
2432    "\n"
2433    "[ libevent ]\n"
2434    "\n"
2435    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2436    "All rights reserved.\n"
2437    "\n"
2438    "Redistribution and use in source and binary forms, with or without\n"
2439    "modification, are permitted provided that the following conditions\n"
2440    "are met:\n"
2441    "1. Redistributions of source code must retain the above copyright\n"
2442    "   notice, this list of conditions and the following disclaimer.\n"
2443    "2. Redistributions in binary form must reproduce the above copyright\n"
2444    "   notice, this list of conditions and the following disclaimer in the\n"
2445    "   documentation and/or other materials provided with the distribution.\n"
2446    "3. All advertising materials mentioning features or use of this software\n"
2447    "   must display the following acknowledgement:\n"
2448    "      This product includes software developed by Niels Provos.\n"
2449    "4. The name of the author may not be used to endorse or promote products\n"
2450    "   derived from this software without specific prior written permission.\n"
2451    "\n"
2452    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2453    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2454    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2455    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2456    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2457    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2458    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2459    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2460    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2461    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2462    );
2463
2464    return;
2465}
2466
2467static void save_pid(const pid_t pid, const char *pid_file) {
2468    FILE *fp;
2469    if (pid_file == NULL)
2470        return;
2471
2472    if ((fp = fopen(pid_file, "w")) == NULL) {
2473        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2474        return;
2475    }
2476
2477    fprintf(fp,"%ld\n", (long)pid);
2478    if (fclose(fp) == -1) {
2479        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2480        return;
2481    }
2482}
2483
2484static void remove_pidfile(const char *pid_file) {
2485  if (pid_file == NULL)
2486      return;
2487
2488  if (unlink(pid_file) != 0) {
2489      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2490  }
2491
2492}
2493
2494
2495static void sig_handler(const int sig) {
2496    printf("SIGINT handled.\n");
2497    exit(EXIT_SUCCESS);
2498}
2499
2500int main (int argc, char **argv) {
2501    int c;
2502    struct in_addr addr;
2503    bool lock_memory = false;
2504    bool daemonize = false;
2505    int maxcore = 0;
2506    char *username = NULL;
2507    char *pid_file = NULL;
2508    struct passwd *pw;
2509    struct sigaction sa;
2510    struct rlimit rlim;
2511
2512    /* handle SIGINT */
2513    signal(SIGINT, sig_handler);
2514
2515    /* init settings */
2516    settings_init();
2517
2518    /* set stderr non-buffering (for running under, say, daemontools) */
2519    setbuf(stderr, NULL);
2520
2521    /* process arguments */
2522    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
2523        switch (c) {
2524        case 'U':
2525            settings.udpport = atoi(optarg);
2526            break;
2527        case 'b':
2528            settings.managed = true;
2529            break;
2530        case 'p':
2531            settings.port = atoi(optarg);
2532            break;
2533        case 's':
2534            settings.socketpath = optarg;
2535            break;
2536        case 'm':
2537            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2538            break;
2539        case 'M':
2540            settings.evict_to_free = 0;
2541            break;
2542        case 'c':
2543            settings.maxconns = atoi(optarg);
2544            break;
2545        case 'h':
2546            usage();
2547            exit(EXIT_SUCCESS);
2548        case 'i':
2549            usage_license();
2550            exit(EXIT_SUCCESS);
2551        case 'k':
2552            lock_memory = true;
2553            break;
2554        case 'v':
2555            settings.verbose++;
2556            break;
2557        case 'l':
2558            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
2559                fprintf(stderr, "Illegal address: %s\n", optarg);
2560                return 1;
2561            } else {
2562                settings.interf = addr;
2563            }
2564            break;
2565        case 'd':
2566            daemonize = true;
2567            break;
2568        case 'r':
2569            maxcore = 1;
2570            break;
2571        case 'u':
2572            username = optarg;
2573            break;
2574        case 'P':
2575            pid_file = optarg;
2576            break;
2577        case 'f':
2578            settings.factor = atof(optarg);
2579            if (settings.factor <= 1.0) {
2580                fprintf(stderr, "Factor must be greater than 1\n");
2581                return 1;
2582            }
2583            break;
2584        case 'n':
2585            settings.chunk_size = atoi(optarg);
2586            if (settings.chunk_size == 0) {
2587                fprintf(stderr, "Chunk size must be greater than 0\n");
2588                return 1;
2589            }
2590            break;
2591        case 't':
2592            settings.num_threads = atoi(optarg);
2593            if (settings.num_threads == 0) {
2594                fprintf(stderr, "Number of threads must be greater than 0\n");
2595                return 1;
2596            }
2597            break;
2598        case 'D':
2599            if (! optarg || ! optarg[0]) {
2600                fprintf(stderr, "No delimiter specified\n");
2601                return 1;
2602            }
2603            settings.prefix_delimiter = optarg[0];
2604            settings.detail_enabled = 1;
2605            break;
2606        default:
2607            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2608            return 1;
2609        }
2610    }
2611
2612    if (maxcore != 0) {
2613        struct rlimit rlim_new;
2614        /*
2615         * First try raising to infinity; if that fails, try bringing
2616         * the soft limit to the hard.
2617         */
2618        if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
2619            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
2620            if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
2621                /* failed. try raising just to the old max */
2622                rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
2623                (void)setrlimit(RLIMIT_CORE, &rlim_new);
2624            }
2625        }
2626        /*
2627         * getrlimit again to see what we ended up with. Only fail if
2628         * the soft limit ends up 0, because then no core files will be
2629         * created at all.
2630         */
2631
2632        if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
2633            fprintf(stderr, "failed to ensure corefile creation\n");
2634            exit(EXIT_FAILURE);
2635        }
2636    }
2637
2638    /*
2639     * If needed, increase rlimits to allow as many connections
2640     * as needed.
2641     */
2642
2643    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2644        fprintf(stderr, "failed to getrlimit number of files\n");
2645        exit(EXIT_FAILURE);
2646    } else {
2647        int maxfiles = settings.maxconns;
2648        if (rlim.rlim_cur < maxfiles)
2649            rlim.rlim_cur = maxfiles + 3;
2650        if (rlim.rlim_max < rlim.rlim_cur)
2651            rlim.rlim_max = rlim.rlim_cur;
2652        if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
2653            fprintf(stderr, "failed to set rlimit for open files. Try running as root or requesting smaller maxconns value.\n");
2654            exit(EXIT_FAILURE);
2655        }
2656    }
2657
2658    /*
2659     * initialization order: first create the listening sockets
2660     * (may need root on low ports), then drop root if needed,
2661     * then daemonise if needed, then init libevent (in some cases
2662     * descriptors created by libevent wouldn't survive forking).
2663     */
2664
2665    /* create the listening socket and bind it */
2666    if (settings.socketpath == NULL) {
2667        l_socket = server_socket(settings.port, 0);
2668        if (l_socket == -1) {
2669            fprintf(stderr, "failed to listen\n");
2670            exit(EXIT_FAILURE);
2671        }
2672    }
2673
2674    if (settings.udpport > 0 && settings.socketpath == NULL) {
2675        /* create the UDP listening socket and bind it */
2676        u_socket = server_socket(settings.udpport, 1);
2677        if (u_socket == -1) {
2678            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
2679            exit(EXIT_FAILURE);
2680        }
2681    }
2682
2683    /* lose root privileges if we have them */
2684    if (getuid() == 0 || geteuid() == 0) {
2685        if (username == 0 || *username == '\0') {
2686            fprintf(stderr, "can't run as root without the -u switch\n");
2687            return 1;
2688        }
2689        if ((pw = getpwnam(username)) == 0) {
2690            fprintf(stderr, "can't find the user %s to switch to\n", username);
2691            return 1;
2692        }
2693        if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
2694            fprintf(stderr, "failed to assume identity of user %s\n", username);
2695            return 1;
2696        }
2697    }
2698
2699    /* create unix mode sockets after dropping privileges */
2700    if (settings.socketpath != NULL) {
2701        l_socket = server_socket_unix(settings.socketpath);
2702        if (l_socket == -1) {
2703            fprintf(stderr, "failed to listen\n");
2704            exit(EXIT_FAILURE);
2705        }
2706    }
2707
2708    /* daemonize if requested */
2709    /* if we want to ensure our ability to dump core, don't chdir to / */
2710    if (daemonize) {
2711        int res;
2712        res = daemon(maxcore, settings.verbose);
2713        if (res == -1) {
2714            fprintf(stderr, "failed to daemon() in order to daemonize\n");
2715            return 1;
2716        }
2717    }
2718
2719
2720    /* initialize main thread libevent instance */
2721    main_base = event_init();
2722
2723    /* initialize other stuff */
2724    item_init();
2725    stats_init();
2726    assoc_init();
2727    conn_init();
2728    slabs_init(settings.maxbytes, settings.factor);
2729
2730    /* managed instance? alloc and zero a bucket array */
2731    if (settings.managed) {
2732        buckets = malloc(sizeof(int) * MAX_BUCKETS);
2733        if (buckets == 0) {
2734            fprintf(stderr, "failed to allocate the bucket array");
2735            exit(EXIT_FAILURE);
2736        }
2737        memset(buckets, 0, sizeof(int) * MAX_BUCKETS);
2738    }
2739
2740    /* lock paged memory if needed */
2741    if (lock_memory) {
2742#ifdef HAVE_MLOCKALL
2743        mlockall(MCL_CURRENT | MCL_FUTURE);
2744#else
2745        fprintf(stderr, "warning: mlockall() not supported on this platform.  proceeding without.\n");
2746#endif
2747    }
2748
2749    /*
2750     * ignore SIGPIPE signals; we can use errno==EPIPE if we
2751     * need that information
2752     */
2753    sa.sa_handler = SIG_IGN;
2754    sa.sa_flags = 0;
2755    if (sigemptyset(&sa.sa_mask) == -1 ||
2756        sigaction(SIGPIPE, &sa, 0) == -1) {
2757        perror("failed to ignore SIGPIPE; sigaction");
2758        exit(EXIT_FAILURE);
2759    }
2760    /* create the initial listening connection */
2761    if (!(listen_conn = conn_new(l_socket, conn_listening,
2762                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
2763        fprintf(stderr, "failed to create listening connection");
2764        exit(EXIT_FAILURE);
2765    }
2766    /* start up worker threads if MT mode */
2767    thread_init(settings.num_threads, main_base);
2768    /* save the PID in if we're a daemon, do this after thread_init due to
2769       a file descriptor handling bug somewhere in libevent */
2770    if (daemonize)
2771        save_pid(getpid(), pid_file);
2772    /* initialise clock event */
2773    clock_handler(0, 0, 0);
2774    /* initialise deletion array and timer event */
2775    deltotal = 200;
2776    delcurr = 0;
2777    if ((todelete = malloc(sizeof(item *) * deltotal)) == NULL) {
2778        perror("failed to allocate memory for deletion array");
2779        exit(EXIT_FAILURE);
2780    }
2781    delete_handler(0, 0, 0); /* sets up the event */
2782    /* create the initial listening udp connection, monitored on all threads */
2783    if (u_socket > -1) {
2784        for (c = 0; c < settings.num_threads; c++) {
2785            /* this is guaranteed to hit all threads because we round-robin */
2786            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
2787                              UDP_READ_BUFFER_SIZE, 1);
2788        }
2789    }
2790    /* enter the event loop */
2791    event_base_loop(main_base, 0);
2792    /* remove the PID file if we're a daemon */
2793    if (daemonize)
2794        remove_pidfile(pid_file);
2795    return 0;
2796}
Note: See TracBrowser for help on using the browser.