root/branches/binary/server/memcached.c @ 759

Revision 759, 111.1 kB (checked in by dsallings, 21 months ago)

Style fix: horizontal spacing

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63/*
64 * forward declarations
65 */
66static void drive_machine(conn *c);
67static int new_socket(struct addrinfo *ai);
68static int server_socket(const int port, enum protocol prot);
69static int try_read_command(conn *c);
70static int try_read_network(conn *c);
71static int try_read_udp(conn *c);
72static void conn_set_state(conn *c, enum conn_states state);
73
74/* stats */
75static void stats_reset(void);
76static void stats_init(void);
77
78/* defaults */
79static void settings_init(void);
80
81/* event handling, network IO */
82static void event_handler(const int fd, const short which, void *arg);
83static void conn_close(conn *c);
84static void conn_init(void);
85static void accept_new_conns(const bool do_accept);
86static bool update_event(conn *c, const int new_flags);
87static void complete_nread(conn *c);
88static void process_command(conn *c, char *command);
89static int transmit(conn *c);
90static int ensure_iov_space(conn *c);
91static int add_iov(conn *c, const void *buf, int len);
92static int add_msghdr(conn *c);
93
94/* time handling */
95static void set_current_time(void);  /* update the global variable holding
96                              global 32-bit seconds-since-start time
97                              (to avoid 64 bit time_t) */
98
99static void conn_free(conn *c);
100
101/** exported globals **/
102struct stats stats;
103struct settings settings;
104
105/** file scope variables **/
106static item **todelete = NULL;
107static int delcurr;
108static int deltotal;
109static conn *listen_conn = NULL;
110static struct event_base *main_base;
111
112#define TRANSMIT_COMPLETE   0
113#define TRANSMIT_INCOMPLETE 1
114#define TRANSMIT_SOFT_ERROR 2
115#define TRANSMIT_HARD_ERROR 3
116
117static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119#define REALTIME_MAXDELTA 60*60*24*30
120/*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125static rel_time_t realtime(const time_t exptime) {
126    /* no. of seconds in 30 days - largest possible delta exptime */
127
128    if (exptime == 0) return 0; /* 0 means never expire */
129
130    if (exptime > REALTIME_MAXDELTA) {
131        /* if item expiration is at/before the server started, give it an
132           expiration time of 1 second after the server started.
133           (because 0 means don't expire).  without this, we'd
134           underflow and wrap around to some large value way in the
135           future, effectively making items expiring in the past
136           really expiring never */
137        if (exptime <= stats.started)
138            return (rel_time_t)1;
139        return (rel_time_t)(exptime - stats.started);
140    } else {
141        return (rel_time_t)(exptime + current_time);
142    }
143}
144
145static void stats_init(void) {
146    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150    /* make the time we started always be 2 seconds before we really
151       did, so time(0) - time.started is never zero.  if so, things
152       like 'settings.oldest_live' which act as booleans as well as
153       values are now false in boolean context... */
154    stats.started = time(0) - 2;
155    stats_prefix_init();
156}
157
158static void stats_reset(void) {
159    STATS_LOCK();
160    stats.total_items = stats.total_conns = 0;
161    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162    stats.bytes_read = stats.bytes_written = 0;
163    stats_prefix_clear();
164    STATS_UNLOCK();
165}
166
167static void settings_init(void) {
168    settings.access = 0700;
169    settings.port = 11211;
170    settings.udpport = 0;
171    /* By default this string should be NULL for getaddrinfo() */
172    settings.inter = NULL;
173    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
174    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
175    settings.verbose = 0;
176    settings.oldest_live = 0;
177    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
178    settings.socketpath = NULL;       /* by default, not using a unix socket */
179    settings.managed = false;
180    settings.factor = 1.25;
181    settings.chunk_size = 48;         /* space for a modest key and value */
182#ifdef USE_THREADS
183    settings.num_threads = 4;
184#else
185    settings.num_threads = 1;
186#endif
187    settings.prefix_delimiter = ':';
188    settings.detail_enabled = 0;
189}
190
191/* returns true if a deleted item's delete-locked-time is over, and it
192   should be removed from the namespace */
193static bool item_delete_lock_over (item *it) {
194    assert(it->it_flags & ITEM_DELETED);
195    return (current_time >= it->exptime);
196}
197
198/*
199 * Adds a message header to a connection.
200 *
201 * Returns 0 on success, -1 on out-of-memory.
202 */
203static int add_msghdr(conn *c)
204{
205    struct msghdr *msg;
206
207    assert(c != NULL);
208
209    if (c->msgsize == c->msgused) {
210        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
211        if (! msg)
212            return -1;
213        c->msglist = msg;
214        c->msgsize *= 2;
215    }
216
217    msg = c->msglist + c->msgused;
218
219    /* this wipes msg_iovlen, msg_control, msg_controllen, and
220       msg_flags, the last 3 of which aren't defined on solaris: */
221    memset(msg, 0, sizeof(struct msghdr));
222
223    msg->msg_iov = &c->iov[c->iovused];
224
225    if (c->request_addr_size > 0) {
226        msg->msg_name = &c->request_addr;
227        msg->msg_namelen = c->request_addr_size;
228    }
229
230    c->msgbytes = 0;
231    c->msgused++;
232
233    if (IS_UDP(c->protocol)) {
234        /* Leave room for the UDP header, which we'll fill in later. */
235        return add_iov(c, NULL, UDP_HEADER_SIZE);
236    }
237
238    return 0;
239}
240
241
242/*
243 * Free list management for connections.
244 */
245
246static conn **freeconns;
247static int freetotal;
248static int freecurr;
249
250
251static void conn_init(void) {
252    freetotal = 200;
253    freecurr = 0;
254    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
255        fprintf(stderr, "malloc()\n");
256    }
257    return;
258}
259
260/*
261 * Returns a connection from the freelist, if any. Should call this using
262 * conn_from_freelist() for thread safety.
263 */
264conn *do_conn_from_freelist() {
265    conn *c;
266
267    if (freecurr > 0) {
268        c = freeconns[--freecurr];
269    } else {
270        c = NULL;
271    }
272
273    return c;
274}
275
276/*
277 * Adds a connection to the freelist. 0 = success. Should call this using
278 * conn_add_to_freelist() for thread safety.
279 */
280bool do_conn_add_to_freelist(conn *c) {
281    if (freecurr < freetotal) {
282        freeconns[freecurr++] = c;
283        return false;
284    } else {
285        /* try to enlarge free connections array */
286        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
287        if (new_freeconns) {
288            freetotal *= 2;
289            freeconns = new_freeconns;
290            freeconns[freecurr++] = c;
291            return false;
292        }
293    }
294    return true;
295}
296
297static char *prot_text(enum protocol prot) {
298    char *rv = "unknown";
299    switch(prot) {
300        case ascii_prot:
301            rv = "ascii";
302            break;
303        case binary_prot:
304            rv = "binary";
305            break;
306        case ascii_udp_prot:
307            rv = "ascii-udp";
308            break;
309        case negotiating_prot:
310            rv = "auto-negotiate";
311            break;
312    }
313    return rv;
314}
315
316conn *conn_new(const int sfd, enum conn_states init_state,
317                const int event_flags,
318                const int read_buffer_size, enum protocol prot,
319                struct event_base *base) {
320    conn *c = conn_from_freelist();
321
322    if (NULL == c) {
323        if (!(c = (conn *)malloc(sizeof(conn)))) {
324            fprintf(stderr, "malloc()\n");
325            return NULL;
326        }
327        c->rbuf = c->wbuf = 0;
328        c->ilist = 0;
329        c->suffixlist = 0;
330        c->iov = 0;
331        c->msglist = 0;
332        c->hdrbuf = 0;
333
334        c->rsize = read_buffer_size;
335        c->wsize = DATA_BUFFER_SIZE;
336        c->isize = ITEM_LIST_INITIAL;
337        c->suffixsize = SUFFIX_LIST_INITIAL;
338        c->iovsize = IOV_LIST_INITIAL;
339        c->msgsize = MSG_LIST_INITIAL;
340        c->hdrsize = 0;
341
342        c->rbuf = (char *)malloc((size_t)c->rsize);
343        c->wbuf = (char *)malloc((size_t)c->wsize);
344        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
345        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
346        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
347        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
348
349        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
350                c->msglist == 0 || c->suffixlist == 0) {
351            if (c->rbuf != 0) free(c->rbuf);
352            if (c->wbuf != 0) free(c->wbuf);
353            if (c->ilist != 0) free(c->ilist);
354            if (c->suffixlist != 0) free(c->suffixlist);
355            if (c->iov != 0) free(c->iov);
356            if (c->msglist != 0) free(c->msglist);
357            free(c);
358            fprintf(stderr, "malloc()\n");
359            return NULL;
360        }
361
362        STATS_LOCK();
363        stats.conn_structs++;
364        STATS_UNLOCK();
365    }
366
367    /* unix socket mode doesn't need this, so zeroed out.  but why
368     * is this done for every command?  presumably for UDP
369     * mode.  */
370    if (!settings.socketpath) {
371        c->request_addr_size = sizeof(c->request_addr);
372    } else {
373        c->request_addr_size = 0;
374    }
375
376    if (settings.verbose > 1) {
377        if (init_state == conn_listening) {
378            fprintf(stderr, "<%d server listening (%s)\n", sfd,
379                prot_text(prot));
380        } else if (IS_UDP(prot)) {
381            fprintf(stderr, "<%d server listening (udp)\n", sfd);
382        } else if (prot == binary_prot) {
383            fprintf(stderr, "<%d new binary client connection\n", sfd);
384        } else if (prot == ascii_prot) {
385            fprintf(stderr, "<%d new ascii client connection\n", sfd);
386        } else if (prot == negotiating_prot) {
387            fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd);
388        } else {
389            fprintf(stderr, "<%d new unknown (%d) client connection\n",
390                sfd, prot);
391            abort();
392        }
393    }
394
395    c->sfd = sfd;
396    c->protocol = prot;
397    c->state = init_state;
398    c->rlbytes = 0;
399    c->cmd = -1;
400    c->rbytes = c->wbytes = 0;
401    c->wcurr = c->wbuf;
402    c->rcurr = c->rbuf;
403    c->ritem = 0;
404    c->icurr = c->ilist;
405    c->suffixcurr = c->suffixlist;
406    c->ileft = 0;
407    c->suffixleft = 0;
408    c->iovused = 0;
409    c->msgcurr = 0;
410    c->msgused = 0;
411
412    c->write_and_go = init_state;
413    c->write_and_free = 0;
414    c->item = 0;
415    c->bucket = -1;
416    c->gen = 0;
417
418    c->noreply = false;
419
420    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
421    event_base_set(base, &c->event);
422    c->ev_flags = event_flags;
423
424    if (event_add(&c->event, 0) == -1) {
425        if (conn_add_to_freelist(c)) {
426            conn_free(c);
427        }
428        perror("event_add");
429        return NULL;
430    }
431
432    STATS_LOCK();
433    stats.curr_conns++;
434    stats.total_conns++;
435    STATS_UNLOCK();
436
437    return c;
438}
439
440static void conn_cleanup(conn *c) {
441    assert(c != NULL);
442
443    if (c->item) {
444        item_remove(c->item);
445        c->item = 0;
446    }
447
448    if (c->ileft != 0) {
449        for (; c->ileft > 0; c->ileft--,c->icurr++) {
450            item_remove(*(c->icurr));
451        }
452    }
453
454    if (c->suffixleft != 0) {
455        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
456            if(suffix_add_to_freelist(*(c->suffixcurr))) {
457                free(*(c->suffixcurr));
458            }
459        }
460    }
461
462    if (c->write_and_free) {
463        free(c->write_and_free);
464        c->write_and_free = 0;
465    }
466}
467
468/*
469 * Frees a connection.
470 */
471void conn_free(conn *c) {
472    if (c) {
473        if (c->hdrbuf)
474            free(c->hdrbuf);
475        if (c->msglist)
476            free(c->msglist);
477        if (c->rbuf)
478            free(c->rbuf);
479        if (c->wbuf)
480            free(c->wbuf);
481        if (c->ilist)
482            free(c->ilist);
483        if (c->suffixlist)
484            free(c->suffixlist);
485        if (c->iov)
486            free(c->iov);
487        free(c);
488    }
489}
490
491static void conn_close(conn *c) {
492    assert(c != NULL);
493
494    /* delete the event, the socket and the conn */
495    event_del(&c->event);
496
497    if (settings.verbose > 1)
498        fprintf(stderr, "<%d connection closed.\n", c->sfd);
499
500    close(c->sfd);
501    accept_new_conns(true);
502    conn_cleanup(c);
503
504    /* if the connection has big buffers, just free it */
505    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
506        conn_free(c);
507    }
508
509    STATS_LOCK();
510    stats.curr_conns--;
511    STATS_UNLOCK();
512
513    return;
514}
515
516static enum conn_states get_init_state(conn *c) {
517    int rv = 0;
518    assert(c != NULL);
519
520    switch(c->protocol) {
521        case binary_prot:
522            rv = conn_bin_init;
523            break;
524        case negotiating_prot:
525            rv = conn_negotiate;
526            break;
527        default:
528            rv = conn_read;
529    }
530    return rv;
531}
532
533/* Set the given connection to its initial state.  The initial state will vary
534 * base don protocol type. */
535static void conn_set_init_state(conn *c) {
536    assert(c != NULL);
537
538    conn_set_state(c, get_init_state(c));
539}
540
541/*
542 * Shrinks a connection's buffers if they're too big.  This prevents
543 * periodic large "get" requests from permanently chewing lots of server
544 * memory.
545 *
546 * This should only be called in between requests since it can wipe output
547 * buffers!
548 */
549static void conn_shrink(conn *c) {
550    assert(c != NULL);
551
552    if (IS_UDP(c->protocol))
553        return;
554
555    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
556        char *newbuf;
557
558        if (c->rcurr != c->rbuf)
559            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
560
561        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
562
563        if (newbuf) {
564            c->rbuf = newbuf;
565            c->rsize = DATA_BUFFER_SIZE;
566        }
567        /* TODO check other branch... */
568        c->rcurr = c->rbuf;
569    }
570
571    if (c->isize > ITEM_LIST_HIGHWAT) {
572        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
573        if (newbuf) {
574            c->ilist = newbuf;
575            c->isize = ITEM_LIST_INITIAL;
576        }
577    /* TODO check error condition? */
578    }
579
580    if (c->msgsize > MSG_LIST_HIGHWAT) {
581        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
582        if (newbuf) {
583            c->msglist = newbuf;
584            c->msgsize = MSG_LIST_INITIAL;
585        }
586    /* TODO check error condition? */
587    }
588
589    if (c->iovsize > IOV_LIST_HIGHWAT) {
590        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
591        if (newbuf) {
592            c->iov = newbuf;
593            c->iovsize = IOV_LIST_INITIAL;
594        }
595    /* TODO check return value */
596    }
597}
598
599/*
600 * Sets a connection's current state in the state machine. Any special
601 * processing that needs to happen on certain state transitions can
602 * happen here.
603 */
604static void conn_set_state(conn *c, enum conn_states state) {
605    assert(c != NULL);
606
607    if (state != c->state) {
608        if (state == conn_read) {
609            conn_shrink(c);
610            assoc_move_next_bucket();
611        }
612        c->state = state;
613    }
614}
615
616/*
617 * Free list management for suffix buffers.
618 */
619
620static char **freesuffix;
621static int freesuffixtotal;
622static int freesuffixcurr;
623
624static void suffix_init(void) {
625    freesuffixtotal = 500;
626    freesuffixcurr  = 0;
627
628    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
629    if (freesuffix == NULL) {
630        fprintf(stderr, "malloc()\n");
631    }
632    return;
633}
634
635/*
636 * Returns a suffix buffer from the freelist, if any. Should call this using
637 * suffix_from_freelist() for thread safety.
638 */
639char *do_suffix_from_freelist() {
640    char *s;
641
642    if (freesuffixcurr > 0) {
643        s = freesuffix[--freesuffixcurr];
644    } else {
645        /* If malloc fails, let the logic fall through without spamming
646         * STDERR on the server. */
647        s = malloc( SUFFIX_SIZE );
648    }
649
650    return s;
651}
652
653/*
654 * Adds a connection to the freelist. 0 = success. Should call this using
655 * conn_add_to_freelist() for thread safety.
656 */
657bool do_suffix_add_to_freelist(char *s) {
658    if (freesuffixcurr < freesuffixtotal) {
659        freesuffix[freesuffixcurr++] = s;
660        return false;
661    } else {
662        /* try to enlarge free connections array */
663        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
664        if (new_freesuffix) {
665            freesuffixtotal *= 2;
666            freesuffix = new_freesuffix;
667            freesuffix[freesuffixcurr++] = s;
668            return false;
669        }
670    }
671    return true;
672}
673
674/*
675 * Ensures that there is room for another struct iovec in a connection's
676 * iov list.
677 *
678 * Returns 0 on success, -1 on out-of-memory.
679 */
680static int ensure_iov_space(conn *c) {
681    assert(c != NULL);
682
683    if (c->iovused >= c->iovsize) {
684        int i, iovnum;
685        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
686                                (c->iovsize * 2) * sizeof(struct iovec));
687        if (! new_iov)
688            return -1;
689        c->iov = new_iov;
690        c->iovsize *= 2;
691
692        /* Point all the msghdr structures at the new list. */
693        for (i = 0, iovnum = 0; i < c->msgused; i++) {
694            c->msglist[i].msg_iov = &c->iov[iovnum];
695            iovnum += c->msglist[i].msg_iovlen;
696        }
697    }
698
699    return 0;
700}
701
702
703/*
704 * Adds data to the list of pending data that will be written out to a
705 * connection.
706 *
707 * Returns 0 on success, -1 on out-of-memory.
708 */
709
710static int add_iov(conn *c, const void *buf, int len) {
711    struct msghdr *m;
712    int leftover;
713    bool limit_to_mtu;
714
715    assert(c != NULL);
716
717    do {
718        m = &c->msglist[c->msgused - 1];
719
720        /*
721         * Limit UDP packets, and the first payloads of TCP replies, to
722         * UDP_MAX_PAYLOAD_SIZE bytes.
723         */
724        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
725
726        /* We may need to start a new msghdr if this one is full. */
727        if (m->msg_iovlen == IOV_MAX ||
728            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
729            add_msghdr(c);
730            m = &c->msglist[c->msgused - 1];
731        }
732
733        if (ensure_iov_space(c) != 0)
734            return -1;
735
736        /* If the fragment is too big to fit in the datagram, split it up */
737        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
738            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
739            len -= leftover;
740        } else {
741            leftover = 0;
742        }
743
744        m = &c->msglist[c->msgused - 1];
745        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
746        m->msg_iov[m->msg_iovlen].iov_len = len;
747
748        c->msgbytes += len;
749        c->iovused++;
750        m->msg_iovlen++;
751
752        buf = ((char *)buf) + len;
753        len = leftover;
754    } while (leftover > 0);
755
756    return 0;
757}
758
759
760/*
761 * Constructs a set of UDP headers and attaches them to the outgoing messages.
762 */
763static int build_udp_headers(conn *c) {
764    int i;
765    unsigned char *hdr;
766
767    assert(c != NULL);
768
769    if (c->msgused > c->hdrsize) {
770        void *new_hdrbuf;
771        if (c->hdrbuf)
772            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
773        else
774            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
775        if (! new_hdrbuf)
776            return -1;
777        c->hdrbuf = (unsigned char *)new_hdrbuf;
778        c->hdrsize = c->msgused * 2;
779    }
780
781    hdr = c->hdrbuf;
782    for (i = 0; i < c->msgused; i++) {
783        c->msglist[i].msg_iov[0].iov_base = hdr;
784        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
785        *hdr++ = c->request_id / 256;
786        *hdr++ = c->request_id % 256;
787        *hdr++ = i / 256;
788        *hdr++ = i % 256;
789        *hdr++ = c->msgused / 256;
790        *hdr++ = c->msgused % 256;
791        *hdr++ = 0;
792        *hdr++ = 0;
793        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
794    }
795
796    return 0;
797}
798
799
800static void out_string(conn *c, const char *str) {
801    size_t len;
802
803    assert(c != NULL);
804
805    if (c->noreply) {
806        if (settings.verbose > 1)
807            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
808        c->noreply = false;
809        conn_set_state(c, conn_read);
810        return;
811    }
812
813    if (settings.verbose > 1)
814        fprintf(stderr, ">%d %s\n", c->sfd, str);
815
816    len = strlen(str);
817    if ((len + 2) > c->wsize) {
818        /* ought to be always enough. just fail for simplicity */
819        str = "SERVER_ERROR output line too long";
820        len = strlen(str);
821    }
822
823    memcpy(c->wbuf, str, len);
824    memcpy(c->wbuf + len, "\r\n", 3);
825    c->wbytes = len + 2;
826    c->wcurr = c->wbuf;
827
828    conn_set_state(c, conn_write);
829    c->write_and_go = get_init_state(c);
830    return;
831}
832
833/*
834 * we get here after reading the value in set/add/replace commands. The command
835 * has been stored in c->item_comm, and the item is ready in c->item.
836 */
837static void complete_nread_ascii(conn *c) {
838    assert(c != NULL);
839
840    item *it = c->item;
841    int comm = c->item_comm;
842    int ret;
843
844    STATS_LOCK();
845    stats.set_cmds++;
846    STATS_UNLOCK();
847
848    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
849        out_string(c, "CLIENT_ERROR bad data chunk");
850    } else {
851      ret = store_item(it, comm);
852      if (ret == 1)
853          out_string(c, "STORED");
854      else if(ret == 2)
855          out_string(c, "EXISTS");
856      else if(ret == 3)
857          out_string(c, "NOT_FOUND");
858      else
859          out_string(c, "NOT_STORED");
860    }
861
862    item_remove(c->item);       /* release the c->item reference */
863    c->item = 0;
864}
865
866static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
867    int i = 0;
868    uint32_t *res_header;
869
870    assert(c);
871    assert(body_len >= 0);
872
873    c->msgcurr = 0;
874    c->msgused = 0;
875    c->iovused = 0;
876    if (add_msghdr(c) != 0) {
877        /* XXX:  out_string is inappropriate here */
878        out_string(c, "SERVER_ERROR out of memory");
879        return;
880    }
881
882    res_header = (uint32_t *)c->wbuf;
883
884    res_header[0] = BIN_RES_MAGIC << 24;
885    res_header[0] |= ((0xff & c->cmd) << 16);
886    res_header[0] |= err & 0xffff;
887
888    res_header[1] = hdr_len << 24;
889    /* TODO:  Support datatype */
890    res_header[2] = body_len;
891    res_header[3] = c->opaque;
892
893    if(settings.verbose > 1) {
894        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
895            res_header[0], res_header[1], res_header[2], res_header[3]);
896    }
897
898    for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
899        res_header[i] = htonl(res_header[i]);
900    }
901
902    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
903    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
904}
905
906static void write_bin_error(conn *c, int err, int swallow) {
907    char *errstr = "Unknown error";
908    switch(err) {
909        case ERR_UNKNOWN_CMD:
910            errstr = "Unknown command";
911            break;
912        case ERR_NOT_FOUND:
913            errstr = "Not found";
914            break;
915        case ERR_INVALID_ARGUMENTS:
916            errstr = "Invalid arguments";
917            break;
918        case ERR_EXISTS:
919            errstr = "Data exists for key.";
920            break;
921        case ERR_TOO_LARGE:
922            errstr = "Too large.";
923            break;
924        case ERR_NOT_STORED:
925            errstr = "Not stored.";
926            break;
927        default:
928            errstr = "UNHANDLED ERROR";
929            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
930    }
931    if(settings.verbose > 0) {
932        fprintf(stderr, "Writing an error:  %s\n", errstr);
933    }
934    add_bin_header(c, err, 0, strlen(errstr));
935    add_iov(c, errstr, strlen(errstr));
936
937    conn_set_state(c, conn_mwrite);
938    if(swallow > 0) {
939        c->sbytes = swallow;
940        c->write_and_go = conn_swallow;
941    } else {
942        c->write_and_go = conn_bin_init;
943    }
944}
945
946/* Form and send a response to a command over the binary protocol */
947static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
948    add_bin_header(c, 0, hlen, dlen);
949    if(dlen > 0) {
950        add_iov(c, d, dlen);
951    }
952    conn_set_state(c, conn_mwrite);
953    c->write_and_go = conn_bin_init;
954}
955
956/* Byte swap a 64-bit number */
957static int64_t swap64(int64_t in) {
958#ifdef ENDIAN_LITTLE
959    /* Little endian, flip the bytes around until someone makes a faster/better
960    * way to do this. */
961    int64_t rv = 0;
962    int i = 0;
963     for(i = 0; i<8; i++) {
964        rv = (rv << 8) | (in & 0xff);
965        in >>= 8;
966     }
967    return rv;
968#else
969    /* big-endian machines don't need byte swapping */
970    return in;
971#endif
972}
973
974static void complete_incr_bin(conn *c) {
975    item *it;
976    int64_t delta;
977    uint64_t initial;
978    int32_t exptime;
979    char *key;
980    size_t nkey;
981    int i;
982    uint64_t *response_buf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
983
984    assert(c != NULL);
985
986    key = c->rbuf + BIN_INCR_HDR_LEN;
987    nkey = c->keylen;
988    key[nkey] = 0x00;
989
990    delta = swap64(*((int64_t*)(c->rbuf)));
991    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
992    exptime = ntohl(*((int*)(c->rbuf + 16)));
993
994    if(settings.verbose) {
995        fprintf(stderr, "incr ");
996        for(i = 0; i<nkey; i++) {
997            fprintf(stderr, "%c", key[i]);
998        }
999        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
1000    }
1001
1002    it = item_get(key, nkey);
1003    if (it) {
1004        /* Weird magic in add_delta forces me to pad here */
1005        char tmpbuf[INCR_MAX_STORAGE_LEN];
1006        uint64_t l = 0;
1007        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1008        tmpbuf[INCR_MAX_STORAGE_LEN] = 0x00;
1009        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1010        *response_buf = swap64(strtoull(tmpbuf, NULL, 10));
1011
1012        write_bin_response(c, response_buf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1013        item_remove(it);         /* release our reference */
1014    } else {
1015        if(exptime >= 0) {
1016            /* Save some room for the response */
1017            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1018            *response_buf = swap64(initial);
1019            it = item_alloc(key, nkey, 0, realtime(exptime),
1020                INCR_MAX_STORAGE_LEN);
1021            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1022
1023            if(store_item(it, NREAD_SET)) {
1024                write_bin_response(c, response_buf, BIN_INCR_HDR_LEN,
1025                    INCR_RES_LEN);
1026            } else {
1027                write_bin_error(c, ERR_NOT_STORED, 0);
1028            }
1029            item_remove(it);         /* release our reference */
1030        } else {
1031            write_bin_error(c, ERR_NOT_FOUND, 0);
1032        }
1033    }
1034}
1035
1036static void complete_update_bin(conn *c) {
1037    int eno = -1, ret = 0;
1038    assert(c != NULL);
1039
1040    item *it = c->item;
1041
1042    STATS_LOCK();
1043    stats.set_cmds++;
1044    STATS_UNLOCK();
1045
1046    /* We don't actually receive the trailing two characters in the bin
1047     * protocol, so we're going to just set them here */
1048    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1049    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1050
1051    switch (store_item(it, c->item_comm)) {
1052        case 1:
1053            /* Stored */
1054            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1055            break;
1056        case 2:
1057            write_bin_error(c, ERR_EXISTS, 0);
1058            break;
1059        case 3:
1060            write_bin_error(c, ERR_NOT_FOUND, 0);
1061            break;
1062        default:
1063            if(c->item_comm == NREAD_ADD) {
1064                eno = ERR_EXISTS;
1065            } else if(c->item_comm == NREAD_REPLACE) {
1066                eno = ERR_NOT_FOUND;
1067            } else {
1068                eno = ERR_NOT_STORED;
1069            }
1070            write_bin_error(c, eno, 0);
1071    }
1072
1073    item_remove(c->item);       /* release the c->item reference */
1074    c->item = 0;
1075}
1076
1077static void process_bin_get(conn *c) {
1078    item *it;
1079
1080    it = item_get(c->rbuf, c->keylen);
1081    if (it) {
1082        int *flags;
1083        uint64_t* identifier;
1084
1085        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1086
1087        /* This is a bit of magic.  I'm using wbuf as the header, so I'll place
1088        this is int in far enough to cover the header */
1089        flags = (int*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1090        *flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1091
1092        /* the length has two unnecessary bytes, and then we write four more */
1093        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1094        /* Flags */
1095        add_iov(c, flags, 4);
1096        identifier = (uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH + 4);
1097        *identifier = swap64((uint32_t)it->cas_id);
1098        add_iov(c, identifier, 8);
1099        /* bytes minus the CRLF */
1100        add_iov(c, ITEM_data(it), it->nbytes - 2);
1101        conn_set_state(c, conn_mwrite);
1102    } else {
1103        if(c->cmd == CMD_GETQ) {
1104            conn_set_state(c, conn_bin_init);
1105        } else {
1106            write_bin_error(c, ERR_NOT_FOUND, 0);
1107        }
1108    }
1109}
1110
1111static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1112    assert(c);
1113    c->substate = next_substate;
1114    c->rlbytes = c->keylen + extra;
1115    assert(c->rsize >= c->rlbytes);
1116    c->ritem = c->rbuf;
1117    conn_set_state(c, conn_nread);
1118}
1119
1120static void dispatch_bin_command(conn *c) {
1121    time_t exptime = 0;
1122    switch(c->cmd) {
1123        case CMD_VERSION:
1124            write_bin_response(c, VERSION, 0, strlen(VERSION));
1125            break;
1126        case CMD_FLUSH:
1127            set_current_time();
1128
1129            settings.oldest_live = current_time - 1;
1130            item_flush_expired();
1131            write_bin_response(c, NULL, 0, 0);
1132            break;
1133        case CMD_NOOP:
1134            write_bin_response(c, NULL, 0, 0);
1135            break;
1136        case CMD_SET:
1137            /* Fallthrough */
1138        case CMD_ADD:
1139            /* Fallthrough */
1140        case CMD_REPLACE:
1141            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1142            break;
1143        case CMD_GETQ:
1144        case CMD_GET:
1145            bin_read_key(c, bin_reading_get_key, 0);
1146            break;
1147        case CMD_DELETE:
1148            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1149            break;
1150        case CMD_INCR:
1151        case CMD_DECR:
1152            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1153            break;
1154        default:
1155            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1156    }
1157}
1158
1159static void process_bin_update(conn *c) {
1160    char *key;
1161    int nkey;
1162    int vlen;
1163    int flags;
1164    int exptime;
1165    item *it;
1166    int comm;
1167    int hdrlen = BIN_SET_HDR_LEN;
1168
1169    assert(c != NULL);
1170
1171    key = c->rbuf + hdrlen;
1172    nkey = c->keylen;
1173    key[nkey] = 0x00;
1174
1175    flags = ntohl(*((int*)(c->rbuf)));
1176    exptime = ntohl(*((int*)(c->rbuf + 4)));
1177    vlen = c->bin_header[2] - (nkey + hdrlen);
1178
1179    if(settings.verbose > 1) {
1180        fprintf(stderr, "Value len is %d\n", vlen);
1181    }
1182
1183    if (settings.detail_enabled) {
1184        stats_prefix_record_set(key);
1185    }
1186
1187    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1188
1189    if (it == 0) {
1190        if (! item_size_ok(nkey, flags, vlen + 2)) {
1191            write_bin_error(c, ERR_TOO_LARGE, vlen);
1192        } else {
1193            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1194        }
1195        /* swallow the data line */
1196        c->write_and_go = conn_swallow;
1197        return;
1198    }
1199
1200    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
1201
1202    switch(c->cmd) {
1203        case CMD_ADD:
1204            c->item_comm = NREAD_ADD;
1205            break;
1206        case CMD_SET:
1207            c->item_comm = NREAD_SET;
1208            break;
1209        case CMD_REPLACE:
1210            c->item_comm = NREAD_REPLACE;
1211            break;
1212        default:
1213            assert(0);
1214    }
1215
1216    if(it->cas_id != 0) {
1217        c->item_comm = NREAD_CAS;
1218    }
1219
1220    c->item = it;
1221    c->ritem = ITEM_data(it);
1222    c->rlbytes = vlen;
1223    conn_set_state(c, conn_nread);
1224    c->substate = bin_read_set_value;
1225}
1226
1227static void process_bin_delete(conn *c) {
1228    char *key;
1229    size_t nkey;
1230    item *it;
1231    time_t exptime = 0;
1232
1233    assert(c != NULL);
1234
1235    exptime = ntohl(*((int*)(c->rbuf)));
1236    key = c->rbuf + 4;
1237    nkey = c->keylen;
1238    key[nkey] = 0x00;
1239
1240    if(settings.verbose) {
1241        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1242    }
1243
1244    if (settings.detail_enabled) {
1245        stats_prefix_record_delete(key);
1246    }
1247
1248    it = item_get(key, nkey);
1249    if (it) {
1250        if (exptime == 0) {
1251            item_unlink(it);
1252            item_remove(it);      /* release our reference */
1253            write_bin_response(c, NULL, 0, 0);
1254        } else {
1255            /* XXX:  This is really lame, but defer_delete returns a string */
1256            char *res = defer_delete(it, exptime);
1257            if(res[0] == 'D') {
1258                write_bin_response(c, NULL, 0, 0);
1259            } else {
1260                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1261            }
1262        }
1263    } else {
1264        write_bin_error(c, ERR_NOT_FOUND, 0);
1265    }
1266}
1267
1268static void complete_nread_binary(conn *c) {
1269    assert(c != NULL);
1270
1271    if(c->cmd < 0) {
1272        /* No command defined.  Figure out what they're trying to say. */
1273        int i = 0;
1274        /* I did a bit of hard-coding around the packet sizes */
1275        assert(BIN_PKT_HDR_WORDS == 3);
1276        for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
1277            c->bin_header[i] = ntohl(c->bin_header[i]);
1278        }
1279        if(settings.verbose) {
1280            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x %08x\n",
1281                c->bin_header[0], c->bin_header[1], c->bin_header[2],
1282                c->bin_header[3]);
1283        }
1284        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
1285            if(settings.verbose) {
1286                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24);
1287            }
1288            conn_set_state(c, conn_closing);
1289            return;
1290        }
1291   
1292        c->msgcurr = 0;
1293        c->msgused = 0;
1294        c->iovused = 0;
1295        if (add_msghdr(c) != 0) {
1296            out_string(c, "SERVER_ERROR out of memory");
1297            return;
1298        }
1299   
1300        c->cmd = (c->bin_header[0] >> 16) & 0xff;
1301        c->keylen = c->bin_header[0] & 0xffff;
1302        c->opaque = c->bin_header[3];
1303        if(settings.verbose > 1) {
1304            fprintf(stderr,
1305                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd,
1306                c->opaque, c->keylen, c->bin_header[2]);
1307        }
1308        dispatch_bin_command(c);
1309    } else {
1310        switch(c->substate) {
1311            case bin_reading_set_header:
1312                process_bin_update(c);
1313                break;
1314            case bin_read_set_value:
1315                complete_update_bin(c);
1316                break;
1317            case bin_reading_get_key:
1318                process_bin_get(c);
1319                break;
1320            case bin_reading_del_header:
1321                process_bin_delete(c);
1322                break;
1323            case bin_reading_incr_header:
1324                complete_incr_bin(c);
1325                break;
1326            default:
1327                fprintf(stderr, "Not handling substate %d\n", c->substate);
1328                assert(0);
1329        }
1330    }
1331}
1332
1333static void reinit_bin_connection(conn *c) {
1334    if (settings.verbose > 1)
1335        fprintf(stderr, "*** Reinitializing binary connection.\n");
1336    c->rlbytes = MIN_BIN_PKT_LENGTH;
1337    c->write_and_go = conn_bin_init;
1338    c->cmd = -1;
1339    c->substate = bin_no_state;
1340    c->rbytes = c->wbytes = 0;
1341    c->ritem = (char*)c->bin_header;
1342    c->rcurr = c->rbuf;
1343    c->wcurr = c->wbuf;
1344    conn_shrink(c);
1345    conn_set_state(c, conn_nread);
1346}
1347
1348/* These do the initial protocol switch.  At this point, we should've read
1349 * exactly one byte, and must treat that byte as the beginning of a command. */
1350static void setup_bin_protocol(conn *c) {
1351    char *loc = (char*)c->bin_header;
1352    if (settings.verbose > 1)
1353        fprintf(stderr, "Negotiated protocol as binary.\n");
1354
1355    c->protocol = binary_prot;
1356    reinit_bin_connection(c);
1357    /* Emulate a read of the first byte */
1358    c->ritem[0] = c->rbuf[0];
1359    c->ritem++;
1360    c->rlbytes--;
1361}
1362
1363static void setup_ascii_protocol(conn *c) {
1364    if (settings.verbose > 1)
1365        fprintf(stderr, "Negotiated protocol as ascii.\n");
1366    c->protocol = ascii_prot;
1367
1368    /* We've already got the first letter of the command, so pretend like we
1369     * Did a single byte read from try_read_command */
1370    c->rcurr = c->rbuf;
1371    c->rbytes = 1;
1372    conn_set_state(c, conn_read);
1373}
1374
1375static void complete_nread(conn *c) {
1376    assert(c != NULL);
1377
1378    if(c->protocol == ascii_prot) {
1379        complete_nread_ascii(c);
1380    } else if(c->protocol == binary_prot) {
1381        complete_nread_binary(c);
1382    } else if(c->protocol == negotiating_prot) {
1383        /* The first byte is either BIN_REQ_MAGIC, or we're speaking ascii */
1384        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC)
1385            setup_bin_protocol(c);
1386        else
1387            setup_ascii_protocol(c);
1388    } else {
1389        assert(0); /* XXX:  Invalid case.  Should probably do more here. */
1390    }
1391}
1392
1393/*
1394 * Stores an item in the cache according to the semantics of one of the set
1395 * commands. In threaded mode, this is protected by the cache lock.
1396 *
1397 * Returns true if the item was stored.
1398 */
1399int do_store_item(item *it, int comm) {
1400    char *key = ITEM_key(it);
1401    bool delete_locked = false;
1402    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1403    int stored = 0;
1404
1405    item *new_it = NULL;
1406    int flags;
1407
1408    if (old_it != NULL && comm == NREAD_ADD) {
1409        /* add only adds a nonexistent item, but promote to head of LRU */
1410        do_item_update(old_it);
1411    } else if (!old_it && (comm == NREAD_REPLACE
1412        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1413    {
1414        /* replace only replaces an existing value; don't store */
1415    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1416        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1417    {
1418        /* replace and add can't override delete locks; don't store */
1419    } else if (comm == NREAD_CAS) {
1420        /* validate cas operation */
1421        if (delete_locked)
1422            old_it = do_item_get_nocheck(key, it->nkey);
1423
1424        if(old_it == NULL) {
1425          // LRU expired
1426          stored = 3;
1427        }
1428        else if(it->cas_id == old_it->cas_id) {
1429          // cas validates
1430          do_item_replace(old_it, it);
1431          stored = 1;
1432        } else {
1433          if(settings.verbose > 1) {
1434            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1435                old_it->cas_id, it->cas_id);
1436          }
1437          stored = 2;
1438        }
1439    } else {
1440        /*
1441         * Append - combine new and old record into single one. Here it's
1442         * atomic and thread-safe.
1443         */
1444
1445        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1446
1447            /* we have it and old_it here - alloc memory to hold both */
1448            /* flags was already lost - so recover them from ITEM_suffix(it) */
1449
1450            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1451
1452            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1453
1454            if (new_it == NULL) {
1455                /* SERVER_ERROR out of memory */
1456                return 0;
1457            }
1458
1459            /* copy data from it and old_it to new_it */
1460
1461            if (comm == NREAD_APPEND) {
1462                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1463                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1464            } else {
1465                /* NREAD_PREPEND */
1466                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1467                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1468            }
1469
1470            it = new_it;
1471        }
1472
1473        /* "set" commands can override the delete lock
1474           window... in which case we have to find the old hidden item
1475           that's in the namespace/LRU but wasn't returned by
1476           item_get.... because we need to replace it */
1477        if (delete_locked)
1478            old_it = do_item_get_nocheck(key, it->nkey);
1479
1480        if (old_it != NULL)
1481            do_item_replace(old_it, it);
1482        else
1483            do_item_link(it);
1484
1485        stored = 1;
1486    }
1487
1488    if (old_it != NULL)
1489        do_item_remove(old_it);         /* release our reference */
1490    if (new_it != NULL)
1491        do_item_remove(new_it);
1492
1493    return stored;
1494}
1495
1496typedef struct token_s {
1497    char *value;
1498    size_t length;
1499} token_t;
1500
1501#define COMMAND_TOKEN 0
1502#define SUBCOMMAND_TOKEN 1
1503#define KEY_TOKEN 1
1504#define KEY_MAX_LENGTH 250
1505
1506#define MAX_TOKENS 8
1507
1508/*
1509 * Tokenize the command string by replacing whitespace with '\0' and update
1510 * the token array tokens with pointer to start of each token and length.
1511 * Returns total number of tokens.  The last valid token is the terminal
1512 * token (value points to the first unprocessed character of the string and
1513 * length zero).
1514 *
1515 * Usage example:
1516 *
1517 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1518 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1519 *          ...
1520 *      }
1521 *      ncommand = tokens[ix].value - command;
1522 *      command  = tokens[ix].value;
1523 *   }
1524 */
1525static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1526    char *s, *e;
1527    size_t ntokens = 0;
1528
1529    assert(command != NULL && tokens != NULL && max_tokens > 1);
1530
1531    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1532        if (*e == ' ') {
1533            if (s != e) {
1534                tokens[ntokens].value = s;
1535                tokens[ntokens].length = e - s;
1536                ntokens++;
1537                *e = '\0';
1538            }
1539            s = e + 1;
1540        }
1541        else if (*e == '\0') {
1542            if (s != e) {
1543                tokens[ntokens].value = s;
1544                tokens[ntokens].length = e - s;
1545                ntokens++;
1546            }
1547
1548            break; /* string end */
1549        }
1550    }
1551
1552    /*
1553     * If we scanned the whole string, the terminal value pointer is null,
1554     * otherwise it is the first unprocessed character.
1555     */
1556    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1557    tokens[ntokens].length = 0;
1558    ntokens++;
1559
1560    return ntokens;
1561}
1562
1563/* set up a connection to write a buffer then free it, used for stats */
1564static void write_and_free(conn *c, char *buf, int bytes) {
1565    if (buf) {
1566        c->write_and_free = buf;
1567        c->wcurr = buf;
1568        c->wbytes = bytes;
1569        conn_set_state(c, conn_write);
1570        c->write_and_go = get_init_state(c);
1571    } else {
1572        out_string(c, "SERVER_ERROR out of memory writing stats");
1573    }
1574}
1575
1576static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1577{
1578    int noreply_index = ntokens - 2;
1579
1580    /*
1581      NOTE: this function is not the first place where we are going to
1582      send the reply.  We could send it instead from process_command()
1583      if the request line has wrong number of tokens.  However parsing
1584      malformed line for "noreply" option is not reliable anyway, so
1585      it can't be helped.
1586    */
1587    if (tokens[noreply_index].value
1588        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1589        c->noreply = true;
1590    }
1591}
1592
1593inline static void process_stats_detail(conn *c, const char *command) {
1594    assert(c != NULL);
1595
1596    if (strcmp(command, "on") == 0) {
1597        settings.detail_enabled = 1;
1598        out_string(c, "OK");
1599    }
1600    else if (strcmp(command, "off") == 0) {
1601        settings.detail_enabled = 0;
1602        out_string(c, "OK");
1603    }
1604    else if (strcmp(command, "dump") == 0) {
1605        int len;
1606        char *stats = stats_prefix_dump(&len);
1607        write_and_free(c, stats, len);
1608    }
1609    else {
1610        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1611    }
1612}
1613
1614static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1615    rel_time_t now = current_time;
1616    char *command;
1617    char *subcommand;
1618
1619    assert(c != NULL);
1620
1621    if(ntokens < 2) {
1622        out_string(c, "CLIENT_ERROR bad command line");
1623        return;
1624    }
1625
1626    command = tokens[COMMAND_TOKEN].value;
1627
1628    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1629        char temp[1024];
1630        pid_t pid = getpid();
1631        char *pos = temp;
1632
1633#ifndef WIN32
1634        struct rusage usage;
1635        getrusage(RUSAGE_SELF, &usage);
1636#endif /* !WIN32 */
1637
1638        STATS_LOCK();
1639        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1640        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1641        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1642        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1643        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1644#ifndef WIN32
1645        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1646        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1647#endif /* !WIN32 */
1648        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1649        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1650        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1651        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1652        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1653        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1654        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1655        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1656        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1657        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1658        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1659        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1660        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1661        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1662        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1663        pos += sprintf(pos, "END");
1664        STATS_UNLOCK();
1665        out_string(c, temp);
1666        return;
1667    }
1668
1669    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1670
1671    if (strcmp(subcommand, "reset") == 0) {
1672        stats_reset();
1673        out_string(c, "RESET");
1674        return;
1675    }
1676
1677#ifdef HAVE_MALLOC_H
1678#ifdef HAVE_STRUCT_MALLINFO
1679    if (strcmp(subcommand, "malloc") == 0) {
1680        char temp[512];
1681        struct mallinfo info;
1682        char *pos = temp;
1683
1684        info = mallinfo();
1685        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1686        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1687        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1688        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1689        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1690        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1691        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1692        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1693        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1694        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1695        out_string(c, temp);
1696        return;
1697    }
1698#endif /* HAVE_STRUCT_MALLINFO */
1699#endif /* HAVE_MALLOC_H */
1700
1701#if !defined(WIN32) || !defined(__APPLE__)
1702    if (strcmp(subcommand, "maps") == 0) {
1703        char *wbuf;
1704        int wsize = 8192; /* should be enough */
1705        int fd;
1706        int res;
1707
1708        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1709            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1710            return;
1711        }
1712
1713        fd = open("/proc/self/maps", O_RDONLY);
1714        if (fd == -1) {
1715            out_string(c, "SERVER_ERROR cannot open the maps file");
1716            free(wbuf);
1717            return;
1718        }
1719
1720        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1721        if (res == wsize - 6) {
1722            out_string(c, "SERVER_ERROR buffer overflow");
1723            free(wbuf); close(fd);
1724            return;
1725        }
1726        if (res == 0 || res == -1) {
1727            out_string(c, "SERVER_ERROR can't read the maps file");
1728            free(wbuf); close(fd);
1729            return;
1730        }
1731        memcpy(wbuf + res, "END\r\n", 5);
1732        write_and_free(c, wbuf, res + 5);
1733        close(fd);
1734        return;
1735    }
1736#endif
1737
1738    if (strcmp(subcommand, "cachedump") == 0) {
1739
1740        char *buf;
1741        unsigned int bytes, id, limit = 0;
1742
1743        if(ntokens < 5) {
1744            out_string(c, "CLIENT_ERROR bad command line");
1745            return;
1746        }
1747
1748        id = strtoul(tokens[2].value, NULL, 10);
1749        limit = strtoul(tokens[3].value, NULL, 10);
1750
1751        if(errno == ERANGE) {
1752            out_string(c, "CLIENT_ERROR bad command line format");
1753            return;
1754        }
1755
1756        buf = item_cachedump(id, limit, &bytes);
1757        write_and_free(c, buf, bytes);
1758        return;
1759    }
1760
1761    if (strcmp(subcommand, "slabs") == 0) {
1762        int bytes = 0;
1763        char *buf = slabs_stats(&bytes);
1764        write_and_free(c, buf, bytes);
1765        return;
1766    }
1767
1768    if (strcmp(subcommand, "items") == 0) {
1769        int bytes = 0;
1770        char *buf = item_stats(&bytes);
1771        write_and_free(c, buf, bytes);
1772        return;
1773    }
1774
1775    if (strcmp(subcommand, "detail") == 0) {
1776        if (ntokens < 4)
1777            process_stats_detail(c, "");  /* outputs the error message */
1778        else
1779            process_stats_detail(c, tokens[2].value);
1780        return;
1781    }
1782
1783    if (strcmp(subcommand, "sizes") == 0) {
1784        int bytes = 0;
1785        char *buf = item_stats_sizes(&bytes);
1786        write_and_free(c, buf, bytes);
1787        return;
1788    }
1789
1790    out_string(c, "ERROR");
1791}
1792
1793/* ntokens is overwritten here... shrug.. */
1794static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1795    char *key;
1796    size_t nkey;
1797    int i = 0;
1798    item *it;
1799    token_t *key_token = &tokens[KEY_TOKEN];
1800    char *suffix;
1801    int stats_get_cmds   = 0;
1802    int stats_get_hits   = 0;
1803    int stats_get_misses = 0;
1804    assert(c != NULL);
1805
1806    if (settings.managed) {
1807        int bucket = c->bucket;
1808        if (bucket == -1) {
1809            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1810            return;
1811        }
1812        c->bucket = -1;
1813        if (buckets[bucket] != c->gen) {
1814            out_string(c, "ERROR_NOT_OWNER");
1815            return;
1816        }
1817    }
1818
1819    do {
1820        while(key_token->length != 0) {
1821
1822            key = key_token->value;
1823            nkey = key_token->length;
1824
1825            if(nkey > KEY_MAX_LENGTH) {
1826                STATS_LOCK();
1827                stats.get_cmds   += stats_get_cmds;
1828                stats.get_hits   += stats_get_hits;
1829                stats.get_misses += stats_get_misses;
1830                STATS_UNLOCK();
1831                out_string(c, "CLIENT_ERROR bad command line format");
1832                return;
1833            }
1834
1835            stats_get_cmds++;
1836            it = item_get(key, nkey);
1837            if (settings.detail_enabled) {
1838                stats_prefix_record_get(key, NULL != it);
1839            }
1840            if (it) {
1841                if (i >= c->isize) {
1842                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1843                    if (new_list) {
1844                        c->isize *= 2;
1845                        c->ilist = new_list;
1846                    } else break;
1847                }
1848
1849                /*
1850                 * Construct the response. Each hit adds three elements to the
1851                 * outgoing data list:
1852                 *   "VALUE "
1853                 *   key
1854                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1855                 */
1856
1857                if(return_cas == true)
1858                {
1859                  /* Goofy mid-flight realloc. */
1860                  if (i >= c->suffixsize) {
1861                    char **new_suffix_list = realloc(c->suffixlist,
1862                                           sizeof(char *) * c->suffixsize * 2);
1863                    if (new_suffix_list) {
1864                      c->suffixsize *= 2;
1865                      c->suffixlist  = new_suffix_list;
1866                    } else break;
1867                  }
1868
1869                  suffix = suffix_from_freelist();
1870                  if (suffix == NULL) {
1871                    STATS_LOCK();
1872                    stats.get_cmds   += stats_get_cmds;
1873                    stats.get_hits   += stats_get_hits;
1874                    stats.get_misses += stats_get_misses;
1875                    STATS_UNLOCK();
1876                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1877                    return;
1878                  }
1879                  *(c->suffixlist + i) = suffix;
1880                  sprintf(suffix, " %llu\r\n", it->cas_id);
1881                  if (add_iov(c, "VALUE ", 6) != 0 ||
1882                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1883                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1884                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1885                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1886                      {
1887                          break;
1888                      }
1889                }
1890                else
1891                {
1892                  if (add_iov(c, "VALUE ", 6) != 0 ||
1893                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1894                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1895                      {
1896                          break;
1897                      }
1898                }
1899
1900
1901                if (settings.verbose > 1)
1902                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1903
1904                /* item_get() has incremented it->refcount for us */
1905                stats_get_hits++;
1906                item_update(it);
1907                *(c->ilist + i) = it;
1908                i++;
1909
1910            } else {
1911                stats_get_misses++;
1912            }
1913
1914            key_token++;
1915        }
1916
1917        /*
1918         * If the command string hasn't been fully processed, get the next set
1919         * of tokens.
1920         */
1921        if(key_token->value != NULL) {
1922            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1923            key_token = tokens;
1924        }
1925
1926    } while(key_token->value != NULL);
1927
1928    c->icurr = c->ilist;
1929    c->ileft = i;
1930    if (return_cas) {
1931        c->suffixcurr = c->suffixlist;
1932        c->suffixleft = i;
1933    }
1934
1935    if (settings.verbose > 1)
1936        fprintf(stderr, ">%d END\n", c->sfd);
1937
1938    /*
1939        If the loop was terminated because of out-of-memory, it is not
1940        reliable to add END\r\n to the buffer, because it might not end
1941        in \r\n. So we send SERVER_ERROR instead.
1942    */
1943    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1944        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1945        out_string(c, "SERVER_ERROR out of memory writing get response");
1946    }
1947    else {
1948        conn_set_state(c, conn_mwrite);
1949        c->msgcurr = 0;
1950    }
1951
1952    STATS_LOCK();
1953    stats.get_cmds   += stats_get_cmds;
1954    stats.get_hits   += stats_get_hits;
1955    stats.get_misses += stats_get_misses;
1956    STATS_UNLOCK();
1957
1958    return;
1959}
1960
1961static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1962    char *key;
1963    size_t nkey;
1964    int flags;
1965    time_t exptime;
1966    int vlen, old_vlen;
1967    uint64_t req_cas_id;
1968    item *it, *old_it;
1969
1970    assert(c != NULL);
1971
1972    set_noreply_maybe(c, tokens, ntokens);
1973
1974    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1975        out_string(c, "CLIENT_ERROR bad command line format");
1976        return;
1977    }
1978
1979    key = tokens[KEY_TOKEN].value;
1980    nkey = tokens[KEY_TOKEN].length;
1981
1982    flags = strtoul(tokens[2].value, NULL, 10);
1983    exptime = strtol(tokens[3].value, NULL, 10);
1984    vlen = strtol(tokens[4].value, NULL, 10);
1985
1986    // does cas value exist?
1987    if(handle_cas)
1988    {
1989      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1990    }
1991
1992    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1993        out_string(c, "CLIENT_ERROR bad command line format");
1994        return;
1995    }
1996
1997    if (settings.detail_enabled) {
1998        stats_prefix_record_set(key);
1999    }
2000
2001    if (settings.managed) {
2002        int bucket = c->bucket;
2003        if (bucket == -1) {
2004            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2005            return;
2006        }
2007        c->bucket = -1;
2008        if (buckets[bucket] != c->gen) {
2009            out_string(c, "ERROR_NOT_OWNER");
2010            return;
2011        }
2012    }
2013
2014    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
2015
2016    if (it == 0) {
2017        if (! item_size_ok(nkey, flags, vlen + 2))
2018            out_string(c, "SERVER_ERROR object too large for cache");
2019        else
2020            out_string(c, "SERVER_ERROR out of memory storing object");
2021        /* swallow the data line */
2022        c->write_and_go = conn_swallow;
2023        c->sbytes = vlen + 2;
2024        return;
2025    }
2026    if(handle_cas)
2027      it->cas_id = req_cas_id;
2028
2029    c->item = it;
2030    c->ritem = ITEM_data(it);
2031    c->rlbytes = it->nbytes;
2032    c->item_comm = comm;
2033    conn_set_state(c, conn_nread);
2034}
2035
2036static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2037    char temp[sizeof("18446744073709551615")];
2038    item *it;
2039    int64_t delta;
2040    char *key;
2041    size_t nkey;
2042
2043    assert(c != NULL);
2044
2045    set_noreply_maybe(c, tokens, ntokens);
2046
2047    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2048        out_string(c, "CLIENT_ERROR bad command line format");
2049        return;
2050    }
2051
2052    key = tokens[KEY_TOKEN].value;
2053    nkey = tokens[KEY_TOKEN].length;
2054
2055    if (settings.managed) {
2056        int bucket = c->bucket;
2057        if (bucket == -1) {
2058            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2059            return;
2060        }
2061        c->bucket = -1;
2062        if (buckets[bucket] != c->gen) {
2063            out_string(c, "ERROR_NOT_OWNER");
2064            return;
2065        }
2066    }
2067
2068    delta = strtoll(tokens[2].value, NULL, 10);
2069
2070    if(errno == ERANGE) {
2071        out_string(c, "CLIENT_ERROR bad command line format");
2072        return;
2073    }
2074
2075    it = item_get(key, nkey);
2076    if (!it) {
2077        out_string(c, "NOT_FOUND");
2078        return;
2079    }
2080
2081    out_string(c, add_delta(it, incr, delta, temp));
2082    item_remove(it);         /* release our reference */
2083}
2084
2085/*
2086 * adds a delta value to a numeric item.
2087 *
2088 * it    item to adjust
2089 * incr  true to increment value, false to decrement
2090 * delta amount to adjust value by
2091 * buf   buffer for response string
2092 *
2093 * returns a response string to send back to the client.
2094 */
2095char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2096    char *ptr;
2097    int64_t value;
2098    int res;
2099
2100    ptr = ITEM_data(it);
2101    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2102
2103    value = strtoull(ptr, NULL, 10);
2104
2105    if(errno == ERANGE) {
2106        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2107    }
2108
2109    if (incr)
2110        value += delta;
2111    else {
2112        value -= delta;
2113    }
2114    if(value < 0) {
2115        value = 0;
2116    }
2117    sprintf(buf, "%llu", value);
2118    res = strlen(buf);
2119    if (res + 2 > it->nbytes) { /* need to realloc */
2120        item *new_it;
2121        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2122        if (new_it == 0) {
2123            return "SERVER_ERROR out of memory in incr/decr";
2124        }
2125        memcpy(ITEM_data(new_it), buf, res);
2126        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2127        do_item_replace(it, new_it);
2128        do_item_remove(new_it);       /* release our reference */
2129    } else { /* replace in-place */
2130        memcpy(ITEM_data(it), buf, res);
2131        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2132    }
2133
2134    return buf;
2135}
2136
2137static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2138    char *key;
2139    size_t nkey;
2140    item *it;
2141    time_t exptime = 0;
2142
2143    assert(c != NULL);
2144
2145    set_noreply_maybe(c, tokens, ntokens);
2146
2147    if (settings.managed) {
2148        int bucket = c->bucket;
2149        if (bucket == -1) {
2150            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2151            return;
2152        }
2153        c->bucket = -1;
2154        if (buckets[bucket] != c->gen) {
2155            out_string(c, "ERROR_NOT_OWNER");
2156            return;
2157        }
2158    }
2159
2160    key = tokens[KEY_TOKEN].value;
2161    nkey = tokens[KEY_TOKEN].length;
2162
2163    if(nkey > KEY_MAX_LENGTH) {
2164        out_string(c, "CLIENT_ERROR bad command line format");
2165        return;
2166    }
2167
2168    if(ntokens == (c->noreply ? 5 : 4)) {
2169        exptime = strtol(tokens[2].value, NULL, 10);
2170
2171        if(errno == ERANGE) {
2172            out_string(c, "CLIENT_ERROR bad command line format");
2173            return;
2174        }
2175    }
2176
2177    if (settings.detail_enabled) {
2178        stats_prefix_record_delete(key);
2179    }
2180
2181    it = item_get(key, nkey);
2182    if (it) {
2183        if (exptime == 0) {
2184            item_unlink(it);
2185            item_remove(it);      /* release our reference */
2186            out_string(c, "DELETED");
2187        } else {
2188            /* our reference will be transfered to the delete queue */
2189            out_string(c, defer_delete(it, exptime));
2190        }
2191    } else {
2192        out_string(c, "NOT_FOUND");
2193    }
2194}
2195
2196/*
2197 * Adds an item to the deferred-delete list so it can be reaped later.
2198 *
2199 * Returns the result to send to the client.
2200 */
2201char *do_defer_delete(item *it, time_t exptime)
2202{
2203    if (delcurr >= deltotal) {
2204        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2205        if (new_delete) {
2206            todelete = new_delete;
2207            deltotal *= 2;
2208        } else {
2209            /*
2210             * can't delete it immediately, user wants a delay,
2211             * but we ran out of memory for the delete queue
2212             */
2213            item_remove(it);    /* release reference */
2214            return "SERVER_ERROR out of memory expanding delete queue";
2215        }
2216    }
2217
2218    /* use its expiration time as its deletion time now */
2219    it->exptime = realtime(exptime);
2220    it->it_flags |= ITEM_DELETED;
2221    todelete[delcurr++] = it;
2222
2223    return "DELETED";
2224}
2225
2226static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2227    unsigned int level;
2228
2229    assert(c != NULL);
2230
2231    set_noreply_maybe(c, tokens, ntokens);
2232
2233    level = strtoul(tokens[1].value, NULL, 10);
2234    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2235    out_string(c, "OK");
2236    return;
2237}
2238
2239static void process_command(conn *c, char *command) {
2240
2241    token_t tokens[MAX_TOKENS];
2242    size_t ntokens;
2243    int comm;
2244
2245    assert(c != NULL);
2246
2247    if (settings.verbose > 1)
2248        fprintf(stderr, "<%d %s\n", c->sfd, command);
2249
2250    /*
2251     * for commands set/add/replace, we build an item and read the data
2252     * directly into it, then continue in nread_complete().
2253     */
2254
2255    c->msgcurr = 0;
2256    c->msgused = 0;
2257    c->iovused = 0;
2258    if (add_msghdr(c) != 0) {
2259        out_string(c, "SERVER_ERROR out of memory preparing response");
2260        return;
2261    }
2262
2263    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2264    if (ntokens >= 3 &&
2265        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2266         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2267
2268        process_get_command(c, tokens, ntokens, false);
2269
2270    } else if ((ntokens == 6 || ntokens == 7) &&
2271               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2272                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2273                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2274                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2275                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2276
2277        process_update_command(c, tokens, ntokens, comm, false);
2278
2279    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2280
2281        process_update_command(c, tokens, ntokens, comm, true);
2282
2283    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2284
2285        process_arithmetic_command(c, tokens, ntokens, 1);
2286
2287    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2288
2289        process_get_command(c, tokens, ntokens, true);
2290
2291    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2292
2293        process_arithmetic_command(c, tokens, ntokens, 0);
2294
2295    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2296
2297        process_delete_command(c, tokens, ntokens);
2298
2299    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2300        unsigned int bucket, gen;
2301        if (!settings.managed) {
2302            out_string(c, "CLIENT_ERROR not a managed instance");
2303            return;
2304        }
2305
2306        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2307            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2308                out_string(c, "CLIENT_ERROR bucket number out of range");
2309                return;
2310            }
2311            buckets[bucket] = gen;
2312            out_string(c, "OWNED");
2313            return;
2314        } else {
2315            out_string(c, "CLIENT_ERROR bad format");
2316            return;
2317        }
2318
2319    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2320
2321        int bucket;
2322        if (!settings.managed) {
2323            out_string(c, "CLIENT_ERROR not a managed instance");
2324            return;
2325        }
2326        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2327            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2328                out_string(c, "CLIENT_ERROR bucket number out of range");
2329                return;
2330            }
2331            buckets[bucket] = 0;
2332            out_string(c, "DISOWNED");
2333            return;
2334        } else {
2335            out_string(c, "CLIENT_ERROR bad format");
2336            return;
2337        }
2338
2339    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2340        int bucket, gen;
2341        if (!settings.managed) {
2342            out_string(c, "CLIENT_ERROR not a managed instance");
2343            return;
2344        }
2345        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2346            /* we never write anything back, even if input's wrong */
2347            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2348                /* do nothing, bad input */
2349            } else {
2350                c->bucket = bucket;
2351                c->gen = gen;
2352            }
2353            conn_set_init_state(c);
2354            return;
2355        } else {
2356            out_string(c, "CLIENT_ERROR bad format");
2357            return;
2358        }
2359
2360    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2361
2362        process_stat(c, tokens, ntokens);
2363
2364    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2365        time_t exptime = 0;
2366        set_current_time();
2367
2368        set_noreply_maybe(c, tokens, ntokens);
2369
2370        if(ntokens == (c->noreply ? 3 : 2)) {
2371            settings.oldest_live = current_time - 1;
2372            item_flush_expired();
2373            out_string(c, "OK");
2374            return;
2375        }
2376
2377        exptime = strtol(tokens[1].value, NULL, 10);
2378        if(errno == ERANGE) {
2379            out_string(c, "CLIENT_ERROR bad command line format");
2380            return;
2381        }
2382
2383        /*
2384          If exptime is zero realtime() would return zero too, and
2385          realtime(exptime) - 1 would overflow to the max unsigned
2386          value.  So we process exptime == 0 the same way we do when
2387          no delay is given at all.
2388        */
2389        if (exptime > 0)
2390            settings.oldest_live = realtime(exptime) - 1;
2391        else /* exptime == 0 */
2392            settings.oldest_live = current_time - 1;
2393        item_flush_expired();
2394        out_string(c, "OK");
2395        return;
2396
2397    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2398
2399        out_string(c, "VERSION " VERSION);
2400
2401    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2402
2403        conn_set_state(c, conn_closing);
2404
2405    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2406                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2407#ifdef ALLOW_SLABS_REASSIGN
2408
2409        int src, dst, rv;
2410
2411        src = strtol(tokens[2].value, NULL, 10);
2412        dst  = strtol(tokens[3].value, NULL, 10);
2413
2414        if(errno == ERANGE) {
2415            out_string(c, "CLIENT_ERROR bad command line format");
2416            return;
2417        }
2418
2419        rv = slabs_reassign(src, dst);
2420        if (rv == 1) {
2421            out_string(c, "DONE");
2422            return;
2423        }
2424        if (rv == 0) {
2425            out_string(c, "CANT");
2426            return;
2427        }
2428        if (rv == -1) {
2429            out_string(c, "BUSY");
2430            return;
2431        }
2432#else
2433        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2434#endif
2435    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2436        process_verbosity_command(c, tokens, ntokens);
2437    } else {
2438        out_string(c, "ERROR");
2439    }
2440    return;
2441}
2442
2443/*
2444 * if we have a complete line in the buffer, process it.
2445 */
2446static int try_read_command(conn *c) {
2447    char *el, *cont;
2448
2449    assert(c != NULL);
2450    assert(c->rcurr <= (c->rbuf + c->rsize));
2451
2452    if (c->rbytes == 0)
2453        return 0;
2454    el = memchr(c->rcurr, '\n', c->rbytes);
2455    if (!el)
2456        return 0;
2457    cont = el + 1;
2458    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2459        el--;
2460    }
2461    *el = '\0';
2462
2463    assert(cont <= (c->rcurr + c->rbytes));
2464
2465    process_command(c, c->rcurr);
2466
2467    c->rbytes -= (cont - c->rcurr);
2468    c->rcurr = cont;
2469
2470    assert(c->rcurr <= (c->rbuf + c->rsize));
2471
2472    return 1;
2473}
2474
2475/*
2476 * read a UDP request.
2477 * return 0 if there's nothing to read.
2478 */
2479static int try_read_udp(conn *c) {
2480    int res;
2481
2482    assert(c != NULL);
2483
2484    c->request_addr_size = sizeof(c->request_addr);
2485    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2486                   0, &c->request_addr, &c->request_addr_size);
2487    if (res > 8) {
2488        unsigned char *buf = (unsigned char *)c->rbuf;
2489        STATS_LOCK();
2490        stats.bytes_read += res;
2491        STATS_UNLOCK();
2492
2493        /* Beginning of UDP packet is the request ID; save it. */
2494        c->request_id = buf[0] * 256 + buf[1];
2495
2496        /* If this is a multi-packet request, drop it. */
2497        if (buf[4] != 0 || buf[5] != 1) {
2498            out_string(c, "SERVER_ERROR multi-packet request not supported");
2499            return 0;
2500        }
2501
2502        /* Don't care about any of the rest of the header. */
2503        res -= 8;
2504        memmove(c->rbuf, c->rbuf + 8, res);
2505
2506        c->rbytes += res;
2507        c->rcurr = c->rbuf;
2508        return 1;
2509    }
2510    return 0;
2511}
2512
2513/*
2514 * read from network as much as we can, handle buffer overflow and connection
2515 * close.
2516 * before reading, move the remaining incomplete fragment of a command
2517 * (if any) to the beginning of the buffer.
2518 * return 0 if there's nothing to read on the first read.
2519 */
2520static int try_read_network(conn *c) {
2521    int gotdata = 0;
2522    int res;
2523
2524    assert(c != NULL);
2525
2526    if (c->rcurr != c->rbuf) {
2527        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2528            memmove(c->rbuf, c->rcurr, c->rbytes);
2529        c->rcurr = c->rbuf;
2530    }
2531
2532    while (1) {
2533        if (c->rbytes >= c->rsize) {
2534            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2535            if (!new_rbuf) {
2536                if (settings.verbose > 0)
2537                    fprintf(stderr, "Couldn't realloc input buffer\n");
2538                c->rbytes = 0; /* ignore what we read */
2539                out_string(c, "SERVER_ERROR out of memory reading request");
2540                c->write_and_go = conn_closing;
2541                return 1;
2542            }
2543            c->rcurr = c->rbuf = new_rbuf;
2544            c->rsize *= 2;
2545        }
2546
2547        int avail = c->rsize - c->rbytes;
2548        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2549        if (res > 0) {
2550            STATS_LOCK();
2551            stats.bytes_read += res;
2552            STATS_UNLOCK();
2553            gotdata = 1;
2554            c->rbytes += res;
2555            if (res == avail) {
2556                continue;
2557            } else {
2558                break;
2559            }
2560        }
2561        if (res == 0) {
2562            /* connection closed */
2563            conn_set_state(c, conn_closing);
2564            return 1;
2565        }
2566        if (res == -1) {
2567            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
2568            /* Should close on unhandled errors. */
2569            conn_set_state(c, conn_closing);
2570            return 1;
2571        }
2572    }
2573    return gotdata;
2574}
2575
2576static bool update_event(conn *c, const int new_flags) {
2577    assert(c != NULL);
2578
2579    struct event_base *base = c->event.ev_base;
2580    if (c->ev_flags == new_flags)
2581        return true;
2582    if (event_del(&c->event) == -1) return false;
2583    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2584    event_base_set(base, &c->event);
2585    c->ev_flags = new_flags;
2586    if (event_add(&c->event, 0) == -1) return false;
2587    return true;
2588}
2589
2590/*
2591 * Sets whether we are listening for new connections or not.
2592 */
2593void accept_new_conns(const bool do_accept) {
2594    conn *next;
2595
2596    if (! is_listen_thread())
2597        return;
2598
2599    for (next = listen_conn; next; next = next->next) {
2600        if (do_accept) {
2601            update_event(next, EV_READ | EV_PERSIST);
2602            if (listen(next->sfd, 1024) != 0) {
2603                perror("listen");
2604            }
2605        }
2606        else {
2607            update_event(next, 0);
2608            if (listen(next->sfd, 0) != 0) {
2609                perror("listen");
2610            }
2611        }
2612  }
2613}
2614
2615/*
2616 * Transmit the next chunk of data from our list of msgbuf structures.
2617 *
2618 * Returns:
2619 *   TRANSMIT_COMPLETE   All done writing.
2620 *   TRANSMIT_INCOMPLETE More data remaining to write.
2621 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2622 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2623 */
2624static int transmit(conn *c) {
2625    assert(c != NULL);
2626
2627    if (c->msgcurr < c->msgused &&
2628            c->msglist[c->msgcurr].msg_iovlen == 0) {
2629        /* Finished writing the current msg; advance to the next. */
2630        c->msgcurr++;
2631    }
2632    if (c->msgcurr < c->msgused) {
2633        ssize_t res;
2634        struct msghdr *m = &c->msglist[c->msgcurr];
2635
2636        res = sendmsg(c->sfd, m, 0);
2637        if (res > 0) {
2638            STATS_LOCK();
2639            stats.bytes_written += res;
2640            STATS_UNLOCK();
2641
2642            /* We've written some of the data. Remove the completed
2643               iovec entries from the list of pending writes. */
2644            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2645                res -= m->msg_iov->iov_len;
2646                m->msg_iovlen--;
2647                m->msg_iov++;
2648            }
2649
2650            /* Might have written just part of the last iovec entry;
2651               adjust it so the next write will do the rest. */
2652            if (res > 0) {
2653                m->msg_iov->iov_base += res;
2654                m->msg_iov->iov_len -= res;
2655            }
2656            return TRANSMIT_INCOMPLETE;
2657        }
2658        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2659            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2660                if (settings.verbose > 0)
2661                    fprintf(stderr, "Couldn't update event\n");
2662                conn_set_state(c, conn_closing);
2663                return TRANSMIT_HARD_ERROR;
2664            }
2665            return TRANSMIT_SOFT_ERROR;
2666        }
2667        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
2668           we have a real error, on which we close the connection */
2669        if (settings.verbose > 0)
2670            perror("Failed to write, and not due to blocking");
2671
2672        if (IS_UDP(c->protocol))
2673            conn_set_state(c, conn_read);
2674        else
2675            conn_set_state(c, conn_closing);
2676        return TRANSMIT_HARD_ERROR;
2677    } else {
2678        return TRANSMIT_COMPLETE;
2679    }
2680}
2681
2682static void drive_machine(conn *c) {
2683    bool stop = false;
2684    int sfd, flags = 1;
2685    enum conn_states init_state; /* initial state for a new connection */
2686    socklen_t addrlen;
2687    struct sockaddr_storage addr;
2688    int res;
2689
2690    assert(c != NULL);
2691
2692    while (!stop) {
2693
2694        switch(c->state) {
2695        case conn_listening:
2696            addrlen = sizeof(addr);
2697            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2698                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2699                    /* these are transient, so don't log anything */
2700                    stop = true;
2701                } else if (errno == EMFILE) {
2702                    if (settings.verbose > 0)
2703                        fprintf(stderr, "Too many open connections\n");
2704                    accept_new_conns(false);
2705                    stop = true;
2706                } else {
2707                    perror("accept()");
2708                    stop = true;
2709                }
2710                break;
2711            }
2712            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2713                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2714                perror("setting O_NONBLOCK");
2715                close(sfd);
2716                break;
2717            }
2718            init_state = get_init_state(c);
2719
2720            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST,
2721                                     DATA_BUFFER_SIZE, c->protocol);
2722
2723            break;
2724
2725        case conn_negotiate:
2726            if (settings.verbose > 1)
2727                fprintf(stderr, "Negotiating protocol for a new connection\n");
2728            c->rlbytes = 1;
2729            c->ritem = c->rbuf;
2730            c->rcurr = c->rbuf;
2731            c->wcurr = c->wbuf;
2732            conn_set_state(c, conn_nread);
2733            break;
2734
2735        case conn_read:
2736            if (try_read_command(c) != 0) {
2737                continue;
2738            }
2739            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) {
2740                continue;
2741            }
2742            /* we have no command line and no data to read from network */
2743            if (!update_event(c, EV_READ | EV_PERSIST)) {
2744                if (settings.verbose > 0)
2745                    fprintf(stderr, "Couldn't update event\n");
2746                conn_set_state(c, conn_closing);
2747                break;
2748            }
2749            stop = true;
2750            break;
2751
2752        case conn_bin_init: /* Reinitialize a binary connection */
2753            reinit_bin_connection(c);
2754            break;
2755
2756        case conn_nread:
2757            if (c->rlbytes == 0) {
2758                complete_nread(c);
2759                break;
2760            }
2761            /* first check if we have leftovers in the conn_read buffer */
2762            if (c->rbytes > 0) {
2763                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2764                memcpy(c->ritem, c->rcurr, tocopy);
2765                c->ritem += tocopy;
2766                c->rlbytes -= tocopy;
2767                c->rcurr += tocopy;
2768                c->rbytes -= tocopy;
2769                break;
2770            }
2771
2772            /*  now try reading from the socket */
2773            res = read(c->sfd, c->ritem, c->rlbytes);
2774            if (res > 0) {
2775                STATS_LOCK();
2776                stats.bytes_read += res;
2777                STATS_UNLOCK();
2778                c->ritem += res;
2779                c->rlbytes -= res;
2780                break;
2781            }
2782            if (res == 0) { /* end of stream */
2783                conn_set_state(c, conn_closing);
2784                break;
2785            }
2786            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2787                if (!update_event(c, EV_READ | EV_PERSIST)) {
2788                    if (settings.verbose > 0)
2789                        fprintf(stderr, "Couldn't update event\n");
2790                    conn_set_state(c, conn_closing);
2791                    break;
2792                }
2793                stop = true;
2794                break;
2795            }
2796            /* otherwise we have a real error, on which we close the connection */
2797            if (settings.verbose > 0)
2798                fprintf(stderr, "Failed to read, and not due to blocking\n");
2799            conn_set_state(c, conn_closing);
2800            break;
2801
2802        case conn_swallow:
2803            /* we are reading sbytes and throwing them away */
2804            if (c->sbytes == 0) {
2805                conn_set_init_state(c);
2806                break;
2807            }
2808
2809            /* first check if we have leftovers in the conn_read buffer */
2810            if (c->rbytes > 0) {
2811                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2812                c->sbytes -= tocopy;
2813                c->rcurr += tocopy;
2814                c->rbytes -= tocopy;
2815                break;
2816            }
2817
2818            /*  now try reading from the socket */
2819            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2820            if (res > 0) {
2821                STATS_LOCK();
2822                stats.bytes_read += res;
2823                STATS_UNLOCK();
2824                c->sbytes -= res;
2825                break;
2826            }
2827            if (res == 0) { /* end of stream */
2828                conn_set_state(c, conn_closing);
2829                break;
2830            }
2831            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2832                if (!update_event(c, EV_READ | EV_PERSIST)) {
2833                    if (settings.verbose > 0)
2834                        fprintf(stderr, "Couldn't update event\n");
2835                    conn_set_state(c, conn_closing);
2836                    break;
2837                }
2838                stop = true;
2839                break;
2840            }
2841            /* otherwise we have a real error, on which we close the connection */
2842            if (settings.verbose > 0)
2843                fprintf(stderr, "Failed to read, and not due to blocking\n");
2844            conn_set_state(c, conn_closing);
2845            break;
2846
2847        case conn_write:
2848            /*
2849             * We want to write out a simple response. If we haven't already,
2850             * assemble it into a msgbuf list (this will be a single-entry
2851             * list for TCP or a two-entry list for UDP).
2852             */
2853            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2854                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2855                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2856                    if (settings.verbose > 0)
2857                        fprintf(stderr, "Couldn't build response\n");
2858                    conn_set_state(c, conn_closing);
2859                    break;
2860                }
2861            }
2862
2863            /* fall through... */
2864
2865        case conn_mwrite:
2866            switch (transmit(c)) {
2867            case TRANSMIT_COMPLETE:
2868                if (c->state == conn_mwrite) {
2869                    while (c->ileft > 0) {
2870                        item *it = *(c->icurr);
2871                        assert((it->it_flags & ITEM_SLABBED) == 0);
2872                        item_remove(it);
2873                        c->icurr++;
2874                        c->ileft--;
2875                    }
2876                    while (c->suffixleft > 0) {
2877                        char *suffix = *(c->suffixcurr);
2878                        if(suffix_add_to_freelist(suffix)) {
2879                            /* Failed to add to freelist, don't leak */
2880                            free(suffix);
2881                        }
2882                        c->suffixcurr++;
2883                        c->suffixleft--;
2884                    }
2885                    /* XXX:  I don't know why this wasn't the general case */
2886                    if(c->protocol == binary_prot) {
2887                        conn_set_state(c, c->write_and_go);
2888                    } else {
2889                        conn_set_init_state(c);
2890                    }
2891                } else if (c->state == conn_write) {
2892                    if (c->write_and_free) {
2893                        free(c->write_and_free);
2894                        c->write_and_free = 0;
2895                    }
2896                    conn_set_state(c, c->write_and_go);
2897                } else {
2898                    if (settings.verbose > 0)
2899                        fprintf(stderr, "Unexpected state %d\n", c->state);
2900                    conn_set_state(c, conn_closing);
2901                }
2902                break;
2903
2904            case TRANSMIT_INCOMPLETE:
2905            case TRANSMIT_HARD_ERROR:
2906                break;                   /* Continue in state machine. */
2907
2908            case TRANSMIT_SOFT_ERROR:
2909                stop = true;
2910                break;
2911            }
2912            break;
2913
2914        case conn_closing:
2915            if (IS_UDP(c->protocol))
2916                conn_cleanup(c);
2917            else
2918                conn_close(c);
2919            stop = true;
2920            break;
2921        }
2922    }
2923
2924    return;
2925}
2926
2927void event_handler(const int fd, const short which, void *arg) {
2928    conn *c;
2929
2930    c = (conn *)arg;
2931    assert(c != NULL);
2932
2933    c->which = which;
2934
2935    /* sanity */
2936    if (fd != c->sfd) {
2937        if (settings.verbose > 0)
2938            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2939        conn_close(c);
2940        return;
2941    }
2942
2943    drive_machine(c);
2944
2945    /* wait for next event */
2946    return;
2947}
2948
2949static int new_socket(struct addrinfo *ai) {
2950    int sfd;
2951    int flags;
2952
2953    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
2954        perror("socket()");
2955        return -1;
2956    }
2957
2958    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2959