root/branches/binary/server/memcached.c @ 755

Revision 755, 112.2 kB (checked in by dsallings, 21 months ago)

Use the bin substate enum as well

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63/*
64 * forward declarations
65 */
66static void drive_machine(conn *c);
67static int new_socket(struct addrinfo *ai);
68static int server_socket(const int port, enum protocol prot);
69static int try_read_command(conn *c);
70static int try_read_network(conn *c);
71static int try_read_udp(conn *c);
72static void conn_set_state(conn *c, enum conn_states state);
73
74/* stats */
75static void stats_reset(void);
76static void stats_init(void);
77
78/* defaults */
79static void settings_init(void);
80
81/* event handling, network IO */
82static void event_handler(const int fd, const short which, void *arg);
83static void conn_close(conn *c);
84static void conn_init(void);
85static void accept_new_conns(const bool do_accept);
86static bool update_event(conn *c, const int new_flags);
87static void complete_nread(conn *c);
88static void process_command(conn *c, char *command);
89static int transmit(conn *c);
90static int ensure_iov_space(conn *c);
91static int add_iov(conn *c, const void *buf, int len);
92static int add_msghdr(conn *c);
93
94/* time handling */
95static void set_current_time(void);  /* update the global variable holding
96                              global 32-bit seconds-since-start time
97                              (to avoid 64 bit time_t) */
98
99static void conn_free(conn *c);
100
101/** exported globals **/
102struct stats stats;
103struct settings settings;
104
105/** file scope variables **/
106static item **todelete = NULL;
107static int delcurr;
108static int deltotal;
109static conn *listen_conn = NULL;
110static struct event_base *main_base;
111
112#define TRANSMIT_COMPLETE   0
113#define TRANSMIT_INCOMPLETE 1
114#define TRANSMIT_SOFT_ERROR 2
115#define TRANSMIT_HARD_ERROR 3
116
117static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119#define REALTIME_MAXDELTA 60*60*24*30
120/*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125static rel_time_t realtime(const time_t exptime) {
126    /* no. of seconds in 30 days - largest possible delta exptime */
127
128    if (exptime == 0) return 0; /* 0 means never expire */
129
130    if (exptime > REALTIME_MAXDELTA) {
131        /* if item expiration is at/before the server started, give it an
132           expiration time of 1 second after the server started.
133           (because 0 means don't expire).  without this, we'd
134           underflow and wrap around to some large value way in the
135           future, effectively making items expiring in the past
136           really expiring never */
137        if (exptime <= stats.started)
138            return (rel_time_t)1;
139        return (rel_time_t)(exptime - stats.started);
140    } else {
141        return (rel_time_t)(exptime + current_time);
142    }
143}
144
145static void stats_init(void) {
146    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150    /* make the time we started always be 2 seconds before we really
151       did, so time(0) - time.started is never zero.  if so, things
152       like 'settings.oldest_live' which act as booleans as well as
153       values are now false in boolean context... */
154    stats.started = time(0) - 2;
155    stats_prefix_init();
156}
157
158static void stats_reset(void) {
159    STATS_LOCK();
160    stats.total_items = stats.total_conns = 0;
161    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162    stats.bytes_read = stats.bytes_written = 0;
163    stats_prefix_clear();
164    STATS_UNLOCK();
165}
166
167static void settings_init(void) {
168    settings.access=0700;
169    settings.port = 11211;
170    settings.udpport = 0;
171    /* By default this string should be NULL for getaddrinfo() */
172    settings.inter = NULL;
173    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
174    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
175    settings.verbose = 0;
176    settings.oldest_live = 0;
177    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
178    settings.socketpath = NULL;       /* by default, not using a unix socket */
179    settings.managed = false;
180    settings.factor = 1.25;
181    settings.chunk_size = 48;         /* space for a modest key and value */
182#ifdef USE_THREADS
183    settings.num_threads = 4;
184#else
185    settings.num_threads = 1;
186#endif
187    settings.prefix_delimiter = ':';
188    settings.detail_enabled = 0;
189}
190
191/* returns true if a deleted item's delete-locked-time is over, and it
192   should be removed from the namespace */
193static bool item_delete_lock_over (item *it) {
194    assert(it->it_flags & ITEM_DELETED);
195    return (current_time >= it->exptime);
196}
197
198/*
199 * Adds a message header to a connection.
200 *
201 * Returns 0 on success, -1 on out-of-memory.
202 */
203static int add_msghdr(conn *c)
204{
205    struct msghdr *msg;
206
207    assert(c != NULL);
208
209    if (c->msgsize == c->msgused) {
210        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
211        if (! msg)
212            return -1;
213        c->msglist = msg;
214        c->msgsize *= 2;
215    }
216
217    msg = c->msglist + c->msgused;
218
219    /* this wipes msg_iovlen, msg_control, msg_controllen, and
220       msg_flags, the last 3 of which aren't defined on solaris: */
221    memset(msg, 0, sizeof(struct msghdr));
222
223    msg->msg_iov = &c->iov[c->iovused];
224
225    if (c->request_addr_size > 0) {
226        msg->msg_name = &c->request_addr;
227        msg->msg_namelen = c->request_addr_size;
228    }
229
230    c->msgbytes = 0;
231    c->msgused++;
232
233    if (IS_UDP(c->protocol)) {
234        /* Leave room for the UDP header, which we'll fill in later. */
235        return add_iov(c, NULL, UDP_HEADER_SIZE);
236    }
237
238    return 0;
239}
240
241
242/*
243 * Free list management for connections.
244 */
245
246static conn **freeconns;
247static int freetotal;
248static int freecurr;
249
250
251static void conn_init(void) {
252    freetotal = 200;
253    freecurr = 0;
254    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
255        fprintf(stderr, "malloc()\n");
256    }
257    return;
258}
259
260/*
261 * Returns a connection from the freelist, if any. Should call this using
262 * conn_from_freelist() for thread safety.
263 */
264conn *do_conn_from_freelist() {
265    conn *c;
266
267    if (freecurr > 0) {
268        c = freeconns[--freecurr];
269    } else {
270        c = NULL;
271    }
272
273    return c;
274}
275
276/*
277 * Adds a connection to the freelist. 0 = success. Should call this using
278 * conn_add_to_freelist() for thread safety.
279 */
280bool do_conn_add_to_freelist(conn *c) {
281    if (freecurr < freetotal) {
282        freeconns[freecurr++] = c;
283        return false;
284    } else {
285        /* try to enlarge free connections array */
286        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
287        if (new_freeconns) {
288            freetotal *= 2;
289            freeconns = new_freeconns;
290            freeconns[freecurr++] = c;
291            return false;
292        }
293    }
294    return true;
295}
296
297static char *prot_text(enum protocol prot) {
298    char *rv="unknown";
299    switch(prot) {
300        case ascii_prot:
301            rv="ascii";
302            break;
303        case binary_prot:
304            rv="binary";
305            break;
306        case ascii_udp_prot:
307            rv="ascii-udp";
308            break;
309        case negotiating_prot:
310            rv="auto-negotiate";
311            break;
312    }
313    return rv;
314}
315
316conn *conn_new(const int sfd, enum conn_states init_state,
317                const int event_flags,
318                const int read_buffer_size, enum protocol prot,
319                struct event_base *base) {
320    conn *c = conn_from_freelist();
321
322    if (NULL == c) {
323        if (!(c = (conn *)malloc(sizeof(conn)))) {
324            fprintf(stderr, "malloc()\n");
325            return NULL;
326        }
327        c->rbuf = c->wbuf = 0;
328        c->ilist = 0;
329        c->suffixlist = 0;
330        c->iov = 0;
331        c->msglist = 0;
332        c->hdrbuf = 0;
333
334        c->rsize = read_buffer_size;
335        c->wsize = DATA_BUFFER_SIZE;
336        c->isize = ITEM_LIST_INITIAL;
337        c->suffixsize = SUFFIX_LIST_INITIAL;
338        c->iovsize = IOV_LIST_INITIAL;
339        c->msgsize = MSG_LIST_INITIAL;
340        c->hdrsize = 0;
341
342        c->rbuf = (char *)malloc((size_t)c->rsize);
343        c->wbuf = (char *)malloc((size_t)c->wsize);
344        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
345        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
346        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
347        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
348
349        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
350                c->msglist == 0 || c->suffixlist == 0) {
351            if (c->rbuf != 0) free(c->rbuf);
352            if (c->wbuf != 0) free(c->wbuf);
353            if (c->ilist !=0) free(c->ilist);
354            if (c->suffixlist != 0) free(c->suffixlist);
355            if (c->iov != 0) free(c->iov);
356            if (c->msglist != 0) free(c->msglist);
357            free(c);
358            fprintf(stderr, "malloc()\n");
359            return NULL;
360        }
361
362        STATS_LOCK();
363        stats.conn_structs++;
364        STATS_UNLOCK();
365    }
366
367    /* unix socket mode doesn't need this, so zeroed out.  but why
368     * is this done for every command?  presumably for UDP
369     * mode.  */
370    if (!settings.socketpath) {
371        c->request_addr_size = sizeof(c->request_addr);
372    } else {
373        c->request_addr_size = 0;
374    }
375
376    if (settings.verbose > 1) {
377        if (init_state == conn_listening) {
378            fprintf(stderr, "<%d server listening (%s)\n", sfd,
379                prot_text(prot));
380        } else if (IS_UDP(prot)) {
381            fprintf(stderr, "<%d server listening (udp)\n", sfd);
382        } else if (prot == binary_prot) {
383            fprintf(stderr, "<%d new binary client connection\n", sfd);
384        } else if (prot == ascii_prot) {
385            fprintf(stderr, "<%d new ascii client connection\n", sfd);
386        } else if (prot == negotiating_prot) {
387            fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd);
388        } else {
389            fprintf(stderr, "<%d new unknown (%d) client connection\n",
390                sfd, prot);
391            abort();
392        }
393    }
394
395    c->sfd = sfd;
396    c->protocol = prot;
397    c->state = init_state;
398    c->rlbytes = 0;
399    c->cmd = -1;
400    c->rbytes = c->wbytes = 0;
401    c->wcurr = c->wbuf;
402    c->rcurr = c->rbuf;
403    c->ritem = 0;
404    c->icurr = c->ilist;
405    c->suffixcurr = c->suffixlist;
406    c->ileft = 0;
407    c->suffixleft = 0;
408    c->iovused = 0;
409    c->msgcurr = 0;
410    c->msgused = 0;
411
412    c->write_and_go = init_state;
413    c->write_and_free = 0;
414    c->item = 0;
415    c->bucket = -1;
416    c->gen = 0;
417
418    c->noreply = false;
419
420    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
421    event_base_set(base, &c->event);
422    c->ev_flags = event_flags;
423
424    if (event_add(&c->event, 0) == -1) {
425        if (conn_add_to_freelist(c)) {
426            conn_free(c);
427        }
428        perror("event_add");
429        return NULL;
430    }
431
432    STATS_LOCK();
433    stats.curr_conns++;
434    stats.total_conns++;
435    STATS_UNLOCK();
436
437    return c;
438}
439
440static void conn_cleanup(conn *c) {
441    assert(c != NULL);
442
443    if (c->item) {
444        item_remove(c->item);
445        c->item = 0;
446    }
447
448    if (c->ileft != 0) {
449        for (; c->ileft > 0; c->ileft--,c->icurr++) {
450            item_remove(*(c->icurr));
451        }
452    }
453
454    if (c->suffixleft != 0) {
455        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
456            if(suffix_add_to_freelist(*(c->suffixcurr))) {
457                free(*(c->suffixcurr));
458            }
459        }
460    }
461
462    if (c->write_and_free) {
463        free(c->write_and_free);
464        c->write_and_free = 0;
465    }
466}
467
468/*
469 * Frees a connection.
470 */
471void conn_free(conn *c) {
472    if (c) {
473        if (c->hdrbuf)
474            free(c->hdrbuf);
475        if (c->msglist)
476            free(c->msglist);
477        if (c->rbuf)
478            free(c->rbuf);
479        if (c->wbuf)
480            free(c->wbuf);
481        if (c->ilist)
482            free(c->ilist);
483        if (c->suffixlist)
484            free(c->suffixlist);
485        if (c->iov)
486            free(c->iov);
487        free(c);
488    }
489}
490
491static void conn_close(conn *c) {
492    assert(c != NULL);
493
494    /* delete the event, the socket and the conn */
495    event_del(&c->event);
496
497    if (settings.verbose > 1)
498        fprintf(stderr, "<%d connection closed.\n", c->sfd);
499
500    close(c->sfd);
501    accept_new_conns(true);
502    conn_cleanup(c);
503
504    /* if the connection has big buffers, just free it */
505    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
506        conn_free(c);
507    }
508
509    STATS_LOCK();
510    stats.curr_conns--;
511    STATS_UNLOCK();
512
513    return;
514}
515
516static enum conn_states get_init_state(conn *c) {
517    int rv=0;
518    assert(c != NULL);
519
520    switch(c->protocol) {
521        case binary_prot:
522            rv=conn_bin_init;
523            break;
524        case negotiating_prot:
525            rv=conn_negotiate;
526            break;
527        default:
528            rv=conn_read;
529    }
530    return rv;
531}
532
533/* Set the given connection to its initial state.  The initial state will vary
534 * base don protocol type. */
535static void conn_set_init_state(conn *c) {
536    assert(c != NULL);
537
538    conn_set_state(c, get_init_state(c));
539}
540
541/*
542 * Shrinks a connection's buffers if they're too big.  This prevents
543 * periodic large "get" requests from permanently chewing lots of server
544 * memory.
545 *
546 * This should only be called in between requests since it can wipe output
547 * buffers!
548 */
549static void conn_shrink(conn *c) {
550    assert(c != NULL);
551
552    if (IS_UDP(c->protocol))
553        return;
554
555    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
556        char *newbuf;
557
558        if (c->rcurr != c->rbuf)
559            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
560
561        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
562
563        if (newbuf) {
564            c->rbuf = newbuf;
565            c->rsize = DATA_BUFFER_SIZE;
566        }
567        /* TODO check other branch... */
568        c->rcurr = c->rbuf;
569    }
570
571    if (c->isize > ITEM_LIST_HIGHWAT) {
572        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
573        if (newbuf) {
574            c->ilist = newbuf;
575            c->isize = ITEM_LIST_INITIAL;
576        }
577    /* TODO check error condition? */
578    }
579
580    if (c->msgsize > MSG_LIST_HIGHWAT) {
581        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
582        if (newbuf) {
583            c->msglist = newbuf;
584            c->msgsize = MSG_LIST_INITIAL;
585        }
586    /* TODO check error condition? */
587    }
588
589    if (c->iovsize > IOV_LIST_HIGHWAT) {
590        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
591        if (newbuf) {
592            c->iov = newbuf;
593            c->iovsize = IOV_LIST_INITIAL;
594        }
595    /* TODO check return value */
596    }
597}
598
599/*
600 * Sets a connection's current state in the state machine. Any special
601 * processing that needs to happen on certain state transitions can
602 * happen here.
603 */
604static void conn_set_state(conn *c, enum conn_states state) {
605    assert(c != NULL);
606
607    if (state != c->state) {
608        if (state == conn_read) {
609            conn_shrink(c);
610            assoc_move_next_bucket();
611        }
612        c->state = state;
613    }
614}
615
616/*
617 * Free list management for suffix buffers.
618 */
619
620static char **freesuffix;
621static int freesuffixtotal;
622static int freesuffixcurr;
623
624static void suffix_init(void) {
625    freesuffixtotal = 500;
626    freesuffixcurr  = 0;
627
628    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
629    if (freesuffix == NULL) {
630        fprintf(stderr, "malloc()\n");
631    }
632    return;
633}
634
635/*
636 * Returns a suffix buffer from the freelist, if any. Should call this using
637 * suffix_from_freelist() for thread safety.
638 */
639char *do_suffix_from_freelist() {
640    char *s;
641
642    if (freesuffixcurr > 0) {
643        s = freesuffix[--freesuffixcurr];
644    } else {
645        /* If malloc fails, let the logic fall through without spamming
646         * STDERR on the server. */
647        s = malloc( SUFFIX_SIZE );
648    }
649
650    return s;
651}
652
653/*
654 * Adds a connection to the freelist. 0 = success. Should call this using
655 * conn_add_to_freelist() for thread safety.
656 */
657bool do_suffix_add_to_freelist(char *s) {
658    if (freesuffixcurr < freesuffixtotal) {
659        freesuffix[freesuffixcurr++] = s;
660        return false;
661    } else {
662        /* try to enlarge free connections array */
663        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
664        if (new_freesuffix) {
665            freesuffixtotal *= 2;
666            freesuffix = new_freesuffix;
667            freesuffix[freesuffixcurr++] = s;
668            return false;
669        }
670    }
671    return true;
672}
673
674/*
675 * Ensures that there is room for another struct iovec in a connection's
676 * iov list.
677 *
678 * Returns 0 on success, -1 on out-of-memory.
679 */
680static int ensure_iov_space(conn *c) {
681    assert(c != NULL);
682
683    if (c->iovused >= c->iovsize) {
684        int i, iovnum;
685        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
686                                (c->iovsize * 2) * sizeof(struct iovec));
687        if (! new_iov)
688            return -1;
689        c->iov = new_iov;
690        c->iovsize *= 2;
691
692        /* Point all the msghdr structures at the new list. */
693        for (i = 0, iovnum = 0; i < c->msgused; i++) {
694            c->msglist[i].msg_iov = &c->iov[iovnum];
695            iovnum += c->msglist[i].msg_iovlen;
696        }
697    }
698
699    return 0;
700}
701
702
703/*
704 * Adds data to the list of pending data that will be written out to a
705 * connection.
706 *
707 * Returns 0 on success, -1 on out-of-memory.
708 */
709
710static int add_iov(conn *c, const void *buf, int len) {
711    struct msghdr *m;
712    int leftover;
713    bool limit_to_mtu;
714
715    assert(c != NULL);
716
717    do {
718        m = &c->msglist[c->msgused - 1];
719
720        /*
721         * Limit UDP packets, and the first payloads of TCP replies, to
722         * UDP_MAX_PAYLOAD_SIZE bytes.
723         */
724        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
725
726        /* We may need to start a new msghdr if this one is full. */
727        if (m->msg_iovlen == IOV_MAX ||
728            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
729            add_msghdr(c);
730            m = &c->msglist[c->msgused - 1];
731        }
732
733        if (ensure_iov_space(c) != 0)
734            return -1;
735
736        /* If the fragment is too big to fit in the datagram, split it up */
737        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
738            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
739            len -= leftover;
740        } else {
741            leftover = 0;
742        }
743
744        m = &c->msglist[c->msgused - 1];
745        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
746        m->msg_iov[m->msg_iovlen].iov_len = len;
747
748        c->msgbytes += len;
749        c->iovused++;
750        m->msg_iovlen++;
751
752        buf = ((char *)buf) + len;
753        len = leftover;
754    } while (leftover > 0);
755
756    return 0;
757}
758
759
760/*
761 * Constructs a set of UDP headers and attaches them to the outgoing messages.
762 */
763static int build_udp_headers(conn *c) {
764    int i;
765    unsigned char *hdr;
766
767    assert(c != NULL);
768
769    if (c->msgused > c->hdrsize) {
770        void *new_hdrbuf;
771        if (c->hdrbuf)
772            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
773        else
774            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
775        if (! new_hdrbuf)
776            return -1;
777        c->hdrbuf = (unsigned char *)new_hdrbuf;
778        c->hdrsize = c->msgused * 2;
779    }
780
781    hdr = c->hdrbuf;
782    for (i = 0; i < c->msgused; i++) {
783        c->msglist[i].msg_iov[0].iov_base = hdr;
784        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
785        *hdr++ = c->request_id / 256;
786        *hdr++ = c->request_id % 256;
787        *hdr++ = i / 256;
788        *hdr++ = i % 256;
789        *hdr++ = c->msgused / 256;
790        *hdr++ = c->msgused % 256;
791        *hdr++ = 0;
792        *hdr++ = 0;
793        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
794    }
795
796    return 0;
797}
798
799
800static void out_string(conn *c, const char *str) {
801    size_t len;
802
803    assert(c != NULL);
804
805    if (c->noreply) {
806        if (settings.verbose > 1)
807            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
808        c->noreply = false;
809        conn_set_state(c, conn_read);
810        return;
811    }
812
813    if (settings.verbose > 1)
814        fprintf(stderr, ">%d %s\n", c->sfd, str);
815
816    len = strlen(str);
817    if ((len + 2) > c->wsize) {
818        /* ought to be always enough. just fail for simplicity */
819        str = "SERVER_ERROR output line too long";
820        len = strlen(str);
821    }
822
823    memcpy(c->wbuf, str, len);
824    memcpy(c->wbuf + len, "\r\n", 3);
825    c->wbytes = len + 2;
826    c->wcurr = c->wbuf;
827
828    conn_set_state(c, conn_write);
829    c->write_and_go = get_init_state(c);
830    return;
831}
832
833/*
834 * we get here after reading the value in set/add/replace commands. The command
835 * has been stored in c->item_comm, and the item is ready in c->item.
836 */
837static void complete_nread_ascii(conn *c) {
838    assert(c != NULL);
839
840    item *it = c->item;
841    int comm = c->item_comm;
842    int ret;
843
844    STATS_LOCK();
845    stats.set_cmds++;
846    STATS_UNLOCK();
847
848    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
849        out_string(c, "CLIENT_ERROR bad data chunk");
850    } else {
851      ret = store_item(it, comm);
852      if (ret == 1)
853          out_string(c, "STORED");
854      else if(ret == 2)
855          out_string(c, "EXISTS");
856      else if(ret == 3)
857          out_string(c, "NOT_FOUND");
858      else
859          out_string(c, "NOT_STORED");
860    }
861
862    item_remove(c->item);       /* release the c->item reference */
863    c->item = 0;
864}
865
866static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
867    int i=0;
868    uint32_t res_header[BIN_PKT_HDR_WORDS];
869
870    assert(c);
871    assert(body_len >= 0);
872
873    c->msgcurr = 0;
874    c->msgused = 0;
875    c->iovused = 0;
876    if (add_msghdr(c) != 0) {
877        /* XXX:  out_string is inappropriate here */
878        out_string(c, "SERVER_ERROR out of memory");
879        return;
880    }
881
882    res_header[0] = BIN_RES_MAGIC << 24;
883    res_header[0] |= ((0xff & c->cmd) << 16);
884    res_header[0] |= err & 0xffff;
885
886    res_header[1] = hdr_len << 24;
887    /* TODO:  Support datatype */
888    res_header[2] = body_len;
889    res_header[3] = c->opaque;
890
891    if(settings.verbose > 1) {
892        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
893            res_header[0], res_header[1], res_header[2], res_header[3]);
894    }
895
896    for(i=0; i<BIN_PKT_HDR_WORDS; i++) {
897        res_header[i] = htonl(res_header[i]);
898    }
899
900    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
901    memcpy(c->wbuf, &res_header, MIN_BIN_PKT_LENGTH);
902    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
903}
904
905static void write_bin_error(conn *c, int err, int swallow) {
906    char *errstr="Unknown error";
907    switch(err) {
908        case ERR_UNKNOWN_CMD:
909            errstr="Unknown command";
910            break;
911        case ERR_NOT_FOUND:
912            errstr="Not found";
913            break;
914        case ERR_INVALID_ARGUMENTS:
915            errstr="Invalid arguments";
916            break;
917        case ERR_EXISTS:
918            errstr="Data exists for key.";
919            break;
920        case ERR_TOO_LARGE:
921            errstr="Too large.";
922            break;
923        case ERR_NOT_STORED:
924            errstr="Not stored.";
925            break;
926        default:
927            errstr="UNHANDLED ERROR";
928            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
929    }
930    if(settings.verbose > 0) {
931        fprintf(stderr, "Writing an error:  %s\n", errstr);
932    }
933    add_bin_header(c, err, 0, strlen(errstr));
934    add_iov(c, errstr, strlen(errstr));
935
936    conn_set_state(c, conn_mwrite);
937    if(swallow > 0) {
938        c->sbytes=swallow;
939        c->write_and_go = conn_swallow;
940    } else {
941        c->write_and_go = conn_bin_init;
942    }
943}
944
945/* Form and send a response to a command over the binary protocol */
946static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
947    add_bin_header(c, 0, hlen, dlen);
948    if(dlen > 0) {
949        add_iov(c, d, dlen);
950    }
951    conn_set_state(c, conn_mwrite);
952    c->write_and_go = conn_bin_init;
953}
954
955/* Byte swap a 64-bit number */
956static int64_t swap64(int64_t in) {
957#ifdef ENDIAN_LITTLE
958    /* Little endian, flip the bytes around until someone makes a faster/better
959    * way to do this. */
960    int64_t rv=0;
961    int i=0;
962     for(i=0; i<8; i++) {
963        rv = (rv << 8) | (in & 0xff);
964        in >>= 8;
965     }
966    return rv;
967#else
968    /* big-endian machines don't need byte swapping */
969    return in;
970#endif
971}
972
973static void complete_incr_bin(conn *c) {
974    item *it;
975    int64_t delta;
976    uint64_t initial;
977    int32_t exptime;
978    char *key;
979    size_t nkey;
980    int i;
981    uint64_t *responseBuf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
982
983    assert(c != NULL);
984
985    key=c->rbuf + BIN_INCR_HDR_LEN;
986    nkey=c->keylen;
987    key[nkey]=0x00;
988
989    delta = swap64(*((int64_t*)(c->rbuf)));
990    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
991    exptime = ntohl(*((int*)(c->rbuf + 16)));
992
993    if(settings.verbose) {
994        fprintf(stderr, "incr ");
995        for(i=0; i<nkey; i++) {
996            fprintf(stderr, "%c", key[i]);
997        }
998        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
999    }
1000
1001    /* XXX:  Not sure what to do with these yet
1002    if (settings.managed) {
1003        int bucket = c->bucket;
1004        if (bucket == -1) {
1005            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1006            return;
1007        }
1008        c->bucket = -1;
1009        if (buckets[bucket] != c->gen) {
1010            out_string(c, "ERROR_NOT_OWNER");
1011            return;
1012        }
1013    }
1014    */
1015
1016    it = item_get(key, nkey);
1017    if (it) {
1018        /* Weird magic in add_delta forces me to pad here */
1019        char tmpbuf[INCR_MAX_STORAGE_LEN];
1020        uint64_t l=0;
1021        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1022        tmpbuf[INCR_MAX_STORAGE_LEN]=0x00;
1023        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1024        *responseBuf=swap64(strtoull(tmpbuf, NULL, 10));
1025
1026        write_bin_response(c, responseBuf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1027        item_remove(it);         /* release our reference */
1028    } else {
1029        if(exptime >= 0) {
1030            /* Save some room for the response */
1031            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1032            *responseBuf=swap64(initial);
1033            it = item_alloc(key, nkey, 0, realtime(exptime),
1034                INCR_MAX_STORAGE_LEN);
1035            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1036
1037            if(store_item(it, NREAD_SET)) {
1038                write_bin_response(c, responseBuf, BIN_INCR_HDR_LEN,
1039                    INCR_RES_LEN);
1040            } else {
1041                write_bin_error(c, ERR_NOT_STORED, 0);
1042            }
1043            item_remove(it);         /* release our reference */
1044        } else {
1045            write_bin_error(c, ERR_NOT_FOUND, 0);
1046        }
1047    }
1048}
1049
1050static void complete_update_bin(conn *c) {
1051    int eno=-1, ret=0;
1052    assert(c != NULL);
1053
1054    item *it = c->item;
1055
1056    STATS_LOCK();
1057    stats.set_cmds++;
1058    STATS_UNLOCK();
1059
1060    /* We don't actually receive the trailing two characters in the bin
1061     * protocol, so we're going to just set them here */
1062    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1063    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1064
1065    switch (store_item(it, c->item_comm)) {
1066        case 1:
1067            /* Stored */
1068            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1069            break;
1070        case 2:
1071            write_bin_error(c, ERR_EXISTS, 0);
1072            break;
1073        case 3:
1074            write_bin_error(c, ERR_NOT_FOUND, 0);
1075            break;
1076        default:
1077            if(c->item_comm == NREAD_ADD) {
1078                eno=ERR_EXISTS;
1079            } else if(c->item_comm == NREAD_REPLACE) {
1080                eno=ERR_NOT_FOUND;
1081            } else {
1082                eno=ERR_NOT_STORED;
1083            }
1084            write_bin_error(c, eno, 0);
1085    }
1086
1087    item_remove(c->item);       /* release the c->item reference */
1088    c->item = 0;
1089}
1090
1091static void process_bin_get(conn *c) {
1092    item *it;
1093
1094    it = item_get(c->rbuf, c->keylen);
1095    if (it) {
1096        int *flags;
1097        uint64_t* identifier;
1098
1099        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1100
1101        /* This is a bit of magic.  I'm using wbuf as the header, so I'll place
1102        this is int in far enough to cover the header */
1103        flags=(int*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1104        *flags=htonl(strtoul(ITEM_suffix(it), NULL, 10));
1105
1106        /* the length has two unnecessary bytes, and then we write four more */
1107        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1108        /* Flags */
1109        add_iov(c, flags, 4);
1110        identifier=(uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH + 4);
1111        *identifier=swap64((uint32_t)it->cas_id);
1112        add_iov(c, identifier, 8);
1113        /* bytes minus the CRLF */
1114        add_iov(c, ITEM_data(it), it->nbytes - 2);
1115        conn_set_state(c, conn_mwrite);
1116    } else {
1117        if(c->cmd == CMD_GETQ) {
1118            conn_set_state(c, conn_bin_init);
1119        } else {
1120            write_bin_error(c, ERR_NOT_FOUND, 0);
1121        }
1122    }
1123}
1124
1125static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1126    assert(c);
1127    c->substate = next_substate;
1128    c->rlbytes = c->keylen + extra;
1129    assert(c->rsize >= c->rlbytes);
1130    c->ritem = c->rbuf;
1131    conn_set_state(c, conn_nread);
1132}
1133
1134static void dispatch_bin_command(conn *c) {
1135    time_t exptime = 0;
1136    switch(c->cmd) {
1137        case CMD_VERSION:
1138            write_bin_response(c, VERSION, 0, strlen(VERSION));
1139            break;
1140        case CMD_FLUSH:
1141            set_current_time();
1142
1143            settings.oldest_live = current_time - 1;
1144            item_flush_expired();
1145            write_bin_response(c, NULL, 0, 0);
1146            break;
1147        case CMD_NOOP:
1148            write_bin_response(c, NULL, 0, 0);
1149            break;
1150        case CMD_SET:
1151            /* Fallthrough */
1152        case CMD_ADD:
1153            /* Fallthrough */
1154        case CMD_REPLACE:
1155            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1156            break;
1157        case CMD_GETQ:
1158        case CMD_GET:
1159            bin_read_key(c, bin_reading_get_key, 0);
1160            break;
1161        case CMD_DELETE:
1162            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1163            break;
1164        case CMD_INCR:
1165        case CMD_DECR:
1166            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1167            break;
1168        default:
1169            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1170    }
1171}
1172
1173static void process_bin_update(conn *c) {
1174    char *key;
1175    int nkey;
1176    int vlen;
1177    int flags;
1178    int exptime;
1179    item *it;
1180    int comm;
1181    int hdrlen=BIN_SET_HDR_LEN;
1182
1183    assert(c != NULL);
1184
1185    key=c->rbuf + hdrlen;
1186    nkey=c->keylen;
1187    key[nkey]=0x00;
1188
1189    flags = ntohl(*((int*)(c->rbuf)));
1190    exptime = ntohl(*((int*)(c->rbuf + 4)));
1191    vlen = c->bin_header[2] - (nkey + hdrlen);
1192
1193    if(settings.verbose > 1) {
1194        fprintf(stderr, "Value len is %d\n", vlen);
1195    }
1196
1197    if (settings.detail_enabled) {
1198        stats_prefix_record_set(key);
1199    }
1200
1201    /* Not sure what to do with this.
1202    if (settings.managed) {
1203        int bucket = c->bucket;
1204        if (bucket == -1) {
1205            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1206            return;
1207        }
1208        c->bucket = -1;
1209        if (buckets[bucket] != c->gen) {
1210            out_string(c, "ERROR_NOT_OWNER");
1211            return;
1212        }
1213    }
1214    */
1215
1216    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1217
1218    if (it == 0) {
1219        if (! item_size_ok(nkey, flags, vlen + 2)) {
1220            write_bin_error(c, ERR_TOO_LARGE, vlen);
1221        } else {
1222            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1223        }
1224        /* swallow the data line */
1225        c->write_and_go = conn_swallow;
1226        return;
1227    }
1228
1229    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
1230
1231    switch(c->cmd) {
1232        case CMD_ADD:
1233            c->item_comm = NREAD_ADD;
1234            break;
1235        case CMD_SET:
1236            c->item_comm = NREAD_SET;
1237            break;
1238        case CMD_REPLACE:
1239            c->item_comm = NREAD_REPLACE;
1240            break;
1241        default:
1242            assert(0);
1243    }
1244
1245    if(it->cas_id != 0) {
1246        c->item_comm = NREAD_CAS;
1247    }
1248
1249    c->item = it;
1250    c->ritem = ITEM_data(it);
1251    c->rlbytes = vlen;
1252    conn_set_state(c, conn_nread);
1253    c->substate = bin_read_set_value;
1254}
1255
1256static void process_bin_delete(conn *c) {
1257    char *key;
1258    size_t nkey;
1259    item *it;
1260    time_t exptime = 0;
1261
1262    assert(c != NULL);
1263
1264    /* XXX:  I don't know what to do with this yet
1265    if (settings.managed) {
1266        int bucket = c->bucket;
1267        if (bucket == -1) {
1268            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1269            return;
1270        }
1271        c->bucket = -1;
1272        if (buckets[bucket] != c->gen) {
1273            out_string(c, "ERROR_NOT_OWNER");
1274            return;
1275        }
1276    }
1277    */
1278
1279    exptime = ntohl(*((int*)(c->rbuf)));
1280    key = c->rbuf + 4;
1281    nkey = c->keylen;
1282    key[nkey]=0x00;
1283
1284    if(settings.verbose) {
1285        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1286    }
1287
1288    if (settings.detail_enabled) {
1289        stats_prefix_record_delete(key);
1290    }
1291
1292    it = item_get(key, nkey);
1293    if (it) {
1294        if (exptime == 0) {
1295            item_unlink(it);
1296            item_remove(it);      /* release our reference */
1297            write_bin_response(c, NULL, 0, 0);
1298        } else {
1299            /* XXX:  This is really lame, but defer_delete returns a string */
1300            char *res=defer_delete(it, exptime);
1301            if(res[0] == 'D') {
1302                write_bin_response(c, NULL, 0, 0);
1303            } else {
1304                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1305            }
1306        }
1307    } else {
1308        write_bin_error(c, ERR_NOT_FOUND, 0);
1309    }
1310}
1311
1312static void complete_nread_binary(conn *c) {
1313    assert(c != NULL);
1314
1315    if(c->cmd < 0) {
1316        /* No command defined.  Figure out what they're trying to say. */
1317        int i=0;
1318        /* I did a bit of hard-coding around the packet sizes */
1319        assert(BIN_PKT_HDR_WORDS == 3);
1320        for(i=0; i<BIN_PKT_HDR_WORDS; i++) {
1321            c->bin_header[i] = ntohl(c->bin_header[i]);
1322        }
1323        if(settings.verbose) {
1324            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x %08x\n",
1325                c->bin_header[0], c->bin_header[1], c->bin_header[2],
1326                c->bin_header[3]);
1327        }
1328        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
1329            if(settings.verbose) {
1330                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24);
1331            }
1332            conn_set_state(c, conn_closing);
1333            return;
1334        }
1335   
1336        c->msgcurr = 0;
1337        c->msgused = 0;
1338        c->iovused = 0;
1339        if (add_msghdr(c) != 0) {
1340            out_string(c, "SERVER_ERROR out of memory");
1341            return;
1342        }
1343   
1344        c->cmd = (c->bin_header[0] >> 16) & 0xff;
1345        c->keylen = c->bin_header[0] & 0xffff;
1346        c->opaque = c->bin_header[3];
1347        if(settings.verbose > 1) {
1348            fprintf(stderr,
1349                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd,
1350                c->opaque, c->keylen, c->bin_header[2]);
1351        }
1352        dispatch_bin_command(c);
1353    } else {
1354        switch(c->substate) {
1355            case bin_reading_set_header:
1356                process_bin_update(c);
1357                break;
1358            case bin_read_set_value:
1359                complete_update_bin(c);
1360                break;
1361            case bin_reading_get_key:
1362                process_bin_get(c);
1363                break;
1364            case bin_reading_del_header:
1365                process_bin_delete(c);
1366                break;
1367            case bin_reading_incr_header:
1368                complete_incr_bin(c);
1369                break;
1370            default:
1371                fprintf(stderr, "Not handling substate %d\n", c->substate);
1372                assert(0);
1373        }
1374    }
1375}
1376
1377static void reinit_bin_connection(conn *c) {
1378    if (settings.verbose > 1)
1379        fprintf(stderr, "*** Reinitializing binary connection.\n");
1380    c->rlbytes = MIN_BIN_PKT_LENGTH;
1381    c->write_and_go = conn_bin_init;
1382    c->cmd = -1;
1383    c->substate = bin_no_state;
1384    c->rbytes = c->wbytes = 0;
1385    c->ritem = (char*)c->bin_header;
1386    c->rcurr = c->rbuf;
1387    c->wcurr = c->wbuf;
1388    conn_shrink(c);
1389    conn_set_state(c, conn_nread);
1390}
1391
1392/* These do the initial protocol switch.  At this point, we should've read
1393 * exactly one byte, and must treat that byte as the beginning of a command. */
1394static void setup_bin_protocol(conn *c) {
1395    char *loc = (char*)c->bin_header;
1396    if (settings.verbose > 1)
1397        fprintf(stderr, "Negotiated protocol as binary.\n");
1398
1399    c->protocol = binary_prot;
1400    reinit_bin_connection(c);
1401    /* Emulate a read of the first byte */
1402    c->ritem[0] = c->rbuf[0];
1403    c->ritem++;
1404    c->rlbytes--;
1405}
1406
1407static void setup_ascii_protocol(conn *c) {
1408    if (settings.verbose > 1)
1409        fprintf(stderr, "Negotiated protocol as ascii.\n");
1410    c->protocol = ascii_prot;
1411
1412    /* We've already got the first letter of the command, so pretend like we
1413     * Did a single byte read from try_read_command */
1414    c->rcurr=c->rbuf;
1415    c->rbytes=1;
1416    conn_set_state(c, conn_read);
1417}
1418
1419static void complete_nread(conn *c) {
1420    assert(c != NULL);
1421
1422    if(c->protocol == ascii_prot) {
1423        complete_nread_ascii(c);
1424    } else if(c->protocol == binary_prot) {
1425        complete_nread_binary(c);
1426    } else if(c->protocol == negotiating_prot) {
1427        /* The first byte is either BIN_REQ_MAGIC, or we're speaking ascii */
1428        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC)
1429            setup_bin_protocol(c);
1430        else
1431            setup_ascii_protocol(c);
1432    } else {
1433        assert(0); /* XXX:  Invalid case.  Should probably do more here. */
1434    }
1435}
1436
1437/*
1438 * Stores an item in the cache according to the semantics of one of the set
1439 * commands. In threaded mode, this is protected by the cache lock.
1440 *
1441 * Returns true if the item was stored.
1442 */
1443int do_store_item(item *it, int comm) {
1444    char *key = ITEM_key(it);
1445    bool delete_locked = false;
1446    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1447    int stored = 0;
1448
1449    item *new_it = NULL;
1450    int flags;
1451
1452    if (old_it != NULL && comm == NREAD_ADD) {
1453        /* add only adds a nonexistent item, but promote to head of LRU */
1454        do_item_update(old_it);
1455    } else if (!old_it && (comm == NREAD_REPLACE
1456        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1457    {
1458        /* replace only replaces an existing value; don't store */
1459    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1460        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1461    {
1462        /* replace and add can't override delete locks; don't store */
1463    } else if (comm == NREAD_CAS) {
1464        /* validate cas operation */
1465        if (delete_locked)
1466            old_it = do_item_get_nocheck(key, it->nkey);
1467
1468        if(old_it == NULL) {
1469          // LRU expired
1470          stored = 3;
1471        }
1472        else if(it->cas_id == old_it->cas_id) {
1473          // cas validates
1474          do_item_replace(old_it, it);
1475          stored = 1;
1476        } else {
1477          if(settings.verbose > 1) {
1478            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1479                old_it->cas_id, it->cas_id);
1480          }
1481          stored = 2;
1482        }
1483    } else {
1484        /*
1485         * Append - combine new and old record into single one. Here it's
1486         * atomic and thread-safe.
1487         */
1488
1489        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1490
1491            /* we have it and old_it here - alloc memory to hold both */
1492            /* flags was already lost - so recover them from ITEM_suffix(it) */
1493
1494            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1495
1496            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1497
1498            if (new_it == NULL) {
1499                /* SERVER_ERROR out of memory */
1500                return 0;
1501            }
1502
1503            /* copy data from it and old_it to new_it */
1504
1505            if (comm == NREAD_APPEND) {
1506                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1507                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1508            } else {
1509                /* NREAD_PREPEND */
1510                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1511                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1512            }
1513
1514            it = new_it;
1515        }
1516
1517        /* "set" commands can override the delete lock
1518           window... in which case we have to find the old hidden item
1519           that's in the namespace/LRU but wasn't returned by
1520           item_get.... because we need to replace it */
1521        if (delete_locked)
1522            old_it = do_item_get_nocheck(key, it->nkey);
1523
1524        if (old_it != NULL)
1525            do_item_replace(old_it, it);
1526        else
1527            do_item_link(it);
1528
1529        stored = 1;
1530    }
1531
1532    if (old_it != NULL)
1533        do_item_remove(old_it);         /* release our reference */
1534    if (new_it != NULL)
1535        do_item_remove(new_it);
1536
1537    return stored;
1538}
1539
1540typedef struct token_s {
1541    char *value;
1542    size_t length;
1543} token_t;
1544
1545#define COMMAND_TOKEN 0
1546#define SUBCOMMAND_TOKEN 1
1547#define KEY_TOKEN 1
1548#define KEY_MAX_LENGTH 250
1549
1550#define MAX_TOKENS 8
1551
1552/*
1553 * Tokenize the command string by replacing whitespace with '\0' and update
1554 * the token array tokens with pointer to start of each token and length.
1555 * Returns total number of tokens.  The last valid token is the terminal
1556 * token (value points to the first unprocessed character of the string and
1557 * length zero).
1558 *
1559 * Usage example:
1560 *
1561 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1562 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1563 *          ...
1564 *      }
1565 *      ncommand = tokens[ix].value - command;
1566 *      command  = tokens[ix].value;
1567 *   }
1568 */
1569static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1570    char *s, *e;
1571    size_t ntokens = 0;
1572
1573    assert(command != NULL && tokens != NULL && max_tokens > 1);
1574
1575    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1576        if (*e == ' ') {
1577            if (s != e) {
1578                tokens[ntokens].value = s;
1579                tokens[ntokens].length = e - s;
1580                ntokens++;
1581                *e = '\0';
1582            }
1583            s = e + 1;
1584        }
1585        else if (*e == '\0') {
1586            if (s != e) {
1587                tokens[ntokens].value = s;
1588                tokens[ntokens].length = e - s;
1589                ntokens++;
1590            }
1591
1592            break; /* string end */
1593        }
1594    }
1595
1596    /*
1597     * If we scanned the whole string, the terminal value pointer is null,
1598     * otherwise it is the first unprocessed character.
1599     */
1600    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1601    tokens[ntokens].length = 0;
1602    ntokens++;
1603
1604    return ntokens;
1605}
1606
1607/* set up a connection to write a buffer then free it, used for stats */
1608static void write_and_free(conn *c, char *buf, int bytes) {
1609    if (buf) {
1610        c->write_and_free = buf;
1611        c->wcurr = buf;
1612        c->wbytes = bytes;
1613        conn_set_state(c, conn_write);
1614        c->write_and_go = get_init_state(c);
1615    } else {
1616        out_string(c, "SERVER_ERROR out of memory writing stats");
1617    }
1618}
1619
1620static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1621{
1622    int noreply_index = ntokens - 2;
1623
1624    /*
1625      NOTE: this function is not the first place where we are going to
1626      send the reply.  We could send it instead from process_command()
1627      if the request line has wrong number of tokens.  However parsing
1628      malformed line for "noreply" option is not reliable anyway, so
1629      it can't be helped.
1630    */
1631    if (tokens[noreply_index].value
1632        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1633        c->noreply = true;
1634    }
1635}
1636
1637inline static void process_stats_detail(conn *c, const char *command) {
1638    assert(c != NULL);
1639
1640    if (strcmp(command, "on") == 0) {
1641        settings.detail_enabled = 1;
1642        out_string(c, "OK");
1643    }
1644    else if (strcmp(command, "off") == 0) {
1645        settings.detail_enabled = 0;
1646        out_string(c, "OK");
1647    }
1648    else if (strcmp(command, "dump") == 0) {
1649        int len;
1650        char *stats = stats_prefix_dump(&len);
1651        write_and_free(c, stats, len);
1652    }
1653    else {
1654        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1655    }
1656}
1657
1658static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1659    rel_time_t now = current_time;
1660    char *command;
1661    char *subcommand;
1662
1663    assert(c != NULL);
1664
1665    if(ntokens < 2) {
1666        out_string(c, "CLIENT_ERROR bad command line");
1667        return;
1668    }
1669
1670    command = tokens[COMMAND_TOKEN].value;
1671
1672    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1673        char temp[1024];
1674        pid_t pid = getpid();
1675        char *pos = temp;
1676
1677#ifndef WIN32
1678        struct rusage usage;
1679        getrusage(RUSAGE_SELF, &usage);
1680#endif /* !WIN32 */
1681
1682        STATS_LOCK();
1683        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1684        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1685        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1686        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1687        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1688#ifndef WIN32
1689        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1690        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1691#endif /* !WIN32 */
1692        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1693        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1694        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1695        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1696        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1697        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1698        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1699        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1700        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1701        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1702        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1703        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1704        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1705        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1706        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1707        pos += sprintf(pos, "END");
1708        STATS_UNLOCK();
1709        out_string(c, temp);
1710        return;
1711    }
1712
1713    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1714
1715    if (strcmp(subcommand, "reset") == 0) {
1716        stats_reset();
1717        out_string(c, "RESET");
1718        return;
1719    }
1720
1721#ifdef HAVE_MALLOC_H
1722#ifdef HAVE_STRUCT_MALLINFO
1723    if (strcmp(subcommand, "malloc") == 0) {
1724        char temp[512];
1725        struct mallinfo info;
1726        char *pos = temp;
1727
1728        info = mallinfo();
1729        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1730        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1731        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1732        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1733        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1734        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1735        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1736        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1737        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1738        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1739        out_string(c, temp);
1740        return;
1741    }
1742#endif /* HAVE_STRUCT_MALLINFO */
1743#endif /* HAVE_MALLOC_H */
1744
1745#if !defined(WIN32) || !defined(__APPLE__)
1746    if (strcmp(subcommand, "maps") == 0) {
1747        char *wbuf;
1748        int wsize = 8192; /* should be enough */
1749        int fd;
1750        int res;
1751
1752        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1753            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1754            return;
1755        }
1756
1757        fd = open("/proc/self/maps", O_RDONLY);
1758        if (fd == -1) {
1759            out_string(c, "SERVER_ERROR cannot open the maps file");
1760            free(wbuf);
1761            return;
1762        }
1763
1764        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1765        if (res == wsize - 6) {
1766            out_string(c, "SERVER_ERROR buffer overflow");
1767            free(wbuf); close(fd);
1768            return;
1769        }
1770        if (res == 0 || res == -1) {
1771            out_string(c, "SERVER_ERROR can't read the maps file");
1772            free(wbuf); close(fd);
1773            return;
1774        }
1775        memcpy(wbuf + res, "END\r\n", 5);
1776        write_and_free(c, wbuf, res + 5);
1777        close(fd);
1778        return;
1779    }
1780#endif
1781
1782    if (strcmp(subcommand, "cachedump") == 0) {
1783
1784        char *buf;
1785        unsigned int bytes, id, limit = 0;
1786
1787        if(ntokens < 5) {
1788            out_string(c, "CLIENT_ERROR bad command line");
1789            return;
1790        }
1791
1792        id = strtoul(tokens[2].value, NULL, 10);
1793        limit = strtoul(tokens[3].value, NULL, 10);
1794
1795        if(errno == ERANGE) {
1796            out_string(c, "CLIENT_ERROR bad command line format");
1797            return;
1798        }
1799
1800        buf = item_cachedump(id, limit, &bytes);
1801        write_and_free(c, buf, bytes);
1802        return;
1803    }
1804
1805    if (strcmp(subcommand, "slabs") == 0) {
1806        int bytes = 0;
1807        char *buf = slabs_stats(&bytes);
1808        write_and_free(c, buf, bytes);
1809        return;
1810    }
1811
1812    if (strcmp(subcommand, "items") == 0) {
1813        int bytes = 0;
1814        char *buf = item_stats(&bytes);
1815        write_and_free(c, buf, bytes);
1816        return;
1817    }
1818
1819    if (strcmp(subcommand, "detail") == 0) {
1820        if (ntokens < 4)
1821            process_stats_detail(c, "");  /* outputs the error message */
1822        else
1823            process_stats_detail(c, tokens[2].value);
1824        return;
1825    }
1826
1827    if (strcmp(subcommand, "sizes") == 0) {
1828        int bytes = 0;
1829        char *buf = item_stats_sizes(&bytes);
1830        write_and_free(c, buf, bytes);
1831        return;
1832    }
1833
1834    out_string(c, "ERROR");
1835}
1836
1837/* ntokens is overwritten here... shrug.. */
1838static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1839    char *key;
1840    size_t nkey;
1841    int i = 0;
1842    item *it;
1843    token_t *key_token = &tokens[KEY_TOKEN];
1844    char *suffix;
1845    int stats_get_cmds   = 0;
1846    int stats_get_hits   = 0;
1847    int stats_get_misses = 0;
1848    assert(c != NULL);
1849
1850    if (settings.managed) {
1851        int bucket = c->bucket;
1852        if (bucket == -1) {
1853            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1854            return;
1855        }
1856        c->bucket = -1;
1857        if (buckets[bucket] != c->gen) {
1858            out_string(c, "ERROR_NOT_OWNER");
1859            return;
1860        }
1861    }
1862
1863    do {
1864        while(key_token->length != 0) {
1865
1866            key = key_token->value;
1867            nkey = key_token->length;
1868
1869            if(nkey > KEY_MAX_LENGTH) {
1870                STATS_LOCK();
1871                stats.get_cmds   += stats_get_cmds;
1872                stats.get_hits   += stats_get_hits;
1873                stats.get_misses += stats_get_misses;
1874                STATS_UNLOCK();
1875                out_string(c, "CLIENT_ERROR bad command line format");
1876                return;
1877            }
1878
1879            stats_get_cmds++;
1880            it = item_get(key, nkey);
1881            if (settings.detail_enabled) {
1882                stats_prefix_record_get(key, NULL != it);
1883            }
1884            if (it) {
1885                if (i >= c->isize) {
1886                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1887                    if (new_list) {
1888                        c->isize *= 2;
1889                        c->ilist = new_list;
1890                    } else break;
1891                }
1892
1893                /*
1894                 * Construct the response. Each hit adds three elements to the
1895                 * outgoing data list:
1896                 *   "VALUE "
1897                 *   key
1898                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1899                 */
1900
1901                if(return_cas == true)
1902                {
1903                  /* Goofy mid-flight realloc. */
1904                  if (i >= c->suffixsize) {
1905                    char **new_suffix_list = realloc(c->suffixlist,
1906                                           sizeof(char *) * c->suffixsize * 2);
1907                    if (new_suffix_list) {
1908                      c->suffixsize *= 2;
1909                      c->suffixlist  = new_suffix_list;
1910                    } else break;
1911                  }
1912
1913                  suffix = suffix_from_freelist();
1914                  if (suffix == NULL) {
1915                    STATS_LOCK();
1916                    stats.get_cmds   += stats_get_cmds;
1917                    stats.get_hits   += stats_get_hits;
1918                    stats.get_misses += stats_get_misses;
1919                    STATS_UNLOCK();
1920                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1921                    return;
1922                  }
1923                  *(c->suffixlist + i) = suffix;
1924                  sprintf(suffix, " %llu\r\n", it->cas_id);
1925                  if (add_iov(c, "VALUE ", 6) != 0 ||
1926                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1927                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1928                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1929                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1930                      {
1931                          break;
1932                      }
1933                }
1934                else
1935                {
1936                  if (add_iov(c, "VALUE ", 6) != 0 ||
1937                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1938                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1939                      {
1940                          break;
1941                      }
1942                }
1943
1944
1945                if (settings.verbose > 1)
1946                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1947
1948                /* item_get() has incremented it->refcount for us */
1949                stats_get_hits++;
1950                item_update(it);
1951                *(c->ilist + i) = it;
1952                i++;
1953
1954            } else {
1955                stats_get_misses++;
1956            }
1957
1958            key_token++;
1959        }
1960
1961        /*
1962         * If the command string hasn't been fully processed, get the next set
1963         * of tokens.
1964         */
1965        if(key_token->value != NULL) {
1966            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1967            key_token = tokens;
1968        }
1969
1970    } while(key_token->value != NULL);
1971
1972    c->icurr = c->ilist;
1973    c->ileft = i;
1974    if (return_cas) {
1975        c->suffixcurr = c->suffixlist;
1976        c->suffixleft = i;
1977    }
1978
1979    if (settings.verbose > 1)
1980        fprintf(stderr, ">%d END\n", c->sfd);
1981
1982    /*
1983        If the loop was terminated because of out-of-memory, it is not
1984        reliable to add END\r\n to the buffer, because it might not end
1985        in \r\n. So we send SERVER_ERROR instead.
1986    */
1987    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1988        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1989        out_string(c, "SERVER_ERROR out of memory writing get response");
1990    }
1991    else {
1992        conn_set_state(c, conn_mwrite);
1993        c->msgcurr = 0;
1994    }
1995
1996    STATS_LOCK();
1997    stats.get_cmds   += stats_get_cmds;
1998    stats.get_hits   += stats_get_hits;
1999    stats.get_misses += stats_get_misses;
2000    STATS_UNLOCK();
2001
2002    return;
2003}
2004
2005static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2006    char *key;
2007    size_t nkey;
2008    int flags;
2009    time_t exptime;
2010    int vlen, old_vlen;
2011    uint64_t req_cas_id;
2012    item *it, *old_it;
2013
2014    assert(c != NULL);
2015
2016    set_noreply_maybe(c, tokens, ntokens);
2017
2018    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2019        out_string(c, "CLIENT_ERROR bad command line format");
2020        return;
2021    }
2022
2023    key = tokens[KEY_TOKEN].value;
2024    nkey = tokens[KEY_TOKEN].length;
2025
2026    flags = strtoul(tokens[2].value, NULL, 10);
2027    exptime = strtol(tokens[3].value, NULL, 10);
2028    vlen = strtol(tokens[4].value, NULL, 10);
2029
2030    // does cas value exist?
2031    if(handle_cas)
2032    {
2033      req_cas_id = strtoull(tokens[5].value, NULL, 10);
2034    }
2035
2036    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
2037        out_string(c, "CLIENT_ERROR bad command line format");
2038        return;
2039    }
2040
2041    if (settings.detail_enabled) {
2042        stats_prefix_record_set(key);
2043    }
2044
2045    if (settings.managed) {
2046        int bucket = c->bucket;
2047        if (bucket == -1) {
2048            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2049            return;
2050        }
2051        c->bucket = -1;
2052        if (buckets[bucket] != c->gen) {
2053            out_string(c, "ERROR_NOT_OWNER");
2054            return;
2055        }
2056    }
2057
2058    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
2059
2060    if (it == 0) {
2061        if (! item_size_ok(nkey, flags, vlen + 2))
2062            out_string(c, "SERVER_ERROR object too large for cache");
2063        else
2064            out_string(c, "SERVER_ERROR out of memory storing object");
2065        /* swallow the data line */
2066        c->write_and_go = conn_swallow;
2067        c->sbytes = vlen + 2;
2068        return;
2069    }
2070    if(handle_cas)
2071      it->cas_id = req_cas_id;
2072
2073    c->item = it;
2074    c->ritem = ITEM_data(it);
2075    c->rlbytes = it->nbytes;
2076    c->item_comm = comm;
2077    conn_set_state(c, conn_nread);
2078}
2079
2080static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2081    char temp[sizeof("18446744073709551615")];
2082    item *it;
2083    int64_t delta;
2084    char *key;
2085    size_t nkey;
2086
2087    assert(c != NULL);
2088
2089    set_noreply_maybe(c, tokens, ntokens);
2090
2091    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2092        out_string(c, "CLIENT_ERROR bad command line format");
2093        return;
2094    }
2095
2096    key = tokens[KEY_TOKEN].value;
2097    nkey = tokens[KEY_TOKEN].length;
2098
2099    if (settings.managed) {
2100        int bucket = c->bucket;
2101        if (bucket == -1) {
2102            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2103            return;
2104        }
2105        c->bucket = -1;
2106        if (buckets[bucket] != c->gen) {
2107            out_string(c, "ERROR_NOT_OWNER");
2108            return;
2109        }
2110    }
2111
2112    delta = strtoll(tokens[2].value, NULL, 10);
2113
2114    if(errno == ERANGE) {
2115        out_string(c, "CLIENT_ERROR bad command line format");
2116        return;
2117    }
2118
2119    it = item_get(key, nkey);
2120    if (!it) {
2121        out_string(c, "NOT_FOUND");
2122        return;
2123    }
2124
2125    out_string(c, add_delta(it, incr, delta, temp));
2126    item_remove(it);         /* release our reference */
2127}
2128
2129/*
2130 * adds a delta value to a numeric item.
2131 *
2132 * it    item to adjust
2133 * incr  true to increment value, false to decrement
2134 * delta amount to adjust value by
2135 * buf   buffer for response string
2136 *
2137 * returns a response string to send back to the client.
2138 */
2139char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2140    char *ptr;
2141    int64_t value;
2142    int res;
2143
2144    ptr = ITEM_data(it);
2145    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2146
2147    value = strtoull(ptr, NULL, 10);
2148
2149    if(errno == ERANGE) {
2150        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2151    }
2152
2153    if (incr)
2154        value += delta;
2155    else {
2156        value -= delta;
2157    }
2158    if(value < 0) {
2159        value=0;
2160    }
2161    sprintf(buf, "%llu", value);
2162    res = strlen(buf);
2163    if (res + 2 > it->nbytes) { /* need to realloc */
2164        item *new_it;
2165        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2166        if (new_it == 0) {
2167            return "SERVER_ERROR out of memory in incr/decr";
2168        }
2169        memcpy(ITEM_data(new_it), buf, res);
2170        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2171        do_item_replace(it, new_it);
2172        do_item_remove(new_it);       /* release our reference */
2173    } else { /* replace in-place */
2174        memcpy(ITEM_data(it), buf, res);
2175        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2176    }
2177
2178    return buf;
2179}
2180
2181static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2182    char *key;
2183    size_t nkey;
2184    item *it;
2185    time_t exptime = 0;
2186
2187    assert(c != NULL);
2188
2189    set_noreply_maybe(c, tokens, ntokens);
2190
2191    if (settings.managed) {
2192        int bucket = c->bucket;
2193        if (bucket == -1) {
2194            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2195            return;
2196        }
2197        c->bucket = -1;
2198        if (buckets[bucket] != c->gen) {
2199            out_string(c, "ERROR_NOT_OWNER");
2200            return;
2201        }
2202    }
2203
2204    key = tokens[KEY_TOKEN].value;
2205    nkey = tokens[KEY_TOKEN].length;
2206
2207    if(nkey > KEY_MAX_LENGTH) {
2208        out_string(c, "CLIENT_ERROR bad command line format");
2209        return;
2210    }
2211
2212    if(ntokens == (c->noreply ? 5 : 4)) {
2213        exptime = strtol(tokens[2].value, NULL, 10);
2214
2215        if(errno == ERANGE) {
2216            out_string(c, "CLIENT_ERROR bad command line format");
2217            return;
2218        }
2219    }
2220
2221    if (settings.detail_enabled) {
2222        stats_prefix_record_delete(key);
2223    }
2224
2225    it = item_get(key, nkey);
2226    if (it) {
2227        if (exptime == 0) {
2228            item_unlink(it);
2229            item_remove(it);      /* release our reference */
2230            out_string(c, "DELETED");
2231        } else {
2232            /* our reference will be transfered to the delete queue */
2233            out_string(c, defer_delete(it, exptime));
2234        }
2235    } else {
2236        out_string(c, "NOT_FOUND");
2237    }
2238}
2239
2240/*
2241 * Adds an item to the deferred-delete list so it can be reaped later.
2242 *
2243 * Returns the result to send to the client.
2244 */
2245char *do_defer_delete(item *it, time_t exptime)
2246{
2247    if (delcurr >= deltotal) {
2248        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2249        if (new_delete) {
2250            todelete = new_delete;
2251            deltotal *= 2;
2252        } else {
2253            /*
2254             * can't delete it immediately, user wants a delay,
2255             * but we ran out of memory for the delete queue
2256             */
2257            item_remove(it);    /* release reference */
2258            return "SERVER_ERROR out of memory expanding delete queue";
2259        }
2260    }
2261
2262    /* use its expiration time as its deletion time now */
2263    it->exptime = realtime(exptime);
2264    it->it_flags |= ITEM_DELETED;
2265    todelete[delcurr++] = it;
2266
2267    return "DELETED";
2268}
2269
2270static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2271    unsigned int level;
2272
2273    assert(c != NULL);
2274
2275    set_noreply_maybe(c, tokens, ntokens);
2276
2277    level = strtoul(tokens[1].value, NULL, 10);
2278    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2279    out_string(c, "OK");
2280    return;
2281}
2282
2283static void process_command(conn *c, char *command) {
2284
2285    token_t tokens[MAX_TOKENS];
2286    size_t ntokens;
2287    int comm;
2288
2289    assert(c != NULL);
2290
2291    if (settings.verbose > 1)
2292        fprintf(stderr, "<%d %s\n", c->sfd, command);
2293
2294    /*
2295     * for commands set/add/replace, we build an item and read the data
2296     * directly into it, then continue in nread_complete().
2297     */
2298
2299    c->msgcurr = 0;
2300    c->msgused = 0;
2301    c->iovused = 0;
2302    if (add_msghdr(c) != 0) {
2303        out_string(c, "SERVER_ERROR out of memory preparing response");
2304        return;
2305    }
2306
2307    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2308    if (ntokens >= 3 &&
2309        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2310         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2311
2312        process_get_command(c, tokens, ntokens, false);
2313
2314    } else if ((ntokens == 6 || ntokens == 7) &&
2315               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2316                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2317                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2318                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2319                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2320
2321        process_update_command(c, tokens, ntokens, comm, false);
2322
2323    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2324
2325        process_update_command(c, tokens, ntokens, comm, true);
2326
2327    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2328
2329        process_arithmetic_command(c, tokens, ntokens, 1);
2330
2331    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2332
2333        process_get_command(c, tokens, ntokens, true);
2334
2335    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2336
2337        process_arithmetic_command(c, tokens, ntokens, 0);
2338
2339    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2340
2341        process_delete_command(c, tokens, ntokens);
2342
2343    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2344        unsigned int bucket, gen;
2345        if (!settings.managed) {
2346            out_string(c, "CLIENT_ERROR not a managed instance");
2347            return;
2348        }
2349
2350        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2351            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2352                out_string(c, "CLIENT_ERROR bucket number out of range");
2353                return;
2354            }
2355            buckets[bucket] = gen;
2356            out_string(c, "OWNED");
2357            return;
2358        } else {
2359            out_string(c, "CLIENT_ERROR bad format");
2360            return;
2361        }
2362
2363    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2364
2365        int bucket;
2366        if (!settings.managed) {
2367            out_string(c, "CLIENT_ERROR not a managed instance");
2368            return;
2369        }
2370        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2371            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2372                out_string(c, "CLIENT_ERROR bucket number out of range");
2373                return;
2374            }
2375            buckets[bucket] = 0;
2376            out_string(c, "DISOWNED");
2377            return;
2378        } else {
2379            out_string(c, "CLIENT_ERROR bad format");
2380            return;
2381        }
2382
2383    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2384        int bucket, gen;
2385        if (!settings.managed) {
2386            out_string(c, "CLIENT_ERROR not a managed instance");
2387            return;
2388        }
2389        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2390            /* we never write anything back, even if input's wrong */
2391            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2392                /* do nothing, bad input */
2393            } else {
2394                c->bucket = bucket;
2395                c->gen = gen;
2396            }
2397            conn_set_init_state(c);
2398            return;
2399        } else {
2400            out_string(c, "CLIENT_ERROR bad format");
2401            return;
2402        }
2403
2404    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2405
2406        process_stat(c, tokens, ntokens);
2407
2408    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2409        time_t exptime = 0;
2410        set_current_time();
2411
2412        set_noreply_maybe(c, tokens, ntokens);
2413
2414        if(ntokens == (c->noreply ? 3 : 2)) {
2415            settings.oldest_live = current_time - 1;
2416            item_flush_expired();
2417            out_string(c, "OK");
2418            return;
2419        }
2420
2421        exptime = strtol(tokens[1].value, NULL, 10);
2422        if(errno == ERANGE) {
2423            out_string(c, "CLIENT_ERROR bad command line format");
2424            return;
2425        }
2426
2427        /*
2428          If exptime is zero realtime() would return zero too, and
2429          realtime(exptime) - 1 would overflow to the max unsigned
2430          value.  So we process exptime == 0 the same way we do when
2431          no delay is given at all.
2432        */
2433        if (exptime > 0)
2434            settings.oldest_live = realtime(exptime) - 1;
2435        else /* exptime == 0 */
2436            settings.oldest_live = current_time - 1;
2437        item_flush_expired();
2438        out_string(c, "OK");
2439        return;
2440
2441    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2442
2443        out_string(c, "VERSION " VERSION);
2444
2445    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2446
2447        conn_set_state(c, conn_closing);
2448
2449    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2450                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2451#ifdef ALLOW_SLABS_REASSIGN
2452
2453        int src, dst, rv;
2454
2455        src = strtol(tokens[2].value, NULL, 10);
2456        dst  = strtol(tokens[3].value, NULL, 10);
2457
2458        if(errno == ERANGE) {
2459            out_string(c, "CLIENT_ERROR bad command line format");
2460            return;
2461        }
2462
2463        rv = slabs_reassign(src, dst);
2464        if (rv == 1) {
2465            out_string(c, "DONE");
2466            return;
2467        }
2468        if (rv == 0) {
2469            out_string(c, "CANT");
2470            return;
2471        }
2472        if (rv == -1) {
2473            out_string(c, "BUSY");
2474            return;
2475        }
2476#else
2477        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2478#endif
2479    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2480        process_verbosity_command(c, tokens, ntokens);
2481    } else {
2482        out_string(c, "ERROR");
2483    }
2484    return;
2485}
2486
2487/*
2488 * if we have a complete line in the buffer, process it.
2489 */
2490static int try_read_command(conn *c) {
2491    char *el, *cont;
2492
2493    assert(c != NULL);
2494    assert(c->rcurr <= (c->rbuf + c->rsize));
2495
2496    if (c->rbytes == 0)
2497        return 0;
2498    el = memchr(c->rcurr, '\n', c->rbytes);
2499    if (!el)
2500        return 0;
2501    cont = el + 1;
2502    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2503        el--;
2504    }
2505    *el = '\0';
2506
2507    assert(cont <= (c->rcurr + c->rbytes));
2508
2509    process_command(c, c->rcurr);
2510
2511    c->rbytes -= (cont - c->rcurr);
2512    c->rcurr = cont;
2513
2514    assert(c->rcurr <= (c->rbuf + c->rsize));
2515
2516    return 1;
2517}
2518
2519/*
2520 * read a UDP request.
2521 * return 0 if there's nothing to read.
2522 */
2523static int try_read_udp(conn *c) {
2524    int res;
2525
2526    assert(c != NULL);
2527
2528    c->request_addr_size = sizeof(c->request_addr);
2529    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2530                   0, &c->request_addr, &c->request_addr_size);
2531    if (res > 8) {
2532        unsigned char *buf = (unsigned char *)c->rbuf;
2533        STATS_LOCK();
2534        stats.bytes_read += res;
2535        STATS_UNLOCK();
2536
2537        /* Beginning of UDP packet is the request ID; save it. */
2538        c->request_id = buf[0] * 256 + buf[1];
2539
2540        /* If this is a multi-packet request, drop it. */
2541        if (buf[4] != 0 || buf[5] != 1) {
2542            out_string(c, "SERVER_ERROR multi-packet request not supported");
2543            return 0;
2544        }
2545
2546        /* Don't care about any of the rest of the header. */
2547        res -= 8;
2548        memmove(c->rbuf, c->rbuf + 8, res);
2549
2550        c->rbytes += res;
2551        c->rcurr = c->rbuf;
2552        return 1;
2553    }
2554    return 0;
2555}
2556
2557/*
2558 * read from network as much as we can, handle buffer overflow and connection
2559 * close.
2560 * before reading, move the remaining incomplete fragment of a command
2561 * (if any) to the beginning of the buffer.
2562 * return 0 if there's nothing to read on the first read.
2563 */
2564static int try_read_network(conn *c) {
2565    int gotdata = 0;
2566    int res;
2567
2568    assert(c != NULL);
2569
2570    if (c->rcurr != c->rbuf) {
2571        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2572            memmove(c->rbuf, c->rcurr, c->rbytes);
2573        c->rcurr = c->rbuf;
2574    }
2575
2576    while (1) {
2577        if (c->rbytes >= c->rsize) {
2578            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2579            if (!new_rbuf) {
2580                if (settings.verbose > 0)
2581                    fprintf(stderr, "Couldn't realloc input buffer\n");
2582                c->rbytes = 0; /* ignore what we read */
2583                out_string(c, "SERVER_ERROR out of memory reading request");
2584                c->write_and_go = conn_closing;
2585                return 1;
2586            }
2587            c->rcurr = c->rbuf = new_rbuf;
2588            c->rsize *= 2;
2589        }
2590
2591        int avail = c->rsize - c->rbytes;
2592        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2593        if (res > 0) {
2594            STATS_LOCK();
2595            stats.bytes_read += res;
2596            STATS_UNLOCK();
2597            gotdata = 1;
2598            c->rbytes += res;
2599            if (res == avail) {
2600                continue;
2601            } else {
2602                break;
2603            }
2604        }
2605        if (res == 0) {
2606            /* connection closed */
2607            conn_set_state(c, conn_closing);
2608            return 1;
2609        }
2610        if (res == -1) {
2611            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
2612            /* Should close on unhandled errors. */
2613            conn_set_state(c, conn_closing);
2614            return 1;
2615        }
2616    }
2617    return gotdata;
2618}
2619
2620static bool update_event(conn *c, const int new_flags) {
2621    assert(c != NULL);
2622
2623    struct event_base *base = c->event.ev_base;
2624    if (c->ev_flags == new_flags)
2625        return true;
2626    if (event_del(&c->event) == -1) return false;
2627    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2628    event_base_set(base, &c->event);
2629    c->ev_flags = new_flags;
2630    if (event_add(&c->event, 0) == -1) return false;
2631    return true;
2632}
2633
2634/*
2635 * Sets whether we are listening for new connections or not.
2636 */
2637void accept_new_conns(const bool do_accept) {
2638    conn *next;
2639
2640    if (! is_listen_thread())
2641        return;
2642
2643    for (next = listen_conn; next; next = next->next) {
2644        if (do_accept) {
2645            update_event(next, EV_READ | EV_PERSIST);
2646            if (listen(next->sfd, 1024) != 0) {
2647                perror("listen");
2648            }
2649        }
2650        else {
2651            update_event(next, 0);
2652            if (listen(next->sfd, 0) != 0) {
2653                perror("listen");
2654            }
2655        }
2656  }
2657}
2658
2659/*
2660 * Transmit the next chunk of data from our list of msgbuf structures.
2661 *
2662 * Returns:
2663 *   TRANSMIT_COMPLETE   All done writing.
2664 *   TRANSMIT_INCOMPLETE More data remaining to write.
2665 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2666 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2667 */
2668static int transmit(conn *c) {
2669    assert(c != NULL);
2670
2671    if (c->msgcurr < c->msgused &&
2672            c->msglist[c->msgcurr].msg_iovlen == 0) {
2673        /* Finished writing the current msg; advance to the next. */
2674        c->msgcurr++;
2675    }
2676    if (c->msgcurr < c->msgused) {
2677        ssize_t res;
2678        struct msghdr *m = &c->msglist[c->msgcurr];
2679
2680        res = sendmsg(c->sfd, m, 0);
2681        if (res > 0) {
2682            STATS_LOCK();
2683            stats.bytes_written += res;
2684            STATS_UNLOCK();
2685
2686            /* We've written some of the data. Remove the completed
2687               iovec entries from the list of pending writes. */
2688            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2689                res -= m->msg_iov->iov_len;
2690                m->msg_iovlen--;
2691                m->msg_iov++;
2692            }
2693
2694            /* Might have written just part of the last iovec entry;
2695               adjust it so the next write will do the rest. */
2696            if (res > 0) {
2697                m->msg_iov->iov_base += res;
2698                m->msg_iov->iov_len -= res;
2699            }
2700            return TRANSMIT_INCOMPLETE;
2701        }
2702        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2703            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2704                if (settings.verbose > 0)
2705                    fprintf(stderr, "Couldn't update event\n");
2706                conn_set_state(c, conn_closing);
2707                return TRANSMIT_HARD_ERROR;
2708            }
2709            return TRANSMIT_SOFT_ERROR;
2710        }
2711        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2712           we have a real error, on which we close the connection */
2713        if (settings.verbose > 0)
2714            perror("Failed to write, and not due to blocking");
2715
2716        if (IS_UDP(c->protocol))
2717            conn_set_state(c, conn_read);
2718        else
2719            conn_set_state(c, conn_closing);
2720        return TRANSMIT_HARD_ERROR;
2721    } else {
2722        return TRANSMIT_COMPLETE;
2723    }
2724}
2725
2726static void drive_machine(conn *c) {
2727    bool stop = false;
2728    int sfd, flags = 1;
2729    enum conn_states init_state; /* initial state for a new connection */
2730    socklen_t addrlen;
2731    struct sockaddr_storage addr;
2732    int res;
2733
2734    assert(c != NULL);
2735
2736    while (!stop) {
2737
2738        switch(c->state) {
2739        case conn_listening:
2740            addrlen = sizeof(addr);
2741            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2742                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2743                    /* these are transient, so don't log anything */
2744                    stop = true;
2745                } else if (errno == EMFILE) {
2746                    if (settings.verbose > 0)
2747                        fprintf(stderr, "Too many open connections\n");
2748                    accept_new_conns(false);
2749                    stop = true;
2750                } else {
2751                    perror("accept()");
2752                    stop = true;
2753                }
2754                break;
2755            }
2756            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2757                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2758                perror("setting O_NONBLOCK");
2759                close(sfd);
2760                break;
2761            }
2762            init_state = get_init_state(c);
2763
2764            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST,
2765                                     DATA_BUFFER_SIZE, c->protocol);
2766
2767            break;
2768
2769        case conn_negotiate:
2770            if (settings.verbose > 1)
2771                fprintf(stderr, "Negotiating protocol for a new connection\n");
2772            c->rlbytes = 1;
2773            c->ritem = c->rbuf;
2774            c->rcurr = c->rbuf;
2775            c->wcurr = c->wbuf;
2776            conn_set_state(c, conn_nread);
2777            break;
2778
2779        case conn_read:
2780            if (try_read_command(c) != 0) {
2781                continue;
2782            }
2783            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) {
2784                continue;
2785            }
2786            /* we have no command line and no data to read from network */
2787            if (!update_event(c, EV_READ | EV_PERSIST)) {
2788                if (settings.verbose > 0)
2789                    fprintf(stderr, "Couldn't update event\n");
2790                conn_set_state(c, conn_closing);
2791                break;
2792            }
2793            stop = true;
2794            break;
2795
2796        case conn_bin_init: /* Reinitialize a binary connection */
2797            reinit_bin_connection(c);
2798            break;
2799
2800        case conn_nread:
2801            if (c->rlbytes == 0) {
2802                complete_nread(c);
2803                break;
2804            }
2805            /* first check if we have leftovers in the conn_read buffer */
2806            if (c->rbytes > 0) {
2807                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2808                memcpy(c->ritem, c->rcurr, tocopy);
2809                c->ritem += tocopy;
2810                c->rlbytes -= tocopy;
2811                c->rcurr += tocopy;
2812                c->rbytes -= tocopy;
2813                break;
2814            }
2815
2816            /*  now try reading from the socket */
2817            res = read(c->sfd, c->ritem, c->rlbytes);
2818            if (res > 0) {
2819                STATS_LOCK();
2820                stats.bytes_read += res;
2821                STATS_UNLOCK();
2822                c->ritem += res;
2823                c->rlbytes -= res;
2824                break;
2825            }
2826            if (res == 0) { /* end of stream */
2827                conn_set_state(c, conn_closing);
2828                break;
2829            }
2830            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2831                if (!update_event(c, EV_READ | EV_PERSIST)) {
2832                    if (settings.verbose > 0)
2833                        fprintf(stderr, "Couldn't update event\n");
2834                    conn_set_state(c, conn_closing);
2835                    break;
2836                }
2837                stop = true;
2838                break;
2839            }
2840            /* otherwise we have a real error, on which we close the connection */
2841            if (settings.verbose > 0)
2842                fprintf(stderr, "Failed to read, and not due to blocking\n");
2843            conn_set_state(c, conn_closing);
2844            break;
2845
2846        case conn_swallow:
2847            /* we are reading sbytes and throwing them away */
2848            if (c->sbytes == 0) {
2849                conn_set_init_state(c);
2850                break;
2851            }
2852
2853            /* first check if we have leftovers in the conn_read buffer */
2854            if (c->rbytes > 0) {
2855                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2856                c->sbytes -= tocopy;
2857                c->rcurr += tocopy;
2858                c->rbytes -= tocopy;
2859                break;
2860            }
2861
2862            /*  now try reading from the socket */
2863            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2864            if (res > 0) {
2865                STATS_LOCK();
2866                stats.bytes_read += res;
2867                STATS_UNLOCK();
2868                c->sbytes -= res;
2869                break;
2870            }
2871            if (res == 0) { /* end of stream */
2872                conn_set_state(c, conn_closing);
2873                break;
2874            }
2875            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2876                if (!update_event(c, EV_READ | EV_PERSIST)) {
2877                    if (settings.verbose > 0)
2878                        fprintf(stderr, "Couldn't update event\n");
2879                    conn_set_state(c, conn_closing);
2880                    break;
2881                }
2882                stop = true;
2883                break;
2884            }
2885            /* otherwise we have a real error, on which we close the connection */
2886            if (settings.verbose > 0)
2887                fprintf(stderr, "Failed to read, and not due to blocking\n");
2888            conn_set_state(c, conn_closing);
2889            break;
2890
2891        case conn_write:
2892            /*
2893             * We want to write out a simple response. If we haven't already,
2894             * assemble it into a msgbuf list (this will be a single-entry
2895             * list for TCP or a two-entry list for UDP).
2896             */
2897            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2898                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2899                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2900                    if (settings.verbose > 0)
2901                        fprintf(stderr, "Couldn't build response\n");
2902                    conn_set_state(c, conn_closing);
2903                    break;
2904                }
2905            }
2906
2907            /* fall through... */
2908
2909        case conn_mwrite:
2910            switch (transmit(c)) {
2911            case TRANSMIT_COMPLETE:
2912                if (c->state == conn_mwrite) {
2913                    while (c->ileft > 0) {
2914                        item *it = *(c->icurr);
2915                        assert((it->it_flags & ITEM_SLABBED) == 0);
2916                        item_remove(it);
2917                        c->icurr++;
2918                        c->ileft--;
2919                    }
2920                    while (c->suffixleft > 0) {
2921                        char *suffix = *(c->suffixcurr);
2922                        if(suffix_add_to_freelist(suffix)) {
2923                            /* Failed to add to freelist, don't leak */
2924                            free(suffix);
2925                        }
2926                        c->suffixcurr++;
2927                        c->suffixleft--;
2928                    }
2929                    /* XXX:  I don't know why this wasn't the general case */
2930                    if(c->protocol == binary_prot) {
2931                        conn_set_state(c, c->write_and_go);
2932                    } else {
2933                        conn_set_init_state(c);
2934                    }
2935                } else if (c->state == conn_write) {
2936                    if (c->write_and_free) {
2937                        free(c->write_and_free);
2938                        c->write_and_free = 0;
2939                    }
2940                    conn_set_state(c, c->write_and_go);
2941                } else {
2942                    if (settings.verbose > 0)
2943                        fprintf(stderr, "Unexpected state %d\n", c->state);
2944                    conn_set_state(c, conn_closing);
2945                }
2946                break;
2947
2948            case TRANSMIT_INCOMPLETE:
2949            case TRANSMIT_HARD_ERROR:
2950                break;                   /* Continue in state machine. */
2951
2952            case TRANSMIT_SOFT_ERROR:
2953                stop = true;
2954                break;
2955            }
2956            break;
2957
2958        case conn_closing:
2959            if (IS_UDP(c->protocol))
2960                conn_cleanup(c);
2961            else
2962                conn_close(c);
2963            stop = true;
2964            break;
2965        }
2966    }
2967
2968    return;
2969}
2970
2971void event_handler(const int fd, const short which,