root/branches/binary/server/memcached.c @ 734

Revision 734, 111.9 kB (checked in by dsallings, 21 months ago)

removed dead code

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63/*
64 * forward declarations
65 */
66static void drive_machine(conn *c);
67static int new_socket(struct addrinfo *ai);
68static int server_socket(const int port, const int prot);
69static int try_read_command(conn *c);
70static int try_read_network(conn *c);
71static int try_read_udp(conn *c);
72static void conn_set_state(conn *, int);
73
74/* stats */
75static void stats_reset(void);
76static void stats_init(void);
77
78/* defaults */
79static void settings_init(void);
80
81/* event handling, network IO */
82static void event_handler(const int fd, const short which, void *arg);
83static void conn_close(conn *c);
84static void conn_init(void);
85static void accept_new_conns(const bool do_accept);
86static bool update_event(conn *c, const int new_flags);
87static void complete_nread(conn *c);
88static void process_command(conn *c, char *command);
89static int transmit(conn *c);
90static int ensure_iov_space(conn *c);
91static int add_iov(conn *c, const void *buf, int len);
92static int add_msghdr(conn *c);
93
94/* time handling */
95static void set_current_time(void);  /* update the global variable holding
96                              global 32-bit seconds-since-start time
97                              (to avoid 64 bit time_t) */
98
99static void conn_free(conn *c);
100
101/** exported globals **/
102struct stats stats;
103struct settings settings;
104
105/** file scope variables **/
106static item **todelete = NULL;
107static int delcurr;
108static int deltotal;
109static conn *listen_conn = NULL;
110static struct event_base *main_base;
111
112#define TRANSMIT_COMPLETE   0
113#define TRANSMIT_INCOMPLETE 1
114#define TRANSMIT_SOFT_ERROR 2
115#define TRANSMIT_HARD_ERROR 3
116
117static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119#define REALTIME_MAXDELTA 60*60*24*30
120/*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125static rel_time_t realtime(const time_t exptime) {
126    /* no. of seconds in 30 days - largest possible delta exptime */
127
128    if (exptime == 0) return 0; /* 0 means never expire */
129
130    if (exptime > REALTIME_MAXDELTA) {
131        /* if item expiration is at/before the server started, give it an
132           expiration time of 1 second after the server started.
133           (because 0 means don't expire).  without this, we'd
134           underflow and wrap around to some large value way in the
135           future, effectively making items expiring in the past
136           really expiring never */
137        if (exptime <= stats.started)
138            return (rel_time_t)1;
139        return (rel_time_t)(exptime - stats.started);
140    } else {
141        return (rel_time_t)(exptime + current_time);
142    }
143}
144
145static void stats_init(void) {
146    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150    /* make the time we started always be 2 seconds before we really
151       did, so time(0) - time.started is never zero.  if so, things
152       like 'settings.oldest_live' which act as booleans as well as
153       values are now false in boolean context... */
154    stats.started = time(0) - 2;
155    stats_prefix_init();
156}
157
158static void stats_reset(void) {
159    STATS_LOCK();
160    stats.total_items = stats.total_conns = 0;
161    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162    stats.bytes_read = stats.bytes_written = 0;
163    stats_prefix_clear();
164    STATS_UNLOCK();
165}
166
167static void settings_init(void) {
168    settings.access=0700;
169    settings.port = 11211;
170    settings.udpport = 0;
171    /* By default this string should be NULL for getaddrinfo() */
172    settings.inter = NULL;
173    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
174    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
175    settings.verbose = 0;
176    settings.oldest_live = 0;
177    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
178    settings.socketpath = NULL;       /* by default, not using a unix socket */
179    settings.managed = false;
180    settings.factor = 1.25;
181    settings.chunk_size = 48;         /* space for a modest key and value */
182#ifdef USE_THREADS
183    settings.num_threads = 4;
184#else
185    settings.num_threads = 1;
186#endif
187    settings.prefix_delimiter = ':';
188    settings.detail_enabled = 0;
189}
190
191/* returns true if a deleted item's delete-locked-time is over, and it
192   should be removed from the namespace */
193static bool item_delete_lock_over (item *it) {
194    assert(it->it_flags & ITEM_DELETED);
195    return (current_time >= it->exptime);
196}
197
198/*
199 * Adds a message header to a connection.
200 *
201 * Returns 0 on success, -1 on out-of-memory.
202 */
203static int add_msghdr(conn *c)
204{
205    struct msghdr *msg;
206
207    assert(c != NULL);
208
209    if (c->msgsize == c->msgused) {
210        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
211        if (! msg)
212            return -1;
213        c->msglist = msg;
214        c->msgsize *= 2;
215    }
216
217    msg = c->msglist + c->msgused;
218
219    /* this wipes msg_iovlen, msg_control, msg_controllen, and
220       msg_flags, the last 3 of which aren't defined on solaris: */
221    memset(msg, 0, sizeof(struct msghdr));
222
223    msg->msg_iov = &c->iov[c->iovused];
224
225    if (c->request_addr_size > 0) {
226        msg->msg_name = &c->request_addr;
227        msg->msg_namelen = c->request_addr_size;
228    }
229
230    c->msgbytes = 0;
231    c->msgused++;
232
233    if (IS_UDP(c->protocol)) {
234        /* Leave room for the UDP header, which we'll fill in later. */
235        return add_iov(c, NULL, UDP_HEADER_SIZE);
236    }
237
238    return 0;
239}
240
241
242/*
243 * Free list management for connections.
244 */
245
246static conn **freeconns;
247static int freetotal;
248static int freecurr;
249
250
251static void conn_init(void) {
252    freetotal = 200;
253    freecurr = 0;
254    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
255        fprintf(stderr, "malloc()\n");
256    }
257    return;
258}
259
260/*
261 * Returns a connection from the freelist, if any. Should call this using
262 * conn_from_freelist() for thread safety.
263 */
264conn *do_conn_from_freelist() {
265    conn *c;
266
267    if (freecurr > 0) {
268        c = freeconns[--freecurr];
269    } else {
270        c = NULL;
271    }
272
273    return c;
274}
275
276/*
277 * Adds a connection to the freelist. 0 = success. Should call this using
278 * conn_add_to_freelist() for thread safety.
279 */
280bool do_conn_add_to_freelist(conn *c) {
281    if (freecurr < freetotal) {
282        freeconns[freecurr++] = c;
283        return false;
284    } else {
285        /* try to enlarge free connections array */
286        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
287        if (new_freeconns) {
288            freetotal *= 2;
289            freeconns = new_freeconns;
290            freeconns[freecurr++] = c;
291            return false;
292        }
293    }
294    return true;
295}
296
297static char *prot_text(const int prot) {
298    char *rv="unknown";
299    switch(prot) {
300        case ascii_prot:
301            rv="ascii";
302            break;
303        case binary_prot:
304            rv="binary";
305            break;
306        case ascii_udp_prot:
307            rv="ascii-udp";
308            break;
309        case negotiating_prot:
310            rv="auto-negotiate";
311            break;
312    }
313    return rv;
314}
315
316conn *conn_new(const int sfd, const int init_state, const int event_flags,
317                const int read_buffer_size, const int prot,
318                struct event_base *base) {
319    conn *c = conn_from_freelist();
320
321    if (NULL == c) {
322        if (!(c = (conn *)malloc(sizeof(conn)))) {
323            fprintf(stderr, "malloc()\n");
324            return NULL;
325        }
326        c->rbuf = c->wbuf = 0;
327        c->ilist = 0;
328        c->suffixlist = 0;
329        c->iov = 0;
330        c->msglist = 0;
331        c->hdrbuf = 0;
332
333        c->rsize = read_buffer_size;
334        c->wsize = DATA_BUFFER_SIZE;
335        c->isize = ITEM_LIST_INITIAL;
336        c->suffixsize = SUFFIX_LIST_INITIAL;
337        c->iovsize = IOV_LIST_INITIAL;
338        c->msgsize = MSG_LIST_INITIAL;
339        c->hdrsize = 0;
340
341        c->rbuf = (char *)malloc((size_t)c->rsize);
342        c->wbuf = (char *)malloc((size_t)c->wsize);
343        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
344        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
345        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
346        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
347
348        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
349                c->msglist == 0 || c->suffixlist == 0) {
350            if (c->rbuf != 0) free(c->rbuf);
351            if (c->wbuf != 0) free(c->wbuf);
352            if (c->ilist !=0) free(c->ilist);
353            if (c->suffixlist != 0) free(c->suffixlist);
354            if (c->iov != 0) free(c->iov);
355            if (c->msglist != 0) free(c->msglist);
356            free(c);
357            fprintf(stderr, "malloc()\n");
358            return NULL;
359        }
360
361        STATS_LOCK();
362        stats.conn_structs++;
363        STATS_UNLOCK();
364    }
365
366    /* unix socket mode doesn't need this, so zeroed out.  but why
367     * is this done for every command?  presumably for UDP
368     * mode.  */
369    if (!settings.socketpath) {
370        c->request_addr_size = sizeof(c->request_addr);
371    } else {
372        c->request_addr_size = 0;
373    }
374
375    if (settings.verbose > 1) {
376        if (init_state == conn_listening) {
377            fprintf(stderr, "<%d server listening (%s)\n", sfd,
378                prot_text(prot));
379        } else if (IS_UDP(prot)) {
380            fprintf(stderr, "<%d server listening (udp)\n", sfd);
381        } else if (prot == binary_prot) {
382            fprintf(stderr, "<%d new binary client connection\n", sfd);
383        } else if (prot == ascii_prot) {
384            fprintf(stderr, "<%d new ascii client connection\n", sfd);
385        } else if (prot == negotiating_prot) {
386            fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd);
387        } else {
388            fprintf(stderr, "<%d new unknown (%d) client connection\n",
389                sfd, prot);
390            abort();
391        }
392    }
393
394    c->sfd = sfd;
395    c->protocol = prot;
396    c->state = init_state;
397    c->rlbytes = 0;
398    c->cmd = -1;
399    c->rbytes = c->wbytes = 0;
400    c->wcurr = c->wbuf;
401    c->rcurr = c->rbuf;
402    c->ritem = 0;
403    c->icurr = c->ilist;
404    c->suffixcurr = c->suffixlist;
405    c->ileft = 0;
406    c->suffixleft = 0;
407    c->iovused = 0;
408    c->msgcurr = 0;
409    c->msgused = 0;
410
411    c->write_and_go = init_state;
412    c->write_and_free = 0;
413    c->item = 0;
414    c->bucket = -1;
415    c->gen = 0;
416
417    c->noreply = false;
418
419    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
420    event_base_set(base, &c->event);
421    c->ev_flags = event_flags;
422
423    if (event_add(&c->event, 0) == -1) {
424        if (conn_add_to_freelist(c)) {
425            conn_free(c);
426        }
427        perror("event_add");
428        return NULL;
429    }
430
431    STATS_LOCK();
432    stats.curr_conns++;
433    stats.total_conns++;
434    STATS_UNLOCK();
435
436    return c;
437}
438
439static void conn_cleanup(conn *c) {
440    assert(c != NULL);
441
442    if (c->item) {
443        item_remove(c->item);
444        c->item = 0;
445    }
446
447    if (c->ileft != 0) {
448        for (; c->ileft > 0; c->ileft--,c->icurr++) {
449            item_remove(*(c->icurr));
450        }
451    }
452
453    if (c->suffixleft != 0) {
454        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
455            if(suffix_add_to_freelist(*(c->suffixcurr))) {
456                free(*(c->suffixcurr));
457            }
458        }
459    }
460
461    if (c->write_and_free) {
462        free(c->write_and_free);
463        c->write_and_free = 0;
464    }
465}
466
467/*
468 * Frees a connection.
469 */
470void conn_free(conn *c) {
471    if (c) {
472        if (c->hdrbuf)
473            free(c->hdrbuf);
474        if (c->msglist)
475            free(c->msglist);
476        if (c->rbuf)
477            free(c->rbuf);
478        if (c->wbuf)
479            free(c->wbuf);
480        if (c->ilist)
481            free(c->ilist);
482        if (c->suffixlist)
483            free(c->suffixlist);
484        if (c->iov)
485            free(c->iov);
486        free(c);
487    }
488}
489
490static void conn_close(conn *c) {
491    assert(c != NULL);
492
493    /* delete the event, the socket and the conn */
494    event_del(&c->event);
495
496    if (settings.verbose > 1)
497        fprintf(stderr, "<%d connection closed.\n", c->sfd);
498
499    close(c->sfd);
500    accept_new_conns(true);
501    conn_cleanup(c);
502
503    /* if the connection has big buffers, just free it */
504    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
505        conn_free(c);
506    }
507
508    STATS_LOCK();
509    stats.curr_conns--;
510    STATS_UNLOCK();
511
512    return;
513}
514
515static int get_init_state(conn *c) {
516    int rv=0;
517    assert(c != NULL);
518
519    switch(c->protocol) {
520        case binary_prot:
521            rv=conn_bin_init;
522            break;
523        case negotiating_prot:
524            rv=conn_negotiate;
525            break;
526        default:
527            rv=conn_read;
528    }
529    return rv;
530}
531
532/* Set the given connection to its initial state.  The initial state will vary
533 * base don protocol type. */
534static void conn_set_init_state(conn *c) {
535    assert(c != NULL);
536
537    conn_set_state(c, get_init_state(c));
538}
539
540/*
541 * Shrinks a connection's buffers if they're too big.  This prevents
542 * periodic large "get" requests from permanently chewing lots of server
543 * memory.
544 *
545 * This should only be called in between requests since it can wipe output
546 * buffers!
547 */
548static void conn_shrink(conn *c) {
549    assert(c != NULL);
550
551    if (IS_UDP(c->protocol))
552        return;
553
554    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
555        char *newbuf;
556
557        if (c->rcurr != c->rbuf)
558            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
559
560        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
561
562        if (newbuf) {
563            c->rbuf = newbuf;
564            c->rsize = DATA_BUFFER_SIZE;
565        }
566        /* TODO check other branch... */
567        c->rcurr = c->rbuf;
568    }
569
570    if (c->isize > ITEM_LIST_HIGHWAT) {
571        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
572        if (newbuf) {
573            c->ilist = newbuf;
574            c->isize = ITEM_LIST_INITIAL;
575        }
576    /* TODO check error condition? */
577    }
578
579    if (c->msgsize > MSG_LIST_HIGHWAT) {
580        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
581        if (newbuf) {
582            c->msglist = newbuf;
583            c->msgsize = MSG_LIST_INITIAL;
584        }
585    /* TODO check error condition? */
586    }
587
588    if (c->iovsize > IOV_LIST_HIGHWAT) {
589        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
590        if (newbuf) {
591            c->iov = newbuf;
592            c->iovsize = IOV_LIST_INITIAL;
593        }
594    /* TODO check return value */
595    }
596}
597
598/*
599 * Sets a connection's current state in the state machine. Any special
600 * processing that needs to happen on certain state transitions can
601 * happen here.
602 */
603static void conn_set_state(conn *c, int state) {
604    assert(c != NULL);
605
606    if (state != c->state) {
607        if (state == conn_read) {
608            conn_shrink(c);
609            assoc_move_next_bucket();
610        }
611        c->state = state;
612    }
613}
614
615/*
616 * Free list management for suffix buffers.
617 */
618
619static char **freesuffix;
620static int freesuffixtotal;
621static int freesuffixcurr;
622
623static void suffix_init(void) {
624    freesuffixtotal = 500;
625    freesuffixcurr  = 0;
626
627    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
628    if (freesuffix == NULL) {
629        fprintf(stderr, "malloc()\n");
630    }
631    return;
632}
633
634/*
635 * Returns a suffix buffer from the freelist, if any. Should call this using
636 * suffix_from_freelist() for thread safety.
637 */
638char *do_suffix_from_freelist() {
639    char *s;
640
641    if (freesuffixcurr > 0) {
642        s = freesuffix[--freesuffixcurr];
643    } else {
644        /* If malloc fails, let the logic fall through without spamming
645         * STDERR on the server. */
646        s = malloc( SUFFIX_SIZE );
647    }
648
649    return s;
650}
651
652/*
653 * Adds a connection to the freelist. 0 = success. Should call this using
654 * conn_add_to_freelist() for thread safety.
655 */
656bool do_suffix_add_to_freelist(char *s) {
657    if (freesuffixcurr < freesuffixtotal) {
658        freesuffix[freesuffixcurr++] = s;
659        return false;
660    } else {
661        /* try to enlarge free connections array */
662        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
663        if (new_freesuffix) {
664            freesuffixtotal *= 2;
665            freesuffix = new_freesuffix;
666            freesuffix[freesuffixcurr++] = s;
667            return false;
668        }
669    }
670    return true;
671}
672
673/*
674 * Ensures that there is room for another struct iovec in a connection's
675 * iov list.
676 *
677 * Returns 0 on success, -1 on out-of-memory.
678 */
679static int ensure_iov_space(conn *c) {
680    assert(c != NULL);
681
682    if (c->iovused >= c->iovsize) {
683        int i, iovnum;
684        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
685                                (c->iovsize * 2) * sizeof(struct iovec));
686        if (! new_iov)
687            return -1;
688        c->iov = new_iov;
689        c->iovsize *= 2;
690
691        /* Point all the msghdr structures at the new list. */
692        for (i = 0, iovnum = 0; i < c->msgused; i++) {
693            c->msglist[i].msg_iov = &c->iov[iovnum];
694            iovnum += c->msglist[i].msg_iovlen;
695        }
696    }
697
698    return 0;
699}
700
701
702/*
703 * Adds data to the list of pending data that will be written out to a
704 * connection.
705 *
706 * Returns 0 on success, -1 on out-of-memory.
707 */
708
709static int add_iov(conn *c, const void *buf, int len) {
710    struct msghdr *m;
711    int leftover;
712    bool limit_to_mtu;
713
714    assert(c != NULL);
715
716    do {
717        m = &c->msglist[c->msgused - 1];
718
719        /*
720         * Limit UDP packets, and the first payloads of TCP replies, to
721         * UDP_MAX_PAYLOAD_SIZE bytes.
722         */
723        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
724
725        /* We may need to start a new msghdr if this one is full. */
726        if (m->msg_iovlen == IOV_MAX ||
727            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
728            add_msghdr(c);
729            m = &c->msglist[c->msgused - 1];
730        }
731
732        if (ensure_iov_space(c) != 0)
733            return -1;
734
735        /* If the fragment is too big to fit in the datagram, split it up */
736        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
737            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
738            len -= leftover;
739        } else {
740            leftover = 0;
741        }
742
743        m = &c->msglist[c->msgused - 1];
744        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
745        m->msg_iov[m->msg_iovlen].iov_len = len;
746
747        c->msgbytes += len;
748        c->iovused++;
749        m->msg_iovlen++;
750
751        buf = ((char *)buf) + len;
752        len = leftover;
753    } while (leftover > 0);
754
755    return 0;
756}
757
758
759/*
760 * Constructs a set of UDP headers and attaches them to the outgoing messages.
761 */
762static int build_udp_headers(conn *c) {
763    int i;
764    unsigned char *hdr;
765
766    assert(c != NULL);
767
768    if (c->msgused > c->hdrsize) {
769        void *new_hdrbuf;
770        if (c->hdrbuf)
771            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
772        else
773            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
774        if (! new_hdrbuf)
775            return -1;
776        c->hdrbuf = (unsigned char *)new_hdrbuf;
777        c->hdrsize = c->msgused * 2;
778    }
779
780    hdr = c->hdrbuf;
781    for (i = 0; i < c->msgused; i++) {
782        c->msglist[i].msg_iov[0].iov_base = hdr;
783        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
784        *hdr++ = c->request_id / 256;
785        *hdr++ = c->request_id % 256;
786        *hdr++ = i / 256;
787        *hdr++ = i % 256;
788        *hdr++ = c->msgused / 256;
789        *hdr++ = c->msgused % 256;
790        *hdr++ = 0;
791        *hdr++ = 0;
792        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
793    }
794
795    return 0;
796}
797
798
799static void out_string(conn *c, const char *str) {
800    size_t len;
801
802    assert(c != NULL);
803
804    if (c->noreply) {
805        if (settings.verbose > 1)
806            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
807        c->noreply = false;
808        conn_set_state(c, conn_read);
809        return;
810    }
811
812    if (settings.verbose > 1)
813        fprintf(stderr, ">%d %s\n", c->sfd, str);
814
815    len = strlen(str);
816    if ((len + 2) > c->wsize) {
817        /* ought to be always enough. just fail for simplicity */
818        str = "SERVER_ERROR output line too long";
819        len = strlen(str);
820    }
821
822    memcpy(c->wbuf, str, len);
823    memcpy(c->wbuf + len, "\r\n", 3);
824    c->wbytes = len + 2;
825    c->wcurr = c->wbuf;
826
827    conn_set_state(c, conn_write);
828    c->write_and_go = get_init_state(c);
829    return;
830}
831
832/*
833 * we get here after reading the value in set/add/replace commands. The command
834 * has been stored in c->item_comm, and the item is ready in c->item.
835 */
836static void complete_nread_ascii(conn *c) {
837    assert(c != NULL);
838
839    item *it = c->item;
840    int comm = c->item_comm;
841    int ret;
842
843    STATS_LOCK();
844    stats.set_cmds++;
845    STATS_UNLOCK();
846
847    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
848        out_string(c, "CLIENT_ERROR bad data chunk");
849    } else {
850      ret = store_item(it, comm);
851      if (ret == 1)
852          out_string(c, "STORED");
853      else if(ret == 2)
854          out_string(c, "EXISTS");
855      else if(ret == 3)
856          out_string(c, "NOT_FOUND");
857      else
858          out_string(c, "NOT_STORED");
859    }
860
861    item_remove(c->item);       /* release the c->item reference */
862    c->item = 0;
863}
864
865static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
866    int i=0;
867    uint32_t res_header[BIN_PKT_HDR_WORDS];
868
869    assert(c);
870    assert(body_len >= 0);
871
872    c->msgcurr = 0;
873    c->msgused = 0;
874    c->iovused = 0;
875    if (add_msghdr(c) != 0) {
876        /* XXX:  out_string is inappropriate here */
877        out_string(c, "SERVER_ERROR out of memory");
878        return;
879    }
880
881    res_header[0] = BIN_RES_MAGIC << 24;
882    res_header[0] |= ((0xff & c->cmd) << 16);
883    res_header[0] |= err & 0xffff;
884
885    res_header[1] = hdr_len << 24;
886    /* TODO:  Support datatype */
887    res_header[2] = body_len;
888    res_header[3] = c->opaque;
889
890    if(settings.verbose > 1) {
891        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
892            res_header[0], res_header[1], res_header[2], res_header[3]);
893    }
894
895    for(i=0; i<BIN_PKT_HDR_WORDS; i++) {
896        res_header[i] = htonl(res_header[i]);
897    }
898
899    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
900    memcpy(c->wbuf, &res_header, MIN_BIN_PKT_LENGTH);
901    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
902}
903
904static void write_bin_error(conn *c, int err, int swallow) {
905    char *errstr="Unknown error";
906    switch(err) {
907        case ERR_UNKNOWN_CMD:
908            errstr="Unknown command";
909            break;
910        case ERR_NOT_FOUND:
911            errstr="Not found";
912            break;
913        case ERR_INVALID_ARGUMENTS:
914            errstr="Invalid arguments";
915            break;
916        case ERR_EXISTS:
917            errstr="Data exists for key.";
918            break;
919        case ERR_TOO_LARGE:
920            errstr="Too large.";
921            break;
922        case ERR_NOT_STORED:
923            errstr="Not stored.";
924            break;
925        default:
926            errstr="UNHANDLED ERROR";
927            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
928    }
929    if(settings.verbose > 0) {
930        fprintf(stderr, "Writing an error:  %s\n", errstr);
931    }
932    add_bin_header(c, err, 0, strlen(errstr));
933    add_iov(c, errstr, strlen(errstr));
934
935    conn_set_state(c, conn_mwrite);
936    if(swallow > 0) {
937        c->sbytes=swallow;
938        c->write_and_go = conn_swallow;
939    } else {
940        c->write_and_go = conn_bin_init;
941    }
942}
943
944/* Form and send a response to a command over the binary protocol */
945static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
946    add_bin_header(c, 0, hlen, dlen);
947    if(dlen > 0) {
948        add_iov(c, d, dlen);
949    }
950    conn_set_state(c, conn_mwrite);
951    c->write_and_go = conn_bin_init;
952}
953
954/* Byte swap a 64-bit number */
955static int64_t swap64(int64_t in) {
956#ifdef ENDIAN_LITTLE
957    /* Little endian, flip the bytes around until someone makes a faster/better
958    * way to do this. */
959    int64_t rv=0;
960    int i=0;
961     for(i=0; i<8; i++) {
962        rv = (rv << 8) | (in & 0xff);
963        in >>= 8;
964     }
965    return rv;
966#else
967    /* big-endian machines don't need byte swapping */
968    return in;
969#endif
970}
971
972static void complete_incr_bin(conn *c) {
973    item *it;
974    int64_t delta;
975    uint64_t initial;
976    int32_t exptime;
977    char *key;
978    size_t nkey;
979    int i;
980    uint64_t *responseBuf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
981
982    assert(c != NULL);
983
984    key=c->rbuf + BIN_INCR_HDR_LEN;
985    nkey=c->keylen;
986    key[nkey]=0x00;
987
988    delta = swap64(*((int64_t*)(c->rbuf)));
989    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
990    exptime = ntohl(*((int*)(c->rbuf + 16)));
991
992    if(settings.verbose) {
993        fprintf(stderr, "incr ");
994        for(i=0; i<nkey; i++) {
995            fprintf(stderr, "%c", key[i]);
996        }
997        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
998    }
999
1000    /* XXX:  Not sure what to do with these yet
1001    if (settings.managed) {
1002        int bucket = c->bucket;
1003        if (bucket == -1) {
1004            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1005            return;
1006        }
1007        c->bucket = -1;
1008        if (buckets[bucket] != c->gen) {
1009            out_string(c, "ERROR_NOT_OWNER");
1010            return;
1011        }
1012    }
1013    */
1014
1015    it = item_get(key, nkey);
1016    if (it) {
1017        /* Weird magic in add_delta forces me to pad here */
1018        char tmpbuf[INCR_MAX_STORAGE_LEN];
1019        uint64_t l=0;
1020        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1021        tmpbuf[INCR_MAX_STORAGE_LEN]=0x00;
1022        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1023        *responseBuf=swap64(strtoull(tmpbuf, NULL, 10));
1024
1025        write_bin_response(c, responseBuf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1026        item_remove(it);         /* release our reference */
1027    } else {
1028        if(exptime >= 0) {
1029            /* Save some room for the response */
1030            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1031            *responseBuf=swap64(initial);
1032            it = item_alloc(key, nkey, 0, realtime(exptime),
1033                INCR_MAX_STORAGE_LEN);
1034            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1035
1036            if(store_item(it, NREAD_SET)) {
1037                write_bin_response(c, responseBuf, BIN_INCR_HDR_LEN,
1038                    INCR_RES_LEN);
1039            } else {
1040                write_bin_error(c, ERR_NOT_STORED, 0);
1041            }
1042            item_remove(it);         /* release our reference */
1043        } else {
1044            write_bin_error(c, ERR_NOT_FOUND, 0);
1045        }
1046    }
1047}
1048
1049static void complete_update_bin(conn *c) {
1050    int eno=-1, ret=0;
1051    assert(c != NULL);
1052
1053    item *it = c->item;
1054
1055    STATS_LOCK();
1056    stats.set_cmds++;
1057    STATS_UNLOCK();
1058
1059    /* We don't actually receive the trailing two characters in the bin
1060     * protocol, so we're going to just set them here */
1061    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1062    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1063
1064    switch (store_item(it, c->item_comm)) {
1065        case 1:
1066            /* Stored */
1067            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1068            break;
1069        case 2:
1070            write_bin_error(c, ERR_EXISTS, 0);
1071            break;
1072        case 3:
1073            write_bin_error(c, ERR_NOT_FOUND, 0);
1074            break;
1075        default:
1076            if(c->item_comm == NREAD_ADD) {
1077                eno=ERR_EXISTS;
1078            } else if(c->item_comm == NREAD_REPLACE) {
1079                eno=ERR_NOT_FOUND;
1080            } else {
1081                eno=ERR_NOT_STORED;
1082            }
1083            write_bin_error(c, eno, 0);
1084    }
1085
1086    item_remove(c->item);       /* release the c->item reference */
1087    c->item = 0;
1088}
1089
1090static void process_bin_get(conn *c) {
1091    item *it;
1092
1093    it = item_get(c->rbuf, c->keylen);
1094    if (it) {
1095        int *flags;
1096        uint64_t* identifier;
1097
1098        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1099
1100        /* This is a bit of magic.  I'm using wbuf as the header, so I'll place
1101        this is int in far enough to cover the header */
1102        flags=(int*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1103        *flags=htonl(strtoul(ITEM_suffix(it), NULL, 10));
1104
1105        /* the length has two unnecessary bytes, and then we write four more */
1106        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1107        /* Flags */
1108        add_iov(c, flags, 4);
1109        identifier=(uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH + 4);
1110        *identifier=swap64((uint32_t)it->cas_id);
1111        add_iov(c, identifier, 8);
1112        /* bytes minus the CRLF */
1113        add_iov(c, ITEM_data(it), it->nbytes - 2);
1114        conn_set_state(c, conn_mwrite);
1115    } else {
1116        if(c->cmd == CMD_GETQ) {
1117            conn_set_state(c, conn_bin_init);
1118        } else {
1119            write_bin_error(c, ERR_NOT_FOUND, 0);
1120        }
1121    }
1122}
1123
1124static void bin_read_key(conn *c, int next_substate, int extra) {
1125    assert(c);
1126    c->substate = next_substate;
1127    c->rlbytes = c->keylen + extra;
1128    assert(c->rsize >= c->rlbytes);
1129    c->ritem = c->rbuf;
1130    conn_set_state(c, conn_nread);
1131}
1132
1133static void dispatch_bin_command(conn *c) {
1134    time_t exptime = 0;
1135    switch(c->cmd) {
1136        case CMD_VERSION:
1137            write_bin_response(c, VERSION, 0, strlen(VERSION));
1138            break;
1139        case CMD_FLUSH:
1140            set_current_time();
1141
1142            settings.oldest_live = current_time - 1;
1143            item_flush_expired();
1144            write_bin_response(c, NULL, 0, 0);
1145            break;
1146        case CMD_NOOP:
1147            write_bin_response(c, NULL, 0, 0);
1148            break;
1149        case CMD_SET:
1150            /* Fallthrough */
1151        case CMD_ADD:
1152            /* Fallthrough */
1153        case CMD_REPLACE:
1154            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1155            break;
1156        case CMD_GETQ:
1157        case CMD_GET:
1158            bin_read_key(c, bin_reading_get_key, 0);
1159            break;
1160        case CMD_DELETE:
1161            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1162            break;
1163        case CMD_INCR:
1164        case CMD_DECR:
1165            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1166            break;
1167        default:
1168            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1169    }
1170}
1171
1172static void process_bin_update(conn *c) {
1173    char *key;
1174    int nkey;
1175    int vlen;
1176    int flags;
1177    int exptime;
1178    item *it;
1179    int comm;
1180    int hdrlen=BIN_SET_HDR_LEN;
1181
1182    assert(c != NULL);
1183
1184    key=c->rbuf + hdrlen;
1185    nkey=c->keylen;
1186    key[nkey]=0x00;
1187
1188    flags = ntohl(*((int*)(c->rbuf)));
1189    exptime = ntohl(*((int*)(c->rbuf + 4)));
1190    vlen = c->bin_header[2] - (nkey + hdrlen);
1191
1192    if(settings.verbose) {
1193        fprintf(stderr, "Value len is %d\n", vlen);
1194    }
1195
1196    if (settings.detail_enabled) {
1197        stats_prefix_record_set(key);
1198    }
1199
1200    /* Not sure what to do with this.
1201    if (settings.managed) {
1202        int bucket = c->bucket;
1203        if (bucket == -1) {
1204            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1205            return;
1206        }
1207        c->bucket = -1;
1208        if (buckets[bucket] != c->gen) {
1209            out_string(c, "ERROR_NOT_OWNER");
1210            return;
1211        }
1212    }
1213    */
1214
1215    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1216
1217    if (it == 0) {
1218        if (! item_size_ok(nkey, flags, vlen + 2)) {
1219            write_bin_error(c, ERR_TOO_LARGE, vlen);
1220        } else {
1221            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1222        }
1223        /* swallow the data line */
1224        c->write_and_go = conn_swallow;
1225        return;
1226    }
1227
1228    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
1229
1230    switch(c->cmd) {
1231        case CMD_ADD:
1232            c->item_comm = NREAD_ADD;
1233            break;
1234        case CMD_SET:
1235            c->item_comm = NREAD_SET;
1236            break;
1237        case CMD_REPLACE:
1238            c->item_comm = NREAD_REPLACE;
1239            break;
1240        default:
1241            assert(0);
1242    }
1243
1244    if(it->cas_id != 0) {
1245        c->item_comm = NREAD_CAS;
1246    }
1247
1248    c->item = it;
1249    c->ritem = ITEM_data(it);
1250    c->rlbytes = vlen;
1251    conn_set_state(c, conn_nread);
1252    c->substate = bin_read_set_value;
1253}
1254
1255static void process_bin_delete(conn *c) {
1256    char *key;
1257    size_t nkey;
1258    item *it;
1259    time_t exptime = 0;
1260
1261    assert(c != NULL);
1262
1263    /* XXX:  I don't know what to do with this yet
1264    if (settings.managed) {
1265        int bucket = c->bucket;
1266        if (bucket == -1) {
1267            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1268            return;
1269        }
1270        c->bucket = -1;
1271        if (buckets[bucket] != c->gen) {
1272            out_string(c, "ERROR_NOT_OWNER");
1273            return;
1274        }
1275    }
1276    */
1277
1278    exptime = ntohl(*((int*)(c->rbuf)));
1279    key = c->rbuf + 4;
1280    nkey = c->keylen;
1281    key[nkey]=0x00;
1282
1283    if(settings.verbose) {
1284        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1285    }
1286
1287    if (settings.detail_enabled) {
1288        stats_prefix_record_delete(key);
1289    }
1290
1291    it = item_get(key, nkey);
1292    if (it) {
1293        if (exptime == 0) {
1294            item_unlink(it);
1295            item_remove(it);      /* release our reference */
1296            write_bin_response(c, NULL, 0, 0);
1297        } else {
1298            /* XXX:  This is really lame, but defer_delete returns a string */
1299            char *res=defer_delete(it, exptime);
1300            if(res[0] == 'D') {
1301                write_bin_response(c, NULL, 0, 0);
1302            } else {
1303                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1304            }
1305        }
1306    } else {
1307        write_bin_error(c, ERR_NOT_FOUND, 0);
1308    }
1309}
1310
1311static void complete_nread_binary(conn *c) {
1312    assert(c != NULL);
1313
1314    if(c->cmd < 0) {
1315        /* No command defined.  Figure out what they're trying to say. */
1316        int i=0;
1317        /* I did a bit of hard-coding around the packet sizes */
1318        assert(BIN_PKT_HDR_WORDS == 3);
1319        for(i=0; i<BIN_PKT_HDR_WORDS; i++) {
1320            c->bin_header[i] = ntohl(c->bin_header[i]);
1321        }
1322        if(settings.verbose) {
1323            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x %08x\n",
1324                c->bin_header[0], c->bin_header[1], c->bin_header[2],
1325                c->bin_header[3]);
1326        }
1327        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
1328            if(settings.verbose) {
1329                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24);
1330            }
1331            conn_set_state(c, conn_closing);
1332            return;
1333        }
1334   
1335        c->msgcurr = 0;
1336        c->msgused = 0;
1337        c->iovused = 0;
1338        if (add_msghdr(c) != 0) {
1339            out_string(c, "SERVER_ERROR out of memory");
1340            return;
1341        }
1342   
1343        c->cmd = (c->bin_header[0] >> 16) & 0xff;
1344        c->keylen = c->bin_header[0] & 0xffff;
1345        c->opaque = c->bin_header[3];
1346        if(settings.verbose > 1) {
1347            fprintf(stderr,
1348                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd,
1349                c->opaque, c->keylen, c->bin_header[2]);
1350        }
1351        dispatch_bin_command(c);
1352    } else {
1353        switch(c->substate) {
1354            case bin_reading_set_header:
1355                process_bin_update(c);
1356                break;
1357            case bin_read_set_value:
1358                complete_update_bin(c);
1359                break;
1360            case bin_reading_get_key:
1361                process_bin_get(c);
1362                break;
1363            case bin_reading_del_header:
1364                process_bin_delete(c);
1365                break;
1366            case bin_reading_incr_header:
1367                complete_incr_bin(c);
1368                break;
1369            default:
1370                fprintf(stderr, "Not handling substate %d\n", c->substate);
1371                assert(0);
1372        }
1373    }
1374}
1375
1376static void reinit_bin_connection(conn *c) {
1377    if (settings.verbose > 0)
1378        fprintf(stderr, "*** Reinitializing binary connection.\n");
1379    c->rlbytes = MIN_BIN_PKT_LENGTH;
1380    c->write_and_go = conn_bin_init;
1381    c->cmd = -1;
1382    c->substate = bin_no_state;
1383    c->rbytes = c->wbytes = 0;
1384    c->ritem = (char*)c->bin_header;
1385    c->rcurr = c->rbuf;
1386    c->wcurr = c->wbuf;
1387    conn_shrink(c);
1388    conn_set_state(c, conn_nread);
1389}
1390
1391/* These do the initial protocol switch.  At this point, we should've read
1392 * exactly one byte, and must treat that byte as the beginning of a command. */
1393static void setup_bin_protocol(conn *c) {
1394    char *loc = (char*)c->bin_header;
1395    if (settings.verbose > 0)
1396        fprintf(stderr, "Negotiated protocol as binary.\n");
1397
1398    c->protocol = binary_prot;
1399    reinit_bin_connection(c);
1400    /* Emulate a read of the first byte */
1401    c->ritem[0] = c->rbuf[0];
1402    c->ritem++;
1403    c->rlbytes--;
1404}
1405
1406static void setup_ascii_protocol(conn *c) {
1407    if (settings.verbose > 0)
1408        fprintf(stderr, "Negotiated protocol as ascii.\n");
1409    c->protocol = ascii_prot;
1410
1411    /* We've already got the first letter of the command, so pretend like we
1412     * Did a single byte read from try_read_command */
1413    c->rcurr=c->rbuf;
1414    c->rbytes=1;
1415    conn_set_state(c, conn_read);
1416}
1417
1418static void complete_nread(conn *c) {
1419    assert(c != NULL);
1420
1421    if(c->protocol == ascii_prot) {
1422        complete_nread_ascii(c);
1423    } else if(c->protocol == binary_prot) {
1424        complete_nread_binary(c);
1425    } else if(c->protocol == negotiating_prot) {
1426        /* The first byte is either BIN_REQ_MAGIC, or we're speaking ascii */
1427        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC)
1428            setup_bin_protocol(c);
1429        else
1430            setup_ascii_protocol(c);
1431    } else {
1432        assert(0); /* XXX:  Invalid case.  Should probably do more here. */
1433    }
1434}
1435
1436/*
1437 * Stores an item in the cache according to the semantics of one of the set
1438 * commands. In threaded mode, this is protected by the cache lock.
1439 *
1440 * Returns true if the item was stored.
1441 */
1442int do_store_item(item *it, int comm) {
1443    char *key = ITEM_key(it);
1444    bool delete_locked = false;
1445    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1446    int stored = 0;
1447
1448    item *new_it = NULL;
1449    int flags;
1450
1451    if (old_it != NULL && comm == NREAD_ADD) {
1452        /* add only adds a nonexistent item, but promote to head of LRU */
1453        do_item_update(old_it);
1454    } else if (!old_it && (comm == NREAD_REPLACE
1455        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1456    {
1457        /* replace only replaces an existing value; don't store */
1458    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1459        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1460    {
1461        /* replace and add can't override delete locks; don't store */
1462    } else if (comm == NREAD_CAS) {
1463        /* validate cas operation */
1464        if (delete_locked)
1465            old_it = do_item_get_nocheck(key, it->nkey);
1466
1467        if(old_it == NULL) {
1468          // LRU expired
1469          stored = 3;
1470        }
1471        else if(it->cas_id == old_it->cas_id) {
1472          // cas validates
1473          do_item_replace(old_it, it);
1474          stored = 1;
1475        } else {
1476          if(settings.verbose > 1) {
1477            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1478                old_it->cas_id, it->cas_id);
1479          }
1480          stored = 2;
1481        }
1482    } else {
1483        /*
1484         * Append - combine new and old record into single one. Here it's
1485         * atomic and thread-safe.
1486         */
1487
1488        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1489
1490            /* we have it and old_it here - alloc memory to hold both */
1491            /* flags was already lost - so recover them from ITEM_suffix(it) */
1492
1493            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1494
1495            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1496
1497            if (new_it == NULL) {
1498                /* SERVER_ERROR out of memory */
1499                return 0;
1500            }
1501
1502            /* copy data from it and old_it to new_it */
1503
1504            if (comm == NREAD_APPEND) {
1505                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1506                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1507            } else {
1508                /* NREAD_PREPEND */
1509                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1510                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1511            }
1512
1513            it = new_it;
1514        }
1515
1516        /* "set" commands can override the delete lock
1517           window... in which case we have to find the old hidden item
1518           that's in the namespace/LRU but wasn't returned by
1519           item_get.... because we need to replace it */
1520        if (delete_locked)
1521            old_it = do_item_get_nocheck(key, it->nkey);
1522
1523        if (old_it != NULL)
1524            do_item_replace(old_it, it);
1525        else
1526            do_item_link(it);
1527
1528        stored = 1;
1529    }
1530
1531    if (old_it != NULL)
1532        do_item_remove(old_it);         /* release our reference */
1533    if (new_it != NULL)
1534        do_item_remove(new_it);
1535
1536    return stored;
1537}
1538
1539typedef struct token_s {
1540    char *value;
1541    size_t length;
1542} token_t;
1543
1544#define COMMAND_TOKEN 0
1545#define SUBCOMMAND_TOKEN 1
1546#define KEY_TOKEN 1
1547#define KEY_MAX_LENGTH 250
1548
1549#define MAX_TOKENS 8
1550
1551/*
1552 * Tokenize the command string by replacing whitespace with '\0' and update
1553 * the token array tokens with pointer to start of each token and length.
1554 * Returns total number of tokens.  The last valid token is the terminal
1555 * token (value points to the first unprocessed character of the string and
1556 * length zero).
1557 *
1558 * Usage example:
1559 *
1560 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1561 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1562 *          ...
1563 *      }
1564 *      ncommand = tokens[ix].value - command;
1565 *      command  = tokens[ix].value;
1566 *   }
1567 */
1568static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1569    char *s, *e;
1570    size_t ntokens = 0;
1571
1572    assert(command != NULL && tokens != NULL && max_tokens > 1);
1573
1574    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1575        if (*e == ' ') {
1576            if (s != e) {
1577                tokens[ntokens].value = s;
1578                tokens[ntokens].length = e - s;
1579                ntokens++;
1580                *e = '\0';
1581            }
1582            s = e + 1;
1583        }
1584        else if (*e == '\0') {
1585            if (s != e) {
1586                tokens[ntokens].value = s;
1587                tokens[ntokens].length = e - s;
1588                ntokens++;
1589            }
1590
1591            break; /* string end */
1592        }
1593    }
1594
1595    /*
1596     * If we scanned the whole string, the terminal value pointer is null,
1597     * otherwise it is the first unprocessed character.
1598     */
1599    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1600    tokens[ntokens].length = 0;
1601    ntokens++;
1602
1603    return ntokens;
1604}
1605
1606/* set up a connection to write a buffer then free it, used for stats */
1607static void write_and_free(conn *c, char *buf, int bytes) {
1608    if (buf) {
1609        c->write_and_free = buf;
1610        c->wcurr = buf;
1611        c->wbytes = bytes;
1612        conn_set_state(c, conn_write);
1613        c->write_and_go = get_init_state(c);
1614    } else {
1615        out_string(c, "SERVER_ERROR out of memory");
1616    }
1617}
1618
1619static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1620{
1621    int noreply_index = ntokens - 2;
1622
1623    /*
1624      NOTE: this function is not the first place where we are going to
1625      send the reply.  We could send it instead from process_command()
1626      if the request line has wrong number of tokens.  However parsing
1627      malformed line for "noreply" option is not reliable anyway, so
1628      it can't be helped.
1629    */
1630    if (tokens[noreply_index].value
1631        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1632        c->noreply = true;
1633    }
1634}
1635
1636inline static void process_stats_detail(conn *c, const char *command) {
1637    assert(c != NULL);
1638
1639    if (strcmp(command, "on") == 0) {
1640        settings.detail_enabled = 1;
1641        out_string(c, "OK");
1642    }
1643    else if (strcmp(command, "off") == 0) {
1644        settings.detail_enabled = 0;
1645        out_string(c, "OK");
1646    }
1647    else if (strcmp(command, "dump") == 0) {
1648        int len;
1649        char *stats = stats_prefix_dump(&len);
1650        write_and_free(c, stats, len);
1651    }
1652    else {
1653        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1654    }
1655}
1656
1657static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1658    rel_time_t now = current_time;
1659    char *command;
1660    char *subcommand;
1661
1662    assert(c != NULL);
1663
1664    if(ntokens < 2) {
1665        out_string(c, "CLIENT_ERROR bad command line");
1666        return;
1667    }
1668
1669    command = tokens[COMMAND_TOKEN].value;
1670
1671    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1672        char temp[1024];
1673        pid_t pid = getpid();
1674        char *pos = temp;
1675
1676#ifndef WIN32
1677        struct rusage usage;
1678        getrusage(RUSAGE_SELF, &usage);
1679#endif /* !WIN32 */
1680
1681        STATS_LOCK();
1682        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1683        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1684        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1685        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1686        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1687#ifndef WIN32
1688        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1689        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1690#endif /* !WIN32 */
1691        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1692        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1693        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1694        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1695        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1696        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1697        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1698        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1699        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1700        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1701        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1702        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1703        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1704        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1705        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1706        pos += sprintf(pos, "END");
1707        STATS_UNLOCK();
1708        out_string(c, temp);
1709        return;
1710    }
1711
1712    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1713
1714    if (strcmp(subcommand, "reset") == 0) {
1715        stats_reset();
1716        out_string(c, "RESET");
1717        return;
1718    }
1719
1720#ifdef HAVE_MALLOC_H
1721#ifdef HAVE_STRUCT_MALLINFO
1722    if (strcmp(subcommand, "malloc") == 0) {
1723        char temp[512];
1724        struct mallinfo info;
1725        char *pos = temp;
1726
1727        info = mallinfo();
1728        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1729        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1730        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1731        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1732        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1733        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1734        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1735        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1736        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1737        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1738        out_string(c, temp);
1739        return;
1740    }
1741#endif /* HAVE_STRUCT_MALLINFO */
1742#endif /* HAVE_MALLOC_H */
1743
1744#if !defined(WIN32) || !defined(__APPLE__)
1745    if (strcmp(subcommand, "maps") == 0) {
1746        char *wbuf;
1747        int wsize = 8192; /* should be enough */
1748        int fd;
1749        int res;
1750
1751        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1752            out_string(c, "SERVER_ERROR out of memory");
1753            return;
1754        }
1755
1756        fd = open("/proc/self/maps", O_RDONLY);
1757        if (fd == -1) {
1758            out_string(c, "SERVER_ERROR cannot open the maps file");
1759            free(wbuf);
1760            return;
1761        }
1762
1763        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1764        if (res == wsize - 6) {
1765            out_string(c, "SERVER_ERROR buffer overflow");
1766            free(wbuf); close(fd);
1767            return;
1768        }
1769        if (res == 0 || res == -1) {
1770            out_string(c, "SERVER_ERROR can't read the maps file");
1771            free(wbuf); close(fd);
1772            return;
1773        }
1774        memcpy(wbuf + res, "END\r\n", 5);
1775        write_and_free(c, wbuf, res + 5);
1776        close(fd);
1777        return;
1778    }
1779#endif
1780
1781    if (strcmp(subcommand, "cachedump") == 0) {
1782
1783        char *buf;
1784        unsigned int bytes, id, limit = 0;
1785
1786        if(ntokens < 5) {
1787            out_string(c, "CLIENT_ERROR bad command line");
1788            return;
1789        }
1790
1791        id = strtoul(tokens[2].value, NULL, 10);
1792        limit = strtoul(tokens[3].value, NULL, 10);
1793
1794        if(errno == ERANGE) {
1795            out_string(c, "CLIENT_ERROR bad command line format");
1796            return;
1797        }
1798
1799        buf = item_cachedump(id, limit, &bytes);
1800        write_and_free(c, buf, bytes);
1801        return;
1802    }
1803
1804    if (strcmp(subcommand, "slabs") == 0) {
1805        int bytes = 0;
1806        char *buf = slabs_stats(&bytes);
1807        write_and_free(c, buf, bytes);
1808        return;
1809    }
1810
1811    if (strcmp(subcommand, "items") == 0) {
1812        int bytes = 0;
1813        char *buf = item_stats(&bytes);
1814        write_and_free(c, buf, bytes);
1815        return;
1816    }
1817
1818    if (strcmp(subcommand, "detail") == 0) {
1819        if (ntokens < 4)
1820            process_stats_detail(c, "");  /* outputs the error message */
1821        else
1822            process_stats_detail(c, tokens[2].value);
1823        return;
1824    }
1825
1826    if (strcmp(subcommand, "sizes") == 0) {
1827        int bytes = 0;
1828        char *buf = item_stats_sizes(&bytes);
1829        write_and_free(c, buf, bytes);
1830        return;
1831    }
1832
1833    out_string(c, "ERROR");
1834}
1835
1836/* ntokens is overwritten here... shrug.. */
1837static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1838    char *key;
1839    size_t nkey;
1840    int i = 0;
1841    item *it;
1842    token_t *key_token = &tokens[KEY_TOKEN];
1843    char *suffix;
1844    int stats_get_cmds   = 0;
1845    int stats_get_hits   = 0;
1846    int stats_get_misses = 0;
1847    assert(c != NULL);
1848
1849    if (settings.managed) {
1850        int bucket = c->bucket;
1851        if (bucket == -1) {
1852            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1853            return;
1854        }
1855        c->bucket = -1;
1856        if (buckets[bucket] != c->gen) {
1857            out_string(c, "ERROR_NOT_OWNER");
1858            return;
1859        }
1860    }
1861
1862    do {
1863        while(key_token->length != 0) {
1864
1865            key = key_token->value;
1866            nkey = key_token->length;
1867
1868            if(nkey > KEY_MAX_LENGTH) {
1869                STATS_LOCK();
1870                stats.get_cmds   += stats_get_cmds;
1871                stats.get_hits   += stats_get_hits;
1872                stats.get_misses += stats_get_misses;
1873                STATS_UNLOCK();
1874                out_string(c, "CLIENT_ERROR bad command line format");
1875                return;
1876            }
1877
1878            stats_get_cmds++;
1879            it = item_get(key, nkey);
1880            if (settings.detail_enabled) {
1881                stats_prefix_record_get(key, NULL != it);
1882            }
1883            if (it) {
1884                if (i >= c->isize) {
1885                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1886                    if (new_list) {
1887                        c->isize *= 2;
1888                        c->ilist = new_list;
1889                    } else break;
1890                }
1891
1892                /*
1893                 * Construct the response. Each hit adds three elements to the
1894                 * outgoing data list:
1895                 *   "VALUE "
1896                 *   key
1897                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1898                 */
1899
1900                if(return_cas == true)
1901                {
1902                  /* Goofy mid-flight realloc. */
1903                  if (i >= c->suffixsize) {
1904                    char **new_suffix_list = realloc(c->suffixlist,
1905                                           sizeof(char *) * c->suffixsize * 2);
1906                    if (new_suffix_list) {
1907                      c->suffixsize *= 2;
1908                      c->suffixlist  = new_suffix_list;
1909                    } else break;
1910                  }
1911
1912                  suffix = suffix_from_freelist();
1913                  if (suffix == NULL) {
1914                    STATS_LOCK();
1915                    stats.get_cmds   += stats_get_cmds;
1916                    stats.get_hits   += stats_get_hits;
1917                    stats.get_misses += stats_get_misses;
1918                    STATS_UNLOCK();
1919                    out_string(c, "SERVER_ERROR out of memory");
1920                    return;
1921                  }
1922                  *(c->suffixlist + i) = suffix;
1923                  sprintf(suffix, " %llu\r\n", it->cas_id);
1924                  if (add_iov(c, "VALUE ", 6) != 0 ||
1925                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1926                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1927                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1928                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1929                      {
1930                          break;
1931                      }
1932                }
1933                else
1934                {
1935                  if (add_iov(c, "VALUE ", 6) != 0 ||
1936                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1937                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1938                      {
1939                          break;
1940                      }
1941                }
1942
1943
1944                if (settings.verbose > 1)
1945                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1946
1947                /* item_get() has incremented it->refcount for us */
1948                stats_get_hits++;
1949                item_update(it);
1950                *(c->ilist + i) = it;
1951                i++;
1952
1953            } else {
1954                stats_get_misses++;
1955            }
1956
1957            key_token++;
1958        }
1959
1960        /*
1961         * If the command string hasn't been fully processed, get the next set
1962         * of tokens.
1963         */
1964        if(key_token->value != NULL) {
1965            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1966            key_token = tokens;
1967        }
1968
1969    } while(key_token->value != NULL);
1970
1971    c->icurr = c->ilist;
1972    c->ileft = i;
1973    if (return_cas) {
1974        c->suffixcurr = c->suffixlist;
1975        c->suffixleft = i;
1976    }
1977
1978    if (settings.verbose > 1)
1979        fprintf(stderr, ">%d END\n", c->sfd);
1980
1981    /*
1982        If the loop was terminated because of out-of-memory, it is not
1983        reliable to add END\r\n to the buffer, because it might not end
1984        in \r\n. So we send SERVER_ERROR instead.
1985    */
1986    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1987        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1988        out_string(c, "SERVER_ERROR out of memory");
1989    }
1990    else {
1991        conn_set_state(c, conn_mwrite);
1992        c->msgcurr = 0;
1993    }
1994
1995    STATS_LOCK();
1996    stats.get_cmds   += stats_get_cmds;
1997    stats.get_hits   += stats_get_hits;
1998    stats.get_misses += stats_get_misses;
1999    STATS_UNLOCK();
2000
2001    return;
2002}
2003
2004static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2005    char *key;
2006    size_t nkey;
2007    int flags;
2008    time_t exptime;
2009    int vlen, old_vlen;
2010    uint64_t req_cas_id;
2011    item *it, *old_it;
2012
2013    assert(c != NULL);
2014
2015    set_noreply_maybe(c, tokens, ntokens);
2016
2017    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2018        out_string(c, "CLIENT_ERROR bad command line format");
2019        return;
2020    }
2021
2022    key = tokens[KEY_TOKEN].value;
2023    nkey = tokens[KEY_TOKEN].length;
2024
2025    flags = strtoul(tokens[2].value, NULL, 10);
2026    exptime = strtol(tokens[3].value, NULL, 10);
2027    vlen = strtol(tokens[4].value, NULL, 10);
2028
2029    // does cas value exist?
2030    if(handle_cas)
2031    {
2032      req_cas_id = strtoull(tokens[5].value, NULL, 10);
2033    }
2034
2035    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
2036        out_string(c, "CLIENT_ERROR bad command line format");
2037        return;
2038    }
2039
2040    if (settings.detail_enabled) {
2041        stats_prefix_record_set(key);
2042    }
2043
2044    if (settings.managed) {
2045        int bucket = c->bucket;
2046        if (bucket == -1) {
2047            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2048            return;
2049        }
2050        c->bucket = -1;
2051        if (buckets[bucket] != c->gen) {
2052            out_string(c, "ERROR_NOT_OWNER");
2053            return;
2054        }
2055    }
2056
2057    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
2058
2059    if (it == 0) {
2060        if (! item_size_ok(nkey, flags, vlen + 2))
2061            out_string(c, "SERVER_ERROR object too large for cache");
2062        else
2063            out_string(c, "SERVER_ERROR out of memory");
2064        /* swallow the data line */
2065        c->write_and_go = conn_swallow;
2066        c->sbytes = vlen + 2;
2067        return;
2068    }
2069    if(handle_cas)
2070      it->cas_id = req_cas_id;
2071
2072    c->item = it;
2073    c->ritem = ITEM_data(it);
2074    c->rlbytes = it->nbytes;
2075    c->item_comm = comm;
2076    conn_set_state(c, conn_nread);
2077}
2078
2079static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2080    char temp[sizeof("18446744073709551615")];
2081    item *it;
2082    int64_t delta;
2083    char *key;
2084    size_t nkey;
2085
2086    assert(c != NULL);
2087
2088    set_noreply_maybe(c, tokens, ntokens);
2089
2090    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2091        out_string(c, "CLIENT_ERROR bad command line format");
2092        return;
2093    }
2094
2095    key = tokens[KEY_TOKEN].value;
2096    nkey = tokens[KEY_TOKEN].length;
2097
2098    if (settings.managed) {
2099        int bucket = c->bucket;
2100        if (bucket == -1) {
2101            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2102            return;
2103        }
2104        c->bucket = -1;
2105        if (buckets[bucket] != c->gen) {
2106            out_string(c, "ERROR_NOT_OWNER");
2107            return;
2108        }
2109    }
2110
2111    delta = strtoll(tokens[2].value, NULL, 10);
2112
2113    if(errno == ERANGE) {
2114        out_string(c, "CLIENT_ERROR bad command line format");
2115        return;
2116    }
2117
2118    it = item_get(key, nkey);
2119    if (!it) {
2120        out_string(c, "NOT_FOUND");
2121        return;
2122    }
2123
2124    out_string(c, add_delta(it, incr, delta, temp));
2125    item_remove(it);         /* release our reference */
2126}
2127
2128/*
2129 * adds a delta value to a numeric item.
2130 *
2131 * it    item to adjust
2132 * incr  true to increment value, false to decrement
2133 * delta amount to adjust value by
2134 * buf   buffer for response string
2135 *
2136 * returns a response string to send back to the client.
2137 */
2138char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2139    char *ptr;
2140    int64_t value;
2141    int res;
2142
2143    ptr = ITEM_data(it);
2144    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2145
2146    value = strtoull(ptr, NULL, 10);
2147
2148    if(errno == ERANGE) {
2149        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2150    }
2151
2152    if (incr)
2153        value += delta;
2154    else {
2155        value -= delta;
2156    }
2157    if(value < 0) {
2158        value=0;
2159    }
2160    sprintf(buf, "%llu", value);
2161    res = strlen(buf);
2162    if (res + 2 > it->nbytes) { /* need to realloc */
2163        item *new_it;
2164        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2165        if (new_it == 0) {
2166            return "SERVER_ERROR out of memory";
2167        }
2168        memcpy(ITEM_data(new_it), buf, res);
2169        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2170        do_item_replace(it, new_it);
2171        do_item_remove(new_it);       /* release our reference */
2172    } else { /* replace in-place */
2173        memcpy(ITEM_data(it), buf, res);
2174        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2175    }
2176
2177    return buf;
2178}
2179
2180static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2181    char *key;
2182    size_t nkey;
2183    item *it;
2184    time_t exptime = 0;
2185
2186    assert(c != NULL);
2187
2188    set_noreply_maybe(c, tokens, ntokens);
2189
2190    if (settings.managed) {
2191        int bucket = c->bucket;
2192        if (bucket == -1) {
2193            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2194            return;
2195        }
2196        c->bucket = -1;
2197        if (buckets[bucket] != c->gen) {
2198            out_string(c, "ERROR_NOT_OWNER");
2199            return;
2200        }
2201    }
2202
2203    key = tokens[KEY_TOKEN].value;
2204    nkey = tokens[KEY_TOKEN].length;
2205
2206    if(nkey > KEY_MAX_LENGTH) {
2207        out_string(c, "CLIENT_ERROR bad command line format");
2208        return;
2209    }
2210
2211    if(ntokens == (c->noreply ? 5 : 4)) {
2212        exptime = strtol(tokens[2].value, NULL, 10);
2213
2214        if(errno == ERANGE) {
2215            out_string(c, "CLIENT_ERROR bad command line format");
2216            return;
2217        }
2218    }
2219
2220    if (settings.detail_enabled) {
2221        stats_prefix_record_delete(key);
2222    }
2223
2224    it = item_get(key, nkey);
2225    if (it) {
2226        if (exptime == 0) {
2227            item_unlink(it);
2228            item_remove(it);      /* release our reference */
2229            out_string(c, "DELETED");
2230        } else {
2231            /* our reference will be transfered to the delete queue */
2232            out_string(c, defer_delete(it, exptime));
2233        }
2234    } else {
2235        out_string(c, "NOT_FOUND");
2236    }
2237}
2238
2239/*
2240 * Adds an item to the deferred-delete list so it can be reaped later.
2241 *
2242 * Returns the result to send to the client.
2243 */
2244char *do_defer_delete(item *it, time_t exptime)
2245{
2246    if (delcurr >= deltotal) {
2247        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2248        if (new_delete) {
2249            todelete = new_delete;
2250            deltotal *= 2;
2251        } else {
2252            /*
2253             * can't delete it immediately, user wants a delay,
2254             * but we ran out of memory for the delete queue
2255             */
2256            item_remove(it);    /* release reference */
2257            return "SERVER_ERROR out of memory";
2258        }
2259    }
2260
2261    /* use its expiration time as its deletion time now */
2262    it->exptime = realtime(exptime);
2263    it->it_flags |= ITEM_DELETED;
2264    todelete[delcurr++] = it;
2265
2266    return "DELETED";
2267}
2268
2269static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2270    unsigned int level;
2271
2272    assert(c != NULL);
2273
2274    set_noreply_maybe(c, tokens, ntokens);
2275
2276    level = strtoul(tokens[1].value, NULL, 10);
2277    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2278    out_string(c, "OK");
2279    return;
2280}
2281
2282static void process_command(conn *c, char *command) {
2283
2284    token_t tokens[MAX_TOKENS];
2285    size_t ntokens;
2286    int comm;
2287
2288    assert(c != NULL);
2289
2290    if (settings.verbose > 1)
2291        fprintf(stderr, "<%d %s\n", c->sfd, command);
2292
2293    /*
2294     * for commands set/add/replace, we build an item and read the data
2295     * directly into it, then continue in nread_complete().
2296     */
2297
2298    c->msgcurr = 0;
2299    c->msgused = 0;
2300    c->iovused = 0;
2301    if (add_msghdr(c) != 0) {
2302        out_string(c, "SERVER_ERROR out of memory");
2303        return;
2304    }
2305
2306    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2307    if (ntokens >= 3 &&
2308        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2309         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2310
2311        process_get_command(c, tokens, ntokens, false);
2312
2313    } else if ((ntokens == 6 || ntokens == 7) &&
2314               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2315                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2316                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2317                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2318                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2319
2320        process_update_command(c, tokens, ntokens, comm, false);
2321
2322    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2323
2324        process_update_command(c, tokens, ntokens, comm, true);
2325
2326    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2327
2328        process_arithmetic_command(c, tokens, ntokens, 1);
2329
2330    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2331
2332        process_get_command(c, tokens, ntokens, true);
2333
2334    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2335
2336        process_arithmetic_command(c, tokens, ntokens, 0);
2337
2338    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2339
2340        process_delete_command(c, tokens, ntokens);
2341
2342    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2343        unsigned int bucket, gen;
2344        if (!settings.managed) {
2345            out_string(c, "CLIENT_ERROR not a managed instance");
2346            return;
2347        }
2348
2349        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2350            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2351                out_string(c, "CLIENT_ERROR bucket number out of range");
2352                return;
2353            }
2354            buckets[bucket] = gen;
2355            out_string(c, "OWNED");
2356            return;
2357        } else {
2358            out_string(c, "CLIENT_ERROR bad format");
2359            return;
2360        }
2361
2362    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2363
2364        int bucket;
2365        if (!settings.managed) {
2366            out_string(c, "CLIENT_ERROR not a managed instance");
2367            return;
2368        }
2369        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2370            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2371                out_string(c, "CLIENT_ERROR bucket number out of range");
2372                return;
2373            }
2374            buckets[bucket] = 0;
2375            out_string(c, "DISOWNED");
2376            return;
2377        } else {
2378            out_string(c, "CLIENT_ERROR bad format");
2379            return;
2380        }
2381
2382    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2383        int bucket, gen;
2384        if (!settings.managed) {
2385            out_string(c, "CLIENT_ERROR not a managed instance");
2386            return;
2387        }
2388        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2389            /* we never write anything back, even if input's wrong */
2390            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2391                /* do nothing, bad input */
2392            } else {
2393                c->bucket = bucket;
2394                c->gen = gen;
2395            }
2396            conn_set_init_state(c);
2397            return;
2398        } else {
2399            out_string(c, "CLIENT_ERROR bad format");
2400            return;
2401        }
2402
2403    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2404
2405        process_stat(c, tokens, ntokens);
2406
2407    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2408        time_t exptime = 0;
2409        set_current_time();
2410
2411        set_noreply_maybe(c, tokens, ntokens);
2412
2413        if(ntokens == (c->noreply ? 3 : 2)) {
2414            settings.oldest_live = current_time - 1;
2415            item_flush_expired();
2416            out_string(c, "OK");
2417            return;
2418        }
2419
2420        exptime = strtol(tokens[1].value, NULL, 10);
2421        if(errno == ERANGE) {
2422            out_string(c, "CLIENT_ERROR bad command line format");
2423            return;
2424        }
2425
2426        /*
2427          If exptime is zero realtime() would return zero too, and
2428          realtime(exptime) - 1 would overflow to the max unsigned
2429          value.  So we process exptime == 0 the same way we do when
2430          no delay is given at all.
2431        */
2432        if (exptime > 0)
2433            settings.oldest_live = realtime(exptime) - 1;
2434        else /* exptime == 0 */
2435            settings.oldest_live = current_time - 1;
2436        item_flush_expired();
2437        out_string(c, "OK");
2438        return;
2439
2440    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2441
2442        out_string(c, "VERSION " VERSION);
2443
2444    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2445
2446        conn_set_state(c, conn_closing);
2447
2448    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2449                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2450#ifdef ALLOW_SLABS_REASSIGN
2451
2452        int src, dst, rv;
2453
2454        src = strtol(tokens[2].value, NULL, 10);
2455        dst  = strtol(tokens[3].value, NULL, 10);
2456
2457        if(errno == ERANGE) {
2458            out_string(c, "CLIENT_ERROR bad command line format");
2459            return;
2460        }
2461
2462        rv = slabs_reassign(src, dst);
2463        if (rv == 1) {
2464            out_string(c, "DONE");
2465            return;
2466        }
2467        if (rv == 0) {
2468            out_string(c, "CANT");
2469            return;
2470        }
2471        if (rv == -1) {
2472            out_string(c, "BUSY");
2473            return;
2474        }
2475#else
2476        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2477#endif
2478    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2479        process_verbosity_command(c, tokens, ntokens);
2480    } else {
2481        out_string(c, "ERROR");
2482    }
2483    return;
2484}
2485
2486/*
2487 * if we have a complete line in the buffer, process it.
2488 */
2489static int try_read_command(conn *c) {
2490    char *el, *cont;
2491
2492    assert(c != NULL);
2493    assert(c->rcurr <= (c->rbuf + c->rsize));
2494
2495    if (c->rbytes == 0)
2496        return 0;
2497    el = memchr(c->rcurr, '\n', c->rbytes);
2498    if (!el)
2499        return 0;
2500    cont = el + 1;
2501    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2502        el--;
2503    }
2504    *el = '\0';
2505
2506    assert(cont <= (c->rcurr + c->rbytes));
2507
2508    process_command(c, c->rcurr);
2509
2510    c->rbytes -= (cont - c->rcurr);
2511    c->rcurr = cont;
2512
2513    assert(c->rcurr <= (c->rbuf + c->rsize));
2514
2515    return 1;
2516}
2517
2518/*
2519 * read a UDP request.
2520 * return 0 if there's nothing to read.
2521 */
2522static int try_read_udp(conn *c) {
2523    int res;
2524
2525    assert(c != NULL);
2526
2527    c->request_addr_size = sizeof(c->request_addr);
2528    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2529                   0, &c->request_addr, &c->request_addr_size);
2530    if (res > 8) {
2531        unsigned char *buf = (unsigned char *)c->rbuf;
2532        STATS_LOCK();
2533        stats.bytes_read += res;
2534        STATS_UNLOCK();
2535
2536        /* Beginning of UDP packet is the request ID; save it. */
2537        c->request_id = buf[0] * 256 + buf[1];
2538
2539        /* If this is a multi-packet request, drop it. */
2540        if (buf[4] != 0 || buf[5] != 1) {
2541            out_string(c, "SERVER_ERROR multi-packet request not supported");
2542            return 0;
2543        }
2544
2545        /* Don't care about any of the rest of the header. */
2546        res -= 8;
2547        memmove(c->rbuf, c->rbuf + 8, res);
2548
2549        c->rbytes += res;
2550        c->rcurr = c->rbuf;
2551        return 1;
2552    }
2553    return 0;
2554}
2555
2556/*
2557 * read from network as much as we can, handle buffer overflow and connection
2558 * close.
2559 * before reading, move the remaining incomplete fragment of a command
2560 * (if any) to the beginning of the buffer.
2561 * return 0 if there's nothing to read on the first read.
2562 */
2563static int try_read_network(conn *c) {
2564    int gotdata = 0;
2565    int res;
2566
2567    assert(c != NULL);
2568
2569    if (c->rcurr != c->rbuf) {
2570        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2571            memmove(c->rbuf, c->rcurr, c->rbytes);
2572        c->rcurr = c->rbuf;
2573    }
2574
2575    while (1) {
2576        if (c->rbytes >= c->rsize) {
2577            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2578            if (!new_rbuf) {
2579                if (settings.verbose > 0)
2580                    fprintf(stderr, "Couldn't realloc input buffer\n");
2581                c->rbytes = 0; /* ignore what we read */
2582                out_string(c, "SERVER_ERROR out of memory");
2583                c->write_and_go = conn_closing;
2584                return 1;
2585            }
2586            c->rcurr = c->rbuf = new_rbuf;
2587            c->rsize *= 2;
2588        }
2589
2590        int avail = c->rsize - c->rbytes;
2591        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2592        if (res > 0) {
2593            STATS_LOCK();
2594            stats.bytes_read += res;
2595            STATS_UNLOCK();
2596            gotdata = 1;
2597            c->rbytes += res;
2598            if (res == avail) {
2599                continue;
2600            } else {
2601                break;
2602            }
2603        }
2604        if (res == 0) {
2605            /* connection closed */
2606            conn_set_state(c, conn_closing);
2607            return 1;
2608        }
2609        if (res == -1) {
2610            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
2611            /* Should close on unhandled errors. */
2612            conn_set_state(c, conn_closing);
2613            return 1;
2614        }
2615    }
2616    return gotdata;
2617}
2618
2619static bool update_event(conn *c, const int new_flags) {
2620    assert(c != NULL);
2621
2622    struct event_base *base = c->event.ev_base;
2623    if (c->ev_flags == new_flags)
2624        return true;
2625    if (event_del(&c->event) == -1) return false;
2626    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2627    event_base_set(base, &c->event);
2628    c->ev_flags = new_flags;
2629    if (event_add(&c->event, 0) == -1) return false;
2630    return true;
2631}
2632
2633/*
2634 * Sets whether we are listening for new connections or not.
2635 */
2636void accept_new_conns(const bool do_accept) {
2637    conn *next;
2638
2639    if (! is_listen_thread())
2640        return;
2641
2642    for (next = listen_conn; next; next = next->next) {
2643        if (do_accept) {
2644            update_event(next, EV_READ | EV_PERSIST);
2645            if (listen(next->sfd, 1024) != 0) {
2646                perror("listen");
2647            }
2648        }
2649        else {
2650            update_event(next, 0);
2651            if (listen(next->sfd, 0) != 0) {
2652                perror("listen");
2653            }
2654        }
2655  }
2656}
2657
2658/*
2659 * Transmit the next chunk of data from our list of msgbuf structures.
2660 *
2661 * Returns:
2662 *   TRANSMIT_COMPLETE   All done writing.
2663 *   TRANSMIT_INCOMPLETE More data remaining to write.
2664 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2665 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2666 */
2667static int transmit(conn *c) {
2668    assert(c != NULL);
2669
2670    if (c->msgcurr < c->msgused &&
2671            c->msglist[c->msgcurr].msg_iovlen == 0) {
2672        /* Finished writing the current msg; advance to the next. */
2673        c->msgcurr++;
2674    }
2675    if (c->msgcurr < c->msgused) {
2676        ssize_t res;
2677        struct msghdr *m = &c->msglist[c->msgcurr];
2678
2679        res = sendmsg(c->sfd, m, 0);
2680        if (res > 0) {
2681            STATS_LOCK();
2682            stats.bytes_written += res;
2683            STATS_UNLOCK();
2684
2685            /* We've written some of the data. Remove the completed
2686               iovec entries from the list of pending writes. */
2687            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2688                res -= m->msg_iov->iov_len;
2689                m->msg_iovlen--;
2690                m->msg_iov++;
2691            }
2692
2693            /* Might have written just part of the last iovec entry;
2694               adjust it so the next write will do the rest. */
2695            if (res > 0) {
2696                m->msg_iov->iov_base += res;
2697                m->msg_iov->iov_len -= res;
2698            }
2699            return TRANSMIT_INCOMPLETE;
2700        }
2701        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2702            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2703                if (settings.verbose > 0)
2704                    fprintf(stderr, "Couldn't update event\n");
2705                conn_set_state(c, conn_closing);
2706                return TRANSMIT_HARD_ERROR;
2707            }
2708            return TRANSMIT_SOFT_ERROR;
2709        }
2710        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2711           we have a real error, on which we close the connection */
2712        if (settings.verbose > 0)
2713            perror("Failed to write, and not due to blocking");
2714
2715        if (IS_UDP(c->protocol))
2716            conn_set_state(c, conn_read);
2717        else
2718            conn_set_state(c, conn_closing);
2719        return TRANSMIT_HARD_ERROR;
2720    } else {
2721        return TRANSMIT_COMPLETE;
2722    }
2723}
2724
2725static void drive_machine(conn *c) {
2726    bool stop = false;
2727    int sfd, flags = 1;
2728    int init_state; /* initial state for a new connection */
2729    socklen_t addrlen;
2730    struct sockaddr_storage addr;
2731    int res;
2732
2733    assert(c != NULL);
2734
2735    while (!stop) {
2736
2737        switch(c->state) {
2738        case conn_listening:
2739            addrlen = sizeof(addr);
2740            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2741                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2742                    /* these are transient, so don't log anything */
2743                    stop = true;
2744                } else if (errno == EMFILE) {
2745                    if (settings.verbose > 0)
2746                        fprintf(stderr, "Too many open connections\n");
2747                    accept_new_conns(false);
2748                    stop = true;
2749                } else {
2750                    perror("accept()");
2751                    stop = true;
2752                }
2753                break;
2754            }
2755            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2756                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2757                perror("setting O_NONBLOCK");
2758                close(sfd);
2759                break;
2760            }
2761            init_state = get_init_state(c);
2762
2763            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST,
2764                                     DATA_BUFFER_SIZE, c->protocol);
2765
2766            break;
2767
2768        case conn_negotiate:
2769            if (settings.verbose > 0)
2770                fprintf(stderr, "Negotiating protocol for a new connection\n");
2771            c->rlbytes = 1;
2772            c->ritem = c->rbuf;
2773            c->rcurr = c->rbuf;
2774            c->wcurr = c->wbuf;
2775            conn_set_state(c, conn_nread);
2776            break;
2777
2778        case conn_read:
2779            if (try_read_command(c) != 0) {
2780                continue;
2781            }
2782            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) {
2783                continue;
2784            }
2785            /* we have no command line and no data to read from network */
2786            if (!update_event(c, EV_READ | EV_PERSIST)) {
2787                if (settings.verbose > 0)
2788                    fprintf(stderr, "Couldn't update event\n");
2789                conn_set_state(c, conn_closing);
2790                break;
2791            }
2792            stop = true;
2793            break;
2794
2795        case conn_bin_init: /* Reinitialize a binary connection */
2796            reinit_bin_connection(c);
2797            break;
2798
2799        case conn_nread:
2800            if (c->rlbytes == 0) {
2801                complete_nread(c);
2802                break;
2803            }
2804            /* first check if we have leftovers in the conn_read buffer */
2805            if (c->rbytes > 0) {
2806                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2807                memcpy(c->ritem, c->rcurr, tocopy);
2808                c->ritem += tocopy;
2809                c->rlbytes -= tocopy;
2810                c->rcurr += tocopy;
2811                c->rbytes -= tocopy;
2812                break;
2813            }
2814
2815            /*  now try reading from the socket */
2816            res = read(c->sfd, c->ritem, c->rlbytes);
2817            if (res > 0) {
2818                STATS_LOCK();
2819                stats.bytes_read += res;
2820                STATS_UNLOCK();
2821                c->ritem += res;
2822                c->rlbytes -= res;
2823                break;
2824            }
2825            if (res == 0) { /* end of stream */
2826                conn_set_state(c, conn_closing);
2827                break;
2828            }
2829            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2830                if (!update_event(c, EV_READ | EV_PERSIST)) {
2831                    if (settings.verbose > 0)
2832                        fprintf(stderr, "Couldn't update event\n");
2833                    conn_set_state(c, conn_closing);
2834                    break;
2835                }
2836                stop = true;
2837                break;
2838            }
2839            /* otherwise we have a real error, on which we close the connection */
2840            if (settings.verbose > 0)
2841                fprintf(stderr, "Failed to read, and not due to blocking\n");
2842            conn_set_state(c, conn_closing);
2843            break;
2844
2845        case conn_swallow:
2846            /* we are reading sbytes and throwing them away */
2847            if (c->sbytes == 0) {
2848                conn_set_init_state(c);
2849                break;
2850            }
2851
2852            /* first check if we have leftovers in the conn_read buffer */
2853            if (c->rbytes > 0) {
2854                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2855                c->sbytes -= tocopy;
2856                c->rcurr += tocopy;
2857                c->rbytes -= tocopy;
2858                break;
2859            }
2860
2861            /*  now try reading from the socket */
2862            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2863            if (res > 0) {
2864                STATS_LOCK();
2865                stats.bytes_read += res;
2866                STATS_UNLOCK();
2867                c->sbytes -= res;
2868                break;
2869            }
2870            if (res == 0) { /* end of stream */
2871                conn_set_state(c, conn_closing);
2872                break;
2873            }
2874            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2875                if (!update_event(c, EV_READ | EV_PERSIST)) {
2876                    if (settings.verbose > 0)
2877                        fprintf(stderr, "Couldn't update event\n");
2878                    conn_set_state(c, conn_closing);
2879                    break;
2880                }
2881                stop = true;
2882                break;
2883            }
2884            /* otherwise we have a real error, on which we close the connection */
2885            if (settings.verbose > 0)
2886                fprintf(stderr, "Failed to read, and not due to blocking\n");
2887            conn_set_state(c, conn_closing);
2888            break;
2889
2890        case conn_write:
2891            /*
2892             * We want to write out a simple response. If we haven't already,
2893             * assemble it into a msgbuf list (this will be a single-entry
2894             * list for TCP or a two-entry list for UDP).
2895             */
2896            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2897                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2898                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2899                    if (settings.verbose > 0)
2900                        fprintf(stderr, "Couldn't build response\n");
2901                    conn_set_state(c, conn_closing);
2902                    break;
2903                }
2904            }
2905
2906            /* fall through... */
2907
2908        case conn_mwrite:
2909            switch (transmit(c)) {
2910            case TRANSMIT_COMPLETE:
2911                if (c->state == conn_mwrite) {
2912                    while (c->ileft > 0) {
2913                        item *it = *(c->icurr);
2914                        assert((it->it_flags & ITEM_SLABBED) == 0);
2915                        item_remove(it);
2916                        c->icurr++;
2917                        c->ileft--;
2918                    }
2919                    while (c->suffixleft > 0) {
2920                        char *suffix = *(c->suffixcurr);
2921                        if(suffix_add_to_freelist(suffix)) {
2922                            /* Failed to add to freelist, don't leak */
2923                            free(suffix);
2924                        }
2925                        c->suffixcurr++;
2926                        c->suffixleft--;
2927                    }
2928                    /* XXX:  I don't know why this wasn't the general case */
2929                    if(c->protocol == binary_prot) {
2930                        conn_set_state(c, c->write_and_go);
2931                    } else {
2932                        conn_set_init_state(c);
2933                    }
2934                } else if (c->state == conn_write) {
2935                    if (c->write_and_free) {
2936                        free(c->write_and_free);
2937                        c->write_and_free = 0;
2938                    }
2939                    conn_set_state(c, c->write_and_go);
2940                } else {
2941                    if (settings.verbose > 0)
2942                        fprintf(stderr, "Unexpected state %d\n", c->state);
2943                    conn_set_state(c, conn_closing);
2944                }
2945                break;
2946
2947            case TRANSMIT_INCOMPLETE:
2948            case TRANSMIT_HARD_ERROR:
2949                break;                   /* Continue in state machine. */
2950
2951            case TRANSMIT_SOFT_ERROR:
2952                stop = true;
2953                break;
2954            }
2955            break;
2956
2957        case conn_closing:
2958            if (IS_UDP(c->protocol))
2959                conn_cleanup(c);
2960            else
2961                conn_close(c);
2962            stop = true;
2963            break;
2964        }
2965    }
2966
2967    return;
2968}
2969
2970void event_handler(const int fd, const short which, void *arg) {
2971    conn *c;
2972