root/branches/binary/server/memcached.c @ 771

Revision 771, 111.5 kB (checked in by dormando, 20 months ago)

16/4 == 4.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63#ifndef HAVE_DAEMON
64extern int daemon(int nochdir, int noclose);
65#endif
66
67/*
68 * forward declarations
69 */
70static void drive_machine(conn *c);
71static int new_socket(struct addrinfo *ai);
72static int server_socket(const int port, enum protocol prot);
73static int try_read_command(conn *c);
74static int try_read_network(conn *c);
75static int try_read_udp(conn *c);
76static void conn_set_state(conn *c, enum conn_states state);
77
78/* stats */
79static void stats_reset(void);
80static void stats_init(void);
81
82/* defaults */
83static void settings_init(void);
84
85/* event handling, network IO */
86static void event_handler(const int fd, const short which, void *arg);
87static void conn_close(conn *c);
88static void conn_init(void);
89static void accept_new_conns(const bool do_accept);
90static bool update_event(conn *c, const int new_flags);
91static void complete_nread(conn *c);
92static void process_command(conn *c, char *command);
93static int transmit(conn *c);
94static int ensure_iov_space(conn *c);
95static int add_iov(conn *c, const void *buf, int len);
96static int add_msghdr(conn *c);
97
98/* time handling */
99static void set_current_time(void);  /* update the global variable holding
100                              global 32-bit seconds-since-start time
101                              (to avoid 64 bit time_t) */
102
103static void conn_free(conn *c);
104
105/** exported globals **/
106struct stats stats;
107struct settings settings;
108
109/** file scope variables **/
110static item **todelete = NULL;
111static int delcurr;
112static int deltotal;
113static conn *listen_conn = NULL;
114static struct event_base *main_base;
115
116#define TRANSMIT_COMPLETE   0
117#define TRANSMIT_INCOMPLETE 1
118#define TRANSMIT_SOFT_ERROR 2
119#define TRANSMIT_HARD_ERROR 3
120
121static int *buckets = 0; /* bucket->generation array for a managed instance */
122
123#define REALTIME_MAXDELTA 60*60*24*30
124/*
125 * given time value that's either unix time or delta from current unix time, return
126 * unix time. Use the fact that delta can't exceed one month (and real time value can't
127 * be that low).
128 */
129static rel_time_t realtime(const time_t exptime) {
130    /* no. of seconds in 30 days - largest possible delta exptime */
131
132    if (exptime == 0) return 0; /* 0 means never expire */
133
134    if (exptime > REALTIME_MAXDELTA) {
135        /* if item expiration is at/before the server started, give it an
136           expiration time of 1 second after the server started.
137           (because 0 means don't expire).  without this, we'd
138           underflow and wrap around to some large value way in the
139           future, effectively making items expiring in the past
140           really expiring never */
141        if (exptime <= stats.started)
142            return (rel_time_t)1;
143        return (rel_time_t)(exptime - stats.started);
144    } else {
145        return (rel_time_t)(exptime + current_time);
146    }
147}
148
149static void stats_init(void) {
150    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
151    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
152    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
153
154    /* make the time we started always be 2 seconds before we really
155       did, so time(0) - time.started is never zero.  if so, things
156       like 'settings.oldest_live' which act as booleans as well as
157       values are now false in boolean context... */
158    stats.started = time(0) - 2;
159    stats_prefix_init();
160}
161
162static void stats_reset(void) {
163    STATS_LOCK();
164    stats.total_items = stats.total_conns = 0;
165    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
166    stats.bytes_read = stats.bytes_written = 0;
167    stats_prefix_clear();
168    STATS_UNLOCK();
169}
170
171static void settings_init(void) {
172    settings.access = 0700;
173    settings.port = 11211;
174    settings.udpport = 0;
175    /* By default this string should be NULL for getaddrinfo() */
176    settings.inter = NULL;
177    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
178    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
179    settings.verbose = 0;
180    settings.oldest_live = 0;
181    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
182    settings.socketpath = NULL;       /* by default, not using a unix socket */
183    settings.managed = false;
184    settings.factor = 1.25;
185    settings.chunk_size = 48;         /* space for a modest key and value */
186#ifdef USE_THREADS
187    settings.num_threads = 4;
188#else
189    settings.num_threads = 1;
190#endif
191    settings.prefix_delimiter = ':';
192    settings.detail_enabled = 0;
193}
194
195/* returns true if a deleted item's delete-locked-time is over, and it
196   should be removed from the namespace */
197static bool item_delete_lock_over (item *it) {
198    assert(it->it_flags & ITEM_DELETED);
199    return (current_time >= it->exptime);
200}
201
202/*
203 * Adds a message header to a connection.
204 *
205 * Returns 0 on success, -1 on out-of-memory.
206 */
207static int add_msghdr(conn *c)
208{
209    struct msghdr *msg;
210
211    assert(c != NULL);
212
213    if (c->msgsize == c->msgused) {
214        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
215        if (! msg)
216            return -1;
217        c->msglist = msg;
218        c->msgsize *= 2;
219    }
220
221    msg = c->msglist + c->msgused;
222
223    /* this wipes msg_iovlen, msg_control, msg_controllen, and
224       msg_flags, the last 3 of which aren't defined on solaris: */
225    memset(msg, 0, sizeof(struct msghdr));
226
227    msg->msg_iov = &c->iov[c->iovused];
228
229    if (c->request_addr_size > 0) {
230        msg->msg_name = &c->request_addr;
231        msg->msg_namelen = c->request_addr_size;
232    }
233
234    c->msgbytes = 0;
235    c->msgused++;
236
237    if (IS_UDP(c->protocol)) {
238        /* Leave room for the UDP header, which we'll fill in later. */
239        return add_iov(c, NULL, UDP_HEADER_SIZE);
240    }
241
242    return 0;
243}
244
245
246/*
247 * Free list management for connections.
248 */
249
250static conn **freeconns;
251static int freetotal;
252static int freecurr;
253
254
255static void conn_init(void) {
256    freetotal = 200;
257    freecurr = 0;
258    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
259        fprintf(stderr, "malloc()\n");
260    }
261    return;
262}
263
264/*
265 * Returns a connection from the freelist, if any. Should call this using
266 * conn_from_freelist() for thread safety.
267 */
268conn *do_conn_from_freelist() {
269    conn *c;
270
271    if (freecurr > 0) {
272        c = freeconns[--freecurr];
273    } else {
274        c = NULL;
275    }
276
277    return c;
278}
279
280/*
281 * Adds a connection to the freelist. 0 = success. Should call this using
282 * conn_add_to_freelist() for thread safety.
283 */
284bool do_conn_add_to_freelist(conn *c) {
285    if (freecurr < freetotal) {
286        freeconns[freecurr++] = c;
287        return false;
288    } else {
289        /* try to enlarge free connections array */
290        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
291        if (new_freeconns) {
292            freetotal *= 2;
293            freeconns = new_freeconns;
294            freeconns[freecurr++] = c;
295            return false;
296        }
297    }
298    return true;
299}
300
301static char *prot_text(enum protocol prot) {
302    char *rv = "unknown";
303    switch(prot) {
304        case ascii_prot:
305            rv = "ascii";
306            break;
307        case binary_prot:
308            rv = "binary";
309            break;
310        case ascii_udp_prot:
311            rv = "ascii-udp";
312            break;
313        case negotiating_prot:
314            rv = "auto-negotiate";
315            break;
316    }
317    return rv;
318}
319
320conn *conn_new(const int sfd, enum conn_states init_state,
321                const int event_flags,
322                const int read_buffer_size, enum protocol prot,
323                struct event_base *base) {
324    conn *c = conn_from_freelist();
325
326    if (NULL == c) {
327        if (!(c = (conn *)malloc(sizeof(conn)))) {
328            fprintf(stderr, "malloc()\n");
329            return NULL;
330        }
331        c->rbuf = c->wbuf = 0;
332        c->ilist = 0;
333        c->suffixlist = 0;
334        c->iov = 0;
335        c->msglist = 0;
336        c->hdrbuf = 0;
337
338        c->rsize = read_buffer_size;
339        c->wsize = DATA_BUFFER_SIZE;
340        c->isize = ITEM_LIST_INITIAL;
341        c->suffixsize = SUFFIX_LIST_INITIAL;
342        c->iovsize = IOV_LIST_INITIAL;
343        c->msgsize = MSG_LIST_INITIAL;
344        c->hdrsize = 0;
345
346        c->rbuf = (char *)malloc((size_t)c->rsize);
347        c->wbuf = (char *)malloc((size_t)c->wsize);
348        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
349        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
350        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
351        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
352
353        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
354                c->msglist == 0 || c->suffixlist == 0) {
355            if (c->rbuf != 0) free(c->rbuf);
356            if (c->wbuf != 0) free(c->wbuf);
357            if (c->ilist != 0) free(c->ilist);
358            if (c->suffixlist != 0) free(c->suffixlist);
359            if (c->iov != 0) free(c->iov);
360            if (c->msglist != 0) free(c->msglist);
361            free(c);
362            fprintf(stderr, "malloc()\n");
363            return NULL;
364        }
365
366        STATS_LOCK();
367        stats.conn_structs++;
368        STATS_UNLOCK();
369    }
370
371    /* unix socket mode doesn't need this, so zeroed out.  but why
372     * is this done for every command?  presumably for UDP
373     * mode.  */
374    if (!settings.socketpath) {
375        c->request_addr_size = sizeof(c->request_addr);
376    } else {
377        c->request_addr_size = 0;
378    }
379
380    if (settings.verbose > 1) {
381        if (init_state == conn_listening) {
382            fprintf(stderr, "<%d server listening (%s)\n", sfd,
383                prot_text(prot));
384        } else if (IS_UDP(prot)) {
385            fprintf(stderr, "<%d server listening (udp)\n", sfd);
386        } else if (prot == binary_prot) {
387            fprintf(stderr, "<%d new binary client connection\n", sfd);
388        } else if (prot == ascii_prot) {
389            fprintf(stderr, "<%d new ascii client connection\n", sfd);
390        } else if (prot == negotiating_prot) {
391            fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd);
392        } else {
393            fprintf(stderr, "<%d new unknown (%d) client connection\n",
394                sfd, prot);
395            abort();
396        }
397    }
398
399    c->sfd = sfd;
400    c->protocol = prot;
401    c->state = init_state;
402    c->rlbytes = 0;
403    c->cmd = -1;
404    c->rbytes = c->wbytes = 0;
405    c->wcurr = c->wbuf;
406    c->rcurr = c->rbuf;
407    c->ritem = 0;
408    c->icurr = c->ilist;
409    c->suffixcurr = c->suffixlist;
410    c->ileft = 0;
411    c->suffixleft = 0;
412    c->iovused = 0;
413    c->msgcurr = 0;
414    c->msgused = 0;
415
416    c->write_and_go = init_state;
417    c->write_and_free = 0;
418    c->item = 0;
419    c->bucket = -1;
420    c->gen = 0;
421
422    c->noreply = false;
423
424    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
425    event_base_set(base, &c->event);
426    c->ev_flags = event_flags;
427
428    if (event_add(&c->event, 0) == -1) {
429        if (conn_add_to_freelist(c)) {
430            conn_free(c);
431        }
432        perror("event_add");
433        return NULL;
434    }
435
436    STATS_LOCK();
437    stats.curr_conns++;
438    stats.total_conns++;
439    STATS_UNLOCK();
440
441    return c;
442}
443
444static void conn_cleanup(conn *c) {
445    assert(c != NULL);
446
447    if (c->item) {
448        item_remove(c->item);
449        c->item = 0;
450    }
451
452    if (c->ileft != 0) {
453        for (; c->ileft > 0; c->ileft--,c->icurr++) {
454            item_remove(*(c->icurr));
455        }
456    }
457
458    if (c->suffixleft != 0) {
459        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
460            if(suffix_add_to_freelist(*(c->suffixcurr))) {
461                free(*(c->suffixcurr));
462            }
463        }
464    }
465
466    if (c->write_and_free) {
467        free(c->write_and_free);
468        c->write_and_free = 0;
469    }
470}
471
472/*
473 * Frees a connection.
474 */
475void conn_free(conn *c) {
476    if (c) {
477        if (c->hdrbuf)
478            free(c->hdrbuf);
479        if (c->msglist)
480            free(c->msglist);
481        if (c->rbuf)
482            free(c->rbuf);
483        if (c->wbuf)
484            free(c->wbuf);
485        if (c->ilist)
486            free(c->ilist);
487        if (c->suffixlist)
488            free(c->suffixlist);
489        if (c->iov)
490            free(c->iov);
491        free(c);
492    }
493}
494
495static void conn_close(conn *c) {
496    assert(c != NULL);
497
498    /* delete the event, the socket and the conn */
499    event_del(&c->event);
500
501    if (settings.verbose > 1)
502        fprintf(stderr, "<%d connection closed.\n", c->sfd);
503
504    close(c->sfd);
505    accept_new_conns(true);
506    conn_cleanup(c);
507
508    /* if the connection has big buffers, just free it */
509    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
510        conn_free(c);
511    }
512
513    STATS_LOCK();
514    stats.curr_conns--;
515    STATS_UNLOCK();
516
517    return;
518}
519
520static enum conn_states get_init_state(conn *c) {
521    int rv = 0;
522    assert(c != NULL);
523
524    switch(c->protocol) {
525        case binary_prot:
526            rv = conn_bin_init;
527            break;
528        case negotiating_prot:
529            rv = conn_negotiate;
530            break;
531        default:
532            rv = conn_read;
533    }
534    return rv;
535}
536
537/* Set the given connection to its initial state.  The initial state will vary
538 * base don protocol type. */
539static void conn_set_init_state(conn *c) {
540    assert(c != NULL);
541
542    conn_set_state(c, get_init_state(c));
543}
544
545/*
546 * Shrinks a connection's buffers if they're too big.  This prevents
547 * periodic large "get" requests from permanently chewing lots of server
548 * memory.
549 *
550 * This should only be called in between requests since it can wipe output
551 * buffers!
552 */
553static void conn_shrink(conn *c) {
554    assert(c != NULL);
555
556    if (IS_UDP(c->protocol))
557        return;
558
559    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
560        char *newbuf;
561
562        if (c->rcurr != c->rbuf)
563            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
564
565        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
566
567        if (newbuf) {
568            c->rbuf = newbuf;
569            c->rsize = DATA_BUFFER_SIZE;
570        }
571        /* TODO check other branch... */
572        c->rcurr = c->rbuf;
573    }
574
575    if (c->isize > ITEM_LIST_HIGHWAT) {
576        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
577        if (newbuf) {
578            c->ilist = newbuf;
579            c->isize = ITEM_LIST_INITIAL;
580        }
581    /* TODO check error condition? */
582    }
583
584    if (c->msgsize > MSG_LIST_HIGHWAT) {
585        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
586        if (newbuf) {
587            c->msglist = newbuf;
588            c->msgsize = MSG_LIST_INITIAL;
589        }
590    /* TODO check error condition? */
591    }
592
593    if (c->iovsize > IOV_LIST_HIGHWAT) {
594        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
595        if (newbuf) {
596            c->iov = newbuf;
597            c->iovsize = IOV_LIST_INITIAL;
598        }
599    /* TODO check return value */
600    }
601}
602
603/*
604 * Sets a connection's current state in the state machine. Any special
605 * processing that needs to happen on certain state transitions can
606 * happen here.
607 */
608static void conn_set_state(conn *c, enum conn_states state) {
609    assert(c != NULL);
610
611    if (state != c->state) {
612        if (state == conn_read) {
613            conn_shrink(c);
614            assoc_move_next_bucket();
615        }
616        c->state = state;
617    }
618}
619
620/*
621 * Free list management for suffix buffers.
622 */
623
624static char **freesuffix;
625static int freesuffixtotal;
626static int freesuffixcurr;
627
628static void suffix_init(void) {
629    freesuffixtotal = 500;
630    freesuffixcurr  = 0;
631
632    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
633    if (freesuffix == NULL) {
634        fprintf(stderr, "malloc()\n");
635    }
636    return;
637}
638
639/*
640 * Returns a suffix buffer from the freelist, if any. Should call this using
641 * suffix_from_freelist() for thread safety.
642 */
643char *do_suffix_from_freelist() {
644    char *s;
645
646    if (freesuffixcurr > 0) {
647        s = freesuffix[--freesuffixcurr];
648    } else {
649        /* If malloc fails, let the logic fall through without spamming
650         * STDERR on the server. */
651        s = malloc( SUFFIX_SIZE );
652    }
653
654    return s;
655}
656
657/*
658 * Adds a connection to the freelist. 0 = success. Should call this using
659 * conn_add_to_freelist() for thread safety.
660 */
661bool do_suffix_add_to_freelist(char *s) {
662    if (freesuffixcurr < freesuffixtotal) {
663        freesuffix[freesuffixcurr++] = s;
664        return false;
665    } else {
666        /* try to enlarge free connections array */
667        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
668        if (new_freesuffix) {
669            freesuffixtotal *= 2;
670            freesuffix = new_freesuffix;
671            freesuffix[freesuffixcurr++] = s;
672            return false;
673        }
674    }
675    return true;
676}
677
678/*
679 * Ensures that there is room for another struct iovec in a connection's
680 * iov list.
681 *
682 * Returns 0 on success, -1 on out-of-memory.
683 */
684static int ensure_iov_space(conn *c) {
685    assert(c != NULL);
686
687    if (c->iovused >= c->iovsize) {
688        int i, iovnum;
689        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
690                                (c->iovsize * 2) * sizeof(struct iovec));
691        if (! new_iov)
692            return -1;
693        c->iov = new_iov;
694        c->iovsize *= 2;
695
696        /* Point all the msghdr structures at the new list. */
697        for (i = 0, iovnum = 0; i < c->msgused; i++) {
698            c->msglist[i].msg_iov = &c->iov[iovnum];
699            iovnum += c->msglist[i].msg_iovlen;
700        }
701    }
702
703    return 0;
704}
705
706
707/*
708 * Adds data to the list of pending data that will be written out to a
709 * connection.
710 *
711 * Returns 0 on success, -1 on out-of-memory.
712 */
713
714static int add_iov(conn *c, const void *buf, int len) {
715    struct msghdr *m;
716    int leftover;
717    bool limit_to_mtu;
718
719    assert(c != NULL);
720
721    do {
722        m = &c->msglist[c->msgused - 1];
723
724        /*
725         * Limit UDP packets, and the first payloads of TCP replies, to
726         * UDP_MAX_PAYLOAD_SIZE bytes.
727         */
728        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
729
730        /* We may need to start a new msghdr if this one is full. */
731        if (m->msg_iovlen == IOV_MAX ||
732            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
733            add_msghdr(c);
734            m = &c->msglist[c->msgused - 1];
735        }
736
737        if (ensure_iov_space(c) != 0)
738            return -1;
739
740        /* If the fragment is too big to fit in the datagram, split it up */
741        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
742            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
743            len -= leftover;
744        } else {
745            leftover = 0;
746        }
747
748        m = &c->msglist[c->msgused - 1];
749        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
750        m->msg_iov[m->msg_iovlen].iov_len = len;
751
752        c->msgbytes += len;
753        c->iovused++;
754        m->msg_iovlen++;
755
756        buf = ((char *)buf) + len;
757        len = leftover;
758    } while (leftover > 0);
759
760    return 0;
761}
762
763
764/*
765 * Constructs a set of UDP headers and attaches them to the outgoing messages.
766 */
767static int build_udp_headers(conn *c) {
768    int i;
769    unsigned char *hdr;
770
771    assert(c != NULL);
772
773    if (c->msgused > c->hdrsize) {
774        void *new_hdrbuf;
775        if (c->hdrbuf)
776            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
777        else
778            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
779        if (! new_hdrbuf)
780            return -1;
781        c->hdrbuf = (unsigned char *)new_hdrbuf;
782        c->hdrsize = c->msgused * 2;
783    }
784
785    hdr = c->hdrbuf;
786    for (i = 0; i < c->msgused; i++) {
787        c->msglist[i].msg_iov[0].iov_base = hdr;
788        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
789        *hdr++ = c->request_id / 256;
790        *hdr++ = c->request_id % 256;
791        *hdr++ = i / 256;
792        *hdr++ = i % 256;
793        *hdr++ = c->msgused / 256;
794        *hdr++ = c->msgused % 256;
795        *hdr++ = 0;
796        *hdr++ = 0;
797        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
798    }
799
800    return 0;
801}
802
803
804static void out_string(conn *c, const char *str) {
805    size_t len;
806
807    assert(c != NULL);
808
809    if (c->noreply) {
810        if (settings.verbose > 1)
811            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
812        c->noreply = false;
813        conn_set_state(c, conn_read);
814        return;
815    }
816
817    if (settings.verbose > 1)
818        fprintf(stderr, ">%d %s\n", c->sfd, str);
819
820    len = strlen(str);
821    if ((len + 2) > c->wsize) {
822        /* ought to be always enough. just fail for simplicity */
823        str = "SERVER_ERROR output line too long";
824        len = strlen(str);
825    }
826
827    memcpy(c->wbuf, str, len);
828    memcpy(c->wbuf + len, "\r\n", 3);
829    c->wbytes = len + 2;
830    c->wcurr = c->wbuf;
831
832    conn_set_state(c, conn_write);
833    c->write_and_go = get_init_state(c);
834    return;
835}
836
837/*
838 * we get here after reading the value in set/add/replace commands. The command
839 * has been stored in c->item_comm, and the item is ready in c->item.
840 */
841static void complete_nread_ascii(conn *c) {
842    assert(c != NULL);
843
844    item *it = c->item;
845    int comm = c->item_comm;
846    int ret;
847
848    STATS_LOCK();
849    stats.set_cmds++;
850    STATS_UNLOCK();
851
852    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
853        out_string(c, "CLIENT_ERROR bad data chunk");
854    } else {
855      ret = store_item(it, comm);
856      if (ret == 1)
857          out_string(c, "STORED");
858      else if(ret == 2)
859          out_string(c, "EXISTS");
860      else if(ret == 3)
861          out_string(c, "NOT_FOUND");
862      else
863          out_string(c, "NOT_STORED");
864    }
865
866    item_remove(c->item);       /* release the c->item reference */
867    c->item = 0;
868}
869
870static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
871    int i = 0;
872    uint32_t *res_header;
873
874    assert(c);
875    assert(body_len >= 0);
876
877    c->msgcurr = 0;
878    c->msgused = 0;
879    c->iovused = 0;
880    if (add_msghdr(c) != 0) {
881        /* XXX:  out_string is inappropriate here */
882        out_string(c, "SERVER_ERROR out of memory");
883        return;
884    }
885
886    res_header = (uint32_t *)c->wbuf;
887
888    res_header[0] = ((uint32_t)BIN_RES_MAGIC) << 24;
889    res_header[0] |= ((0xff & c->cmd) << 16);
890    res_header[0] |= err & 0xffff;
891
892    res_header[1] = hdr_len << 24;
893    /* TODO:  Support datatype */
894    res_header[2] = body_len;
895    res_header[3] = c->opaque;
896
897    if(settings.verbose > 1) {
898        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
899            res_header[0], res_header[1], res_header[2], res_header[3]);
900    }
901
902    for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
903        res_header[i] = htonl(res_header[i]);
904    }
905
906    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
907    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
908}
909
910static void write_bin_error(conn *c, int err, int swallow) {
911    char *errstr = "Unknown error";
912    switch(err) {
913        case ERR_UNKNOWN_CMD:
914            errstr = "Unknown command";
915            break;
916        case ERR_NOT_FOUND:
917            errstr = "Not found";
918            break;
919        case ERR_INVALID_ARGUMENTS:
920            errstr = "Invalid arguments";
921            break;
922        case ERR_EXISTS:
923            errstr = "Data exists for key.";
924            break;
925        case ERR_TOO_LARGE:
926            errstr = "Too large.";
927            break;
928        case ERR_NOT_STORED:
929            errstr = "Not stored.";
930            break;
931        default:
932            errstr = "UNHANDLED ERROR";
933            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
934    }
935    if(settings.verbose > 0) {
936        fprintf(stderr, "Writing an error:  %s\n", errstr);
937    }
938    add_bin_header(c, err, 0, strlen(errstr));
939    add_iov(c, errstr, strlen(errstr));
940
941    conn_set_state(c, conn_mwrite);
942    if(swallow > 0) {
943        c->sbytes = swallow;
944        c->write_and_go = conn_swallow;
945    } else {
946        c->write_and_go = conn_bin_init;
947    }
948}
949
950/* Form and send a response to a command over the binary protocol */
951static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
952    add_bin_header(c, 0, hlen, dlen);
953    if(dlen > 0) {
954        add_iov(c, d, dlen);
955    }
956    conn_set_state(c, conn_mwrite);
957    c->write_and_go = conn_bin_init;
958}
959
960/* Byte swap a 64-bit number */
961static int64_t swap64(int64_t in) {
962#ifdef ENDIAN_LITTLE
963    /* Little endian, flip the bytes around until someone makes a faster/better
964    * way to do this. */
965    int64_t rv = 0;
966    int i = 0;
967     for(i = 0; i<8; i++) {
968        rv = (rv << 8) | (in & 0xff);
969        in >>= 8;
970     }
971    return rv;
972#else
973    /* big-endian machines don't need byte swapping */
974    return in;
975#endif
976}
977
978static void complete_incr_bin(conn *c) {
979    item *it;
980    int64_t delta;
981    uint64_t initial;
982    int32_t exptime;
983    char *key;
984    size_t nkey;
985    int i;
986    uint64_t *response_buf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
987
988    assert(c != NULL);
989
990    key = c->rbuf + BIN_INCR_HDR_LEN;
991    nkey = c->keylen;
992    key[nkey] = 0x00;
993
994    delta = swap64(*((int64_t*)(c->rbuf)));
995    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
996    exptime = ntohl(*((int*)(c->rbuf + 16)));
997
998    if(settings.verbose) {
999        fprintf(stderr, "incr ");
1000        for(i = 0; i<nkey; i++) {
1001            fprintf(stderr, "%c", key[i]);
1002        }
1003        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
1004    }
1005
1006    it = item_get(key, nkey);
1007    if (it) {
1008        /* Weird magic in add_delta forces me to pad here */
1009        char tmpbuf[INCR_MAX_STORAGE_LEN];
1010        uint64_t l = 0;
1011        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1012        tmpbuf[INCR_MAX_STORAGE_LEN] = 0x00;
1013        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1014        *response_buf = swap64(strtoull(tmpbuf, NULL, 10));
1015
1016        write_bin_response(c, response_buf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1017        item_remove(it);         /* release our reference */
1018    } else {
1019        if(exptime >= 0) {
1020            /* Save some room for the response */
1021            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1022            *response_buf = swap64(initial);
1023            it = item_alloc(key, nkey, 0, realtime(exptime),
1024                INCR_MAX_STORAGE_LEN);
1025            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1026
1027            if(store_item(it, NREAD_SET)) {
1028                write_bin_response(c, response_buf, BIN_INCR_HDR_LEN,
1029                    INCR_RES_LEN);
1030            } else {
1031                write_bin_error(c, ERR_NOT_STORED, 0);
1032            }
1033            item_remove(it);         /* release our reference */
1034        } else {
1035            write_bin_error(c, ERR_NOT_FOUND, 0);
1036        }
1037    }
1038}
1039
1040static void complete_update_bin(conn *c) {
1041    int eno = -1, ret = 0;
1042    assert(c != NULL);
1043
1044    item *it = c->item;
1045
1046    STATS_LOCK();
1047    stats.set_cmds++;
1048    STATS_UNLOCK();
1049
1050    /* We don't actually receive the trailing two characters in the bin
1051     * protocol, so we're going to just set them here */
1052    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1053    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1054
1055    switch (store_item(it, c->item_comm)) {
1056        case 1:
1057            /* Stored */
1058            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1059            break;
1060        case 2:
1061            write_bin_error(c, ERR_EXISTS, 0);
1062            break;
1063        case 3:
1064            write_bin_error(c, ERR_NOT_FOUND, 0);
1065            break;
1066        default:
1067            if(c->item_comm == NREAD_ADD) {
1068                eno = ERR_EXISTS;
1069            } else if(c->item_comm == NREAD_REPLACE) {
1070                eno = ERR_NOT_FOUND;
1071            } else {
1072                eno = ERR_NOT_STORED;
1073            }
1074            write_bin_error(c, eno, 0);
1075    }
1076
1077    item_remove(c->item);       /* release the c->item reference */
1078    c->item = 0;
1079}
1080
1081static void process_bin_get(conn *c) {
1082    item *it;
1083
1084    it = item_get(c->rbuf, c->keylen);
1085    if (it) {
1086        int *flags;
1087        uint64_t* identifier;
1088
1089        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1090
1091        /* the length has two unnecessary bytes, and then we write four more */
1092        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1093
1094        /* Add the "extras" field: CAS-id followed by flags. The cas is a 64-
1095           bit datatype and require alignment to 8-byte boundaries on some
1096           architechtures. Verify that the size of the packet header is of
1097           the correct size (if not the following code generates SIGBUS on
1098           sparc hardware).
1099        */
1100        assert(MIN_BIN_PKT_LENGTH % 8 == 0);
1101        identifier = (uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1102        *identifier = swap64(it->cas_id);
1103        add_iov(c, identifier, 8);
1104
1105        /* Add the flags */
1106        flags = (int*)(c->wbuf + MIN_BIN_PKT_LENGTH + 8);
1107        *flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1108        add_iov(c, flags, 4);
1109
1110        /* Add the data minus the CRLF */
1111        add_iov(c, ITEM_data(it), it->nbytes - 2);
1112        conn_set_state(c, conn_mwrite);
1113    } else {
1114        if(c->cmd == CMD_GETQ) {
1115            conn_set_state(c, conn_bin_init);
1116        } else {
1117            write_bin_error(c, ERR_NOT_FOUND, 0);
1118        }
1119    }
1120}
1121
1122static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1123    assert(c);
1124    c->substate = next_substate;
1125    c->rlbytes = c->keylen + extra;
1126    assert(c->rsize >= c->rlbytes);
1127    c->ritem = c->rbuf;
1128    conn_set_state(c, conn_nread);
1129}
1130
1131static void dispatch_bin_command(conn *c) {
1132    time_t exptime = 0;
1133    switch(c->cmd) {
1134        case CMD_VERSION:
1135            write_bin_response(c, VERSION, 0, strlen(VERSION));
1136            break;
1137        case CMD_FLUSH:
1138            set_current_time();
1139
1140            settings.oldest_live = current_time - 1;
1141            item_flush_expired();
1142            write_bin_response(c, NULL, 0, 0);
1143            break;
1144        case CMD_NOOP:
1145            write_bin_response(c, NULL, 0, 0);
1146            break;
1147        case CMD_SET:
1148            /* Fallthrough */
1149        case CMD_ADD:
1150            /* Fallthrough */
1151        case CMD_REPLACE:
1152            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1153            break;
1154        case CMD_GETQ:
1155        case CMD_GET:
1156            bin_read_key(c, bin_reading_get_key, 0);
1157            break;
1158        case CMD_DELETE:
1159            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1160            break;
1161        case CMD_INCR:
1162        case CMD_DECR:
1163            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1164            break;
1165        default:
1166            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1167    }
1168}
1169
1170static void process_bin_update(conn *c) {
1171    char *key;
1172    int nkey;
1173    int vlen;
1174    int flags;
1175    int exptime;
1176    item *it;
1177    int comm;
1178    int hdrlen = BIN_SET_HDR_LEN;
1179
1180    assert(c != NULL);
1181
1182    key = c->rbuf + hdrlen;
1183    nkey = c->keylen;
1184    key[nkey] = 0x00;
1185
1186    flags = ntohl(*((int*)(c->rbuf + 8)));
1187    exptime = ntohl(*((int*)(c->rbuf + 12)));
1188    vlen = c->bin_header[2] - (nkey + hdrlen);
1189
1190    if(settings.verbose > 1) {
1191        fprintf(stderr, "Value len is %d\n", vlen);
1192    }
1193
1194    if (settings.detail_enabled) {
1195        stats_prefix_record_set(key);
1196    }
1197
1198    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1199
1200    if (it == 0) {
1201        if (! item_size_ok(nkey, flags, vlen + 2)) {
1202            write_bin_error(c, ERR_TOO_LARGE, vlen);
1203        } else {
1204            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1205        }
1206        /* swallow the data line */
1207        c->write_and_go = conn_swallow;
1208        return;
1209    }
1210
1211    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf)));
1212
1213    switch(c->cmd) {
1214        case CMD_ADD:
1215            c->item_comm = NREAD_ADD;
1216            break;
1217        case CMD_SET:
1218            c->item_comm = NREAD_SET;
1219            break;
1220        case CMD_REPLACE:
1221            c->item_comm = NREAD_REPLACE;
1222            break;
1223        default:
1224            assert(0);
1225    }
1226
1227    if(it->cas_id != 0) {
1228        c->item_comm = NREAD_CAS;
1229    }
1230
1231    c->item = it;
1232    c->ritem = ITEM_data(it);
1233    c->rlbytes = vlen;
1234    conn_set_state(c, conn_nread);
1235    c->substate = bin_read_set_value;
1236}
1237
1238static void process_bin_delete(conn *c) {
1239    char *key;
1240    size_t nkey;
1241    item *it;
1242    time_t exptime = 0;
1243
1244    assert(c != NULL);
1245
1246    exptime = ntohl(*((int*)(c->rbuf)));
1247    key = c->rbuf + 4;
1248    nkey = c->keylen;
1249    key[nkey] = 0x00;
1250
1251    if(settings.verbose) {
1252        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1253    }
1254
1255    if (settings.detail_enabled) {
1256        stats_prefix_record_delete(key);
1257    }
1258
1259    it = item_get(key, nkey);
1260    if (it) {
1261        if (exptime == 0) {
1262            item_unlink(it);
1263            item_remove(it);      /* release our reference */
1264            write_bin_response(c, NULL, 0, 0);
1265        } else {
1266            /* XXX:  This is really lame, but defer_delete returns a string */
1267            char *res = defer_delete(it, exptime);
1268            if(res[0] == 'D') {
1269                write_bin_response(c, NULL, 0, 0);
1270            } else {
1271                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1272            }
1273        }
1274    } else {
1275        write_bin_error(c, ERR_NOT_FOUND, 0);
1276    }
1277}
1278
1279static void complete_nread_binary(conn *c) {
1280    assert(c != NULL);
1281
1282    if(c->cmd < 0) {
1283        /* No command defined.  Figure out what they're trying to say. */
1284        int i = 0;
1285        /* I did a bit of hard-coding around the packet sizes */
1286        assert(BIN_PKT_HDR_WORDS == 4);
1287        for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
1288            c->bin_header[i] = ntohl(c->bin_header[i]);
1289        }
1290        if(settings.verbose) {
1291            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x %08x\n",
1292                c->bin_header[0], c->bin_header[1], c->bin_header[2],
1293                c->bin_header[3]);
1294        }
1295        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
1296            if(settings.verbose) {
1297                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24);
1298            }
1299            conn_set_state(c, conn_closing);
1300            return;
1301        }
1302
1303        c->msgcurr = 0;
1304        c->msgused = 0;
1305        c->iovused = 0;
1306        if (add_msghdr(c) != 0) {
1307            out_string(c, "SERVER_ERROR out of memory");
1308            return;
1309        }
1310
1311        c->cmd = (c->bin_header[0] >> 16) & 0xff;
1312        c->keylen = c->bin_header[0] & 0xffff;
1313        c->opaque = c->bin_header[3];
1314        if(settings.verbose > 1) {
1315            fprintf(stderr,
1316                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd,
1317                c->opaque, c->keylen, c->bin_header[2]);
1318        }
1319        dispatch_bin_command(c);
1320    } else {
1321        switch(c->substate) {
1322            case bin_reading_set_header:
1323                process_bin_update(c);
1324                break;
1325            case bin_read_set_value:
1326                complete_update_bin(c);
1327                break;
1328            case bin_reading_get_key:
1329                process_bin_get(c);
1330                break;
1331            case bin_reading_del_header:
1332                process_bin_delete(c);
1333                break;
1334            case bin_reading_incr_header:
1335                complete_incr_bin(c);
1336                break;
1337            default:
1338                fprintf(stderr, "Not handling substate %d\n", c->substate);
1339                assert(0);
1340        }
1341    }
1342}
1343
1344static void reinit_bin_connection(conn *c) {
1345    if (settings.verbose > 1)
1346        fprintf(stderr, "*** Reinitializing binary connection.\n");
1347    c->rlbytes = MIN_BIN_PKT_LENGTH;
1348    c->write_and_go = conn_bin_init;
1349    c->cmd = -1;
1350    c->substate = bin_no_state;
1351    c->rbytes = c->wbytes = 0;
1352    c->ritem = (char*)c->bin_header;
1353    c->rcurr = c->rbuf;
1354    c->wcurr = c->wbuf;
1355    conn_shrink(c);
1356    conn_set_state(c, conn_nread);
1357}
1358
1359/* These do the initial protocol switch.  At this point, we should've read
1360 * exactly one byte, and must treat that byte as the beginning of a command. */
1361static void setup_bin_protocol(conn *c) {
1362    char *loc = (char*)c->bin_header;
1363    if (settings.verbose > 1)
1364        fprintf(stderr, "Negotiated protocol as binary.\n");
1365
1366    c->protocol = binary_prot;
1367    reinit_bin_connection(c);
1368    /* Emulate a read of the first byte */
1369    c->ritem[0] = c->rbuf[0];
1370    c->ritem++;
1371    c->rlbytes--;
1372}
1373
1374static void setup_ascii_protocol(conn *c) {
1375    if (settings.verbose > 1)
1376        fprintf(stderr, "Negotiated protocol as ascii.\n");
1377    c->protocol = ascii_prot;
1378
1379    /* We've already got the first letter of the command, so pretend like we
1380     * Did a single byte read from try_read_command */
1381    c->rcurr = c->rbuf;
1382    c->rbytes = 1;
1383    conn_set_state(c, conn_read);
1384}
1385
1386static void complete_nread(conn *c) {
1387    assert(c != NULL);
1388
1389    if(c->protocol == ascii_prot) {
1390        complete_nread_ascii(c);
1391    } else if(c->protocol == binary_prot) {
1392        complete_nread_binary(c);
1393    } else if(c->protocol == negotiating_prot) {
1394        /* The first byte is either BIN_REQ_MAGIC, or we're speaking ascii */
1395        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC)
1396            setup_bin_protocol(c);
1397        else
1398            setup_ascii_protocol(c);
1399    } else {
1400        assert(0); /* XXX:  Invalid case.  Should probably do more here. */
1401    }
1402}
1403
1404/*
1405 * Stores an item in the cache according to the semantics of one of the set
1406 * commands. In threaded mode, this is protected by the cache lock.
1407 *
1408 * Returns true if the item was stored.
1409 */
1410int do_store_item(item *it, int comm) {
1411    char *key = ITEM_key(it);
1412    bool delete_locked = false;
1413    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1414    int stored = 0;
1415
1416    item *new_it = NULL;
1417    int flags;
1418
1419    if (old_it != NULL && comm == NREAD_ADD) {
1420        /* add only adds a nonexistent item, but promote to head of LRU */
1421        do_item_update(old_it);
1422    } else if (!old_it && (comm == NREAD_REPLACE
1423        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1424    {
1425        /* replace only replaces an existing value; don't store */
1426    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1427        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1428    {
1429        /* replace and add can't override delete locks; don't store */
1430    } else if (comm == NREAD_CAS) {
1431        /* validate cas operation */
1432        if (delete_locked)
1433            old_it = do_item_get_nocheck(key, it->nkey);
1434
1435        if(old_it == NULL) {
1436          // LRU expired
1437          stored = 3;
1438        }
1439        else if(it->cas_id == old_it->cas_id) {
1440          // cas validates
1441          do_item_replace(old_it, it);
1442          stored = 1;
1443        } else {
1444          if(settings.verbose > 1) {
1445            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1446                old_it->cas_id, it->cas_id);
1447          }
1448          stored = 2;
1449        }
1450    } else {
1451        /*
1452         * Append - combine new and old record into single one. Here it's
1453         * atomic and thread-safe.
1454         */
1455
1456        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1457
1458            /* we have it and old_it here - alloc memory to hold both */
1459            /* flags was already lost - so recover them from ITEM_suffix(it) */
1460
1461            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1462
1463            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1464
1465            if (new_it == NULL) {
1466                /* SERVER_ERROR out of memory */
1467                return 0;
1468            }
1469
1470            /* copy data from it and old_it to new_it */
1471
1472            if (comm == NREAD_APPEND) {
1473                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1474                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1475            } else {
1476                /* NREAD_PREPEND */
1477                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1478                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1479            }
1480
1481            it = new_it;
1482        }
1483
1484        /* "set" commands can override the delete lock
1485           window... in which case we have to find the old hidden item
1486           that's in the namespace/LRU but wasn't returned by
1487           item_get.... because we need to replace it */
1488        if (delete_locked)
1489            old_it = do_item_get_nocheck(key, it->nkey);
1490
1491        if (old_it != NULL)
1492            do_item_replace(old_it, it);
1493        else
1494            do_item_link(it);
1495
1496        stored = 1;
1497    }
1498
1499    if (old_it != NULL)
1500        do_item_remove(old_it);         /* release our reference */
1501    if (new_it != NULL)
1502        do_item_remove(new_it);
1503
1504    return stored;
1505}
1506
1507typedef struct token_s {
1508    char *value;
1509    size_t length;
1510} token_t;
1511
1512#define COMMAND_TOKEN 0
1513#define SUBCOMMAND_TOKEN 1
1514#define KEY_TOKEN 1
1515#define KEY_MAX_LENGTH 250
1516
1517#define MAX_TOKENS 8
1518
1519/*
1520 * Tokenize the command string by replacing whitespace with '\0' and update
1521 * the token array tokens with pointer to start of each token and length.
1522 * Returns total number of tokens.  The last valid token is the terminal
1523 * token (value points to the first unprocessed character of the string and
1524 * length zero).
1525 *
1526 * Usage example:
1527 *
1528 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1529 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1530 *          ...
1531 *      }
1532 *      ncommand = tokens[ix].value - command;
1533 *      command  = tokens[ix].value;
1534 *   }
1535 */
1536static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1537    char *s, *e;
1538    size_t ntokens = 0;
1539
1540    assert(command != NULL && tokens != NULL && max_tokens > 1);
1541
1542    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1543        if (*e == ' ') {
1544            if (s != e) {
1545                tokens[ntokens].value = s;
1546                tokens[ntokens].length = e - s;
1547                ntokens++;
1548                *e = '\0';
1549            }
1550            s = e + 1;
1551        }
1552        else if (*e == '\0') {
1553            if (s != e) {
1554                tokens[ntokens].value = s;
1555                tokens[ntokens].length = e - s;
1556                ntokens++;
1557            }
1558
1559            break; /* string end */
1560        }
1561    }
1562
1563    /*
1564     * If we scanned the whole string, the terminal value pointer is null,
1565     * otherwise it is the first unprocessed character.
1566     */
1567    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1568    tokens[ntokens].length = 0;
1569    ntokens++;
1570
1571    return ntokens;
1572}
1573
1574/* set up a connection to write a buffer then free it, used for stats */
1575static void write_and_free(conn *c, char *buf, int bytes) {
1576    if (buf) {
1577        c->write_and_free = buf;
1578        c->wcurr = buf;
1579        c->wbytes = bytes;
1580        conn_set_state(c, conn_write);
1581        c->write_and_go = get_init_state(c);
1582    } else {
1583        out_string(c, "SERVER_ERROR out of memory writing stats");
1584    }
1585}
1586
1587static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1588{
1589    int noreply_index = ntokens - 2;
1590
1591    /*
1592      NOTE: this function is not the first place where we are going to
1593      send the reply.  We could send it instead from process_command()
1594      if the request line has wrong number of tokens.  However parsing
1595      malformed line for "noreply" option is not reliable anyway, so
1596      it can't be helped.
1597    */
1598    if (tokens[noreply_index].value
1599        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1600        c->noreply = true;
1601    }
1602}
1603
1604inline static void process_stats_detail(conn *c, const char *command) {
1605    assert(c != NULL);
1606
1607    if (strcmp(command, "on") == 0) {
1608        settings.detail_enabled = 1;
1609        out_string(c, "OK");
1610    }
1611    else if (strcmp(command, "off") == 0) {
1612        settings.detail_enabled = 0;
1613        out_string(c, "OK");
1614    }
1615    else if (strcmp(command, "dump") == 0) {
1616        int len;
1617        char *stats = stats_prefix_dump(&len);
1618        write_and_free(c, stats, len);
1619    }
1620    else {
1621        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1622    }
1623}
1624
1625static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1626    rel_time_t now = current_time;
1627    char *command;
1628    char *subcommand;
1629
1630    assert(c != NULL);
1631
1632    if(ntokens < 2) {
1633        out_string(c, "CLIENT_ERROR bad command line");
1634        return;
1635    }
1636
1637    command = tokens[COMMAND_TOKEN].value;
1638
1639    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1640        char temp[1024];
1641        pid_t pid = getpid();
1642        char *pos = temp;
1643
1644#ifndef WIN32
1645        struct rusage usage;
1646        getrusage(RUSAGE_SELF, &usage);
1647#endif /* !WIN32 */
1648
1649        STATS_LOCK();
1650        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1651        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1652        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1653        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1654        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1655#ifndef WIN32
1656        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1657        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1658#endif /* !WIN32 */
1659        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1660        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1661        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1662        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1663        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1664        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1665        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1666        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1667        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1668        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1669        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1670        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1671        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1672        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1673        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1674        pos += sprintf(pos, "END");
1675        STATS_UNLOCK();
1676        out_string(c, temp);
1677        return;
1678    }
1679
1680    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1681
1682    if (strcmp(subcommand, "reset") == 0) {
1683        stats_reset();
1684        out_string(c, "RESET");
1685        return;
1686    }
1687
1688#ifdef HAVE_MALLOC_H
1689#ifdef HAVE_STRUCT_MALLINFO
1690    if (strcmp(subcommand, "malloc") == 0) {
1691        char temp[512];
1692        struct mallinfo info;
1693        char *pos = temp;
1694
1695        info = mallinfo();
1696        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1697        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1698        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1699        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1700        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1701        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1702        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1703        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1704        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1705        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1706        out_string(c, temp);
1707        return;
1708    }
1709#endif /* HAVE_STRUCT_MALLINFO */
1710#endif /* HAVE_MALLOC_H */
1711
1712#if !defined(WIN32) || !defined(__APPLE__)
1713    if (strcmp(subcommand, "maps") == 0) {
1714        char *wbuf;
1715        int wsize = 8192; /* should be enough */
1716        int fd;
1717        int res;
1718
1719        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1720            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1721            return;
1722        }
1723
1724        fd = open("/proc/self/maps", O_RDONLY);
1725        if (fd == -1) {
1726            out_string(c, "SERVER_ERROR cannot open the maps file");
1727            free(wbuf);
1728            return;
1729        }
1730
1731        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1732        if (res == wsize - 6) {
1733            out_string(c, "SERVER_ERROR buffer overflow");
1734            free(wbuf); close(fd);
1735            return;
1736        }
1737        if (res == 0 || res == -1) {
1738            out_string(c, "SERVER_ERROR can't read the maps file");
1739            free(wbuf); close(fd);
1740            return;
1741        }
1742        memcpy(wbuf + res, "END\r\n", 5);
1743        write_and_free(c, wbuf, res + 5);
1744        close(fd);
1745        return;
1746    }
1747#endif
1748
1749    if (strcmp(subcommand, "cachedump") == 0) {
1750
1751        char *buf;
1752        unsigned int bytes, id, limit = 0;
1753
1754        if(ntokens < 5) {
1755            out_string(c, "CLIENT_ERROR bad command line");
1756            return;
1757        }
1758
1759        id = strtoul(tokens[2].value, NULL, 10);
1760        limit = strtoul(tokens[3].value, NULL, 10);
1761
1762        if(errno == ERANGE) {
1763            out_string(c, "CLIENT_ERROR bad command line format");
1764            return;
1765        }
1766
1767        buf = item_cachedump(id, limit, &bytes);
1768        write_and_free(c, buf, bytes);
1769        return;
1770    }
1771
1772    if (strcmp(subcommand, "slabs") == 0) {
1773        int bytes = 0;
1774        char *buf = slabs_stats(&bytes);
1775        write_and_free(c, buf, bytes);
1776        return;
1777    }
1778
1779    if (strcmp(subcommand, "items") == 0) {
1780        int bytes = 0;
1781        char *buf = item_stats(&bytes);
1782        write_and_free(c, buf, bytes);
1783        return;
1784    }
1785
1786    if (strcmp(subcommand, "detail") == 0) {
1787        if (ntokens < 4)
1788            process_stats_detail(c, "");  /* outputs the error message */
1789        else
1790            process_stats_detail(c, tokens[2].value);
1791        return;
1792    }
1793
1794    if (strcmp(subcommand, "sizes") == 0) {
1795        int bytes = 0;
1796        char *buf = item_stats_sizes(&bytes);
1797        write_and_free(c, buf, bytes);
1798        return;
1799    }
1800
1801    out_string(c, "ERROR");
1802}
1803
1804/* ntokens is overwritten here... shrug.. */
1805static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1806    char *key;
1807    size_t nkey;
1808    int i = 0;
1809    item *it;
1810    token_t *key_token = &tokens[KEY_TOKEN];
1811    char *suffix;
1812    int stats_get_cmds   = 0;
1813    int stats_get_hits   = 0;
1814    int stats_get_misses = 0;
1815    assert(c != NULL);
1816
1817    if (settings.managed) {
1818        int bucket = c->bucket;
1819        if (bucket == -1) {
1820            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1821            return;
1822        }
1823        c->bucket = -1;
1824        if (buckets[bucket] != c->gen) {
1825            out_string(c, "ERROR_NOT_OWNER");
1826            return;
1827        }
1828    }
1829
1830    do {
1831        while(key_token->length != 0) {
1832
1833            key = key_token->value;
1834            nkey = key_token->length;
1835
1836            if(nkey > KEY_MAX_LENGTH) {
1837                STATS_LOCK();
1838                stats.get_cmds   += stats_get_cmds;
1839                stats.get_hits   += stats_get_hits;
1840                stats.get_misses += stats_get_misses;
1841                STATS_UNLOCK();
1842                out_string(c, "CLIENT_ERROR bad command line format");
1843                return;
1844            }
1845
1846            stats_get_cmds++;
1847            it = item_get(key, nkey);
1848            if (settings.detail_enabled) {
1849                stats_prefix_record_get(key, NULL != it);
1850            }
1851            if (it) {
1852                if (i >= c->isize) {
1853                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1854                    if (new_list) {
1855                        c->isize *= 2;
1856                        c->ilist = new_list;
1857                    } else break;
1858                }
1859
1860                /*
1861                 * Construct the response. Each hit adds three elements to the
1862                 * outgoing data list:
1863                 *   "VALUE "
1864                 *   key
1865                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1866                 */
1867
1868                if(return_cas == true)
1869                {
1870                  /* Goofy mid-flight realloc. */
1871                  if (i >= c->suffixsize) {
1872                    char **new_suffix_list = realloc(c->suffixlist,
1873                                           sizeof(char *) * c->suffixsize * 2);
1874                    if (new_suffix_list) {
1875                      c->suffixsize *= 2;
1876                      c->suffixlist  = new_suffix_list;
1877                    } else break;
1878                  }
1879
1880                  suffix = suffix_from_freelist();
1881                  if (suffix == NULL) {
1882                    STATS_LOCK();
1883                    stats.get_cmds   += stats_get_cmds;
1884                    stats.get_hits   += stats_get_hits;
1885                    stats.get_misses += stats_get_misses;
1886                    STATS_UNLOCK();
1887                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1888                    return;
1889                  }
1890                  *(c->suffixlist + i) = suffix;
1891                  sprintf(suffix, " %llu\r\n", it->cas_id);
1892                  if (add_iov(c, "VALUE ", 6) != 0 ||
1893                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1894                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1895                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1896                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1897                      {
1898                          break;
1899                      }
1900                }
1901                else
1902                {
1903                  if (add_iov(c, "VALUE ", 6) != 0 ||
1904                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1905                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1906                      {
1907                          break;
1908                      }
1909                }
1910
1911
1912                if (settings.verbose > 1)
1913                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1914
1915                /* item_get() has incremented it->refcount for us */
1916                stats_get_hits++;
1917                item_update(it);
1918                *(c->ilist + i) = it;
1919                i++;
1920
1921            } else {
1922                stats_get_misses++;
1923            }
1924
1925            key_token++;
1926        }
1927
1928        /*
1929         * If the command string hasn't been fully processed, get the next set
1930         * of tokens.
1931         */
1932        if(key_token->value != NULL) {
1933            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1934            key_token = tokens;
1935        }
1936
1937    } while(key_token->value != NULL);
1938
1939    c->icurr = c->ilist;
1940    c->ileft = i;
1941    if (return_cas) {
1942        c->suffixcurr = c->suffixlist;
1943        c->suffixleft = i;
1944    }
1945
1946    if (settings.verbose > 1)
1947        fprintf(stderr, ">%d END\n", c->sfd);
1948
1949    /*
1950        If the loop was terminated because of out-of-memory, it is not
1951        reliable to add END\r\n to the buffer, because it might not end
1952        in \r\n. So we send SERVER_ERROR instead.
1953    */
1954    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1955        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1956        out_string(c, "SERVER_ERROR out of memory writing get response");
1957    }
1958    else {
1959        conn_set_state(c, conn_mwrite);
1960        c->msgcurr = 0;
1961    }
1962
1963    STATS_LOCK();
1964    stats.get_cmds   += stats_get_cmds;
1965    stats.get_hits   += stats_get_hits;
1966    stats.get_misses += stats_get_misses;
1967    STATS_UNLOCK();
1968
1969    return;
1970}
1971
1972static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1973    char *key;
1974    size_t nkey;
1975    int flags;
1976    time_t exptime;
1977    int vlen, old_vlen;
1978    uint64_t req_cas_id;
1979    item *it, *old_it;
1980
1981    assert(c != NULL);
1982
1983    set_noreply_maybe(c, tokens, ntokens);
1984
1985    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1986        out_string(c, "CLIENT_ERROR bad command line format");
1987        return;
1988    }
1989
1990    key = tokens[KEY_TOKEN].value;
1991    nkey = tokens[KEY_TOKEN].length;
1992
1993    flags = strtoul(tokens[2].value, NULL, 10);
1994    exptime = strtol(tokens[3].value, NULL, 10);
1995    vlen = strtol(tokens[4].value, NULL, 10);
1996
1997    // does cas value exist?
1998    if(handle_cas)
1999    {
2000      req_cas_id = strtoull(tokens[5].value, NULL, 10);
2001    }
2002
2003    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
2004        out_string(c, "CLIENT_ERROR bad command line format");
2005        return;
2006    }
2007
2008    if (settings.detail_enabled) {
2009        stats_prefix_record_set(key);
2010    }
2011
2012    if (settings.managed) {
2013        int bucket = c->bucket;
2014        if (bucket == -1) {
2015            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2016            return;
2017        }
2018        c->bucket = -1;
2019        if (buckets[bucket] != c->gen) {
2020            out_string(c, "ERROR_NOT_OWNER");
2021            return;
2022        }
2023    }
2024
2025    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
2026
2027    if (it == 0) {
2028        if (! item_size_ok(nkey, flags, vlen + 2))
2029            out_string(c, "SERVER_ERROR object too large for cache");
2030        else
2031            out_string(c, "SERVER_ERROR out of memory storing object");
2032        /* swallow the data line */
2033        c->write_and_go = conn_swallow;
2034        c->sbytes = vlen + 2;
2035        return;
2036    }
2037    if(handle_cas)
2038      it->cas_id = req_cas_id;
2039
2040    c->item = it;
2041    c->ritem = ITEM_data(it);
2042    c->rlbytes = it->nbytes;
2043    c->item_comm = comm;
2044    conn_set_state(c, conn_nread);
2045}
2046
2047static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2048    char temp[sizeof("18446744073709551615")];
2049    item *it;
2050    int64_t delta;
2051    char *key;
2052    size_t nkey;
2053
2054    assert(c != NULL);
2055
2056    set_noreply_maybe(c, tokens, ntokens);
2057
2058    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2059        out_string(c, "CLIENT_ERROR bad command line format");
2060        return;
2061    }
2062
2063    key = tokens[KEY_TOKEN].value;
2064    nkey = tokens[KEY_TOKEN].length;
2065
2066    if (settings.managed) {
2067        int bucket = c->bucket;
2068        if (bucket == -1) {
2069            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2070            return;
2071        }
2072        c->bucket = -1;
2073        if (buckets[bucket] != c->gen) {
2074            out_string(c, "ERROR_NOT_OWNER");
2075            return;
2076        }
2077    }
2078
2079    delta = strtoll(tokens[2].value, NULL, 10);
2080
2081    if(errno == ERANGE) {
2082        out_string(c, "CLIENT_ERROR bad command line format");
2083        return;
2084    }
2085
2086    it = item_get(key, nkey);
2087    if (!it) {
2088        out_string(c, "NOT_FOUND");
2089        return;
2090    }
2091
2092    out_string(c, add_delta(it, incr, delta, temp));
2093    item_remove(it);         /* release our reference */
2094}
2095
2096/*
2097 * adds a delta value to a numeric item.
2098 *
2099 * it    item to adjust
2100 * incr  true to increment value, false to decrement
2101 * delta amount to adjust value by
2102 * buf   buffer for response string
2103 *
2104 * returns a response string to send back to the client.
2105 */
2106char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2107    char *ptr;
2108    int64_t value;
2109    int res;
2110
2111    ptr = ITEM_data(it);
2112    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2113
2114    value = strtoull(ptr, NULL, 10);
2115
2116    if(errno == ERANGE) {
2117        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2118    }
2119
2120    if (incr)
2121        value += delta;
2122    else {
2123        value -= delta;
2124    }
2125    if(value < 0) {
2126        value = 0;
2127    }
2128    sprintf(buf, "%llu", value);
2129    res = strlen(buf);
2130    if (res + 2 > it->nbytes) { /* need to realloc */
2131        item *new_it;
2132        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2133        if (new_it == 0) {
2134            return "SERVER_ERROR out of memory in incr/decr";
2135        }
2136        memcpy(ITEM_data(new_it), buf, res);
2137        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2138        do_item_replace(it, new_it);
2139        do_item_remove(new_it);       /* release our reference */
2140    } else { /* replace in-place */
2141        memcpy(ITEM_data(it), buf, res);
2142        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2143    }
2144
2145    return buf;
2146}
2147
2148static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2149    char *key;
2150    size_t nkey;
2151    item *it;
2152    time_t exptime = 0;
2153
2154    assert(c != NULL);
2155
2156    set_noreply_maybe(c, tokens, ntokens);
2157
2158    if (settings.managed) {
2159        int bucket = c->bucket;
2160        if (bucket == -1) {
2161            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2162            return;
2163        }
2164        c->bucket = -1;
2165        if (buckets[bucket] != c->gen) {
2166            out_string(c, "ERROR_NOT_OWNER");
2167            return;
2168        }
2169    }
2170
2171    key = tokens[KEY_TOKEN].value;
2172    nkey = tokens[KEY_TOKEN].length;
2173
2174    if(nkey > KEY_MAX_LENGTH) {
2175        out_string(c, "CLIENT_ERROR bad command line format");
2176        return;
2177    }
2178
2179    if(ntokens == (c->noreply ? 5 : 4)) {
2180        exptime = strtol(tokens[2].value, NULL, 10);
2181
2182        if(errno == ERANGE) {
2183            out_string(c, "CLIENT_ERROR bad command line format");
2184            return;
2185        }
2186    }
2187
2188    if (settings.detail_enabled) {
2189        stats_prefix_record_delete(key);
2190    }
2191
2192    it = item_get(key, nkey);
2193    if (it) {
2194        if (exptime == 0) {
2195            item_unlink(it);
2196            item_remove(it);      /* release our reference */
2197            out_string(c, "DELETED");
2198        } else {
2199            /* our reference will be transfered to the delete queue */
2200            out_string(c, defer_delete(it, exptime));
2201        }
2202    } else {
2203        out_string(c, "NOT_FOUND");
2204    }
2205}
2206
2207/*
2208 * Adds an item to the deferred-delete list so it can be reaped later.
2209 *
2210 * Returns the result to send to the client.
2211 */
2212char *do_defer_delete(item *it, time_t exptime)
2213{
2214    if (delcurr >= deltotal) {
2215        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2216        if (new_delete) {
2217            todelete = new_delete;
2218            deltotal *= 2;
2219        } else {
2220            /*
2221             * can't delete it immediately, user wants a delay,
2222             * but we ran out of memory for the delete queue
2223             */
2224            item_remove(it);    /* release reference */
2225            return "SERVER_ERROR out of memory expanding delete queue";
2226        }
2227    }
2228
2229    /* use its expiration time as its deletion time now */
2230    it->exptime = realtime(exptime);
2231    it->it_flags |= ITEM_DELETED;
2232    todelete[delcurr++] = it;
2233
2234    return "DELETED";
2235}
2236
2237static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2238    unsigned int level;
2239
2240    assert(c != NULL);
2241
2242    set_noreply_maybe(c, tokens, ntokens);
2243
2244    level = strtoul(tokens[1].value, NULL, 10);
2245    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2246    out_string(c, "OK");
2247    return;
2248}
2249
2250static void process_command(conn *c, char *command) {
2251
2252    token_t tokens[MAX_TOKENS];
2253    size_t ntokens;
2254    int comm;
2255
2256    assert(c != NULL);
2257
2258    if (settings.verbose > 1)
2259        fprintf(stderr, "<%d %s\n", c->sfd, command);
2260
2261    /*
2262     * for commands set/add/replace, we build an item and read the data
2263     * directly into it, then continue in nread_complete().
2264     */
2265
2266    c->msgcurr = 0;
2267    c->msgused = 0;
2268    c->iovused = 0;
2269    if (add_msghdr(c) != 0) {
2270        out_string(c, "SERVER_ERROR out of memory preparing response");
2271        return;
2272    }
2273
2274    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2275    if (ntokens >= 3 &&
2276        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2277         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2278
2279        process_get_command(c, tokens, ntokens, false);
2280
2281    } else if ((ntokens == 6 || ntokens == 7) &&
2282               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2283                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2284                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2285                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2286                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2287
2288        process_update_command(c, tokens, ntokens, comm, false);
2289
2290    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2291
2292        process_update_command(c, tokens, ntokens, comm, true);
2293
2294    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2295
2296        process_arithmetic_command(c, tokens, ntokens, 1);
2297
2298    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2299
2300        process_get_command(c, tokens, ntokens, true);
2301
2302    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2303
2304        process_arithmetic_command(c, tokens, ntokens, 0);
2305
2306    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2307
2308        process_delete_command(c, tokens, ntokens);
2309
2310    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2311        unsigned int bucket, gen;
2312        if (!settings.managed) {
2313            out_string(c, "CLIENT_ERROR not a managed instance");
2314            return;
2315        }
2316
2317        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2318            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2319                out_string(c, "CLIENT_ERROR bucket number out of range");
2320                return;
2321            }
2322            buckets[bucket] = gen;
2323            out_string(c, "OWNED");
2324            return;
2325        } else {
2326            out_string(c, "CLIENT_ERROR bad format");
2327            return;
2328        }
2329
2330    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2331
2332        int bucket;
2333        if (!settings.managed) {
2334            out_string(c, "CLIENT_ERROR not a managed instance");
2335            return;
2336        }
2337        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2338            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2339                out_string(c, "CLIENT_ERROR bucket number out of range");
2340                return;
2341            }
2342            buckets[bucket] = 0;
2343            out_string(c, "DISOWNED");
2344            return;
2345        } else {
2346            out_string(c, "CLIENT_ERROR bad format");
2347            return;
2348        }
2349
2350    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2351        int bucket, gen;
2352        if (!settings.managed) {
2353            out_string(c, "CLIENT_ERROR not a managed instance");
2354            return;
2355        }
2356        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2357            /* we never write anything back, even if input's wrong */
2358            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2359                /* do nothing, bad input */
2360            } else {
2361                c->bucket = bucket;
2362                c->gen = gen;
2363            }
2364            conn_set_init_state(c);
2365            return;
2366        } else {
2367            out_string(c, "CLIENT_ERROR bad format");
2368            return;
2369        }
2370
2371    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2372
2373        process_stat(c, tokens, ntokens);
2374
2375    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2376        time_t exptime = 0;
2377        set_current_time();
2378
2379        set_noreply_maybe(c, tokens, ntokens);
2380
2381        if(ntokens == (c->noreply ? 3 : 2)) {
2382            settings.oldest_live = current_time - 1;
2383            item_flush_expired();
2384            out_string(c, "OK");
2385            return;
2386        }
2387
2388        exptime = strtol(tokens[1].value, NULL, 10);
2389        if(errno == ERANGE) {
2390            out_string(c, "CLIENT_ERROR bad command line format");
2391            return;
2392        }
2393
2394        /*
2395          If exptime is zero realtime() would return zero too, and
2396          realtime(exptime) - 1 would overflow to the max unsigned
2397          value.  So we process exptime == 0 the same way we do when
2398          no delay is given at all.
2399        */
2400        if (exptime > 0)
2401            settings.oldest_live = realtime(exptime) - 1;
2402        else /* exptime == 0 */
2403            settings.oldest_live = current_time - 1;
2404        item_flush_expired();
2405        out_string(c, "OK");
2406        return;
2407
2408    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2409
2410        out_string(c, "VERSION " VERSION);
2411
2412    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2413
2414        conn_set_state(c, conn_closing);
2415
2416    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2417                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2418#ifdef ALLOW_SLABS_REASSIGN
2419
2420        int src, dst, rv;
2421
2422        src = strtol(tokens[2].value, NULL, 10);
2423        dst  = strtol(tokens[3].value, NULL, 10);
2424
2425        if(errno == ERANGE) {
2426            out_string(c, "CLIENT_ERROR bad command line format");
2427            return;
2428        }
2429
2430        rv = slabs_reassign(src, dst);
2431        if (rv == 1) {
2432            out_string(c, "DONE");
2433            return;
2434        }
2435        if (rv == 0) {
2436            out_string(c, "CANT");
2437            return;
2438        }
2439        if (rv == -1) {
2440            out_string(c, "BUSY");
2441            return;
2442        }
2443#else
2444        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2445#endif
2446    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2447        process_verbosity_command(c, tokens, ntokens);
2448    } else {
2449        out_string(c, "ERROR");
2450    }
2451    return;
2452}
2453
2454/*
2455 * if we have a complete line in the buffer, process it.
2456 */
2457static int try_read_command(conn *c) {
2458    char *el, *cont;
2459
2460    assert(c != NULL);
2461    assert(c->rcurr <= (c->rbuf + c->rsize));
2462
2463    if (c->rbytes == 0)
2464        return 0;
2465    el = memchr(c->rcurr, '\n', c->rbytes);
2466    if (!el)
2467        return 0;
2468    cont = el + 1;
2469    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2470        el--;
2471    }
2472    *el = '\0';
2473
2474    assert(cont <= (c->rcurr + c->rbytes));
2475
2476    process_command(c, c->rcurr);
2477
2478    c->rbytes -= (cont - c->rcurr);
2479    c->rcurr = cont;
2480
2481    assert(c->rcurr <= (c->rbuf + c->rsize));
2482
2483    return 1;
2484}
2485
2486/*
2487 * read a UDP request.
2488 * return 0 if there's nothing to read.
2489 */
2490static int try_read_udp(conn *c) {
2491    int res;
2492
2493    assert(c != NULL);
2494
2495    c->request_addr_size = sizeof(c->request_addr);
2496    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2497                   0, &c->request_addr, &c->request_addr_size);
2498    if (res > 8) {
2499        unsigned char *buf = (unsigned char *)c->rbuf;
2500        STATS_LOCK();
2501        stats.bytes_read += res;
2502        STATS_UNLOCK();
2503
2504        /* Beginning of UDP packet is the request ID; save it. */
2505        c->request_id = buf[0] * 256 + buf[1];
2506
2507        /* If this is a multi-packet request, drop it. */
2508        if (buf[4] != 0 || buf[5] != 1) {
2509            out_string(c, "SERVER_ERROR multi-packet request not supported");
2510            return 0;
2511        }
2512
2513        /* Don't care about any of the rest of the header. */
2514        res -= 8;
2515        memmove(c->rbuf, c->rbuf + 8, res);
2516
2517        c->rbytes += res;
2518        c->rcurr = c->rbuf;
2519        return 1;
2520    }
2521    return 0;
2522}
2523
2524/*
2525 * read from network as much as we can, handle buffer overflow and connection
2526 * close.
2527 * before reading, move the remaining incomplete fragment of a command
2528 * (if any) to the beginning of the buffer.
2529 * return 0 if there's nothing to read on the first read.
2530 */
2531static int try_read_network(conn *c) {
2532    int gotdata = 0;
2533    int res;
2534
2535    assert(c != NULL);
2536
2537    if (c->rcurr != c->rbuf) {
2538        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2539            memmove(c->rbuf, c->rcurr, c->rbytes);
2540        c->rcurr = c->rbuf;
2541    }
2542
2543    while (1) {
2544        if (c->rbytes >= c->rsize) {
2545            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2546            if (!new_rbuf) {
2547                if (settings.verbose > 0)
2548                    fprintf(stderr, "Couldn't realloc input buffer\n");
2549                c->rbytes = 0; /* ignore what we read */
2550                out_string(c, "SERVER_ERROR out of memory reading request");
2551                c->write_and_go = conn_closing;
2552                return 1;
2553            }
2554            c->rcurr = c->rbuf = new_rbuf;
2555            c->rsize *= 2;
2556        }
2557
2558        int avail = c->rsize - c->rbytes;
2559        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2560        if (res > 0) {
2561            STATS_LOCK();
2562            stats.bytes_read += res;
2563            STATS_UNLOCK();
2564            gotdata = 1;
2565            c->rbytes += res;
2566            if (res == avail) {
2567                continue;
2568            } else {
2569                break;
2570            }
2571        }
2572        if (res == 0) {
2573            /* connection closed */
2574            conn_set_state(c, conn_closing);
2575            return 1;
2576        }
2577        if (res == -1) {
2578            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
2579            /* Should close on unhandled errors. */
2580            conn_set_state(c, conn_closing);
2581            return 1;
2582        }
2583    }
2584    return gotdata;
2585}
2586
2587static bool update_event(conn *c, const int new_flags) {
2588    assert(c != NULL);
2589
2590    struct event_base *base = c->event.ev_base;
2591    if (c->ev_flags == new_flags)
2592        return true;
2593    if (event_del(&c->event) == -1) return false;
2594    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2595    event_base_set(base, &c->event);
2596    c->ev_flags = new_flags;
2597    if (event_add(&c->event, 0) == -1) return false;
2598    return true;
2599}
2600
2601/*
2602 * Sets whether we are listening for new connections or not.
2603 */
2604void accept_new_conns(const bool do_accept) {
2605    conn *next;
2606
2607    if (! is_listen_thread())
2608        return;
2609
2610    for (next = listen_conn; next; next = next->next) {
2611        if (do_accept) {
2612            update_event(next, EV_READ | EV_PERSIST);
2613            if (listen(next->sfd, 1024) != 0) {
2614                perror("listen");
2615            }
2616        }
2617        else {
2618            update_event(next, 0);
2619            if (listen(next->sfd, 0) != 0) {
2620                perror("listen");
2621            }
2622        }
2623  }
2624}
2625
2626/*
2627 * Transmit the next chunk of data from our list of msgbuf structures.
2628 *
2629 * Returns:
2630 *   TRANSMIT_COMPLETE   All done writing.
2631 *   TRANSMIT_INCOMPLETE More data remaining to write.
2632 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2633 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2634 */
2635static int transmit(conn *c) {
2636    assert(c != NULL);
2637
2638    if (c->msgcurr < c->msgused &&
2639            c->msglist[c->msgcurr].msg_iovlen == 0) {
2640        /* Finished writing the current msg; advance to the next. */
2641        c->msgcurr++;
2642    }
2643    if (c->msgcurr < c->msgused) {
2644        ssize_t res;
2645        struct msghdr *m = &c->msglist[c->msgcurr];
2646
2647        res = sendmsg(c->sfd, m, 0);
2648        if (res > 0) {
2649            STATS_LOCK();
2650            stats.bytes_written += res;
2651            STATS_UNLOCK();
2652
2653            /* We've written some of the data. Remove the completed
2654               iovec entries from the list of pending writes. */
2655            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2656                res -= m->msg_iov->iov_len;
2657                m->msg_iovlen--;
2658                m->msg_iov++;
2659            }
2660
2661            /* Might have written just part of the last iovec entry;
2662               adjust it so the next write will do the rest. */
2663            if (res > 0) {
2664                m->msg_iov->iov_base += res;
2665                m->msg_iov->iov_len -= res;
2666            }
2667            return TRANSMIT_INCOMPLETE;
2668        }
2669        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2670            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2671                if (settings.verbose > 0)
2672                    fprintf(stderr, "Couldn't update event\n");
2673                conn_set_state(c, conn_closing);
2674                return TRANSMIT_HARD_ERROR;
2675            }
2676            return TRANSMIT_SOFT_ERROR;
2677        }
2678        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
2679           we have a real error, on which we close the connection */
2680        if (settings.verbose > 0)
2681            perror("Failed to write, and not due to blocking");
2682
2683        if (IS_UDP(c->protocol))
2684            conn_set_state(c, conn_read);
2685        else
2686            conn_set_state(c, conn_closing);
2687        return TRANSMIT_HARD_ERROR;
2688    } else {
2689        return TRANSMIT_COMPLETE;
2690    }
2691}
2692
2693static void drive_machine(conn *c) {
2694    bool stop = false;
2695    int sfd, flags = 1;
2696    enum conn_states init_state; /* initial state for a new connection */
2697    socklen_t addrlen;
2698    struct sockaddr_storage addr;
2699    int res;
2700
2701    assert(c != NULL);
2702
2703    while (!stop) {
2704
2705        switch(c->state) {
2706        case conn_listening:
2707            addrlen = sizeof(addr);
2708            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2709                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2710                    /* these are transient, so don't log anything */
2711                    stop = true;
2712                } else if (errno == EMFILE) {
2713                    if (settings.verbose > 0)
2714                        fprintf(stderr, "Too many open connections\n");
2715                    accept_new_conns(false);
2716                    stop = true;
2717                } else {
2718                    perror("accept()");
2719                    stop = true;
2720                }
2721                break;
2722            }
2723            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2724                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2725                perror("setting O_NONBLOCK");
2726                close(sfd);
2727                break;
2728            }
2729            init_state = get_init_state(c);
2730
2731            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST,
2732                                     DATA_BUFFER_SIZE, c->protocol);
2733
2734            break;
2735
2736        case conn_negotiate:
2737            if (settings.verbose > 1)
2738                fprintf(stderr, "Negotiating protocol for a new connection\n");
2739            c->rlbytes = 1;
2740            c->ritem = c->rbuf;
2741            c->rcurr = c->rbuf;
2742            c->wcurr = c->wbuf;
2743            conn_set_state(c, conn_nread);
2744            break;
2745
2746        case conn_read:
2747            if (try_read_command(c) != 0) {
2748                continue;
2749            }
2750            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) {
2751                continue;
2752            }
2753            /* we have no command line and no data to read from network */
2754            if (!update_event(c, EV_READ | EV_PERSIST)) {
2755                if (settings.verbose > 0)
2756                    fprintf(stderr, "Couldn't update event\n");
2757                conn_set_state(c, conn_closing);
2758                break;
2759            }
2760            stop = true;
2761            break;
2762
2763        case conn_bin_init: /* Reinitialize a binary connection */
2764            reinit_bin_connection(c);
2765            break;
2766
2767        case conn_nread:
2768            if (c->rlbytes == 0) {
2769                complete_nread(c);
2770                break;
2771            }
2772            /* first check if we have leftovers in the conn_read buffer */
2773            if (c->rbytes > 0) {
2774                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2775                memcpy(c->ritem, c->rcurr, tocopy);
2776                c->ritem += tocopy;
2777                c->rlbytes -= tocopy;
2778                c->rcurr += tocopy;
2779                c->rbytes -= tocopy;
2780                break;
2781            }
2782
2783            /*  now try reading from the socket */
2784            res = read(c->sfd, c->ritem, c->rlbytes);
2785            if (res > 0) {
2786                STATS_LOCK();
2787                stats.bytes_read += res;
2788                STATS_UNLOCK();
2789                c->ritem += res;
2790                c->rlbytes -= res;
2791                break;
2792            }
2793            if (res == 0) { /* end of stream */
2794                conn_set_state(c, conn_closing);
2795                break;
2796            }
2797            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2798                if (!update_event(c, EV_READ | EV_PERSIST)) {
2799                    if (settings.verbose > 0)
2800                        fprintf(stderr, "Couldn't update event\n");
2801                    conn_set_state(c, conn_closing);
2802                    break;
2803                }
2804                stop = true;
2805                break;
2806            }
2807            /* otherwise we have a real error, on which we close the connection */
2808            if (settings.verbose > 0)
2809                fprintf(stderr, "Failed to read, and not due to blocking\n");
2810            conn_set_state(c, conn_closing);
2811            break;
2812
2813        case conn_swallow:
2814            /* we are reading sbytes and throwing them away */
2815            if (c->sbytes == 0) {
2816                conn_set_init_state(c);
2817                break;
2818            }
2819
2820            /* first check if we have leftovers in the conn_read buffer */
2821            if (c->rbytes > 0) {
2822                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2823                c->sbytes -= tocopy;
2824                c->rcurr += tocopy;
2825                c->rbytes -= tocopy;
2826                break;
2827            }
2828
2829            /*  now try reading from the socket */
2830            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2831            if (res > 0) {
2832                STATS_LOCK();
2833                stats.bytes_read += res;
2834                STATS_UNLOCK();
2835                c->sbytes -= res;
2836                break;
2837            }
2838            if (res == 0) { /* end of stream */
2839                conn_set_state(c, conn_closing);
2840                break;
2841            }
2842            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2843                if (!update_event(c, EV_READ | EV_PERSIST)) {
2844                    if (settings.verbose > 0)
2845                        fprintf(stderr, "Couldn't update event\n");
2846                    conn_set_state(c, conn_closing);
2847                    break;
2848                }
2849                stop = true;
2850                break;
2851            }
2852            /* otherwise we have a real error, on which we close the connection */
2853            if (settings.verbose > 0)
2854                fprintf(stderr, "Failed to read, and not due to blocking\n");
2855            conn_set_state(c, conn_closing);
2856            break;
2857
2858        case conn_write:
2859            /*
2860             * We want to write out a simple response. If we haven't already,
2861             * assemble it into a msgbuf list (this will be a single-entry
2862             * list for TCP or a two-entry list for UDP).
2863             */
2864            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2865                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2866                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2867                    if (settings.verbose > 0)
2868                        fprintf(stderr, "Couldn't build response\n");
2869                    conn_set_state(c, conn_closing);
2870                    break;
2871                }
2872            }
2873
2874            /* fall through... */
2875
2876        case conn_mwrite:
2877            switch (transmit(c)) {
2878            case TRANSMIT_COMPLETE:
2879                if (c->state == conn_mwrite) {
2880                    while (c->ileft > 0) {
2881                        item *it = *(c->icurr);
2882                        assert((it->it_flags & ITEM_SLABBED) == 0);
2883                        item_remove(it);
2884                        c->icurr++;
2885                        c->ileft--;
2886                    }
2887                    while (c->suffixleft > 0) {
2888                        char *suffix = *(c->suffixcurr);
2889                        if(suffix_add_to_freelist(suffix)) {
2890                            /* Failed to add to freelist, don't leak */
2891                            free(suffix);
2892                        }
2893                        c->suffixcurr++;
2894                        c->suffixleft--;
2895                    }
2896                    /* XXX:  I don't know why this wasn't the general case */
2897                    if(c->protocol == binary_prot) {
2898                        conn_set_state(c, c->write_and_go);
2899                    } else {
2900                        conn_set_init_state(c);
2901                    }
2902                } else if (c->state == conn_write) {
2903                    if (c->write_and_free) {
2904                        free(c->write_and_free);
2905                        c->write_and_free = 0;
2906                    }
2907                    conn_set_state(c, c->write_and_go);
2908                } else {
2909                    if (settings.verbose > 0)
2910                        fprintf(stderr, "Unexpected state %d\n", c->state);
2911                    conn_set_state(c, conn_closing);
2912                }
2913                break;
2914
2915            case TRANSMIT_INCOMPLETE:
2916            case TRANSMIT_HARD_ERROR:
2917                break;                   /* Continue in state machine. */
2918
2919            case TRANSMIT_SOFT_ERROR:
2920                stop = true;
2921                break;
2922            }
2923            break;
2924
2925        case conn_closing:
2926            if (IS_UDP(c->protocol))
2927                conn_cleanup(c);
2928            else
2929                conn_close(c);
2930            stop = true;
2931            break;
2932        }
2933    }
2934
2935    return;
2936}
2937
2938void event_handler(const int fd, const short which, void *arg) {
2939    conn *c;
2940
2941    c = (conn *)arg;
2942    assert(c != NULL);
2943
2944    c->which = which;
2945
2946    /* sanity */
2947    if (fd != c->sfd) {
2948        if (settings.verbose > 0)
2949            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2950        conn_close(c);
2951        return;
2952    }
2953
2954    drive_machine(c);
2955
2956    /* wait for next event */
2957    return;
2958}
2959
2960static int new_socket(struct addrinfo *ai) {
2961    int sfd;
2962