root/branches/binary/server/memcached.c @ 778

Revision 778, 111.5 kB (checked in by dsallings, 20 months ago)

Binary quit command from Trond Norbye.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63#ifndef HAVE_DAEMON
64extern int daemon(int nochdir, int noclose);
65#endif
66
67/*
68 * forward declarations
69 */
70static void drive_machine(conn *c);
71static int new_socket(struct addrinfo *ai);
72static int server_socket(const int port, enum protocol prot);
73static int try_read_command(conn *c);
74static int try_read_network(conn *c);
75static int try_read_udp(conn *c);
76static void conn_set_state(conn *c, enum conn_states state);
77
78/* stats */
79static void stats_reset(void);
80static void stats_init(void);
81
82/* defaults */
83static void settings_init(void);
84
85/* event handling, network IO */
86static void event_handler(const int fd, const short which, void *arg);
87static void conn_close(conn *c);
88static void conn_init(void);
89static void accept_new_conns(const bool do_accept);
90static bool update_event(conn *c, const int new_flags);
91static void complete_nread(conn *c);
92static void process_command(conn *c, char *command);
93static int transmit(conn *c);
94static int ensure_iov_space(conn *c);
95static int add_iov(conn *c, const void *buf, int len);
96static int add_msghdr(conn *c);
97
98/* time handling */
99static void set_current_time(void);  /* update the global variable holding
100                              global 32-bit seconds-since-start time
101                              (to avoid 64 bit time_t) */
102
103static void conn_free(conn *c);
104
105/** exported globals **/
106struct stats stats;
107struct settings settings;
108
109/** file scope variables **/
110static item **todelete = NULL;
111static int delcurr;
112static int deltotal;
113static conn *listen_conn = NULL;
114static struct event_base *main_base;
115
116#define TRANSMIT_COMPLETE   0
117#define TRANSMIT_INCOMPLETE 1
118#define TRANSMIT_SOFT_ERROR 2
119#define TRANSMIT_HARD_ERROR 3
120
121static int *buckets = 0; /* bucket->generation array for a managed instance */
122
123#define REALTIME_MAXDELTA 60*60*24*30
124/*
125 * given time value that's either unix time or delta from current unix time, return
126 * unix time. Use the fact that delta can't exceed one month (and real time value can't
127 * be that low).
128 */
129static rel_time_t realtime(const time_t exptime) {
130    /* no. of seconds in 30 days - largest possible delta exptime */
131
132    if (exptime == 0) return 0; /* 0 means never expire */
133
134    if (exptime > REALTIME_MAXDELTA) {
135        /* if item expiration is at/before the server started, give it an
136           expiration time of 1 second after the server started.
137           (because 0 means don't expire).  without this, we'd
138           underflow and wrap around to some large value way in the
139           future, effectively making items expiring in the past
140           really expiring never */
141        if (exptime <= stats.started)
142            return (rel_time_t)1;
143        return (rel_time_t)(exptime - stats.started);
144    } else {
145        return (rel_time_t)(exptime + current_time);
146    }
147}
148
149static void stats_init(void) {
150    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
151    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
152    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
153
154    /* make the time we started always be 2 seconds before we really
155       did, so time(0) - time.started is never zero.  if so, things
156       like 'settings.oldest_live' which act as booleans as well as
157       values are now false in boolean context... */
158    stats.started = time(0) - 2;
159    stats_prefix_init();
160}
161
162static void stats_reset(void) {
163    STATS_LOCK();
164    stats.total_items = stats.total_conns = 0;
165    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
166    stats.bytes_read = stats.bytes_written = 0;
167    stats_prefix_clear();
168    STATS_UNLOCK();
169}
170
171static void settings_init(void) {
172    settings.access = 0700;
173    settings.port = 11211;
174    settings.udpport = 0;
175    /* By default this string should be NULL for getaddrinfo() */
176    settings.inter = NULL;
177    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
178    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
179    settings.verbose = 0;
180    settings.oldest_live = 0;
181    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
182    settings.socketpath = NULL;       /* by default, not using a unix socket */
183    settings.managed = false;
184    settings.factor = 1.25;
185    settings.chunk_size = 48;         /* space for a modest key and value */
186#ifdef USE_THREADS
187    settings.num_threads = 4;
188#else
189    settings.num_threads = 1;
190#endif
191    settings.prefix_delimiter = ':';
192    settings.detail_enabled = 0;
193}
194
195/* returns true if a deleted item's delete-locked-time is over, and it
196   should be removed from the namespace */
197static bool item_delete_lock_over (item *it) {
198    assert(it->it_flags & ITEM_DELETED);
199    return (current_time >= it->exptime);
200}
201
202/*
203 * Adds a message header to a connection.
204 *
205 * Returns 0 on success, -1 on out-of-memory.
206 */
207static int add_msghdr(conn *c)
208{
209    struct msghdr *msg;
210
211    assert(c != NULL);
212
213    if (c->msgsize == c->msgused) {
214        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
215        if (! msg)
216            return -1;
217        c->msglist = msg;
218        c->msgsize *= 2;
219    }
220
221    msg = c->msglist + c->msgused;
222
223    /* this wipes msg_iovlen, msg_control, msg_controllen, and
224       msg_flags, the last 3 of which aren't defined on solaris: */
225    memset(msg, 0, sizeof(struct msghdr));
226
227    msg->msg_iov = &c->iov[c->iovused];
228
229    if (c->request_addr_size > 0) {
230        msg->msg_name = &c->request_addr;
231        msg->msg_namelen = c->request_addr_size;
232    }
233
234    c->msgbytes = 0;
235    c->msgused++;
236
237    if (IS_UDP(c->protocol)) {
238        /* Leave room for the UDP header, which we'll fill in later. */
239        return add_iov(c, NULL, UDP_HEADER_SIZE);
240    }
241
242    return 0;
243}
244
245
246/*
247 * Free list management for connections.
248 */
249
250static conn **freeconns;
251static int freetotal;
252static int freecurr;
253
254
255static void conn_init(void) {
256    freetotal = 200;
257    freecurr = 0;
258    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
259        fprintf(stderr, "malloc()\n");
260    }
261    return;
262}
263
264/*
265 * Returns a connection from the freelist, if any. Should call this using
266 * conn_from_freelist() for thread safety.
267 */
268conn *do_conn_from_freelist() {
269    conn *c;
270
271    if (freecurr > 0) {
272        c = freeconns[--freecurr];
273    } else {
274        c = NULL;
275    }
276
277    return c;
278}
279
280/*
281 * Adds a connection to the freelist. 0 = success. Should call this using
282 * conn_add_to_freelist() for thread safety.
283 */
284bool do_conn_add_to_freelist(conn *c) {
285    if (freecurr < freetotal) {
286        freeconns[freecurr++] = c;
287        return false;
288    } else {
289        /* try to enlarge free connections array */
290        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
291        if (new_freeconns) {
292            freetotal *= 2;
293            freeconns = new_freeconns;
294            freeconns[freecurr++] = c;
295            return false;
296        }
297    }
298    return true;
299}
300
301static const char *prot_text(enum protocol prot) {
302    char *rv = "unknown";
303    switch(prot) {
304        case ascii_prot:
305            rv = "ascii";
306            break;
307        case binary_prot:
308            rv = "binary";
309            break;
310        case ascii_udp_prot:
311            rv = "ascii-udp";
312            break;
313        case negotiating_prot:
314            rv = "auto-negotiate";
315            break;
316    }
317    return rv;
318}
319
320conn *conn_new(const int sfd, enum conn_states init_state,
321                const int event_flags,
322                const int read_buffer_size, enum protocol prot,
323                struct event_base *base) {
324    conn *c = conn_from_freelist();
325
326    if (NULL == c) {
327        if (!(c = (conn *)malloc(sizeof(conn)))) {
328            fprintf(stderr, "malloc()\n");
329            return NULL;
330        }
331        c->rbuf = c->wbuf = 0;
332        c->ilist = 0;
333        c->suffixlist = 0;
334        c->iov = 0;
335        c->msglist = 0;
336        c->hdrbuf = 0;
337
338        c->rsize = read_buffer_size;
339        c->wsize = DATA_BUFFER_SIZE;
340        c->isize = ITEM_LIST_INITIAL;
341        c->suffixsize = SUFFIX_LIST_INITIAL;
342        c->iovsize = IOV_LIST_INITIAL;
343        c->msgsize = MSG_LIST_INITIAL;
344        c->hdrsize = 0;
345
346        c->rbuf = (char *)malloc((size_t)c->rsize);
347        c->wbuf = (char *)malloc((size_t)c->wsize);
348        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
349        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
350        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
351        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
352
353        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
354                c->msglist == 0 || c->suffixlist == 0) {
355            if (c->rbuf != 0) free(c->rbuf);
356            if (c->wbuf != 0) free(c->wbuf);
357            if (c->ilist != 0) free(c->ilist);
358            if (c->suffixlist != 0) free(c->suffixlist);
359            if (c->iov != 0) free(c->iov);
360            if (c->msglist != 0) free(c->msglist);
361            free(c);
362            fprintf(stderr, "malloc()\n");
363            return NULL;
364        }
365
366        STATS_LOCK();
367        stats.conn_structs++;
368        STATS_UNLOCK();
369    }
370
371    /* unix socket mode doesn't need this, so zeroed out.  but why
372     * is this done for every command?  presumably for UDP
373     * mode.  */
374    if (!settings.socketpath) {
375        c->request_addr_size = sizeof(c->request_addr);
376    } else {
377        c->request_addr_size = 0;
378    }
379
380    if (settings.verbose > 1) {
381        if (init_state == conn_listening) {
382            fprintf(stderr, "<%d server listening (%s)\n", sfd,
383                prot_text(prot));
384        } else if (IS_UDP(prot)) {
385            fprintf(stderr, "<%d server listening (udp)\n", sfd);
386        } else if (prot == negotiating_prot) {
387            fprintf(stderr, "<%d new auto-negotiating client connection\n",
388                    sfd);
389        } else {
390            fprintf(stderr, "<%d new unknown (%d) client connection\n",
391                sfd, prot);
392            abort();
393        }
394    }
395
396    c->sfd = sfd;
397    c->protocol = prot;
398    c->state = init_state;
399    c->rlbytes = 0;
400    c->cmd = -1;
401    c->rbytes = c->wbytes = 0;
402    c->wcurr = c->wbuf;
403    c->rcurr = c->rbuf;
404    c->ritem = 0;
405    c->icurr = c->ilist;
406    c->suffixcurr = c->suffixlist;
407    c->ileft = 0;
408    c->suffixleft = 0;
409    c->iovused = 0;
410    c->msgcurr = 0;
411    c->msgused = 0;
412
413    c->write_and_go = init_state;
414    c->write_and_free = 0;
415    c->item = 0;
416    c->bucket = -1;
417    c->gen = 0;
418
419    c->noreply = false;
420
421    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
422    event_base_set(base, &c->event);
423    c->ev_flags = event_flags;
424
425    if (event_add(&c->event, 0) == -1) {
426        if (conn_add_to_freelist(c)) {
427            conn_free(c);
428        }
429        perror("event_add");
430        return NULL;
431    }
432
433    STATS_LOCK();
434    stats.curr_conns++;
435    stats.total_conns++;
436    STATS_UNLOCK();
437
438    return c;
439}
440
441static void conn_cleanup(conn *c) {
442    assert(c != NULL);
443
444    if (c->item) {
445        item_remove(c->item);
446        c->item = 0;
447    }
448
449    if (c->ileft != 0) {
450        for (; c->ileft > 0; c->ileft--,c->icurr++) {
451            item_remove(*(c->icurr));
452        }
453    }
454
455    if (c->suffixleft != 0) {
456        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
457            if(suffix_add_to_freelist(*(c->suffixcurr))) {
458                free(*(c->suffixcurr));
459            }
460        }
461    }
462
463    if (c->write_and_free) {
464        free(c->write_and_free);
465        c->write_and_free = 0;
466    }
467}
468
469/*
470 * Frees a connection.
471 */
472void conn_free(conn *c) {
473    if (c) {
474        if (c->hdrbuf)
475            free(c->hdrbuf);
476        if (c->msglist)
477            free(c->msglist);
478        if (c->rbuf)
479            free(c->rbuf);
480        if (c->wbuf)
481            free(c->wbuf);
482        if (c->ilist)
483            free(c->ilist);
484        if (c->suffixlist)
485            free(c->suffixlist);
486        if (c->iov)
487            free(c->iov);
488        free(c);
489    }
490}
491
492static void conn_close(conn *c) {
493    assert(c != NULL);
494
495    /* delete the event, the socket and the conn */
496    event_del(&c->event);
497
498    if (settings.verbose > 1)
499        fprintf(stderr, "<%d connection closed.\n", c->sfd);
500
501    close(c->sfd);
502    accept_new_conns(true);
503    conn_cleanup(c);
504
505    /* if the connection has big buffers, just free it */
506    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
507        conn_free(c);
508    }
509
510    STATS_LOCK();
511    stats.curr_conns--;
512    STATS_UNLOCK();
513
514    return;
515}
516
517/*
518 * Shrinks a connection's buffers if they're too big.  This prevents
519 * periodic large "get" requests from permanently chewing lots of server
520 * memory.
521 *
522 * This should only be called in between requests since it can wipe output
523 * buffers!
524 */
525static void conn_shrink(conn *c) {
526    assert(c != NULL);
527
528    if (IS_UDP(c->protocol))
529        return;
530
531    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
532        char *newbuf;
533
534        if (c->rcurr != c->rbuf)
535            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
536
537        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
538
539        if (newbuf) {
540            c->rbuf = newbuf;
541            c->rsize = DATA_BUFFER_SIZE;
542        }
543        /* TODO check other branch... */
544        c->rcurr = c->rbuf;
545    }
546
547    if (c->isize > ITEM_LIST_HIGHWAT) {
548        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
549        if (newbuf) {
550            c->ilist = newbuf;
551            c->isize = ITEM_LIST_INITIAL;
552        }
553    /* TODO check error condition? */
554    }
555
556    if (c->msgsize > MSG_LIST_HIGHWAT) {
557        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
558        if (newbuf) {
559            c->msglist = newbuf;
560            c->msgsize = MSG_LIST_INITIAL;
561        }
562    /* TODO check error condition? */
563    }
564
565    if (c->iovsize > IOV_LIST_HIGHWAT) {
566        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
567        if (newbuf) {
568            c->iov = newbuf;
569            c->iovsize = IOV_LIST_INITIAL;
570        }
571    /* TODO check return value */
572    }
573}
574
575/**
576 * Convert a state name to a human readable form.
577 */
578static const char *state_text(enum conn_states state) {
579    const char* const statenames[] = { "conn_listening",
580                                       "conn_new_cmd",
581                                       "conn_waiting",
582                                       "conn_read",
583                                       "conn_parse_cmd",
584                                       "conn_write",
585                                       "conn_nread",
586                                       "conn_swallow",
587                                       "conn_closing",
588                                       "conn_mwrite" };
589    return statenames[state];
590}
591
592/*
593 * Sets a connection's current state in the state machine. Any special
594 * processing that needs to happen on certain state transitions can
595 * happen here.
596 */
597static void conn_set_state(conn *c, enum conn_states state) {
598    assert(c != NULL);
599    assert(state >= conn_listening && state < conn_max_state);
600
601    if (state != c->state) {
602        if (settings.verbose > 2) {
603            fprintf(stderr, "%d: going from %s to %s\n",
604                    c->sfd, state_text(c->state),
605                    state_text(state));
606        }
607
608        c->state = state;
609    }
610}
611
612/*
613 * Free list management for suffix buffers.
614 */
615
616static char **freesuffix;
617static int freesuffixtotal;
618static int freesuffixcurr;
619
620static void suffix_init(void) {
621    freesuffixtotal = 500;
622    freesuffixcurr  = 0;
623
624    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
625    if (freesuffix == NULL) {
626        fprintf(stderr, "malloc()\n");
627    }
628    return;
629}
630
631/*
632 * Returns a suffix buffer from the freelist, if any. Should call this using
633 * suffix_from_freelist() for thread safety.
634 */
635char *do_suffix_from_freelist() {
636    char *s;
637
638    if (freesuffixcurr > 0) {
639        s = freesuffix[--freesuffixcurr];
640    } else {
641        /* If malloc fails, let the logic fall through without spamming
642         * STDERR on the server. */
643        s = malloc( SUFFIX_SIZE );
644    }
645
646    return s;
647}
648
649/*
650 * Adds a connection to the freelist. 0 = success. Should call this using
651 * conn_add_to_freelist() for thread safety.
652 */
653bool do_suffix_add_to_freelist(char *s) {
654    if (freesuffixcurr < freesuffixtotal) {
655        freesuffix[freesuffixcurr++] = s;
656        return false;
657    } else {
658        /* try to enlarge free connections array */
659        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
660        if (new_freesuffix) {
661            freesuffixtotal *= 2;
662            freesuffix = new_freesuffix;
663            freesuffix[freesuffixcurr++] = s;
664            return false;
665        }
666    }
667    return true;
668}
669
670/*
671 * Ensures that there is room for another struct iovec in a connection's
672 * iov list.
673 *
674 * Returns 0 on success, -1 on out-of-memory.
675 */
676static int ensure_iov_space(conn *c) {
677    assert(c != NULL);
678
679    if (c->iovused >= c->iovsize) {
680        int i, iovnum;
681        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
682                                (c->iovsize * 2) * sizeof(struct iovec));
683        if (! new_iov)
684            return -1;
685        c->iov = new_iov;
686        c->iovsize *= 2;
687
688        /* Point all the msghdr structures at the new list. */
689        for (i = 0, iovnum = 0; i < c->msgused; i++) {
690            c->msglist[i].msg_iov = &c->iov[iovnum];
691            iovnum += c->msglist[i].msg_iovlen;
692        }
693    }
694
695    return 0;
696}
697
698
699/*
700 * Adds data to the list of pending data that will be written out to a
701 * connection.
702 *
703 * Returns 0 on success, -1 on out-of-memory.
704 */
705
706static int add_iov(conn *c, const void *buf, int len) {
707    struct msghdr *m;
708    int leftover;
709    bool limit_to_mtu;
710
711    assert(c != NULL);
712
713    do {
714        m = &c->msglist[c->msgused - 1];
715
716        /*
717         * Limit UDP packets, and the first payloads of TCP replies, to
718         * UDP_MAX_PAYLOAD_SIZE bytes.
719         */
720        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
721
722        /* We may need to start a new msghdr if this one is full. */
723        if (m->msg_iovlen == IOV_MAX ||
724            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
725            add_msghdr(c);
726            m = &c->msglist[c->msgused - 1];
727        }
728
729        if (ensure_iov_space(c) != 0)
730            return -1;
731
732        /* If the fragment is too big to fit in the datagram, split it up */
733        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
734            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
735            len -= leftover;
736        } else {
737            leftover = 0;
738        }
739
740        m = &c->msglist[c->msgused - 1];
741        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
742        m->msg_iov[m->msg_iovlen].iov_len = len;
743
744        c->msgbytes += len;
745        c->iovused++;
746        m->msg_iovlen++;
747
748        buf = ((char *)buf) + len;
749        len = leftover;
750    } while (leftover > 0);
751
752    return 0;
753}
754
755
756/*
757 * Constructs a set of UDP headers and attaches them to the outgoing messages.
758 */
759static int build_udp_headers(conn *c) {
760    int i;
761    unsigned char *hdr;
762
763    assert(c != NULL);
764
765    if (c->msgused > c->hdrsize) {
766        void *new_hdrbuf;
767        if (c->hdrbuf)
768            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
769        else
770            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
771        if (! new_hdrbuf)
772            return -1;
773        c->hdrbuf = (unsigned char *)new_hdrbuf;
774        c->hdrsize = c->msgused * 2;
775    }
776
777    hdr = c->hdrbuf;
778    for (i = 0; i < c->msgused; i++) {
779        c->msglist[i].msg_iov[0].iov_base = hdr;
780        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
781        *hdr++ = c->request_id / 256;
782        *hdr++ = c->request_id % 256;
783        *hdr++ = i / 256;
784        *hdr++ = i % 256;
785        *hdr++ = c->msgused / 256;
786        *hdr++ = c->msgused % 256;
787        *hdr++ = 0;
788        *hdr++ = 0;
789        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
790    }
791
792    return 0;
793}
794
795
796static void out_string(conn *c, const char *str) {
797    size_t len;
798
799    assert(c != NULL);
800
801    if (c->noreply) {
802        if (settings.verbose > 1)
803            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
804        c->noreply = false;
805        conn_set_state(c, conn_new_cmd);
806        return;
807    }
808
809    if (settings.verbose > 1)
810        fprintf(stderr, ">%d %s\n", c->sfd, str);
811
812    len = strlen(str);
813    if ((len + 2) > c->wsize) {
814        /* ought to be always enough. just fail for simplicity */
815        str = "SERVER_ERROR output line too long";
816        len = strlen(str);
817    }
818
819    memcpy(c->wbuf, str, len);
820    memcpy(c->wbuf + len, "\r\n", 3);
821    c->wbytes = len + 2;
822    c->wcurr = c->wbuf;
823
824    conn_set_state(c, conn_write);
825    c->write_and_go = conn_new_cmd;
826    return;
827}
828
829/*
830 * we get here after reading the value in set/add/replace commands. The command
831 * has been stored in c->item_comm, and the item is ready in c->item.
832 */
833static void complete_nread_ascii(conn *c) {
834    assert(c != NULL);
835
836    item *it = c->item;
837    int comm = c->item_comm;
838    int ret;
839
840    STATS_LOCK();
841    stats.set_cmds++;
842    STATS_UNLOCK();
843
844    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
845        out_string(c, "CLIENT_ERROR bad data chunk");
846    } else {
847      ret = store_item(it, comm);
848      if (ret == 1)
849          out_string(c, "STORED");
850      else if(ret == 2)
851          out_string(c, "EXISTS");
852      else if(ret == 3)
853          out_string(c, "NOT_FOUND");
854      else
855          out_string(c, "NOT_STORED");
856    }
857
858    item_remove(c->item);       /* release the c->item reference */
859    c->item = 0;
860}
861
862static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
863    int i = 0;
864    uint32_t *res_header;
865
866    assert(c);
867    assert(body_len >= 0);
868
869    c->msgcurr = 0;
870    c->msgused = 0;
871    c->iovused = 0;
872    if (add_msghdr(c) != 0) {
873        /* XXX:  out_string is inappropriate here */
874        out_string(c, "SERVER_ERROR out of memory");
875        return;
876    }
877
878    res_header = (uint32_t *)c->wbuf;
879
880    res_header[0] = ((uint32_t)BIN_RES_MAGIC) << 24;
881    res_header[0] |= ((0xff & c->cmd) << 16);
882    res_header[0] |= err & 0xffff;
883
884    res_header[1] = hdr_len << 24;
885    /* TODO:  Support datatype */
886    res_header[2] = body_len;
887    res_header[3] = c->opaque;
888
889    if(settings.verbose > 1) {
890        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
891            res_header[0], res_header[1], res_header[2], res_header[3]);
892    }
893
894    for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
895        res_header[i] = htonl(res_header[i]);
896    }
897
898    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
899    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
900}
901
902static void write_bin_error(conn *c, int err, int swallow) {
903    const char *errstr = "Unknown error";
904    switch(err) {
905        case ERR_OUT_OF_MEMORY:
906            errstr = "Out of memory";
907            break;
908        case ERR_UNKNOWN_CMD:
909            errstr = "Unknown command";
910            break;
911        case ERR_NOT_FOUND:
912            errstr = "Not found";
913            break;
914        case ERR_INVALID_ARGUMENTS:
915            errstr = "Invalid arguments";
916            break;
917        case ERR_EXISTS:
918            errstr = "Data exists for key.";
919            break;
920        case ERR_TOO_LARGE:
921            errstr = "Too large.";
922            break;
923        case ERR_NOT_STORED:
924            errstr = "Not stored.";
925            break;
926        default:
927            errstr = "UNHANDLED ERROR";
928            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
929    }
930    if(settings.verbose > 0) {
931        fprintf(stderr, "Writing an error:  %s\n", errstr);
932    }
933    add_bin_header(c, err, 0, strlen(errstr));
934    add_iov(c, errstr, strlen(errstr));
935
936    conn_set_state(c, conn_mwrite);
937    if(swallow > 0) {
938        c->sbytes = swallow;
939        c->write_and_go = conn_swallow;
940    } else {
941        c->write_and_go = conn_new_cmd;
942    }
943}
944
945/* Form and send a response to a command over the binary protocol */
946static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
947    add_bin_header(c, 0, hlen, dlen);
948    if(dlen > 0) {
949        add_iov(c, d, dlen);
950    }
951    conn_set_state(c, conn_mwrite);
952    c->write_and_go = conn_new_cmd;
953}
954
955/* Byte swap a 64-bit number */
956static int64_t swap64(int64_t in) {
957#ifdef ENDIAN_LITTLE
958    /* Little endian, flip the bytes around until someone makes a faster/better
959    * way to do this. */
960    int64_t rv = 0;
961    int i = 0;
962     for(i = 0; i<8; i++) {
963        rv = (rv << 8) | (in & 0xff);
964        in >>= 8;
965     }
966    return rv;
967#else
968    /* big-endian machines don't need byte swapping */
969    return in;
970#endif
971}
972
973static void complete_incr_bin(conn *c) {
974    item *it;
975    int64_t delta;
976    uint64_t initial;
977    int32_t exptime;
978    char *key;
979    size_t nkey;
980    int i;
981    uint64_t *response_buf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
982
983    assert(c != NULL);
984
985    key = c->rbuf + BIN_INCR_HDR_LEN;
986    nkey = c->keylen;
987    key[nkey] = 0x00;
988
989    delta = swap64(*((int64_t*)(c->rbuf)));
990    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
991    exptime = ntohl(*((int*)(c->rbuf + 16)));
992
993    if(settings.verbose) {
994        fprintf(stderr, "incr ");
995        for(i = 0; i<nkey; i++) {
996            fprintf(stderr, "%c", key[i]);
997        }
998        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
999    }
1000
1001    it = item_get(key, nkey);
1002    if (it) {
1003        /* Weird magic in add_delta forces me to pad here */
1004        char tmpbuf[INCR_MAX_STORAGE_LEN];
1005        uint64_t l = 0;
1006        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1007        tmpbuf[INCR_MAX_STORAGE_LEN] = 0x00;
1008        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1009        *response_buf = swap64(strtoull(tmpbuf, NULL, 10));
1010
1011        write_bin_response(c, response_buf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1012        item_remove(it);         /* release our reference */
1013    } else {
1014        if(exptime >= 0) {
1015            /* Save some room for the response */
1016            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1017            *response_buf = swap64(initial);
1018            it = item_alloc(key, nkey, 0, realtime(exptime),
1019                INCR_MAX_STORAGE_LEN);
1020            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1021
1022            if(store_item(it, NREAD_SET)) {
1023                write_bin_response(c, response_buf, BIN_INCR_HDR_LEN,
1024                    INCR_RES_LEN);
1025            } else {
1026                write_bin_error(c, ERR_NOT_STORED, 0);
1027            }
1028            item_remove(it);         /* release our reference */
1029        } else {
1030            write_bin_error(c, ERR_NOT_FOUND, 0);
1031        }
1032    }
1033}
1034
1035static void complete_update_bin(conn *c) {
1036    int eno = -1, ret = 0;
1037    assert(c != NULL);
1038
1039    item *it = c->item;
1040
1041    STATS_LOCK();
1042    stats.set_cmds++;
1043    STATS_UNLOCK();
1044
1045    /* We don't actually receive the trailing two characters in the bin
1046     * protocol, so we're going to just set them here */
1047    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1048    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1049
1050    switch (store_item(it, c->item_comm)) {
1051        case 1:
1052            /* Stored */
1053            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1054            break;
1055        case 2:
1056            write_bin_error(c, ERR_EXISTS, 0);
1057            break;
1058        case 3:
1059            write_bin_error(c, ERR_NOT_FOUND, 0);
1060            break;
1061        default:
1062            if(c->item_comm == NREAD_ADD) {
1063                eno = ERR_EXISTS;
1064            } else if(c->item_comm == NREAD_REPLACE) {
1065                eno = ERR_NOT_FOUND;
1066            } else {
1067                eno = ERR_NOT_STORED;
1068            }
1069            write_bin_error(c, eno, 0);
1070    }
1071
1072    item_remove(c->item);       /* release the c->item reference */
1073    c->item = 0;
1074}
1075
1076static void process_bin_get(conn *c) {
1077    item *it;
1078
1079    it = item_get(c->rbuf, c->keylen);
1080    if (it) {
1081        int *flags;
1082        uint64_t* identifier;
1083
1084        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1085
1086        /* the length has two unnecessary bytes, and then we write four more */
1087        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1088
1089        /* Add the "extras" field: CAS-id followed by flags. The cas is a 64-
1090           bit datatype and require alignment to 8-byte boundaries on some
1091           architechtures. Verify that the size of the packet header is of
1092           the correct size (if not the following code generates SIGBUS on
1093           sparc hardware).
1094        */
1095        assert(MIN_BIN_PKT_LENGTH % 8 == 0);
1096        identifier = (uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1097        *identifier = swap64(it->cas_id);
1098        add_iov(c, identifier, 8);
1099
1100        /* Add the flags */
1101        flags = (int*)(c->wbuf + MIN_BIN_PKT_LENGTH + 8);
1102        *flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1103        add_iov(c, flags, 4);
1104
1105        /* Add the data minus the CRLF */
1106        add_iov(c, ITEM_data(it), it->nbytes - 2);
1107        conn_set_state(c, conn_mwrite);
1108    } else {
1109        if(c->cmd == CMD_GETQ) {
1110            conn_set_state(c, conn_new_cmd);
1111        } else {
1112            write_bin_error(c, ERR_NOT_FOUND, 0);
1113        }
1114    }
1115}
1116
1117static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1118    assert(c);
1119    c->substate = next_substate;
1120    c->rlbytes = c->keylen + extra;
1121    assert(c->rsize >= c->rlbytes);
1122    c->ritem = c->rbuf;
1123    conn_set_state(c, conn_nread);
1124}
1125
1126static void dispatch_bin_command(conn *c) {
1127    time_t exptime = 0;
1128    switch(c->cmd) {
1129        case CMD_VERSION:
1130            write_bin_response(c, VERSION, 0, strlen(VERSION));
1131            break;
1132        case CMD_FLUSH:
1133            set_current_time();
1134
1135            settings.oldest_live = current_time - 1;
1136            item_flush_expired();
1137            write_bin_response(c, NULL, 0, 0);
1138            break;
1139        case CMD_NOOP:
1140            write_bin_response(c, NULL, 0, 0);
1141            break;
1142        case CMD_SET:
1143            /* Fallthrough */
1144        case CMD_ADD:
1145            /* Fallthrough */
1146        case CMD_REPLACE:
1147            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1148            break;
1149        case CMD_GETQ:
1150        case CMD_GET:
1151            bin_read_key(c, bin_reading_get_key, 0);
1152            break;
1153        case CMD_DELETE:
1154            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1155            break;
1156        case CMD_INCR:
1157        case CMD_DECR:
1158            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1159            break;
1160        case CMD_QUIT:
1161            write_bin_response(c, NULL, 0, 0);
1162            c->write_and_go = conn_closing;
1163            break;
1164        default:
1165            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1166    }
1167}
1168
1169static void process_bin_update(conn *c) {
1170    char *key;
1171    int nkey;
1172    int vlen;
1173    int flags;
1174    int exptime;
1175    item *it;
1176    int comm;
1177    int hdrlen = BIN_SET_HDR_LEN;
1178
1179    assert(c != NULL);
1180
1181    key = c->rbuf + hdrlen;
1182    nkey = c->keylen;
1183    key[nkey] = 0x00;
1184
1185    flags = ntohl(*((int*)(c->rbuf + 8)));
1186    exptime = ntohl(*((int*)(c->rbuf + 12)));
1187    vlen = c->bin_header[2] - (nkey + hdrlen);
1188
1189    if(settings.verbose > 1) {
1190        fprintf(stderr, "Value len is %d\n", vlen);
1191    }
1192
1193    if (settings.detail_enabled) {
1194        stats_prefix_record_set(key);
1195    }
1196
1197    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1198
1199    if (it == 0) {
1200        if (! item_size_ok(nkey, flags, vlen + 2)) {
1201            write_bin_error(c, ERR_TOO_LARGE, vlen);
1202        } else {
1203            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1204        }
1205        /* swallow the data line */
1206        c->write_and_go = conn_swallow;
1207        return;
1208    }
1209
1210    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf)));
1211
1212    switch(c->cmd) {
1213        case CMD_ADD:
1214            c->item_comm = NREAD_ADD;
1215            break;
1216        case CMD_SET:
1217            c->item_comm = NREAD_SET;
1218            break;
1219        case CMD_REPLACE:
1220            c->item_comm = NREAD_REPLACE;
1221            break;
1222        default:
1223            assert(0);
1224    }
1225
1226    if(it->cas_id != 0) {
1227        c->item_comm = NREAD_CAS;
1228    }
1229
1230    c->item = it;
1231    c->ritem = ITEM_data(it);
1232    c->rlbytes = vlen;
1233    conn_set_state(c, conn_nread);
1234    c->substate = bin_read_set_value;
1235}
1236
1237static void process_bin_delete(conn *c) {
1238    char *key;
1239    size_t nkey;
1240    item *it;
1241    time_t exptime = 0;
1242
1243    assert(c != NULL);
1244
1245    exptime = ntohl(*((int*)(c->rbuf)));
1246    key = c->rbuf + 4;
1247    nkey = c->keylen;
1248    key[nkey] = 0x00;
1249
1250    if(settings.verbose) {
1251        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1252    }
1253
1254    if (settings.detail_enabled) {
1255        stats_prefix_record_delete(key);
1256    }
1257
1258    it = item_get(key, nkey);
1259    if (it) {
1260        if (exptime == 0) {
1261            item_unlink(it);
1262            item_remove(it);      /* release our reference */
1263            write_bin_response(c, NULL, 0, 0);
1264        } else {
1265            /* XXX:  This is really lame, but defer_delete returns a string */
1266            char *res = defer_delete(it, exptime);
1267            if(res[0] == 'D') {
1268                write_bin_response(c, NULL, 0, 0);
1269            } else {
1270                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1271            }
1272        }
1273    } else {
1274        write_bin_error(c, ERR_NOT_FOUND, 0);
1275    }
1276}
1277
1278static void complete_nread_binary(conn *c) {
1279    assert(c != NULL);
1280    assert(c->cmd >= 0);
1281
1282    switch(c->substate) {
1283    case bin_reading_set_header:
1284        process_bin_update(c);
1285        break;
1286    case bin_read_set_value:
1287        complete_update_bin(c);
1288        break;
1289    case bin_reading_get_key:
1290        process_bin_get(c);
1291        break;
1292    case bin_reading_del_header:
1293        process_bin_delete(c);
1294        break;
1295    case bin_reading_incr_header:
1296        complete_incr_bin(c);
1297        break;
1298    default:
1299        fprintf(stderr, "Not handling substate %d\n", c->substate);
1300        assert(0);
1301    }
1302}
1303
1304static void reset_cmd_handler(conn *c) {
1305    c->cmd = -1;
1306    c->substate = bin_no_state;
1307    conn_shrink(c);
1308    if (c->rbytes > 0) {
1309        conn_set_state(c, conn_parse_cmd);
1310    } else {
1311        conn_set_state(c, conn_waiting);
1312    }
1313}
1314
1315static void complete_nread(conn *c) {
1316    assert(c != NULL);
1317    assert(c->protocol == ascii_prot || c->protocol == binary_prot);
1318
1319    if(c->protocol == ascii_prot) {
1320        complete_nread_ascii(c);
1321    } else if (c->protocol == binary_prot) {
1322        complete_nread_binary(c);
1323    }
1324}
1325
1326/*
1327 * Stores an item in the cache according to the semantics of one of the set
1328 * commands. In threaded mode, this is protected by the cache lock.
1329 *
1330 * Returns true if the item was stored.
1331 */
1332int do_store_item(item *it, int comm) {
1333    char *key = ITEM_key(it);
1334    bool delete_locked = false;
1335    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1336    int stored = 0;
1337
1338    item *new_it = NULL;
1339    int flags;
1340
1341    if (old_it != NULL && comm == NREAD_ADD) {
1342        /* add only adds a nonexistent item, but promote to head of LRU */
1343        do_item_update(old_it);
1344    } else if (!old_it && (comm == NREAD_REPLACE
1345        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1346    {
1347        /* replace only replaces an existing value; don't store */
1348    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1349        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1350    {
1351        /* replace and add can't override delete locks; don't store */
1352    } else if (comm == NREAD_CAS) {
1353        /* validate cas operation */
1354        if (delete_locked)
1355            old_it = do_item_get_nocheck(key, it->nkey);
1356
1357        if(old_it == NULL) {
1358          // LRU expired
1359          stored = 3;
1360        }
1361        else if(it->cas_id == old_it->cas_id) {
1362          // cas validates
1363          do_item_replace(old_it, it);
1364          stored = 1;
1365        } else {
1366          if(settings.verbose > 1) {
1367            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1368                old_it->cas_id, it->cas_id);
1369          }
1370          stored = 2;
1371        }
1372    } else {
1373        /*
1374         * Append - combine new and old record into single one. Here it's
1375         * atomic and thread-safe.
1376         */
1377
1378        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1379
1380            /* we have it and old_it here - alloc memory to hold both */
1381            /* flags was already lost - so recover them from ITEM_suffix(it) */
1382
1383            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1384
1385            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1386
1387            if (new_it == NULL) {
1388                /* SERVER_ERROR out of memory */
1389                return 0;
1390            }
1391
1392            /* copy data from it and old_it to new_it */
1393
1394            if (comm == NREAD_APPEND) {
1395                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1396                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1397            } else {
1398                /* NREAD_PREPEND */
1399                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1400                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1401            }
1402
1403            it = new_it;
1404        }
1405
1406        /* "set" commands can override the delete lock
1407           window... in which case we have to find the old hidden item
1408           that's in the namespace/LRU but wasn't returned by
1409           item_get.... because we need to replace it */
1410        if (delete_locked)
1411            old_it = do_item_get_nocheck(key, it->nkey);
1412
1413        if (old_it != NULL)
1414            do_item_replace(old_it, it);
1415        else
1416            do_item_link(it);
1417
1418        stored = 1;
1419    }
1420
1421    if (old_it != NULL)
1422        do_item_remove(old_it);         /* release our reference */
1423    if (new_it != NULL)
1424        do_item_remove(new_it);
1425
1426    return stored;
1427}
1428
1429typedef struct token_s {
1430    char *value;
1431    size_t length;
1432} token_t;
1433
1434#define COMMAND_TOKEN 0
1435#define SUBCOMMAND_TOKEN 1
1436#define KEY_TOKEN 1
1437#define KEY_MAX_LENGTH 250
1438
1439#define MAX_TOKENS 8
1440
1441/*
1442 * Tokenize the command string by replacing whitespace with '\0' and update
1443 * the token array tokens with pointer to start of each token and length.
1444 * Returns total number of tokens.  The last valid token is the terminal
1445 * token (value points to the first unprocessed character of the string and
1446 * length zero).
1447 *
1448 * Usage example:
1449 *
1450 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1451 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1452 *          ...
1453 *      }
1454 *      ncommand = tokens[ix].value - command;
1455 *      command  = tokens[ix].value;
1456 *   }
1457 */
1458static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1459    char *s, *e;
1460    size_t ntokens = 0;
1461
1462    assert(command != NULL && tokens != NULL && max_tokens > 1);
1463
1464    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1465        if (*e == ' ') {
1466            if (s != e) {
1467                tokens[ntokens].value = s;
1468                tokens[ntokens].length = e - s;
1469                ntokens++;
1470                *e = '\0';
1471            }
1472            s = e + 1;
1473        }
1474        else if (*e == '\0') {
1475            if (s != e) {
1476                tokens[ntokens].value = s;
1477                tokens[ntokens].length = e - s;
1478                ntokens++;
1479            }
1480
1481            break; /* string end */
1482        }
1483    }
1484
1485    /*
1486     * If we scanned the whole string, the terminal value pointer is null,
1487     * otherwise it is the first unprocessed character.
1488     */
1489    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1490    tokens[ntokens].length = 0;
1491    ntokens++;
1492
1493    return ntokens;
1494}
1495
1496/* set up a connection to write a buffer then free it, used for stats */
1497static void write_and_free(conn *c, char *buf, int bytes) {
1498    if (buf) {
1499        c->write_and_free = buf;
1500        c->wcurr = buf;
1501        c->wbytes = bytes;
1502        conn_set_state(c, conn_write);
1503        c->write_and_go = conn_new_cmd;
1504    } else {
1505        out_string(c, "SERVER_ERROR out of memory writing stats");
1506    }
1507}
1508
1509static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1510{
1511    int noreply_index = ntokens - 2;
1512
1513    /*
1514      NOTE: this function is not the first place where we are going to
1515      send the reply.  We could send it instead from process_command()
1516      if the request line has wrong number of tokens.  However parsing
1517      malformed line for "noreply" option is not reliable anyway, so
1518      it can't be helped.
1519    */
1520    if (tokens[noreply_index].value
1521        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1522        c->noreply = true;
1523    }
1524}
1525
1526inline static void process_stats_detail(conn *c, const char *command) {
1527    assert(c != NULL);
1528
1529    if (strcmp(command, "on") == 0) {
1530        settings.detail_enabled = 1;
1531        out_string(c, "OK");
1532    }
1533    else if (strcmp(command, "off") == 0) {
1534        settings.detail_enabled = 0;
1535        out_string(c, "OK");
1536    }
1537    else if (strcmp(command, "dump") == 0) {
1538        int len;
1539        char *stats = stats_prefix_dump(&len);
1540        write_and_free(c, stats, len);
1541    }
1542    else {
1543        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1544    }
1545}
1546
1547static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1548    rel_time_t now = current_time;
1549    char *command;
1550    char *subcommand;
1551
1552    assert(c != NULL);
1553
1554    if(ntokens < 2) {
1555        out_string(c, "CLIENT_ERROR bad command line");
1556        return;
1557    }
1558
1559    command = tokens[COMMAND_TOKEN].value;
1560
1561    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1562        char temp[1024];
1563        pid_t pid = getpid();
1564        char *pos = temp;
1565
1566#ifndef WIN32
1567        struct rusage usage;
1568        getrusage(RUSAGE_SELF, &usage);
1569#endif /* !WIN32 */
1570
1571        STATS_LOCK();
1572        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1573        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1574        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1575        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1576        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1577#ifndef WIN32
1578        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1579        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1580#endif /* !WIN32 */
1581        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1582        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1583        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1584        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1585        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1586        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1587        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1588        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1589        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1590        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1591        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1592        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1593        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1594        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1595        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1596        pos += sprintf(pos, "END");
1597        STATS_UNLOCK();
1598        out_string(c, temp);
1599        return;
1600    }
1601
1602    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1603
1604    if (strcmp(subcommand, "reset") == 0) {
1605        stats_reset();
1606        out_string(c, "RESET");
1607        return;
1608    }
1609
1610#ifdef HAVE_MALLOC_H
1611#ifdef HAVE_STRUCT_MALLINFO
1612    if (strcmp(subcommand, "malloc") == 0) {
1613        char temp[512];
1614        struct mallinfo info;
1615        char *pos = temp;
1616
1617        info = mallinfo();
1618        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1619        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1620        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1621        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1622        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1623        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1624        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1625        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1626        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1627        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1628        out_string(c, temp);
1629        return;
1630    }
1631#endif /* HAVE_STRUCT_MALLINFO */
1632#endif /* HAVE_MALLOC_H */
1633
1634#if !defined(WIN32) || !defined(__APPLE__)
1635    if (strcmp(subcommand, "maps") == 0) {
1636        char *wbuf;
1637        int wsize = 8192; /* should be enough */
1638        int fd;
1639        int res;
1640
1641        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1642            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1643            return;
1644        }
1645
1646        fd = open("/proc/self/maps", O_RDONLY);
1647        if (fd == -1) {
1648            out_string(c, "SERVER_ERROR cannot open the maps file");
1649            free(wbuf);
1650            return;
1651        }
1652
1653        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1654        if (res == wsize - 6) {
1655            out_string(c, "SERVER_ERROR buffer overflow");
1656            free(wbuf); close(fd);
1657            return;
1658        }
1659        if (res == 0 || res == -1) {
1660            out_string(c, "SERVER_ERROR can't read the maps file");
1661            free(wbuf); close(fd);
1662            return;
1663        }
1664        memcpy(wbuf + res, "END\r\n", 5);
1665        write_and_free(c, wbuf, res + 5);
1666        close(fd);
1667        return;
1668    }
1669#endif
1670
1671    if (strcmp(subcommand, "cachedump") == 0) {
1672
1673        char *buf;
1674        unsigned int bytes, id, limit = 0;
1675
1676        if(ntokens < 5) {
1677            out_string(c, "CLIENT_ERROR bad command line");
1678            return;
1679        }
1680
1681        id = strtoul(tokens[2].value, NULL, 10);
1682        limit = strtoul(tokens[3].value, NULL, 10);
1683
1684        if(errno == ERANGE) {
1685            out_string(c, "CLIENT_ERROR bad command line format");
1686            return;
1687        }
1688
1689        buf = item_cachedump(id, limit, &bytes);
1690        write_and_free(c, buf, bytes);
1691        return;
1692    }
1693
1694    if (strcmp(subcommand, "slabs") == 0) {
1695        int bytes = 0;
1696        char *buf = slabs_stats(&bytes);
1697        write_and_free(c, buf, bytes);
1698        return;
1699    }
1700
1701    if (strcmp(subcommand, "items") == 0) {
1702        int bytes = 0;
1703        char *buf = item_stats(&bytes);
1704        write_and_free(c, buf, bytes);
1705        return;
1706    }
1707
1708    if (strcmp(subcommand, "detail") == 0) {
1709        if (ntokens < 4)
1710            process_stats_detail(c, "");  /* outputs the error message */
1711        else
1712            process_stats_detail(c, tokens[2].value);
1713        return;
1714    }
1715
1716    if (strcmp(subcommand, "sizes") == 0) {
1717        int bytes = 0;
1718        char *buf = item_stats_sizes(&bytes);
1719        write_and_free(c, buf, bytes);
1720        return;
1721    }
1722
1723    out_string(c, "ERROR");
1724}
1725
1726/* ntokens is overwritten here... shrug.. */
1727static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1728    char *key;
1729    size_t nkey;
1730    int i = 0;
1731    item *it;
1732    token_t *key_token = &tokens[KEY_TOKEN];
1733    char *suffix;
1734    int stats_get_cmds   = 0;
1735    int stats_get_hits   = 0;
1736    int stats_get_misses = 0;
1737    assert(c != NULL);
1738
1739    if (settings.managed) {
1740        int bucket = c->bucket;
1741        if (bucket == -1) {
1742            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1743            return;
1744        }
1745        c->bucket = -1;
1746        if (buckets[bucket] != c->gen) {
1747            out_string(c, "ERROR_NOT_OWNER");
1748            return;
1749        }
1750    }
1751
1752    do {
1753        while(key_token->length != 0) {
1754
1755            key = key_token->value;
1756            nkey = key_token->length;
1757
1758            if(nkey > KEY_MAX_LENGTH) {
1759                STATS_LOCK();
1760                stats.get_cmds   += stats_get_cmds;
1761                stats.get_hits   += stats_get_hits;
1762                stats.get_misses += stats_get_misses;
1763                STATS_UNLOCK();
1764                out_string(c, "CLIENT_ERROR bad command line format");
1765                return;
1766            }
1767
1768            stats_get_cmds++;
1769            it = item_get(key, nkey);
1770            if (settings.detail_enabled) {
1771                stats_prefix_record_get(key, NULL != it);
1772            }
1773            if (it) {
1774                if (i >= c->isize) {
1775                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1776                    if (new_list) {
1777                        c->isize *= 2;
1778                        c->ilist = new_list;
1779                    } else break;
1780                }
1781
1782                /*
1783                 * Construct the response. Each hit adds three elements to the
1784                 * outgoing data list:
1785                 *   "VALUE "
1786                 *   key
1787                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1788                 */
1789
1790                if (return_cas)
1791                {
1792                  /* Goofy mid-flight realloc. */
1793                  if (i >= c->suffixsize) {
1794                    char **new_suffix_list = realloc(c->suffixlist,
1795                                           sizeof(char *) * c->suffixsize * 2);
1796                    if (new_suffix_list) {
1797                      c->suffixsize *= 2;
1798                      c->suffixlist  = new_suffix_list;
1799                    } else break;
1800                  }
1801
1802                  suffix = suffix_from_freelist();
1803                  if (suffix == NULL) {
1804                    STATS_LOCK();
1805                    stats.get_cmds   += stats_get_cmds;
1806                    stats.get_hits   += stats_get_hits;
1807                    stats.get_misses += stats_get_misses;
1808                    STATS_UNLOCK();
1809                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1810                    return;
1811                  }
1812                  *(c->suffixlist + i) = suffix;
1813                  sprintf(suffix, " %llu\r\n", it->cas_id);
1814                  if (add_iov(c, "VALUE ", 6) != 0 ||
1815                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1816                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1817                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1818                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1819                      {
1820                          break;
1821                      }
1822                }
1823                else
1824                {
1825                  if (add_iov(c, "VALUE ", 6) != 0 ||
1826                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1827                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1828                      {
1829                          break;
1830                      }
1831                }
1832
1833
1834                if (settings.verbose > 1)
1835                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1836
1837                /* item_get() has incremented it->refcount for us */
1838                stats_get_hits++;
1839                item_update(it);
1840                *(c->ilist + i) = it;
1841                i++;
1842
1843            } else {
1844                stats_get_misses++;
1845            }
1846
1847            key_token++;
1848        }
1849
1850        /*
1851         * If the command string hasn't been fully processed, get the next set
1852         * of tokens.
1853         */
1854        if(key_token->value != NULL) {
1855            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1856            key_token = tokens;
1857        }
1858
1859    } while(key_token->value != NULL);
1860
1861    c->icurr = c->ilist;
1862    c->ileft = i;
1863    if (return_cas) {
1864        c->suffixcurr = c->suffixlist;
1865        c->suffixleft = i;
1866    }
1867
1868    if (settings.verbose > 1)
1869        fprintf(stderr, ">%d END\n", c->sfd);
1870
1871    /*
1872        If the loop was terminated because of out-of-memory, it is not
1873        reliable to add END\r\n to the buffer, because it might not end
1874        in \r\n. So we send SERVER_ERROR instead.
1875    */
1876    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1877        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1878        out_string(c, "SERVER_ERROR out of memory writing get response");
1879    }
1880    else {
1881        conn_set_state(c, conn_mwrite);
1882        c->msgcurr = 0;
1883    }
1884
1885    STATS_LOCK();
1886    stats.get_cmds   += stats_get_cmds;
1887    stats.get_hits   += stats_get_hits;
1888    stats.get_misses += stats_get_misses;
1889    STATS_UNLOCK();
1890
1891    return;
1892}
1893
1894static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1895    char *key;
1896    size_t nkey;
1897    int flags;
1898    time_t exptime;
1899    int vlen, old_vlen;
1900    uint64_t req_cas_id;
1901    item *it, *old_it;
1902
1903    assert(c != NULL);
1904
1905    set_noreply_maybe(c, tokens, ntokens);
1906
1907    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1908        out_string(c, "CLIENT_ERROR bad command line format");
1909        return;
1910    }
1911
1912    key = tokens[KEY_TOKEN].value;
1913    nkey = tokens[KEY_TOKEN].length;
1914
1915    flags = strtoul(tokens[2].value, NULL, 10);
1916    exptime = strtol(tokens[3].value, NULL, 10);
1917    vlen = strtol(tokens[4].value, NULL, 10);
1918
1919    // does cas value exist?
1920    if(handle_cas)
1921    {
1922      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1923    }
1924
1925    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1926        out_string(c, "CLIENT_ERROR bad command line format");
1927        return;
1928    }
1929
1930    if (settings.detail_enabled) {
1931        stats_prefix_record_set(key);
1932    }
1933
1934    if (settings.managed) {
1935        int bucket = c->bucket;
1936        if (bucket == -1) {
1937            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1938            return;
1939        }
1940        c->bucket = -1;
1941        if (buckets[bucket] != c->gen) {
1942            out_string(c, "ERROR_NOT_OWNER");
1943            return;
1944        }
1945    }
1946
1947    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1948
1949    if (it == 0) {
1950        if (! item_size_ok(nkey, flags, vlen + 2))
1951            out_string(c, "SERVER_ERROR object too large for cache");
1952        else
1953            out_string(c, "SERVER_ERROR out of memory storing object");
1954        /* swallow the data line */
1955        c->write_and_go = conn_swallow;
1956        c->sbytes = vlen + 2;
1957        return;
1958    }
1959    if(handle_cas)
1960      it->cas_id = req_cas_id;
1961
1962    c->item = it;
1963    c->ritem = ITEM_data(it);
1964    c->rlbytes = it->nbytes;
1965    c->item_comm = comm;
1966    conn_set_state(c, conn_nread);
1967}
1968
1969static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
1970    char temp[sizeof("18446744073709551615")];
1971    item *it;
1972    int64_t delta;
1973    char *key;
1974    size_t nkey;
1975
1976    assert(c != NULL);
1977
1978    set_noreply_maybe(c, tokens, ntokens);
1979
1980    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1981        out_string(c, "CLIENT_ERROR bad command line format");
1982        return;
1983    }
1984
1985    key = tokens[KEY_TOKEN].value;
1986    nkey = tokens[KEY_TOKEN].length;
1987
1988    if (settings.managed) {
1989        int bucket = c->bucket;
1990        if (bucket == -1) {
1991            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1992            return;
1993        }
1994        c->bucket = -1;
1995        if (buckets[bucket] != c->gen) {
1996            out_string(c, "ERROR_NOT_OWNER");
1997            return;
1998        }
1999    }
2000
2001    delta = strtoll(tokens[2].value, NULL, 10);
2002
2003    if(errno == ERANGE) {
2004        out_string(c, "CLIENT_ERROR bad command line format");
2005        return;
2006    }
2007
2008    it = item_get(key, nkey);
2009    if (!it) {
2010        out_string(c, "NOT_FOUND");
2011        return;
2012    }
2013
2014    out_string(c, add_delta(it, incr, delta, temp));
2015    item_remove(it);         /* release our reference */
2016}
2017
2018/*
2019 * adds a delta value to a numeric item.
2020 *
2021 * it    item to adjust
2022 * incr  true to increment value, false to decrement
2023 * delta amount to adjust value by
2024 * buf   buffer for response string
2025 *
2026 * returns a response string to send back to the client.
2027 */
2028char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2029    char *ptr;
2030    int64_t value;
2031    int res;
2032
2033    ptr = ITEM_data(it);
2034    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2035
2036    value = strtoull(ptr, NULL, 10);
2037
2038    if(errno == ERANGE) {
2039        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2040    }
2041
2042    if (incr)
2043        value += delta;
2044    else {
2045        value -= delta;
2046    }
2047    if(value < 0) {
2048        value = 0;
2049    }
2050    sprintf(buf, "%llu", value);
2051    res = strlen(buf);
2052    if (res + 2 > it->nbytes) { /* need to realloc */
2053        item *new_it;
2054        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2055        if (new_it == 0) {
2056            return "SERVER_ERROR out of memory in incr/decr";
2057        }
2058        memcpy(ITEM_data(new_it), buf, res);
2059        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2060        do_item_replace(it, new_it);
2061        do_item_remove(new_it);       /* release our reference */
2062    } else { /* replace in-place */
2063        memcpy(ITEM_data(it), buf, res);
2064        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2065    }
2066
2067    return buf;
2068}
2069
2070static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2071    char *key;
2072    size_t nkey;
2073    item *it;
2074    time_t exptime = 0;
2075
2076    assert(c != NULL);
2077
2078    set_noreply_maybe(c, tokens, ntokens);
2079
2080    if (settings.managed) {
2081        int bucket = c->bucket;
2082        if (bucket == -1) {
2083            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2084            return;
2085        }
2086        c->bucket = -1;
2087        if (buckets[bucket] != c->gen) {
2088            out_string(c, "ERROR_NOT_OWNER");
2089            return;
2090        }
2091    }
2092
2093    key = tokens[KEY_TOKEN].value;
2094    nkey = tokens[KEY_TOKEN].length;
2095
2096    if(nkey > KEY_MAX_LENGTH) {
2097        out_string(c, "CLIENT_ERROR bad command line format");
2098        return;
2099    }
2100
2101    if(ntokens == (c->noreply ? 5 : 4)) {
2102        exptime = strtol(tokens[2].value, NULL, 10);
2103
2104        if(errno == ERANGE) {
2105            out_string(c, "CLIENT_ERROR bad command line format");
2106            return;
2107        }
2108    }
2109
2110    if (settings.detail_enabled) {
2111        stats_prefix_record_delete(key);
2112    }
2113
2114    it = item_get(key, nkey);
2115    if (it) {
2116        if (exptime == 0) {
2117            item_unlink(it);
2118            item_remove(it);      /* release our reference */
2119            out_string(c, "DELETED");
2120        } else {
2121            /* our reference will be transfered to the delete queue */
2122            out_string(c, defer_delete(it, exptime));
2123        }
2124    } else {
2125        out_string(c, "NOT_FOUND");
2126    }
2127}
2128
2129/*
2130 * Adds an item to the deferred-delete list so it can be reaped later.
2131 *
2132 * Returns the result to send to the client.
2133 */
2134char *do_defer_delete(item *it, time_t exptime)
2135{
2136    if (delcurr >= deltotal) {
2137        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2138        if (new_delete) {
2139            todelete = new_delete;
2140            deltotal *= 2;
2141        } else {
2142            /*
2143             * can't delete it immediately, user wants a delay,
2144             * but we ran out of memory for the delete queue
2145             */
2146            item_remove(it);    /* release reference */
2147            return "SERVER_ERROR out of memory expanding delete queue";
2148        }
2149    }
2150
2151    /* use its expiration time as its deletion time now */
2152    it->exptime = realtime(exptime);
2153    it->it_flags |= ITEM_DELETED;
2154    todelete[delcurr++] = it;
2155
2156    return "DELETED";
2157}
2158
2159static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2160    unsigned int level;
2161
2162    assert(c != NULL);
2163
2164    set_noreply_maybe(c, tokens, ntokens);
2165
2166    level = strtoul(tokens[1].value, NULL, 10);
2167    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2168    out_string(c, "OK");
2169    return;
2170}
2171
2172static void process_command(conn *c, char *command) {
2173
2174    token_t tokens[MAX_TOKENS];
2175    size_t ntokens;
2176    int comm;
2177
2178    assert(c != NULL);
2179
2180    if (settings.verbose > 1)
2181        fprintf(stderr, "<%d %s\n", c->sfd, command);
2182
2183    /*
2184     * for commands set/add/replace, we build an item and read the data
2185     * directly into it, then continue in nread_complete().
2186     */
2187
2188    c->msgcurr = 0;
2189    c->msgused = 0;
2190    c->iovused = 0;
2191    if (add_msghdr(c) != 0) {
2192        out_string(c, "SERVER_ERROR out of memory preparing response");
2193        return;
2194    }
2195
2196    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2197    if (ntokens >= 3 &&
2198        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2199         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2200
2201        process_get_command(c, tokens, ntokens, false);
2202
2203    } else if ((ntokens == 6 || ntokens == 7) &&
2204               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2205                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2206                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2207                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2208                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2209
2210        process_update_command(c, tokens, ntokens, comm, false);
2211
2212    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2213
2214        process_update_command(c, tokens, ntokens, comm, true);
2215
2216    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2217
2218        process_arithmetic_command(c, tokens, ntokens, 1);
2219
2220    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2221
2222        process_get_command(c, tokens, ntokens, true);
2223
2224    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2225
2226        process_arithmetic_command(c, tokens, ntokens, 0);
2227
2228    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2229
2230        process_delete_command(c, tokens, ntokens);
2231
2232    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2233        unsigned int bucket, gen;
2234        if (!settings.managed) {
2235            out_string(c, "CLIENT_ERROR not a managed instance");
2236            return;
2237        }
2238
2239        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2240            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2241                out_string(c, "CLIENT_ERROR bucket number out of range");
2242                return;
2243            }
2244            buckets[bucket] = gen;
2245            out_string(c, "OWNED");
2246            return;
2247        } else {
2248            out_string(c, "CLIENT_ERROR bad format");
2249            return;
2250        }
2251
2252    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2253
2254        int bucket;
2255        if (!settings.managed) {
2256            out_string(c, "CLIENT_ERROR not a managed instance");
2257            return;
2258        }
2259        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2260            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2261                out_string(c, "CLIENT_ERROR bucket number out of range");
2262                return;
2263            }
2264            buckets[bucket] = 0;
2265            out_string(c, "DISOWNED");
2266            return;
2267        } else {
2268            out_string(c, "CLIENT_ERROR bad format");
2269            return;
2270        }
2271
2272    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2273        int bucket, gen;
2274        if (!settings.managed) {
2275            out_string(c, "CLIENT_ERROR not a managed instance");
2276            return;
2277        }
2278        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2279            /* we never write anything back, even if input's wrong */
2280            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2281                /* do nothing, bad input */
2282            } else {
2283                c->bucket = bucket;
2284                c->gen = gen;
2285            }
2286            conn_set_state(c, conn_new_cmd);
2287            return;
2288        } else {
2289            out_string(c, "CLIENT_ERROR bad format");
2290            return;
2291        }
2292
2293    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2294
2295        process_stat(c, tokens, ntokens);
2296
2297    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2298        time_t exptime = 0;
2299        set_current_time();
2300
2301        set_noreply_maybe(c, tokens, ntokens);
2302
2303        if(ntokens == (c->noreply ? 3 : 2)) {
2304            settings.oldest_live = current_time - 1;
2305            item_flush_expired();
2306            out_string(c, "OK");
2307            return;
2308        }
2309
2310        exptime = strtol(tokens[1].value, NULL, 10);
2311        if(errno == ERANGE) {
2312            out_string(c, "CLIENT_ERROR bad command line format");
2313            return;
2314        }
2315
2316        /*
2317          If exptime is zero realtime() would return zero too, and
2318          realtime(exptime) - 1 would overflow to the max unsigned
2319          value.  So we process exptime == 0 the same way we do when
2320          no delay is given at all.
2321        */
2322        if (exptime > 0)
2323            settings.oldest_live = realtime(exptime) - 1;
2324        else /* exptime == 0 */
2325            settings.oldest_live = current_time - 1;
2326        item_flush_expired();
2327        out_string(c, "OK");
2328        return;
2329
2330    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2331
2332        out_string(c, "VERSION " VERSION);
2333
2334    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2335
2336        conn_set_state(c, conn_closing);
2337
2338    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2339                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2340#ifdef ALLOW_SLABS_REASSIGN
2341
2342        int src, dst, rv;
2343
2344        src = strtol(tokens[2].value, NULL, 10);
2345        dst  = strtol(tokens[3].value, NULL, 10);
2346
2347        if(errno == ERANGE) {
2348            out_string(c, "CLIENT_ERROR bad command line format");
2349            return;
2350        }
2351
2352        rv = slabs_reassign(src, dst);
2353        if (rv == 1) {
2354            out_string(c, "DONE");
2355            return;
2356        }
2357        if (rv == 0) {
2358            out_string(c, "CANT");
2359            return;
2360        }
2361        if (rv == -1) {
2362            out_string(c, "BUSY");
2363            return;
2364        }
2365#else
2366        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2367#endif
2368    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2369        process_verbosity_command(c, tokens, ntokens);
2370    } else {
2371        out_string(c, "ERROR");
2372    }
2373    return;
2374}
2375
2376/*
2377 * if we have a complete line in the buffer, process it.
2378 */
2379static int try_read_command(conn *c) {
2380    assert(c != NULL);
2381    assert(c->rcurr <= (c->rbuf + c->rsize));
2382    assert(c->rbytes > 0);
2383
2384    if (c->protocol == negotiating_prot)  {
2385        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC) {
2386            c->protocol = binary_prot;
2387        } else {
2388            c->protocol = ascii_prot;
2389        }
2390
2391        if (settings.verbose) {
2392            fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
2393                    prot_text(c->protocol));
2394        }
2395    }
2396
2397    if (c->protocol == binary_prot) {
2398        /* Do we have the complete packet header? */
2399        if (c->rbytes < MIN_BIN_PKT_LENGTH) {
2400            /* need more data! */
2401            return 0;
2402        } else {
2403            int i = 0;
2404            memcpy(c->bin_header, c->rcurr, sizeof(c->bin_header));
2405            assert(BIN_PKT_HDR_WORDS == 4);
2406            for (i = 0; i<BIN_PKT_HDR_WORDS; i++) {
2407                c->bin_header[i] = ntohl(c->bin_header[i]);
2408            }
2409
2410            if (settings.verbose) {
2411                fprintf(stderr,
2412                        "Read binary protocol data:  %08x %08x %08x %08x\n",
2413                        c->bin_header[0], c->bin_header[1], c->bin_header[2],
2414                        c->bin_header[3]);
2415            }
2416
2417            if ((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
2418                if (settings.verbose) {
2419                    fprintf(stderr, "Invalid magic:  %x\n",
2420                            c->bin_header[0] >> 24);
2421                }
2422                conn_set_state(c, conn_closing);
2423                return 0;
2424            }
2425
2426            c->msgcurr = 0;
2427            c->msgused = 0;
2428            c->iovused = 0;
2429            if (add_msghdr(c) != 0) {
2430                out_string(c, "SERVER_ERROR out of memory");
2431                return 0;
2432            }
2433
2434            c->cmd = (c->bin_header[0] >> 16) & 0xff;
2435            c->keylen = c->bin_header[0] & 0xffff;
2436            c->opaque = c->bin_header[3];
2437            if (settings.verbose > 1) {
2438                fprintf(stderr,
2439                        "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n",
2440                        c->cmd, c->opaque, c->keylen, c->bin_header[2]);
2441            }
2442
2443            dispatch_bin_command(c);
2444
2445            c->rbytes -= MIN_BIN_PKT_LENGTH;
2446            c->rcurr += MIN_BIN_PKT_LENGTH;
2447        }
2448    } else {
2449        char *el, *cont;
2450
2451        if (c->rbytes == 0)
2452            return 0;
2453        el = memchr(c->rcurr, '\n', c->rbytes);
2454        if (!el)
2455            return 0;
2456        cont = el + 1;
2457        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2458            el--;
2459        }
2460        *el = '\0';
2461
2462        assert(cont <= (c->rcurr + c->rbytes));
2463
2464        process_command(c, c->rcurr);
2465
2466        c->rbytes -= (cont - c->rcurr);
2467        c->rcurr = cont;
2468
2469        assert(c->rcurr <= (c->rbuf + c->rsize));
2470    }
2471
2472    return 1;
2473}
2474
2475/*
2476 * read a UDP request.
2477 * return 0 if there's nothing to read.
2478 */
2479static int try_read_udp(conn *c) {
2480    int res;
2481
2482    assert(c != NULL);
2483
2484    c->request_addr_size = sizeof(c->request_addr);
2485    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2486                   0, &c->request_addr, &c->request_addr_size);
2487    if (res > 8) {
2488        unsigned char *buf = (unsigned char *)c->rbuf;
2489        STATS_LOCK();
2490        stats.bytes_read += res;
2491        STATS_UNLOCK();
2492
2493        /* Beginning of UDP packet is the request ID; save it. */
2494        c->request_id = buf[0] * 256 + buf[1];
2495
2496        /* If this is a multi-packet request, drop it. */
2497        if (buf[4] != 0 || buf[5] != 1) {
2498            out_string(c, "SERVER_ERROR multi-packet request not supported");
2499            return 0;
2500        }
2501
2502        /* Don't care about any of the rest of the header. */
2503        res -= 8;
2504        memmove(c->rbuf, c->rbuf + 8, res);
2505
2506        c->rbytes += res;
2507        c->rcurr = c->rbuf;
2508        return 1;
2509    }
2510    return 0;
2511}
2512
2513/*
2514 * read from network as much as we can, handle buffer overflow and connection
2515 * close.
2516 * before reading, move the remaining incomplete fragment of a command
2517 * (if any) to the beginning of the buffer.
2518 * @return 1 data received
2519 *         0 no data received
2520 *        -1 an error occured (on the socket) (or client closed connection)
2521 *        -2 memory error (failed to allocate more memory)
2522 */
2523static int try_read_network(conn *c) {
2524    int gotdata = 0;
2525    int res;
2526
2527    assert(c != NULL);
2528
2529    if (c->rcurr != c->rbuf) {
2530        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2531            memmove(c->rbuf, c->rcurr, c->rbytes);
2532        c->rcurr = c->rbuf;
2533    }
2534
2535    while (1) {
2536        if (c->rbytes >= c->rsize) {
2537            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2538            if (!new_rbuf) {
2539                if (settings.verbose > 0)
2540                    fprintf(stderr, "Couldn't realloc input buffer\n");
2541                c->rbytes = 0; /* ignore what we read */
2542                out_string(c, "SERVER_ERROR out of memory reading request");
2543                c->write_and_go = conn_closing;
2544                return -2;
2545            }
2546            c->rcurr = c->rbuf = new_rbuf;
2547            c->rsize *= 2;
2548        }
2549
2550        int avail = c->rsize - c->rbytes;
2551        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2552        if (res > 0) {
2553            STATS_LOCK();
2554            stats.bytes_read += res;
2555            STATS_UNLOCK();
2556            gotdata = 1;
2557            c->rbytes += res;
2558            if (res == avail) {
2559                continue;
2560            } else {
2561                break;
2562            }
2563        }
2564        if (res == 0) {
2565            return -1;
2566        }
2567        if (res == -1) {
2568            if (errno == EAGAIN || errno == EWOULDBLOCK) {
2569                break;
2570            }
2571            return -1;
2572        }
2573    }
2574    return gotdata;
2575}
2576
2577static bool update_event(conn *c, const int new_flags) {
2578    assert(c != NULL);
2579
2580    struct event_base *base = c->event.ev_base;
2581    if (c->ev_flags == new_flags)
2582        return true;
2583    if (event_del(&c->event) == -1) return false;
2584    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2585    event_base_set(base, &c->event);
2586    c->ev_flags = new_flags;
2587    if (event_add(&c->event, 0) == -1) return false;
2588    return true;
2589}
2590
2591/*
2592 * Sets whether we are listening for new connections or not.
2593 */
2594void accept_new_conns(const bool do_accept) {
2595    conn *next;
2596
2597    if (! is_listen_thread())
2598        return;
2599
2600    for (next = listen_conn; next; next = next->next) {
2601        if (do_accept) {
2602            update_event(next, EV_READ | EV_PERSIST);
2603            if (listen(next->sfd, 1024) != 0) {
2604                perror("listen");
2605            }
2606        }
2607        else {
2608            update_event(next, 0);
2609            if (listen(next->sfd, 0) != 0) {
2610                perror("listen");
2611            }
2612        }
2613  }
2614}
2615
2616/*
2617 * Transmit the next chunk of data from our list of msgbuf structures.
2618 *
2619 * Returns:
2620 *   TRANSMIT_COMPLETE   All done writing.
2621 *   TRANSMIT_INCOMPLETE More data remaining to write.
2622 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2623 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2624 */
2625static int transmit(conn *c) {
2626    assert(c != NULL);
2627
2628    if (c->msgcurr < c->msgused &&
2629            c->msglist[c->msgcurr].msg_iovlen == 0) {
2630        /* Finished writing the current msg; advance to the next. */
2631        c->msgcurr++;
2632    }
2633    if (c->msgcurr < c->msgused) {
2634        ssize_t res;
2635        struct msghdr *m = &c->msglist[c->msgcurr];
2636
2637        res = sendmsg(c->sfd, m, 0);
2638        if (res > 0) {
2639            STATS_LOCK();
2640            stats.bytes_written += res;
2641            STATS_UNLOCK();
2642
2643            /* We've written some of the data. Remove the completed
2644               iovec entries from the list of pending writes. */
2645            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2646                res -= m->msg_iov->iov_len;
2647                m->msg_iovlen--;
2648                m->msg_iov++;
2649            }
2650
2651            /* Might have written just part of the last iovec entry;
2652               adjust it so the next write will do the rest. */
2653            if (res > 0) {
2654                m->msg_iov->iov_base += res;
2655                m->msg_iov->iov_len -= res;
2656            }
2657            return TRANSMIT_INCOMPLETE;
2658        }
2659        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2660            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2661                if (settings.verbose > 0)
2662                    fprintf(stderr, "Couldn't update event\n");
2663                conn_set_state(c, conn_closing);
2664                return TRANSMIT_HARD_ERROR;
2665            }
2666            return TRANSMIT_SOFT_ERROR;
2667        }
2668        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
2669           we have a real error, on which we close the connection */
2670        if (settings.verbose > 0)
2671            perror("Failed to write, and not due to blocking");
2672
2673        if (IS_UDP(c->protocol))
2674            conn_set_state(c, conn_read);
2675        else
2676            conn_set_state(c, conn_closing);
2677        return TRANSMIT_HARD_ERROR;
2678    } else {
2679        return TRANSMIT_COMPLETE;
2680    }
2681}
2682
2683static void drive_machine(conn *c) {
2684    bool stop = false;
2685    int sfd, flags = 1;
2686    socklen_t addrlen;
2687    struct sockaddr_storage addr;
2688    int res;
2689
2690    assert(c != NULL);
2691
2692    while (!stop) {
2693
2694        switch(c->state) {
2695        case conn_listening:
2696            addrlen = sizeof(addr);
2697            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2698                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2699                    /* these are transient, so don't log anything */
2700                    stop = true;
2701                } else if (errno == EMFILE) {
2702                    if (settings.verbose > 0)
2703                        fprintf(stderr, "Too many open connections\n");
2704                    accept_new_conns(false);
2705                    stop = true;
2706                } else {
2707                    perror("accept()");
2708                    stop = true;
2709                }
2710                break;
2711            }
2712            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2713                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2714                perror("setting O_NONBLOCK");
2715                close(sfd);
2716                break;
2717            }
2718
2719            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
2720                                     DATA_BUFFER_SIZE, c->protocol);
2721            stop = true;
2722            break;
2723
2724        case conn_waiting:
2725            if (!update_event(c, EV_READ | EV_PERSIST)) {
2726                if (settings.verbose > 0)
2727                    fprintf(stderr, "Couldn't update event\n");
2728                conn_set_state(c, conn_closing);
2729                break;
2730            }
2731
2732            conn_set_state(c, conn_read);
2733            stop = true;
2734            break;
2735
2736        case conn_read:
2737            res = IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c);
2738            switch (res) {
2739            case 0 :
2740                conn_set_state(c, conn_waiting);
2741                break;
2742            case 1:
2743                conn_set_state(c, conn_parse_cmd);
2744                break;
2745            case -1:
2746                conn_set_state(c, conn_closing);
2747                break;
2748            case -2: /* Failed to allocate more memory */
2749                /* State already set by try_read_network */
2750                break;
2751            }
2752            break;
2753
2754        case conn_parse_cmd :
2755            if (try_read_command(c) == 0) {
2756                /* wee need more data! */
2757                conn_set_state(c, conn_waiting);
2758            }
2759
2760            break;
2761
2762        case conn_new_cmd:
2763            reset_cmd_handler(c);
2764            assoc_move_next_bucket();
2765            break;
2766
2767        case conn_nread:
2768            if (c->rlbytes == 0) {
2769                complete_nread(c);
2770                break;
2771            }
2772            /* first check if we have leftovers in the conn_read buffer */
2773            if (c->rbytes > 0) {
2774                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2775                memmove(c->ritem, c->rcurr, tocopy);
2776                c->ritem += tocopy;
2777                c->rlbytes -= tocopy;
2778                c->rcurr += tocopy;
2779                c->rbytes -= tocopy;
2780                if (c->rlbytes == 0) {
2781                    break;
2782                }
2783            }
2784
2785            /*  now try reading from the socket */
2786            res = read(c->sfd, c->ritem, c->rlbytes);
2787            if (res > 0) {
2788                STATS_LOCK();
2789                stats.bytes_read += res;
2790                STATS_UNLOCK();
2791                c->ritem += res;
2792                c->rlbytes -= res;
2793                break;
2794            }
2795            if (res == 0) { /* end of stream */
2796                conn_set_state(c, conn_closing);
2797                break;
2798            }
2799            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2800                if (!update_event(c, EV_READ | EV_PERSIST)) {
2801                    if (settings.verbose > 0)
2802                        fprintf(stderr, "Couldn't update event\n");
2803                    conn_set_state(c, conn_closing);
2804                    break;
2805                }
2806                stop = true;
2807                break;
2808            }
2809            /* otherwise we have a real error, on which we close the connection */
2810            if (settings.verbose > 0)
2811                fprintf(stderr, "Failed to read, and not due to blocking\n");
2812            conn_set_state(c, conn_closing);
2813            break;
2814
2815        case conn_swallow:
2816            /* we are reading sbytes and throwing them away */
2817            if (c->sbytes == 0) {
2818                conn_set_state(c, conn_new_cmd);
2819                break;
2820            }
2821
2822            /* first check if we have leftovers in the conn_read buffer */
2823            if (c->rbytes > 0) {
2824                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2825                c->sbytes -= tocopy;
2826                c->rcurr += tocopy;
2827                c->rbytes -= tocopy;
2828                break;
2829            }
2830
2831            /*  now try reading from the socket */
2832            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2833            if (res > 0) {
2834                STATS_LOCK();
2835                stats.bytes_read += res;
2836                STATS_UNLOCK();
2837                c->sbytes -= res;
2838                break;
2839            }
2840            if (res == 0) { /* end of stream */
2841                conn_set_state(c, conn_closing);
2842                break;
2843            }
2844            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2845                if (!update_event(c, EV_READ | EV_PERSIST)) {
2846                    if (settings.verbose > 0)
2847                        fprintf(stderr, "Couldn't update event\n");
2848                    conn_set_state(c, conn_closing);
2849                    break;
2850                }
2851                stop = true;
2852                break;
2853            }
2854            /* otherwise we have a real error, on which we close the connection */
2855            if (settings.verbose > 0)
2856                fprintf(stderr, "Failed to read, and not due to blocking\n");
2857            conn_set_state(c, conn_closing);
2858            break;
2859
2860        case conn_write:
2861            /*
2862             * We want to write out a simple response. If we haven't already,
2863             * assemble it into a msgbuf list (this will be a single-entry
2864             * list for TCP or a two-entry list for UDP).
2865             */
2866            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2867                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2868                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2869                    if (settings.verbose > 0)
2870                        fprintf(stderr, "Couldn't build response\n");
2871                    conn_set_state(c, conn_closing);
2872                    break;
2873                }
2874            }
2875
2876            /* fall through... */
2877
2878        case conn_mwrite:
2879            switch (transmit(c)) {
2880            case TRANSMIT_COMPLETE:
2881                if (c->state == conn_mwrite) {
2882                    while (c->ileft > 0) {
2883                        item *it = *(c->icurr);
2884                        assert((it->it_flags & ITEM_SLABBED) == 0);
2885                        item_remove(it);
2886                        c->icurr++;
2887                        c->ileft--;
2888                    }
2889                    while (c->suffixleft > 0) {
2890                        char *suffix = *(c->suffixcurr);
2891                        if(suffix_add_to_freelist(suffix)) {
2892                            /* Failed to add to freelist, don't leak */
2893                            free(suffix);
2894                        }
2895                        c->suffixcurr++;
2896                        c->suffixleft--;
2897                    }
2898                    /* XXX:  I don't know why this wasn't the general case */
2899                    if(c->protocol == binary_prot) {
2900                        conn_set_state(c, c->write_and_go);
2901                    } else {
2902                        conn_set_state(c, conn_new_cmd);
2903                    }
2904                } else if (c->state == conn_write) {
2905                    if (c->write_and_free) {
2906                        free(c->write_and_free);
2907                        c->write_and_free = 0;
2908                    }
2909                    conn_set_state(c, c->write_and_go);
2910                } else {
2911                    if (settings.verbose > 0)
2912                        fprintf(stderr, "Unexpected state %d\n", c->state);
2913                    conn_set_state(c, conn_closing);
2914                }
2915                break;
2916
2917            case TRANSMIT_INCOMPLETE:
2918            case TRANSMIT_HARD_ERROR:
2919                break;                   /* Continue in state machine. */
2920
2921            case TRANSMIT_SOFT_ERROR:
2922                stop = true;
2923                break;
2924            }
2925            break;
2926
2927        case conn_closing:
2928            if (IS_UDP(c->protocol))
2929                conn_cleanup(c);
2930            else
2931                conn_close(c);
2932            stop = true;
2933            break;
2934        }
2935    }
2936
2937    return;
2938}
2939
2940void event_handler(const int fd, const short which, void *arg) {
2941    conn *c;
2942
2943    c = (conn *)arg;
2944    assert(c != NULL);
2945
2946    c->which = which;
2947
2948    /* sanity */
2949    if (fd != c->sfd) {
2950        if (settings.verbose > 0)
2951            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2952        conn_close(c);
2953        return;
2954    }
2955
2956    drive_machine(c);
2957
2958    /* wait for next event */
2959    return;
2960}
2961
2962static int new_socket(struct addrinfo *ai) {
2963    int sfd;
2964    int flags;
2965
2966    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
2967        perror("socket()");