root/branches/binary/server/memcached.c @ 760

Revision 760, 111.4 kB (checked in by dsallings, 21 months ago)

Alignment fixes for 64-bit processors.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63/*
64 * forward declarations
65 */
66static void drive_machine(conn *c);
67static int new_socket(struct addrinfo *ai);
68static int server_socket(const int port, enum protocol prot);
69static int try_read_command(conn *c);
70static int try_read_network(conn *c);
71static int try_read_udp(conn *c);
72static void conn_set_state(conn *c, enum conn_states state);
73
74/* stats */
75static void stats_reset(void);
76static void stats_init(void);
77
78/* defaults */
79static void settings_init(void);
80
81/* event handling, network IO */
82static void event_handler(const int fd, const short which, void *arg);
83static void conn_close(conn *c);
84static void conn_init(void);
85static void accept_new_conns(const bool do_accept);
86static bool update_event(conn *c, const int new_flags);
87static void complete_nread(conn *c);
88static void process_command(conn *c, char *command);
89static int transmit(conn *c);
90static int ensure_iov_space(conn *c);
91static int add_iov(conn *c, const void *buf, int len);
92static int add_msghdr(conn *c);
93
94/* time handling */
95static void set_current_time(void);  /* update the global variable holding
96                              global 32-bit seconds-since-start time
97                              (to avoid 64 bit time_t) */
98
99static void conn_free(conn *c);
100
101/** exported globals **/
102struct stats stats;
103struct settings settings;
104
105/** file scope variables **/
106static item **todelete = NULL;
107static int delcurr;
108static int deltotal;
109static conn *listen_conn = NULL;
110static struct event_base *main_base;
111
112#define TRANSMIT_COMPLETE   0
113#define TRANSMIT_INCOMPLETE 1
114#define TRANSMIT_SOFT_ERROR 2
115#define TRANSMIT_HARD_ERROR 3
116
117static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119#define REALTIME_MAXDELTA 60*60*24*30
120/*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125static rel_time_t realtime(const time_t exptime) {
126    /* no. of seconds in 30 days - largest possible delta exptime */
127
128    if (exptime == 0) return 0; /* 0 means never expire */
129
130    if (exptime > REALTIME_MAXDELTA) {
131        /* if item expiration is at/before the server started, give it an
132           expiration time of 1 second after the server started.
133           (because 0 means don't expire).  without this, we'd
134           underflow and wrap around to some large value way in the
135           future, effectively making items expiring in the past
136           really expiring never */
137        if (exptime <= stats.started)
138            return (rel_time_t)1;
139        return (rel_time_t)(exptime - stats.started);
140    } else {
141        return (rel_time_t)(exptime + current_time);
142    }
143}
144
145static void stats_init(void) {
146    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150    /* make the time we started always be 2 seconds before we really
151       did, so time(0) - time.started is never zero.  if so, things
152       like 'settings.oldest_live' which act as booleans as well as
153       values are now false in boolean context... */
154    stats.started = time(0) - 2;
155    stats_prefix_init();
156}
157
158static void stats_reset(void) {
159    STATS_LOCK();
160    stats.total_items = stats.total_conns = 0;
161    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162    stats.bytes_read = stats.bytes_written = 0;
163    stats_prefix_clear();
164    STATS_UNLOCK();
165}
166
167static void settings_init(void) {
168    settings.access = 0700;
169    settings.port = 11211;
170    settings.udpport = 0;
171    /* By default this string should be NULL for getaddrinfo() */
172    settings.inter = NULL;
173    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
174    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
175    settings.verbose = 0;
176    settings.oldest_live = 0;
177    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
178    settings.socketpath = NULL;       /* by default, not using a unix socket */
179    settings.managed = false;
180    settings.factor = 1.25;
181    settings.chunk_size = 48;         /* space for a modest key and value */
182#ifdef USE_THREADS
183    settings.num_threads = 4;
184#else
185    settings.num_threads = 1;
186#endif
187    settings.prefix_delimiter = ':';
188    settings.detail_enabled = 0;
189}
190
191/* returns true if a deleted item's delete-locked-time is over, and it
192   should be removed from the namespace */
193static bool item_delete_lock_over (item *it) {
194    assert(it->it_flags & ITEM_DELETED);
195    return (current_time >= it->exptime);
196}
197
198/*
199 * Adds a message header to a connection.
200 *
201 * Returns 0 on success, -1 on out-of-memory.
202 */
203static int add_msghdr(conn *c)
204{
205    struct msghdr *msg;
206
207    assert(c != NULL);
208
209    if (c->msgsize == c->msgused) {
210        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
211        if (! msg)
212            return -1;
213        c->msglist = msg;
214        c->msgsize *= 2;
215    }
216
217    msg = c->msglist + c->msgused;
218
219    /* this wipes msg_iovlen, msg_control, msg_controllen, and
220       msg_flags, the last 3 of which aren't defined on solaris: */
221    memset(msg, 0, sizeof(struct msghdr));
222
223    msg->msg_iov = &c->iov[c->iovused];
224
225    if (c->request_addr_size > 0) {
226        msg->msg_name = &c->request_addr;
227        msg->msg_namelen = c->request_addr_size;
228    }
229
230    c->msgbytes = 0;
231    c->msgused++;
232
233    if (IS_UDP(c->protocol)) {
234        /* Leave room for the UDP header, which we'll fill in later. */
235        return add_iov(c, NULL, UDP_HEADER_SIZE);
236    }
237
238    return 0;
239}
240
241
242/*
243 * Free list management for connections.
244 */
245
246static conn **freeconns;
247static int freetotal;
248static int freecurr;
249
250
251static void conn_init(void) {
252    freetotal = 200;
253    freecurr = 0;
254    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
255        fprintf(stderr, "malloc()\n");
256    }
257    return;
258}
259
260/*
261 * Returns a connection from the freelist, if any. Should call this using
262 * conn_from_freelist() for thread safety.
263 */
264conn *do_conn_from_freelist() {
265    conn *c;
266
267    if (freecurr > 0) {
268        c = freeconns[--freecurr];
269    } else {
270        c = NULL;
271    }
272
273    return c;
274}
275
276/*
277 * Adds a connection to the freelist. 0 = success. Should call this using
278 * conn_add_to_freelist() for thread safety.
279 */
280bool do_conn_add_to_freelist(conn *c) {
281    if (freecurr < freetotal) {
282        freeconns[freecurr++] = c;
283        return false;
284    } else {
285        /* try to enlarge free connections array */
286        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
287        if (new_freeconns) {
288            freetotal *= 2;
289            freeconns = new_freeconns;
290            freeconns[freecurr++] = c;
291            return false;
292        }
293    }
294    return true;
295}
296
297static char *prot_text(enum protocol prot) {
298    char *rv = "unknown";
299    switch(prot) {
300        case ascii_prot:
301            rv = "ascii";
302            break;
303        case binary_prot:
304            rv = "binary";
305            break;
306        case ascii_udp_prot:
307            rv = "ascii-udp";
308            break;
309        case negotiating_prot:
310            rv = "auto-negotiate";
311            break;
312    }
313    return rv;
314}
315
316conn *conn_new(const int sfd, enum conn_states init_state,
317                const int event_flags,
318                const int read_buffer_size, enum protocol prot,
319                struct event_base *base) {
320    conn *c = conn_from_freelist();
321
322    if (NULL == c) {
323        if (!(c = (conn *)malloc(sizeof(conn)))) {
324            fprintf(stderr, "malloc()\n");
325            return NULL;
326        }
327        c->rbuf = c->wbuf = 0;
328        c->ilist = 0;
329        c->suffixlist = 0;
330        c->iov = 0;
331        c->msglist = 0;
332        c->hdrbuf = 0;
333
334        c->rsize = read_buffer_size;
335        c->wsize = DATA_BUFFER_SIZE;
336        c->isize = ITEM_LIST_INITIAL;
337        c->suffixsize = SUFFIX_LIST_INITIAL;
338        c->iovsize = IOV_LIST_INITIAL;
339        c->msgsize = MSG_LIST_INITIAL;
340        c->hdrsize = 0;
341
342        c->rbuf = (char *)malloc((size_t)c->rsize);
343        c->wbuf = (char *)malloc((size_t)c->wsize);
344        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
345        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
346        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
347        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
348
349        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
350                c->msglist == 0 || c->suffixlist == 0) {
351            if (c->rbuf != 0) free(c->rbuf);
352            if (c->wbuf != 0) free(c->wbuf);
353            if (c->ilist != 0) free(c->ilist);
354            if (c->suffixlist != 0) free(c->suffixlist);
355            if (c->iov != 0) free(c->iov);
356            if (c->msglist != 0) free(c->msglist);
357            free(c);
358            fprintf(stderr, "malloc()\n");
359            return NULL;
360        }
361
362        STATS_LOCK();
363        stats.conn_structs++;
364        STATS_UNLOCK();
365    }
366
367    /* unix socket mode doesn't need this, so zeroed out.  but why
368     * is this done for every command?  presumably for UDP
369     * mode.  */
370    if (!settings.socketpath) {
371        c->request_addr_size = sizeof(c->request_addr);
372    } else {
373        c->request_addr_size = 0;
374    }
375
376    if (settings.verbose > 1) {
377        if (init_state == conn_listening) {
378            fprintf(stderr, "<%d server listening (%s)\n", sfd,
379                prot_text(prot));
380        } else if (IS_UDP(prot)) {
381            fprintf(stderr, "<%d server listening (udp)\n", sfd);
382        } else if (prot == binary_prot) {
383            fprintf(stderr, "<%d new binary client connection\n", sfd);
384        } else if (prot == ascii_prot) {
385            fprintf(stderr, "<%d new ascii client connection\n", sfd);
386        } else if (prot == negotiating_prot) {
387            fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd);
388        } else {
389            fprintf(stderr, "<%d new unknown (%d) client connection\n",
390                sfd, prot);
391            abort();
392        }
393    }
394
395    c->sfd = sfd;
396    c->protocol = prot;
397    c->state = init_state;
398    c->rlbytes = 0;
399    c->cmd = -1;
400    c->rbytes = c->wbytes = 0;
401    c->wcurr = c->wbuf;
402    c->rcurr = c->rbuf;
403    c->ritem = 0;
404    c->icurr = c->ilist;
405    c->suffixcurr = c->suffixlist;
406    c->ileft = 0;
407    c->suffixleft = 0;
408    c->iovused = 0;
409    c->msgcurr = 0;
410    c->msgused = 0;
411
412    c->write_and_go = init_state;
413    c->write_and_free = 0;
414    c->item = 0;
415    c->bucket = -1;
416    c->gen = 0;
417
418    c->noreply = false;
419
420    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
421    event_base_set(base, &c->event);
422    c->ev_flags = event_flags;
423
424    if (event_add(&c->event, 0) == -1) {
425        if (conn_add_to_freelist(c)) {
426            conn_free(c);
427        }
428        perror("event_add");
429        return NULL;
430    }
431
432    STATS_LOCK();
433    stats.curr_conns++;
434    stats.total_conns++;
435    STATS_UNLOCK();
436
437    return c;
438}
439
440static void conn_cleanup(conn *c) {
441    assert(c != NULL);
442
443    if (c->item) {
444        item_remove(c->item);
445        c->item = 0;
446    }
447
448    if (c->ileft != 0) {
449        for (; c->ileft > 0; c->ileft--,c->icurr++) {
450            item_remove(*(c->icurr));
451        }
452    }
453
454    if (c->suffixleft != 0) {
455        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
456            if(suffix_add_to_freelist(*(c->suffixcurr))) {
457                free(*(c->suffixcurr));
458            }
459        }
460    }
461
462    if (c->write_and_free) {
463        free(c->write_and_free);
464        c->write_and_free = 0;
465    }
466}
467
468/*
469 * Frees a connection.
470 */
471void conn_free(conn *c) {
472    if (c) {
473        if (c->hdrbuf)
474            free(c->hdrbuf);
475        if (c->msglist)
476            free(c->msglist);
477        if (c->rbuf)
478            free(c->rbuf);
479        if (c->wbuf)
480            free(c->wbuf);
481        if (c->ilist)
482            free(c->ilist);
483        if (c->suffixlist)
484            free(c->suffixlist);
485        if (c->iov)
486            free(c->iov);
487        free(c);
488    }
489}
490
491static void conn_close(conn *c) {
492    assert(c != NULL);
493
494    /* delete the event, the socket and the conn */
495    event_del(&c->event);
496
497    if (settings.verbose > 1)
498        fprintf(stderr, "<%d connection closed.\n", c->sfd);
499
500    close(c->sfd);
501    accept_new_conns(true);
502    conn_cleanup(c);
503
504    /* if the connection has big buffers, just free it */
505    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
506        conn_free(c);
507    }
508
509    STATS_LOCK();
510    stats.curr_conns--;
511    STATS_UNLOCK();
512
513    return;
514}
515
516static enum conn_states get_init_state(conn *c) {
517    int rv = 0;
518    assert(c != NULL);
519
520    switch(c->protocol) {
521        case binary_prot:
522            rv = conn_bin_init;
523            break;
524        case negotiating_prot:
525            rv = conn_negotiate;
526            break;
527        default:
528            rv = conn_read;
529    }
530    return rv;
531}
532
533/* Set the given connection to its initial state.  The initial state will vary
534 * base don protocol type. */
535static void conn_set_init_state(conn *c) {
536    assert(c != NULL);
537
538    conn_set_state(c, get_init_state(c));
539}
540
541/*
542 * Shrinks a connection's buffers if they're too big.  This prevents
543 * periodic large "get" requests from permanently chewing lots of server
544 * memory.
545 *
546 * This should only be called in between requests since it can wipe output
547 * buffers!
548 */
549static void conn_shrink(conn *c) {
550    assert(c != NULL);
551
552    if (IS_UDP(c->protocol))
553        return;
554
555    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
556        char *newbuf;
557
558        if (c->rcurr != c->rbuf)
559            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
560
561        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
562
563        if (newbuf) {
564            c->rbuf = newbuf;
565            c->rsize = DATA_BUFFER_SIZE;
566        }
567        /* TODO check other branch... */
568        c->rcurr = c->rbuf;
569    }
570
571    if (c->isize > ITEM_LIST_HIGHWAT) {
572        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
573        if (newbuf) {
574            c->ilist = newbuf;
575            c->isize = ITEM_LIST_INITIAL;
576        }
577    /* TODO check error condition? */
578    }
579
580    if (c->msgsize > MSG_LIST_HIGHWAT) {
581        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
582        if (newbuf) {
583            c->msglist = newbuf;
584            c->msgsize = MSG_LIST_INITIAL;
585        }
586    /* TODO check error condition? */
587    }
588
589    if (c->iovsize > IOV_LIST_HIGHWAT) {
590        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
591        if (newbuf) {
592            c->iov = newbuf;
593            c->iovsize = IOV_LIST_INITIAL;
594        }
595    /* TODO check return value */
596    }
597}
598
599/*
600 * Sets a connection's current state in the state machine. Any special
601 * processing that needs to happen on certain state transitions can
602 * happen here.
603 */
604static void conn_set_state(conn *c, enum conn_states state) {
605    assert(c != NULL);
606
607    if (state != c->state) {
608        if (state == conn_read) {
609            conn_shrink(c);
610            assoc_move_next_bucket();
611        }
612        c->state = state;
613    }
614}
615
616/*
617 * Free list management for suffix buffers.
618 */
619
620static char **freesuffix;
621static int freesuffixtotal;
622static int freesuffixcurr;
623
624static void suffix_init(void) {
625    freesuffixtotal = 500;
626    freesuffixcurr  = 0;
627
628    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
629    if (freesuffix == NULL) {
630        fprintf(stderr, "malloc()\n");
631    }
632    return;
633}
634
635/*
636 * Returns a suffix buffer from the freelist, if any. Should call this using
637 * suffix_from_freelist() for thread safety.
638 */
639char *do_suffix_from_freelist() {
640    char *s;
641
642    if (freesuffixcurr > 0) {
643        s = freesuffix[--freesuffixcurr];
644    } else {
645        /* If malloc fails, let the logic fall through without spamming
646         * STDERR on the server. */
647        s = malloc( SUFFIX_SIZE );
648    }
649
650    return s;
651}
652
653/*
654 * Adds a connection to the freelist. 0 = success. Should call this using
655 * conn_add_to_freelist() for thread safety.
656 */
657bool do_suffix_add_to_freelist(char *s) {
658    if (freesuffixcurr < freesuffixtotal) {
659        freesuffix[freesuffixcurr++] = s;
660        return false;
661    } else {
662        /* try to enlarge free connections array */
663        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
664        if (new_freesuffix) {
665            freesuffixtotal *= 2;
666            freesuffix = new_freesuffix;
667            freesuffix[freesuffixcurr++] = s;
668            return false;
669        }
670    }
671    return true;
672}
673
674/*
675 * Ensures that there is room for another struct iovec in a connection's
676 * iov list.
677 *
678 * Returns 0 on success, -1 on out-of-memory.
679 */
680static int ensure_iov_space(conn *c) {
681    assert(c != NULL);
682
683    if (c->iovused >= c->iovsize) {
684        int i, iovnum;
685        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
686                                (c->iovsize * 2) * sizeof(struct iovec));
687        if (! new_iov)
688            return -1;
689        c->iov = new_iov;
690        c->iovsize *= 2;
691
692        /* Point all the msghdr structures at the new list. */
693        for (i = 0, iovnum = 0; i < c->msgused; i++) {
694            c->msglist[i].msg_iov = &c->iov[iovnum];
695            iovnum += c->msglist[i].msg_iovlen;
696        }
697    }
698
699    return 0;
700}
701
702
703/*
704 * Adds data to the list of pending data that will be written out to a
705 * connection.
706 *
707 * Returns 0 on success, -1 on out-of-memory.
708 */
709
710static int add_iov(conn *c, const void *buf, int len) {
711    struct msghdr *m;
712    int leftover;
713    bool limit_to_mtu;
714
715    assert(c != NULL);
716
717    do {
718        m = &c->msglist[c->msgused - 1];
719
720        /*
721         * Limit UDP packets, and the first payloads of TCP replies, to
722         * UDP_MAX_PAYLOAD_SIZE bytes.
723         */
724        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
725
726        /* We may need to start a new msghdr if this one is full. */
727        if (m->msg_iovlen == IOV_MAX ||
728            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
729            add_msghdr(c);
730            m = &c->msglist[c->msgused - 1];
731        }
732
733        if (ensure_iov_space(c) != 0)
734            return -1;
735
736        /* If the fragment is too big to fit in the datagram, split it up */
737        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
738            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
739            len -= leftover;
740        } else {
741            leftover = 0;
742        }
743
744        m = &c->msglist[c->msgused - 1];
745        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
746        m->msg_iov[m->msg_iovlen].iov_len = len;
747
748        c->msgbytes += len;
749        c->iovused++;
750        m->msg_iovlen++;
751
752        buf = ((char *)buf) + len;
753        len = leftover;
754    } while (leftover > 0);
755
756    return 0;
757}
758
759
760/*
761 * Constructs a set of UDP headers and attaches them to the outgoing messages.
762 */
763static int build_udp_headers(conn *c) {
764    int i;
765    unsigned char *hdr;
766
767    assert(c != NULL);
768
769    if (c->msgused > c->hdrsize) {
770        void *new_hdrbuf;
771        if (c->hdrbuf)
772            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
773        else
774            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
775        if (! new_hdrbuf)
776            return -1;
777        c->hdrbuf = (unsigned char *)new_hdrbuf;
778        c->hdrsize = c->msgused * 2;
779    }
780
781    hdr = c->hdrbuf;
782    for (i = 0; i < c->msgused; i++) {
783        c->msglist[i].msg_iov[0].iov_base = hdr;
784        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
785        *hdr++ = c->request_id / 256;
786        *hdr++ = c->request_id % 256;
787        *hdr++ = i / 256;
788        *hdr++ = i % 256;
789        *hdr++ = c->msgused / 256;
790        *hdr++ = c->msgused % 256;
791        *hdr++ = 0;
792        *hdr++ = 0;
793        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
794    }
795
796    return 0;
797}
798
799
800static void out_string(conn *c, const char *str) {
801    size_t len;
802
803    assert(c != NULL);
804
805    if (c->noreply) {
806        if (settings.verbose > 1)
807            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
808        c->noreply = false;
809        conn_set_state(c, conn_read);
810        return;
811    }
812
813    if (settings.verbose > 1)
814        fprintf(stderr, ">%d %s\n", c->sfd, str);
815
816    len = strlen(str);
817    if ((len + 2) > c->wsize) {
818        /* ought to be always enough. just fail for simplicity */
819        str = "SERVER_ERROR output line too long";
820        len = strlen(str);
821    }
822
823    memcpy(c->wbuf, str, len);
824    memcpy(c->wbuf + len, "\r\n", 3);
825    c->wbytes = len + 2;
826    c->wcurr = c->wbuf;
827
828    conn_set_state(c, conn_write);
829    c->write_and_go = get_init_state(c);
830    return;
831}
832
833/*
834 * we get here after reading the value in set/add/replace commands. The command
835 * has been stored in c->item_comm, and the item is ready in c->item.
836 */
837static void complete_nread_ascii(conn *c) {
838    assert(c != NULL);
839
840    item *it = c->item;
841    int comm = c->item_comm;
842    int ret;
843
844    STATS_LOCK();
845    stats.set_cmds++;
846    STATS_UNLOCK();
847
848    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
849        out_string(c, "CLIENT_ERROR bad data chunk");
850    } else {
851      ret = store_item(it, comm);
852      if (ret == 1)
853          out_string(c, "STORED");
854      else if(ret == 2)
855          out_string(c, "EXISTS");
856      else if(ret == 3)
857          out_string(c, "NOT_FOUND");
858      else
859          out_string(c, "NOT_STORED");
860    }
861
862    item_remove(c->item);       /* release the c->item reference */
863    c->item = 0;
864}
865
866static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
867    int i = 0;
868    uint32_t *res_header;
869
870    assert(c);
871    assert(body_len >= 0);
872
873    c->msgcurr = 0;
874    c->msgused = 0;
875    c->iovused = 0;
876    if (add_msghdr(c) != 0) {
877        /* XXX:  out_string is inappropriate here */
878        out_string(c, "SERVER_ERROR out of memory");
879        return;
880    }
881
882    res_header = (uint32_t *)c->wbuf;
883
884    res_header[0] = BIN_RES_MAGIC << 24;
885    res_header[0] |= ((0xff & c->cmd) << 16);
886    res_header[0] |= err & 0xffff;
887
888    res_header[1] = hdr_len << 24;
889    /* TODO:  Support datatype */
890    res_header[2] = body_len;
891    res_header[3] = c->opaque;
892
893    if(settings.verbose > 1) {
894        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
895            res_header[0], res_header[1], res_header[2], res_header[3]);
896    }
897
898    for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
899        res_header[i] = htonl(res_header[i]);
900    }
901
902    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
903    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
904}
905
906static void write_bin_error(conn *c, int err, int swallow) {
907    char *errstr = "Unknown error";
908    switch(err) {
909        case ERR_UNKNOWN_CMD:
910            errstr = "Unknown command";
911            break;
912        case ERR_NOT_FOUND:
913            errstr = "Not found";
914            break;
915        case ERR_INVALID_ARGUMENTS:
916            errstr = "Invalid arguments";
917            break;
918        case ERR_EXISTS:
919            errstr = "Data exists for key.";
920            break;
921        case ERR_TOO_LARGE:
922            errstr = "Too large.";
923            break;
924        case ERR_NOT_STORED:
925            errstr = "Not stored.";
926            break;
927        default:
928            errstr = "UNHANDLED ERROR";
929            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
930    }
931    if(settings.verbose > 0) {
932        fprintf(stderr, "Writing an error:  %s\n", errstr);
933    }
934    add_bin_header(c, err, 0, strlen(errstr));
935    add_iov(c, errstr, strlen(errstr));
936
937    conn_set_state(c, conn_mwrite);
938    if(swallow > 0) {
939        c->sbytes = swallow;
940        c->write_and_go = conn_swallow;
941    } else {
942        c->write_and_go = conn_bin_init;
943    }
944}
945
946/* Form and send a response to a command over the binary protocol */
947static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
948    add_bin_header(c, 0, hlen, dlen);
949    if(dlen > 0) {
950        add_iov(c, d, dlen);
951    }
952    conn_set_state(c, conn_mwrite);
953    c->write_and_go = conn_bin_init;
954}
955
956/* Byte swap a 64-bit number */
957static int64_t swap64(int64_t in) {
958#ifdef ENDIAN_LITTLE
959    /* Little endian, flip the bytes around until someone makes a faster/better
960    * way to do this. */
961    int64_t rv = 0;
962    int i = 0;
963     for(i = 0; i<8; i++) {
964        rv = (rv << 8) | (in & 0xff);
965        in >>= 8;
966     }
967    return rv;
968#else
969    /* big-endian machines don't need byte swapping */
970    return in;
971#endif
972}
973
974static void complete_incr_bin(conn *c) {
975    item *it;
976    int64_t delta;
977    uint64_t initial;
978    int32_t exptime;
979    char *key;
980    size_t nkey;
981    int i;
982    uint64_t *response_buf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
983
984    assert(c != NULL);
985
986    key = c->rbuf + BIN_INCR_HDR_LEN;
987    nkey = c->keylen;
988    key[nkey] = 0x00;
989
990    delta = swap64(*((int64_t*)(c->rbuf)));
991    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
992    exptime = ntohl(*((int*)(c->rbuf + 16)));
993
994    if(settings.verbose) {
995        fprintf(stderr, "incr ");
996        for(i = 0; i<nkey; i++) {
997            fprintf(stderr, "%c", key[i]);
998        }
999        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
1000    }
1001
1002    it = item_get(key, nkey);
1003    if (it) {
1004        /* Weird magic in add_delta forces me to pad here */
1005        char tmpbuf[INCR_MAX_STORAGE_LEN];
1006        uint64_t l = 0;
1007        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1008        tmpbuf[INCR_MAX_STORAGE_LEN] = 0x00;
1009        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1010        *response_buf = swap64(strtoull(tmpbuf, NULL, 10));
1011
1012        write_bin_response(c, response_buf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1013        item_remove(it);         /* release our reference */
1014    } else {
1015        if(exptime >= 0) {
1016            /* Save some room for the response */
1017            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1018            *response_buf = swap64(initial);
1019            it = item_alloc(key, nkey, 0, realtime(exptime),
1020                INCR_MAX_STORAGE_LEN);
1021            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1022
1023            if(store_item(it, NREAD_SET)) {
1024                write_bin_response(c, response_buf, BIN_INCR_HDR_LEN,
1025                    INCR_RES_LEN);
1026            } else {
1027                write_bin_error(c, ERR_NOT_STORED, 0);
1028            }
1029            item_remove(it);         /* release our reference */
1030        } else {
1031            write_bin_error(c, ERR_NOT_FOUND, 0);
1032        }
1033    }
1034}
1035
1036static void complete_update_bin(conn *c) {
1037    int eno = -1, ret = 0;
1038    assert(c != NULL);
1039
1040    item *it = c->item;
1041
1042    STATS_LOCK();
1043    stats.set_cmds++;
1044    STATS_UNLOCK();
1045
1046    /* We don't actually receive the trailing two characters in the bin
1047     * protocol, so we're going to just set them here */
1048    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1049    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1050
1051    switch (store_item(it, c->item_comm)) {
1052        case 1:
1053            /* Stored */
1054            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1055            break;
1056        case 2:
1057            write_bin_error(c, ERR_EXISTS, 0);
1058            break;
1059        case 3:
1060            write_bin_error(c, ERR_NOT_FOUND, 0);
1061            break;
1062        default:
1063            if(c->item_comm == NREAD_ADD) {
1064                eno = ERR_EXISTS;
1065            } else if(c->item_comm == NREAD_REPLACE) {
1066                eno = ERR_NOT_FOUND;
1067            } else {
1068                eno = ERR_NOT_STORED;
1069            }
1070            write_bin_error(c, eno, 0);
1071    }
1072
1073    item_remove(c->item);       /* release the c->item reference */
1074    c->item = 0;
1075}
1076
1077static void process_bin_get(conn *c) {
1078    item *it;
1079
1080    it = item_get(c->rbuf, c->keylen);
1081    if (it) {
1082        int *flags;
1083        uint64_t* identifier;
1084
1085        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1086
1087        /* the length has two unnecessary bytes, and then we write four more */
1088        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1089
1090        /* Add the "extras" field: CAS-id followed by flags. The cas is a 64-
1091           bit datatype and require alignment to 8-byte boundaries on some
1092           architechtures. Verify that the size of the packet header is of
1093           the correct size (if not the following code generates SIGBUS on
1094           sparc hardware).
1095        */
1096        assert(MIN_BIN_PKT_LENGTH % 8 == 0);
1097        identifier = (uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1098        *identifier = swap64(it->cas_id);
1099        add_iov(c, identifier, 8);
1100
1101        /* Add the flags */
1102        flags = (int*)(c->wbuf + MIN_BIN_PKT_LENGTH + 8);
1103        *flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1104        add_iov(c, flags, 4);
1105
1106        /* Add the data minus the CRLF */
1107        add_iov(c, ITEM_data(it), it->nbytes - 2);
1108        conn_set_state(c, conn_mwrite);
1109    } else {
1110        if(c->cmd == CMD_GETQ) {
1111            conn_set_state(c, conn_bin_init);
1112        } else {
1113            write_bin_error(c, ERR_NOT_FOUND, 0);
1114        }
1115    }
1116}
1117
1118static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1119    assert(c);
1120    c->substate = next_substate;
1121    c->rlbytes = c->keylen + extra;
1122    assert(c->rsize >= c->rlbytes);
1123    c->ritem = c->rbuf;
1124    conn_set_state(c, conn_nread);
1125}
1126
1127static void dispatch_bin_command(conn *c) {
1128    time_t exptime = 0;
1129    switch(c->cmd) {
1130        case CMD_VERSION:
1131            write_bin_response(c, VERSION, 0, strlen(VERSION));
1132            break;
1133        case CMD_FLUSH:
1134            set_current_time();
1135
1136            settings.oldest_live = current_time - 1;
1137            item_flush_expired();
1138            write_bin_response(c, NULL, 0, 0);
1139            break;
1140        case CMD_NOOP:
1141            write_bin_response(c, NULL, 0, 0);
1142            break;
1143        case CMD_SET:
1144            /* Fallthrough */
1145        case CMD_ADD:
1146            /* Fallthrough */
1147        case CMD_REPLACE:
1148            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1149            break;
1150        case CMD_GETQ:
1151        case CMD_GET:
1152            bin_read_key(c, bin_reading_get_key, 0);
1153            break;
1154        case CMD_DELETE:
1155            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1156            break;
1157        case CMD_INCR:
1158        case CMD_DECR:
1159            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1160            break;
1161        default:
1162            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1163    }
1164}
1165
1166static void process_bin_update(conn *c) {
1167    char *key;
1168    int nkey;
1169    int vlen;
1170    int flags;
1171    int exptime;
1172    item *it;
1173    int comm;
1174    int hdrlen = BIN_SET_HDR_LEN;
1175
1176    assert(c != NULL);
1177
1178    key = c->rbuf + hdrlen;
1179    nkey = c->keylen;
1180    key[nkey] = 0x00;
1181
1182    flags = ntohl(*((int*)(c->rbuf + 8)));
1183    exptime = ntohl(*((int*)(c->rbuf + 12)));
1184    vlen = c->bin_header[2] - (nkey + hdrlen);
1185
1186    if(settings.verbose > 1) {
1187        fprintf(stderr, "Value len is %d\n", vlen);
1188    }
1189
1190    if (settings.detail_enabled) {
1191        stats_prefix_record_set(key);
1192    }
1193
1194    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1195
1196    if (it == 0) {
1197        if (! item_size_ok(nkey, flags, vlen + 2)) {
1198            write_bin_error(c, ERR_TOO_LARGE, vlen);
1199        } else {
1200            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1201        }
1202        /* swallow the data line */
1203        c->write_and_go = conn_swallow;
1204        return;
1205    }
1206
1207    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf)));
1208
1209    switch(c->cmd) {
1210        case CMD_ADD:
1211            c->item_comm = NREAD_ADD;
1212            break;
1213        case CMD_SET:
1214            c->item_comm = NREAD_SET;
1215            break;
1216        case CMD_REPLACE:
1217            c->item_comm = NREAD_REPLACE;
1218            break;
1219        default:
1220            assert(0);
1221    }
1222
1223    if(it->cas_id != 0) {
1224        c->item_comm = NREAD_CAS;
1225    }
1226
1227    c->item = it;
1228    c->ritem = ITEM_data(it);
1229    c->rlbytes = vlen;
1230    conn_set_state(c, conn_nread);
1231    c->substate = bin_read_set_value;
1232}
1233
1234static void process_bin_delete(conn *c) {
1235    char *key;
1236    size_t nkey;
1237    item *it;
1238    time_t exptime = 0;
1239
1240    assert(c != NULL);
1241
1242    exptime = ntohl(*((int*)(c->rbuf)));
1243    key = c->rbuf + 4;
1244    nkey = c->keylen;
1245    key[nkey] = 0x00;
1246
1247    if(settings.verbose) {
1248        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1249    }
1250
1251    if (settings.detail_enabled) {
1252        stats_prefix_record_delete(key);
1253    }
1254
1255    it = item_get(key, nkey);
1256    if (it) {
1257        if (exptime == 0) {
1258            item_unlink(it);
1259            item_remove(it);      /* release our reference */
1260            write_bin_response(c, NULL, 0, 0);
1261        } else {
1262            /* XXX:  This is really lame, but defer_delete returns a string */
1263            char *res = defer_delete(it, exptime);
1264            if(res[0] == 'D') {
1265                write_bin_response(c, NULL, 0, 0);
1266            } else {
1267                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1268            }
1269        }
1270    } else {
1271        write_bin_error(c, ERR_NOT_FOUND, 0);
1272    }
1273}
1274
1275static void complete_nread_binary(conn *c) {
1276    assert(c != NULL);
1277
1278    if(c->cmd < 0) {
1279        /* No command defined.  Figure out what they're trying to say. */
1280        int i = 0;
1281        /* I did a bit of hard-coding around the packet sizes */
1282        assert(BIN_PKT_HDR_WORDS == 3);
1283        for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
1284            c->bin_header[i] = ntohl(c->bin_header[i]);
1285        }
1286        if(settings.verbose) {
1287            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x %08x\n",
1288                c->bin_header[0], c->bin_header[1], c->bin_header[2],
1289                c->bin_header[3]);
1290        }
1291        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
1292            if(settings.verbose) {
1293                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24);
1294            }
1295            conn_set_state(c, conn_closing);
1296            return;
1297        }
1298
1299        c->msgcurr = 0;
1300        c->msgused = 0;
1301        c->iovused = 0;
1302        if (add_msghdr(c) != 0) {
1303            out_string(c, "SERVER_ERROR out of memory");
1304            return;
1305        }
1306
1307        c->cmd = (c->bin_header[0] >> 16) & 0xff;
1308        c->keylen = c->bin_header[0] & 0xffff;
1309        c->opaque = c->bin_header[3];
1310        if(settings.verbose > 1) {
1311            fprintf(stderr,
1312                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd,
1313                c->opaque, c->keylen, c->bin_header[2]);
1314        }
1315        dispatch_bin_command(c);
1316    } else {
1317        switch(c->substate) {
1318            case bin_reading_set_header:
1319                process_bin_update(c);
1320                break;
1321            case bin_read_set_value:
1322                complete_update_bin(c);
1323                break;
1324            case bin_reading_get_key:
1325                process_bin_get(c);
1326                break;
1327            case bin_reading_del_header:
1328                process_bin_delete(c);
1329                break;
1330            case bin_reading_incr_header:
1331                complete_incr_bin(c);
1332                break;
1333            default:
1334                fprintf(stderr, "Not handling substate %d\n", c->substate);
1335                assert(0);
1336        }
1337    }
1338}
1339
1340static void reinit_bin_connection(conn *c) {
1341    if (settings.verbose > 1)
1342        fprintf(stderr, "*** Reinitializing binary connection.\n");
1343    c->rlbytes = MIN_BIN_PKT_LENGTH;
1344    c->write_and_go = conn_bin_init;
1345    c->cmd = -1;
1346    c->substate = bin_no_state;
1347    c->rbytes = c->wbytes = 0;
1348    c->ritem = (char*)c->bin_header;
1349    c->rcurr = c->rbuf;
1350    c->wcurr = c->wbuf;
1351    conn_shrink(c);
1352    conn_set_state(c, conn_nread);
1353}
1354
1355/* These do the initial protocol switch.  At this point, we should've read
1356 * exactly one byte, and must treat that byte as the beginning of a command. */
1357static void setup_bin_protocol(conn *c) {
1358    char *loc = (char*)c->bin_header;
1359    if (settings.verbose > 1)
1360        fprintf(stderr, "Negotiated protocol as binary.\n");
1361
1362    c->protocol = binary_prot;
1363    reinit_bin_connection(c);
1364    /* Emulate a read of the first byte */
1365    c->ritem[0] = c->rbuf[0];
1366    c->ritem++;
1367    c->rlbytes--;
1368}
1369
1370static void setup_ascii_protocol(conn *c) {
1371    if (settings.verbose > 1)
1372        fprintf(stderr, "Negotiated protocol as ascii.\n");
1373    c->protocol = ascii_prot;
1374
1375    /* We've already got the first letter of the command, so pretend like we
1376     * Did a single byte read from try_read_command */
1377    c->rcurr = c->rbuf;
1378    c->rbytes = 1;
1379    conn_set_state(c, conn_read);
1380}
1381
1382static void complete_nread(conn *c) {
1383    assert(c != NULL);
1384
1385    if(c->protocol == ascii_prot) {
1386        complete_nread_ascii(c);
1387    } else if(c->protocol == binary_prot) {
1388        complete_nread_binary(c);
1389    } else if(c->protocol == negotiating_prot) {
1390        /* The first byte is either BIN_REQ_MAGIC, or we're speaking ascii */
1391        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC)
1392            setup_bin_protocol(c);
1393        else
1394            setup_ascii_protocol(c);
1395    } else {
1396        assert(0); /* XXX:  Invalid case.  Should probably do more here. */
1397    }
1398}
1399
1400/*
1401 * Stores an item in the cache according to the semantics of one of the set
1402 * commands. In threaded mode, this is protected by the cache lock.
1403 *
1404 * Returns true if the item was stored.
1405 */
1406int do_store_item(item *it, int comm) {
1407    char *key = ITEM_key(it);
1408    bool delete_locked = false;
1409    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1410    int stored = 0;
1411
1412    item *new_it = NULL;
1413    int flags;
1414
1415    if (old_it != NULL && comm == NREAD_ADD) {
1416        /* add only adds a nonexistent item, but promote to head of LRU */
1417        do_item_update(old_it);
1418    } else if (!old_it && (comm == NREAD_REPLACE
1419        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1420    {
1421        /* replace only replaces an existing value; don't store */
1422    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1423        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1424    {
1425        /* replace and add can't override delete locks; don't store */
1426    } else if (comm == NREAD_CAS) {
1427        /* validate cas operation */
1428        if (delete_locked)
1429            old_it = do_item_get_nocheck(key, it->nkey);
1430
1431        if(old_it == NULL) {
1432          // LRU expired
1433          stored = 3;
1434        }
1435        else if(it->cas_id == old_it->cas_id) {
1436          // cas validates
1437          do_item_replace(old_it, it);
1438          stored = 1;
1439        } else {
1440          if(settings.verbose > 1) {
1441            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1442                old_it->cas_id, it->cas_id);
1443          }
1444          stored = 2;
1445        }
1446    } else {
1447        /*
1448         * Append - combine new and old record into single one. Here it's
1449         * atomic and thread-safe.
1450         */
1451
1452        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1453
1454            /* we have it and old_it here - alloc memory to hold both */
1455            /* flags was already lost - so recover them from ITEM_suffix(it) */
1456
1457            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1458
1459            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1460
1461            if (new_it == NULL) {
1462                /* SERVER_ERROR out of memory */
1463                return 0;
1464            }
1465
1466            /* copy data from it and old_it to new_it */
1467
1468            if (comm == NREAD_APPEND) {
1469                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1470                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1471            } else {
1472                /* NREAD_PREPEND */
1473                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1474                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1475            }
1476
1477            it = new_it;
1478        }
1479
1480        /* "set" commands can override the delete lock
1481           window... in which case we have to find the old hidden item
1482           that's in the namespace/LRU but wasn't returned by
1483           item_get.... because we need to replace it */
1484        if (delete_locked)
1485            old_it = do_item_get_nocheck(key, it->nkey);
1486
1487        if (old_it != NULL)
1488            do_item_replace(old_it, it);
1489        else
1490            do_item_link(it);
1491
1492        stored = 1;
1493    }
1494
1495    if (old_it != NULL)
1496        do_item_remove(old_it);         /* release our reference */
1497    if (new_it != NULL)
1498        do_item_remove(new_it);
1499
1500    return stored;
1501}
1502
1503typedef struct token_s {
1504    char *value;
1505    size_t length;
1506} token_t;
1507
1508#define COMMAND_TOKEN 0
1509#define SUBCOMMAND_TOKEN 1
1510#define KEY_TOKEN 1
1511#define KEY_MAX_LENGTH 250
1512
1513#define MAX_TOKENS 8
1514
1515/*
1516 * Tokenize the command string by replacing whitespace with '\0' and update
1517 * the token array tokens with pointer to start of each token and length.
1518 * Returns total number of tokens.  The last valid token is the terminal
1519 * token (value points to the first unprocessed character of the string and
1520 * length zero).
1521 *
1522 * Usage example:
1523 *
1524 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1525 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1526 *          ...
1527 *      }
1528 *      ncommand = tokens[ix].value - command;
1529 *      command  = tokens[ix].value;
1530 *   }
1531 */
1532static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1533    char *s, *e;
1534    size_t ntokens = 0;
1535
1536    assert(command != NULL && tokens != NULL && max_tokens > 1);
1537
1538    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1539        if (*e == ' ') {
1540            if (s != e) {
1541                tokens[ntokens].value = s;
1542                tokens[ntokens].length = e - s;
1543                ntokens++;
1544                *e = '\0';
1545            }
1546            s = e + 1;
1547        }
1548        else if (*e == '\0') {
1549            if (s != e) {
1550                tokens[ntokens].value = s;
1551                tokens[ntokens].length = e - s;
1552                ntokens++;
1553            }
1554
1555            break; /* string end */
1556        }
1557    }
1558
1559    /*
1560     * If we scanned the whole string, the terminal value pointer is null,
1561     * otherwise it is the first unprocessed character.
1562     */
1563    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1564    tokens[ntokens].length = 0;
1565    ntokens++;
1566
1567    return ntokens;
1568}
1569
1570/* set up a connection to write a buffer then free it, used for stats */
1571static void write_and_free(conn *c, char *buf, int bytes) {
1572    if (buf) {
1573        c->write_and_free = buf;
1574        c->wcurr = buf;
1575        c->wbytes = bytes;
1576        conn_set_state(c, conn_write);
1577        c->write_and_go = get_init_state(c);
1578    } else {
1579        out_string(c, "SERVER_ERROR out of memory writing stats");
1580    }
1581}
1582
1583static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1584{
1585    int noreply_index = ntokens - 2;
1586
1587    /*
1588      NOTE: this function is not the first place where we are going to
1589      send the reply.  We could send it instead from process_command()
1590      if the request line has wrong number of tokens.  However parsing
1591      malformed line for "noreply" option is not reliable anyway, so
1592      it can't be helped.
1593    */
1594    if (tokens[noreply_index].value
1595        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1596        c->noreply = true;
1597    }
1598}
1599
1600inline static void process_stats_detail(conn *c, const char *command) {
1601    assert(c != NULL);
1602
1603    if (strcmp(command, "on") == 0) {
1604        settings.detail_enabled = 1;
1605        out_string(c, "OK");
1606    }
1607    else if (strcmp(command, "off") == 0) {
1608        settings.detail_enabled = 0;
1609        out_string(c, "OK");
1610    }
1611    else if (strcmp(command, "dump") == 0) {
1612        int len;
1613        char *stats = stats_prefix_dump(&len);
1614        write_and_free(c, stats, len);
1615    }
1616    else {
1617        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1618    }
1619}
1620
1621static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1622    rel_time_t now = current_time;
1623    char *command;
1624    char *subcommand;
1625
1626    assert(c != NULL);
1627
1628    if(ntokens < 2) {
1629        out_string(c, "CLIENT_ERROR bad command line");
1630        return;
1631    }
1632
1633    command = tokens[COMMAND_TOKEN].value;
1634
1635    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1636        char temp[1024];
1637        pid_t pid = getpid();
1638        char *pos = temp;
1639
1640#ifndef WIN32
1641        struct rusage usage;
1642        getrusage(RUSAGE_SELF, &usage);
1643#endif /* !WIN32 */
1644
1645        STATS_LOCK();
1646        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1647        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1648        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1649        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1650        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1651#ifndef WIN32
1652        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1653        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1654#endif /* !WIN32 */
1655        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1656        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1657        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1658        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1659        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1660        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1661        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1662        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1663        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1664        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1665        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1666        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1667        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1668        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1669        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1670        pos += sprintf(pos, "END");
1671        STATS_UNLOCK();
1672        out_string(c, temp);
1673        return;
1674    }
1675
1676    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1677
1678    if (strcmp(subcommand, "reset") == 0) {
1679        stats_reset();
1680        out_string(c, "RESET");
1681        return;
1682    }
1683
1684#ifdef HAVE_MALLOC_H
1685#ifdef HAVE_STRUCT_MALLINFO
1686    if (strcmp(subcommand, "malloc") == 0) {
1687        char temp[512];
1688        struct mallinfo info;
1689        char *pos = temp;
1690
1691        info = mallinfo();
1692        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1693        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1694        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1695        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1696        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1697        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1698        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1699        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1700        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1701        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1702        out_string(c, temp);
1703        return;
1704    }
1705#endif /* HAVE_STRUCT_MALLINFO */
1706#endif /* HAVE_MALLOC_H */
1707
1708#if !defined(WIN32) || !defined(__APPLE__)
1709    if (strcmp(subcommand, "maps") == 0) {
1710        char *wbuf;
1711        int wsize = 8192; /* should be enough */
1712        int fd;
1713        int res;
1714
1715        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1716            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1717            return;
1718        }
1719
1720        fd = open("/proc/self/maps", O_RDONLY);
1721        if (fd == -1) {
1722            out_string(c, "SERVER_ERROR cannot open the maps file");
1723            free(wbuf);
1724            return;
1725        }
1726
1727        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1728        if (res == wsize - 6) {
1729            out_string(c, "SERVER_ERROR buffer overflow");
1730            free(wbuf); close(fd);
1731            return;
1732        }
1733        if (res == 0 || res == -1) {
1734            out_string(c, "SERVER_ERROR can't read the maps file");
1735            free(wbuf); close(fd);
1736            return;
1737        }
1738        memcpy(wbuf + res, "END\r\n", 5);
1739        write_and_free(c, wbuf, res + 5);
1740        close(fd);
1741        return;
1742    }
1743#endif
1744
1745    if (strcmp(subcommand, "cachedump") == 0) {
1746
1747        char *buf;
1748        unsigned int bytes, id, limit = 0;
1749
1750        if(ntokens < 5) {
1751            out_string(c, "CLIENT_ERROR bad command line");
1752            return;
1753        }
1754
1755        id = strtoul(tokens[2].value, NULL, 10);
1756        limit = strtoul(tokens[3].value, NULL, 10);
1757
1758        if(errno == ERANGE) {
1759            out_string(c, "CLIENT_ERROR bad command line format");
1760            return;
1761        }
1762
1763        buf = item_cachedump(id, limit, &bytes);
1764        write_and_free(c, buf, bytes);
1765        return;
1766    }
1767
1768    if (strcmp(subcommand, "slabs") == 0) {
1769        int bytes = 0;
1770        char *buf = slabs_stats(&bytes);
1771        write_and_free(c, buf, bytes);
1772        return;
1773    }
1774
1775    if (strcmp(subcommand, "items") == 0) {
1776        int bytes = 0;
1777        char *buf = item_stats(&bytes);
1778        write_and_free(c, buf, bytes);
1779        return;
1780    }
1781
1782    if (strcmp(subcommand, "detail") == 0) {
1783        if (ntokens < 4)
1784            process_stats_detail(c, "");  /* outputs the error message */
1785        else
1786            process_stats_detail(c, tokens[2].value);
1787        return;
1788    }
1789
1790    if (strcmp(subcommand, "sizes") == 0) {
1791        int bytes = 0;
1792        char *buf = item_stats_sizes(&bytes);
1793        write_and_free(c, buf, bytes);
1794        return;
1795    }
1796
1797    out_string(c, "ERROR");
1798}
1799
1800/* ntokens is overwritten here... shrug.. */
1801static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1802    char *key;
1803    size_t nkey;
1804    int i = 0;
1805    item *it;
1806    token_t *key_token = &tokens[KEY_TOKEN];
1807    char *suffix;
1808    int stats_get_cmds   = 0;
1809    int stats_get_hits   = 0;
1810    int stats_get_misses = 0;
1811    assert(c != NULL);
1812
1813    if (settings.managed) {
1814        int bucket = c->bucket;
1815        if (bucket == -1) {
1816            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1817            return;
1818        }
1819        c->bucket = -1;
1820        if (buckets[bucket] != c->gen) {
1821            out_string(c, "ERROR_NOT_OWNER");
1822            return;
1823        }
1824    }
1825
1826    do {
1827        while(key_token->length != 0) {
1828
1829            key = key_token->value;
1830            nkey = key_token->length;
1831
1832            if(nkey > KEY_MAX_LENGTH) {
1833                STATS_LOCK();
1834                stats.get_cmds   += stats_get_cmds;
1835                stats.get_hits   += stats_get_hits;
1836                stats.get_misses += stats_get_misses;
1837                STATS_UNLOCK();
1838                out_string(c, "CLIENT_ERROR bad command line format");
1839                return;
1840            }
1841
1842            stats_get_cmds++;
1843            it = item_get(key, nkey);
1844            if (settings.detail_enabled) {
1845                stats_prefix_record_get(key, NULL != it);
1846            }
1847            if (it) {
1848                if (i >= c->isize) {
1849                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1850                    if (new_list) {
1851                        c->isize *= 2;
1852                        c->ilist = new_list;
1853                    } else break;
1854                }
1855
1856                /*
1857                 * Construct the response. Each hit adds three elements to the
1858                 * outgoing data list:
1859                 *   "VALUE "
1860                 *   key
1861                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1862                 */
1863
1864                if(return_cas == true)
1865                {
1866                  /* Goofy mid-flight realloc. */
1867                  if (i >= c->suffixsize) {
1868                    char **new_suffix_list = realloc(c->suffixlist,
1869                                           sizeof(char *) * c->suffixsize * 2);
1870                    if (new_suffix_list) {
1871                      c->suffixsize *= 2;
1872                      c->suffixlist  = new_suffix_list;
1873                    } else break;
1874                  }
1875
1876                  suffix = suffix_from_freelist();
1877                  if (suffix == NULL) {
1878                    STATS_LOCK();
1879                    stats.get_cmds   += stats_get_cmds;
1880                    stats.get_hits   += stats_get_hits;
1881                    stats.get_misses += stats_get_misses;
1882                    STATS_UNLOCK();
1883                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1884                    return;
1885                  }
1886                  *(c->suffixlist + i) = suffix;
1887                  sprintf(suffix, " %llu\r\n", it->cas_id);
1888                  if (add_iov(c, "VALUE ", 6) != 0 ||
1889                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1890                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1891                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1892                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1893                      {
1894                          break;
1895                      }
1896                }
1897                else
1898                {
1899                  if (add_iov(c, "VALUE ", 6) != 0 ||
1900                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1901                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1902                      {
1903                          break;
1904                      }
1905                }
1906
1907
1908                if (settings.verbose > 1)
1909                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1910
1911                /* item_get() has incremented it->refcount for us */
1912                stats_get_hits++;
1913                item_update(it);
1914                *(c->ilist + i) = it;
1915                i++;
1916
1917            } else {
1918                stats_get_misses++;
1919            }
1920
1921            key_token++;
1922        }
1923
1924        /*
1925         * If the command string hasn't been fully processed, get the next set
1926         * of tokens.
1927         */
1928        if(key_token->value != NULL) {
1929            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1930            key_token = tokens;
1931        }
1932
1933    } while(key_token->value != NULL);
1934
1935    c->icurr = c->ilist;
1936    c->ileft = i;
1937    if (return_cas) {
1938        c->suffixcurr = c->suffixlist;
1939        c->suffixleft = i;
1940    }
1941
1942    if (settings.verbose > 1)
1943        fprintf(stderr, ">%d END\n", c->sfd);
1944
1945    /*
1946        If the loop was terminated because of out-of-memory, it is not
1947        reliable to add END\r\n to the buffer, because it might not end
1948        in \r\n. So we send SERVER_ERROR instead.
1949    */
1950    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1951        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1952        out_string(c, "SERVER_ERROR out of memory writing get response");
1953    }
1954    else {
1955        conn_set_state(c, conn_mwrite);
1956        c->msgcurr = 0;
1957    }
1958
1959    STATS_LOCK();
1960    stats.get_cmds   += stats_get_cmds;
1961    stats.get_hits   += stats_get_hits;
1962    stats.get_misses += stats_get_misses;
1963    STATS_UNLOCK();
1964
1965    return;
1966}
1967
1968static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1969    char *key;
1970    size_t nkey;
1971    int flags;
1972    time_t exptime;
1973    int vlen, old_vlen;
1974    uint64_t req_cas_id;
1975    item *it, *old_it;
1976
1977    assert(c != NULL);
1978
1979    set_noreply_maybe(c, tokens, ntokens);
1980
1981    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1982        out_string(c, "CLIENT_ERROR bad command line format");
1983        return;
1984    }
1985
1986    key = tokens[KEY_TOKEN].value;
1987    nkey = tokens[KEY_TOKEN].length;
1988
1989    flags = strtoul(tokens[2].value, NULL, 10);
1990    exptime = strtol(tokens[3].value, NULL, 10);
1991    vlen = strtol(tokens[4].value, NULL, 10);
1992
1993    // does cas value exist?
1994    if(handle_cas)
1995    {
1996      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1997    }
1998
1999    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
2000        out_string(c, "CLIENT_ERROR bad command line format");
2001        return;
2002    }
2003
2004    if (settings.detail_enabled) {
2005        stats_prefix_record_set(key);
2006    }
2007
2008    if (settings.managed) {
2009        int bucket = c->bucket;
2010        if (bucket == -1) {
2011            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2012            return;
2013        }
2014        c->bucket = -1;
2015        if (buckets[bucket] != c->gen) {
2016            out_string(c, "ERROR_NOT_OWNER");
2017            return;
2018        }
2019    }
2020
2021    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
2022
2023    if (it == 0) {
2024        if (! item_size_ok(nkey, flags, vlen + 2))
2025            out_string(c, "SERVER_ERROR object too large for cache");
2026        else
2027            out_string(c, "SERVER_ERROR out of memory storing object");
2028        /* swallow the data line */
2029        c->write_and_go = conn_swallow;
2030        c->sbytes = vlen + 2;
2031        return;
2032    }
2033    if(handle_cas)
2034      it->cas_id = req_cas_id;
2035
2036    c->item = it;
2037    c->ritem = ITEM_data(it);
2038    c->rlbytes = it->nbytes;
2039    c->item_comm = comm;
2040    conn_set_state(c, conn_nread);
2041}
2042
2043static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2044    char temp[sizeof("18446744073709551615")];
2045    item *it;
2046    int64_t delta;
2047    char *key;
2048    size_t nkey;
2049
2050    assert(c != NULL);
2051
2052    set_noreply_maybe(c, tokens, ntokens);
2053
2054    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2055        out_string(c, "CLIENT_ERROR bad command line format");
2056        return;
2057    }
2058
2059    key = tokens[KEY_TOKEN].value;
2060    nkey = tokens[KEY_TOKEN].length;
2061
2062    if (settings.managed) {
2063        int bucket = c->bucket;
2064        if (bucket == -1) {
2065            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2066            return;
2067        }
2068        c->bucket = -1;
2069        if (buckets[bucket] != c->gen) {
2070            out_string(c, "ERROR_NOT_OWNER");
2071            return;
2072        }
2073    }
2074
2075    delta = strtoll(tokens[2].value, NULL, 10);
2076
2077    if(errno == ERANGE) {
2078        out_string(c, "CLIENT_ERROR bad command line format");
2079        return;
2080    }
2081
2082    it = item_get(key, nkey);
2083    if (!it) {
2084        out_string(c, "NOT_FOUND");
2085        return;
2086    }
2087
2088    out_string(c, add_delta(it, incr, delta, temp));
2089    item_remove(it);         /* release our reference */
2090}
2091
2092/*
2093 * adds a delta value to a numeric item.
2094 *
2095 * it    item to adjust
2096 * incr  true to increment value, false to decrement
2097 * delta amount to adjust value by
2098 * buf   buffer for response string
2099 *
2100 * returns a response string to send back to the client.
2101 */
2102char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2103    char *ptr;
2104    int64_t value;
2105    int res;
2106
2107    ptr = ITEM_data(it);
2108    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2109
2110    value = strtoull(ptr, NULL, 10);
2111
2112    if(errno == ERANGE) {
2113        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2114    }
2115
2116    if (incr)
2117        value += delta;
2118    else {
2119        value -= delta;
2120    }
2121    if(value < 0) {
2122        value = 0;
2123    }
2124    sprintf(buf, "%llu", value);
2125    res = strlen(buf);
2126    if (res + 2 > it->nbytes) { /* need to realloc */
2127        item *new_it;
2128        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2129        if (new_it == 0) {
2130            return "SERVER_ERROR out of memory in incr/decr";
2131        }
2132        memcpy(ITEM_data(new_it), buf, res);
2133        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2134        do_item_replace(it, new_it);
2135        do_item_remove(new_it);       /* release our reference */
2136    } else { /* replace in-place */
2137        memcpy(ITEM_data(it), buf, res);
2138        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2139    }
2140
2141    return buf;
2142}
2143
2144static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2145    char *key;
2146    size_t nkey;
2147    item *it;
2148    time_t exptime = 0;
2149
2150    assert(c != NULL);
2151
2152    set_noreply_maybe(c, tokens, ntokens);
2153
2154    if (settings.managed) {
2155        int bucket = c->bucket;
2156        if (bucket == -1) {
2157            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2158            return;
2159        }
2160        c->bucket = -1;
2161        if (buckets[bucket] != c->gen) {
2162            out_string(c, "ERROR_NOT_OWNER");
2163            return;
2164        }
2165    }
2166
2167    key = tokens[KEY_TOKEN].value;
2168    nkey = tokens[KEY_TOKEN].length;
2169
2170    if(nkey > KEY_MAX_LENGTH) {
2171        out_string(c, "CLIENT_ERROR bad command line format");
2172        return;
2173    }
2174
2175    if(ntokens == (c->noreply ? 5 : 4)) {
2176        exptime = strtol(tokens[2].value, NULL, 10);
2177
2178        if(errno == ERANGE) {
2179            out_string(c, "CLIENT_ERROR bad command line format");
2180            return;
2181        }
2182    }
2183
2184    if (settings.detail_enabled) {
2185        stats_prefix_record_delete(key);
2186    }
2187
2188    it = item_get(key, nkey);
2189    if (it) {
2190        if (exptime == 0) {
2191            item_unlink(it);
2192            item_remove(it);      /* release our reference */
2193            out_string(c, "DELETED");
2194        } else {
2195            /* our reference will be transfered to the delete queue */
2196            out_string(c, defer_delete(it, exptime));
2197        }
2198    } else {
2199        out_string(c, "NOT_FOUND");
2200    }
2201}
2202
2203/*
2204 * Adds an item to the deferred-delete list so it can be reaped later.
2205 *
2206 * Returns the result to send to the client.
2207 */
2208char *do_defer_delete(item *it, time_t exptime)
2209{
2210    if (delcurr >= deltotal) {
2211        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2212        if (new_delete) {
2213            todelete = new_delete;
2214            deltotal *= 2;
2215        } else {
2216            /*
2217             * can't delete it immediately, user wants a delay,
2218             * but we ran out of memory for the delete queue
2219             */
2220            item_remove(it);    /* release reference */
2221            return "SERVER_ERROR out of memory expanding delete queue";
2222        }
2223    }
2224
2225    /* use its expiration time as its deletion time now */
2226    it->exptime = realtime(exptime);
2227    it->it_flags |= ITEM_DELETED;
2228    todelete[delcurr++] = it;
2229
2230    return "DELETED";
2231}
2232
2233static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2234    unsigned int level;
2235
2236    assert(c != NULL);
2237
2238    set_noreply_maybe(c, tokens, ntokens);
2239
2240    level = strtoul(tokens[1].value, NULL, 10);
2241    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2242    out_string(c, "OK");
2243    return;
2244}
2245
2246static void process_command(conn *c, char *command) {
2247
2248    token_t tokens[MAX_TOKENS];
2249    size_t ntokens;
2250    int comm;
2251
2252    assert(c != NULL);
2253
2254    if (settings.verbose > 1)
2255        fprintf(stderr, "<%d %s\n", c->sfd, command);
2256
2257    /*
2258     * for commands set/add/replace, we build an item and read the data
2259     * directly into it, then continue in nread_complete().
2260     */
2261
2262    c->msgcurr = 0;
2263    c->msgused = 0;
2264    c->iovused = 0;
2265    if (add_msghdr(c) != 0) {
2266        out_string(c, "SERVER_ERROR out of memory preparing response");
2267        return;
2268    }
2269
2270    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2271    if (ntokens >= 3 &&
2272        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2273         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2274
2275        process_get_command(c, tokens, ntokens, false);
2276
2277    } else if ((ntokens == 6 || ntokens == 7) &&
2278               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2279                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2280                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2281                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2282                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2283
2284        process_update_command(c, tokens, ntokens, comm, false);
2285
2286    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2287
2288        process_update_command(c, tokens, ntokens, comm, true);
2289
2290    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2291
2292        process_arithmetic_command(c, tokens, ntokens, 1);
2293
2294    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2295
2296        process_get_command(c, tokens, ntokens, true);
2297
2298    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2299
2300        process_arithmetic_command(c, tokens, ntokens, 0);
2301
2302    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2303
2304        process_delete_command(c, tokens, ntokens);
2305
2306    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2307        unsigned int bucket, gen;
2308        if (!settings.managed) {
2309            out_string(c, "CLIENT_ERROR not a managed instance");
2310            return;
2311        }
2312
2313        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2314            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2315                out_string(c, "CLIENT_ERROR bucket number out of range");
2316                return;
2317            }
2318            buckets[bucket] = gen;
2319            out_string(c, "OWNED");
2320            return;
2321        } else {
2322            out_string(c, "CLIENT_ERROR bad format");
2323            return;
2324        }
2325
2326    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2327
2328        int bucket;
2329        if (!settings.managed) {
2330            out_string(c, "CLIENT_ERROR not a managed instance");
2331            return;
2332        }
2333        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2334            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2335                out_string(c, "CLIENT_ERROR bucket number out of range");
2336                return;
2337            }
2338            buckets[bucket] = 0;
2339            out_string(c, "DISOWNED");
2340            return;
2341        } else {
2342            out_string(c, "CLIENT_ERROR bad format");
2343            return;
2344        }
2345
2346    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2347        int bucket, gen;
2348        if (!settings.managed) {
2349            out_string(c, "CLIENT_ERROR not a managed instance");
2350            return;
2351        }
2352        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2353            /* we never write anything back, even if input's wrong */
2354            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2355                /* do nothing, bad input */
2356            } else {
2357                c->bucket = bucket;
2358                c->gen = gen;
2359            }
2360            conn_set_init_state(c);
2361            return;
2362        } else {
2363            out_string(c, "CLIENT_ERROR bad format");
2364            return;
2365        }
2366
2367    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2368
2369        process_stat(c, tokens, ntokens);
2370
2371    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2372        time_t exptime = 0;
2373        set_current_time();
2374
2375        set_noreply_maybe(c, tokens, ntokens);
2376
2377        if(ntokens == (c->noreply ? 3 : 2)) {
2378            settings.oldest_live = current_time - 1;
2379            item_flush_expired();
2380            out_string(c, "OK");
2381            return;
2382        }
2383
2384        exptime = strtol(tokens[1].value, NULL, 10);
2385        if(errno == ERANGE) {
2386            out_string(c, "CLIENT_ERROR bad command line format");
2387            return;
2388        }
2389
2390        /*
2391          If exptime is zero realtime() would return zero too, and
2392          realtime(exptime) - 1 would overflow to the max unsigned
2393          value.  So we process exptime == 0 the same way we do when
2394          no delay is given at all.
2395        */
2396        if (exptime > 0)
2397            settings.oldest_live = realtime(exptime) - 1;
2398        else /* exptime == 0 */
2399            settings.oldest_live = current_time - 1;
2400        item_flush_expired();
2401        out_string(c, "OK");
2402        return;
2403
2404    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2405
2406        out_string(c, "VERSION " VERSION);
2407
2408    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2409
2410        conn_set_state(c, conn_closing);
2411
2412    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2413                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2414#ifdef ALLOW_SLABS_REASSIGN
2415
2416        int src, dst, rv;
2417
2418        src = strtol(tokens[2].value, NULL, 10);
2419        dst  = strtol(tokens[3].value, NULL, 10);
2420
2421        if(errno == ERANGE) {
2422            out_string(c, "CLIENT_ERROR bad command line format");
2423            return;
2424        }
2425
2426        rv = slabs_reassign(src, dst);
2427        if (rv == 1) {
2428            out_string(c, "DONE");
2429            return;
2430        }
2431        if (rv == 0) {
2432            out_string(c, "CANT");
2433            return;
2434        }
2435        if (rv == -1) {
2436            out_string(c, "BUSY");
2437            return;
2438        }
2439#else
2440        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2441#endif
2442    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2443        process_verbosity_command(c, tokens, ntokens);
2444    } else {
2445        out_string(c, "ERROR");
2446    }
2447    return;
2448}
2449
2450/*
2451 * if we have a complete line in the buffer, process it.
2452 */
2453static int try_read_command(conn *c) {
2454    char *el, *cont;
2455
2456    assert(c != NULL);
2457    assert(c->rcurr <= (c->rbuf + c->rsize));
2458
2459    if (c->rbytes == 0)
2460        return 0;
2461    el = memchr(c->rcurr, '\n', c->rbytes);
2462    if (!el)
2463        return 0;
2464    cont = el + 1;
2465    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2466        el--;
2467    }
2468    *el = '\0';
2469
2470    assert(cont <= (c->rcurr + c->rbytes));
2471
2472    process_command(c, c->rcurr);
2473
2474    c->rbytes -= (cont - c->rcurr);
2475    c->rcurr = cont;
2476
2477    assert(c->rcurr <= (c->rbuf + c->rsize));
2478
2479    return 1;
2480}
2481
2482/*
2483 * read a UDP request.
2484 * return 0 if there's nothing to read.
2485 */
2486static int try_read_udp(conn *c) {
2487    int res;
2488
2489    assert(c != NULL);
2490
2491    c->request_addr_size = sizeof(c->request_addr);
2492    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2493                   0, &c->request_addr, &c->request_addr_size);
2494    if (res > 8) {
2495        unsigned char *buf = (unsigned char *)c->rbuf;
2496        STATS_LOCK();
2497        stats.bytes_read += res;
2498        STATS_UNLOCK();
2499
2500        /* Beginning of UDP packet is the request ID; save it. */
2501        c->request_id = buf[0] * 256 + buf[1];
2502
2503        /* If this is a multi-packet request, drop it. */
2504        if (buf[4] != 0 || buf[5] != 1) {
2505            out_string(c, "SERVER_ERROR multi-packet request not supported");
2506            return 0;
2507        }
2508
2509        /* Don't care about any of the rest of the header. */
2510        res -= 8;
2511        memmove(c->rbuf, c->rbuf + 8, res);
2512
2513        c->rbytes += res;
2514        c->rcurr = c->rbuf;
2515        return 1;
2516    }
2517    return 0;
2518}
2519
2520/*
2521 * read from network as much as we can, handle buffer overflow and connection
2522 * close.
2523 * before reading, move the remaining incomplete fragment of a command
2524 * (if any) to the beginning of the buffer.
2525 * return 0 if there's nothing to read on the first read.
2526 */
2527static int try_read_network(conn *c) {
2528    int gotdata = 0;
2529    int res;
2530
2531    assert(c != NULL);
2532
2533    if (c->rcurr != c->rbuf) {
2534        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2535            memmove(c->rbuf, c->rcurr, c->rbytes);
2536        c->rcurr = c->rbuf;
2537    }
2538
2539    while (1) {
2540        if (c->rbytes >= c->rsize) {
2541            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2542            if (!new_rbuf) {
2543                if (settings.verbose > 0)
2544                    fprintf(stderr, "Couldn't realloc input buffer\n");
2545                c->rbytes = 0; /* ignore what we read */
2546                out_string(c, "SERVER_ERROR out of memory reading request");
2547                c->write_and_go = conn_closing;
2548                return 1;
2549            }
2550            c->rcurr = c->rbuf = new_rbuf;
2551            c->rsize *= 2;
2552        }
2553
2554        int avail = c->rsize - c->rbytes;
2555        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2556        if (res > 0) {
2557            STATS_LOCK();
2558            stats.bytes_read += res;
2559            STATS_UNLOCK();
2560            gotdata = 1;
2561            c->rbytes += res;
2562            if (res == avail) {
2563                continue;
2564            } else {
2565                break;
2566            }
2567        }
2568        if (res == 0) {
2569            /* connection closed */
2570            conn_set_state(c, conn_closing);
2571            return 1;
2572        }
2573        if (res == -1) {
2574            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
2575            /* Should close on unhandled errors. */
2576            conn_set_state(c, conn_closing);
2577            return 1;
2578        }
2579    }
2580    return gotdata;
2581}
2582
2583static bool update_event(conn *c, const int new_flags) {
2584    assert(c != NULL);
2585
2586    struct event_base *base = c->event.ev_base;
2587    if (c->ev_flags == new_flags)
2588        return true;
2589    if (event_del(&c->event) == -1) return false;
2590    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2591    event_base_set(base, &c->event);
2592    c->ev_flags = new_flags;
2593    if (event_add(&c->event, 0) == -1) return false;
2594    return true;
2595}
2596
2597/*
2598 * Sets whether we are listening for new connections or not.
2599 */
2600void accept_new_conns(const bool do_accept) {
2601    conn *next;
2602
2603    if (! is_listen_thread())
2604        return;
2605
2606    for (next = listen_conn; next; next = next->next) {
2607        if (do_accept) {
2608            update_event(next, EV_READ | EV_PERSIST);
2609            if (listen(next->sfd, 1024) != 0) {
2610                perror("listen");
2611            }
2612        }
2613        else {
2614            update_event(next, 0);
2615            if (listen(next->sfd, 0) != 0) {
2616                perror("listen");
2617            }
2618        }
2619  }
2620}
2621
2622/*
2623 * Transmit the next chunk of data from our list of msgbuf structures.
2624 *
2625 * Returns:
2626 *   TRANSMIT_COMPLETE   All done writing.
2627 *   TRANSMIT_INCOMPLETE More data remaining to write.
2628 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2629 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2630 */
2631static int transmit(conn *c) {
2632    assert(c != NULL);
2633
2634    if (c->msgcurr < c->msgused &&
2635            c->msglist[c->msgcurr].msg_iovlen == 0) {
2636        /* Finished writing the current msg; advance to the next. */
2637        c->msgcurr++;
2638    }
2639    if (c->msgcurr < c->msgused) {
2640        ssize_t res;
2641        struct msghdr *m = &c->msglist[c->msgcurr];
2642
2643        res = sendmsg(c->sfd, m, 0);
2644        if (res > 0) {
2645            STATS_LOCK();
2646            stats.bytes_written += res;
2647            STATS_UNLOCK();
2648
2649            /* We've written some of the data. Remove the completed
2650               iovec entries from the list of pending writes. */
2651            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2652                res -= m->msg_iov->iov_len;
2653                m->msg_iovlen--;
2654                m->msg_iov++;
2655            }
2656
2657            /* Might have written just part of the last iovec entry;
2658               adjust it so the next write will do the rest. */
2659            if (res > 0) {
2660                m->msg_iov->iov_base += res;
2661                m->msg_iov->iov_len -= res;
2662            }
2663            return TRANSMIT_INCOMPLETE;
2664        }
2665        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2666            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2667                if (settings.verbose > 0)
2668                    fprintf(stderr, "Couldn't update event\n");
2669                conn_set_state(c, conn_closing);
2670                return TRANSMIT_HARD_ERROR;
2671            }
2672            return TRANSMIT_SOFT_ERROR;
2673        }
2674        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
2675           we have a real error, on which we close the connection */
2676        if (settings.verbose > 0)
2677            perror("Failed to write, and not due to blocking");
2678
2679        if (IS_UDP(c->protocol))
2680            conn_set_state(c, conn_read);
2681        else
2682            conn_set_state(c, conn_closing);
2683        return TRANSMIT_HARD_ERROR;
2684    } else {
2685        return TRANSMIT_COMPLETE;
2686    }
2687}
2688
2689static void drive_machine(conn *c) {
2690    bool stop = false;
2691    int sfd, flags = 1;
2692    enum conn_states init_state; /* initial state for a new connection */
2693    socklen_t addrlen;
2694    struct sockaddr_storage addr;
2695    int res;
2696
2697    assert(c != NULL);
2698
2699    while (!stop) {
2700
2701        switch(c->state) {
2702        case conn_listening:
2703            addrlen = sizeof(addr);
2704            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2705                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2706                    /* these are transient, so don't log anything */
2707                    stop = true;
2708                } else if (errno == EMFILE) {
2709                    if (settings.verbose > 0)
2710                        fprintf(stderr, "Too many open connections\n");
2711                    accept_new_conns(false);
2712                    stop = true;
2713                } else {
2714                    perror("accept()");
2715                    stop = true;
2716                }
2717                break;
2718            }
2719            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2720                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2721                perror("setting O_NONBLOCK");
2722                close(sfd);
2723                break;
2724            }
2725            init_state = get_init_state(c);
2726
2727            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST,
2728                                     DATA_BUFFER_SIZE, c->protocol);
2729
2730            break;
2731
2732        case conn_negotiate:
2733            if (settings.verbose > 1)
2734                fprintf(stderr, "Negotiating protocol for a new connection\n");
2735            c->rlbytes = 1;
2736            c->ritem = c->rbuf;
2737            c->rcurr = c->rbuf;
2738            c->wcurr = c->wbuf;
2739            conn_set_state(c, conn_nread);
2740            break;
2741
2742        case conn_read:
2743            if (try_read_command(c) != 0) {
2744                continue;
2745            }
2746            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) {
2747                continue;
2748            }
2749            /* we have no command line and no data to read from network */
2750            if (!update_event(c, EV_READ | EV_PERSIST)) {
2751                if (settings.verbose > 0)
2752                    fprintf(stderr, "Couldn't update event\n");
2753                conn_set_state(c, conn_closing);
2754                break;
2755            }
2756            stop = true;
2757            break;
2758
2759        case conn_bin_init: /* Reinitialize a binary connection */
2760            reinit_bin_connection(c);
2761            break;
2762
2763        case conn_nread:
2764            if (c->rlbytes == 0) {
2765                complete_nread(c);
2766                break;
2767            }
2768            /* first check if we have leftovers in the conn_read buffer */
2769            if (c->rbytes > 0) {
2770                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2771                memcpy(c->ritem, c->rcurr, tocopy);
2772                c->ritem += tocopy;
2773                c->rlbytes -= tocopy;
2774                c->rcurr += tocopy;
2775                c->rbytes -= tocopy;
2776                break;
2777            }
2778
2779            /*  now try reading from the socket */
2780            res = read(c->sfd, c->ritem, c->rlbytes);
2781            if (res > 0) {
2782                STATS_LOCK();
2783                stats.bytes_read += res;
2784                STATS_UNLOCK();
2785                c->ritem += res;
2786                c->rlbytes -= res;
2787                break;
2788            }
2789            if (res == 0) { /* end of stream */
2790                conn_set_state(c, conn_closing);
2791                break;
2792            }
2793            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2794                if (!update_event(c, EV_READ | EV_PERSIST)) {
2795                    if (settings.verbose > 0)
2796                        fprintf(stderr, "Couldn't update event\n");
2797                    conn_set_state(c, conn_closing);
2798                    break;
2799                }
2800                stop = true;
2801                break;
2802            }
2803            /* otherwise we have a real error, on which we close the connection */
2804            if (settings.verbose > 0)
2805                fprintf(stderr, "Failed to read, and not due to blocking\n");
2806            conn_set_state(c, conn_closing);
2807            break;
2808
2809        case conn_swallow:
2810            /* we are reading sbytes and throwing them away */
2811            if (c->sbytes == 0) {
2812                conn_set_init_state(c);
2813                break;
2814            }
2815
2816            /* first check if we have leftovers in the conn_read buffer */
2817            if (c->rbytes > 0) {
2818                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2819                c->sbytes -= tocopy;
2820                c->rcurr += tocopy;
2821                c->rbytes -= tocopy;
2822                break;
2823            }
2824
2825            /*  now try reading from the socket */
2826            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2827            if (res > 0) {
2828                STATS_LOCK();
2829                stats.bytes_read += res;
2830                STATS_UNLOCK();
2831                c->sbytes -= res;
2832                break;
2833            }
2834            if (res == 0) { /* end of stream */
2835                conn_set_state(c, conn_closing);
2836                break;
2837            }
2838            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2839                if (!update_event(c, EV_READ | EV_PERSIST)) {
2840                    if (settings.verbose > 0)
2841                        fprintf(stderr, "Couldn't update event\n");
2842                    conn_set_state(c, conn_closing);
2843                    break;
2844                }
2845                stop = true;
2846                break;
2847            }
2848            /* otherwise we have a real error, on which we close the connection */
2849            if (settings.verbose > 0)
2850                fprintf(stderr, "Failed to read, and not due to blocking\n");
2851            conn_set_state(c, conn_closing);
2852            break;
2853
2854        case conn_write:
2855            /*
2856             * We want to write out a simple response. If we haven't already,
2857             * assemble it into a msgbuf list (this will be a single-entry
2858             * list for TCP or a two-entry list for UDP).
2859             */
2860            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2861                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2862                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2863                    if (settings.verbose > 0)
2864                        fprintf(stderr, "Couldn't build response\n");
2865                    conn_set_state(c, conn_closing);
2866                    break;
2867                }
2868            }
2869
2870            /* fall through... */
2871
2872        case conn_mwrite:
2873            switch (transmit(c)) {
2874            case TRANSMIT_COMPLETE:
2875                if (c->state == conn_mwrite) {
2876                    while (c->ileft > 0) {
2877                        item *it = *(c->icurr);
2878                        assert((it->it_flags & ITEM_SLABBED) == 0);
2879                        item_remove(it);
2880                        c->icurr++;
2881                        c->ileft--;
2882                    }
2883                    while (c->suffixleft > 0) {
2884                        char *suffix = *(c->suffixcurr);
2885                        if(suffix_add_to_freelist(suffix)) {
2886                            /* Failed to add to freelist, don't leak */
2887                            free(suffix);
2888                        }
2889                        c->suffixcurr++;
2890                        c->suffixleft--;
2891                    }
2892                    /* XXX:  I don't know why this wasn't the general case */
2893                    if(c->protocol == binary_prot) {
2894                        conn_set_state(c, c->write_and_go);
2895                    } else {
2896                        conn_set_init_state(c);
2897                    }
2898                } else if (c->state == conn_write) {
2899                    if (c->write_and_free) {
2900                        free(c->write_and_free);
2901                        c->write_and_free = 0;
2902                    }
2903                    conn_set_state(c, c->write_and_go);
2904                } else {
2905                    if (settings.verbose > 0)
2906                        fprintf(stderr, "Unexpected state %d\n", c->state);
2907                    conn_set_state(c, conn_closing);
2908                }
2909                break;
2910
2911            case TRANSMIT_INCOMPLETE:
2912            case TRANSMIT_HARD_ERROR:
2913                break;                   /* Continue in state machine. */
2914
2915            case TRANSMIT_SOFT_ERROR:
2916                stop = true;
2917                break;
2918            }
2919            break;
2920
2921        case conn_closing:
2922            if (IS_UDP(c->protocol))
2923                conn_cleanup(c);
2924            else
2925                conn_close(c);
2926            stop = true;
2927            break;
2928        }
2929    }
2930
2931    return;
2932}
2933
2934void event_handler(const int fd, const short which, void *arg) {
2935    conn *c;
2936
2937    c = (conn *)arg;
2938    assert(c != NULL);
2939
2940    c->which = which;
2941
2942    /* sanity */
2943    if (fd != c->sfd) {
2944        if (settings.verbose > 0)
2945            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2946        conn_close(c);
2947        return;
2948    }
2949
2950    drive_machine(c);
2951
2952    /* wait for next event */
2953    return;
2954}
2955
2956static int new_socket(struct addrinfo *ai) {
2957    int sfd;
2958    int flags;
2959
2960    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {