root/branches/binary/server/memcached.c @ 775

Revision 775, 111.4 kB (checked in by dsallings, 20 months ago)

Syscall optimizations from Trond Norbye.

From Trond:
I have been using the last two days to test the modifications I have done to
the binary protocol, and I have not been able to find any new bugs. From my
testing the binary protocol now use the same amount of system calls as the
textual protocol.

I have tried to use a common state machine, and branch the execution path as
late as possible.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63#ifndef HAVE_DAEMON
64extern int daemon(int nochdir, int noclose);
65#endif
66
67/*
68 * forward declarations
69 */
70static void drive_machine(conn *c);
71static int new_socket(struct addrinfo *ai);
72static int server_socket(const int port, enum protocol prot);
73static int try_read_command(conn *c);
74static int try_read_network(conn *c);
75static int try_read_udp(conn *c);
76static void conn_set_state(conn *c, enum conn_states state);
77
78/* stats */
79static void stats_reset(void);
80static void stats_init(void);
81
82/* defaults */
83static void settings_init(void);
84
85/* event handling, network IO */
86static void event_handler(const int fd, const short which, void *arg);
87static void conn_close(conn *c);
88static void conn_init(void);
89static void accept_new_conns(const bool do_accept);
90static bool update_event(conn *c, const int new_flags);
91static void complete_nread(conn *c);
92static void process_command(conn *c, char *command);
93static int transmit(conn *c);
94static int ensure_iov_space(conn *c);
95static int add_iov(conn *c, const void *buf, int len);
96static int add_msghdr(conn *c);
97
98/* time handling */
99static void set_current_time(void);  /* update the global variable holding
100                              global 32-bit seconds-since-start time
101                              (to avoid 64 bit time_t) */
102
103static void conn_free(conn *c);
104
105/** exported globals **/
106struct stats stats;
107struct settings settings;
108
109/** file scope variables **/
110static item **todelete = NULL;
111static int delcurr;
112static int deltotal;
113static conn *listen_conn = NULL;
114static struct event_base *main_base;
115
116#define TRANSMIT_COMPLETE   0
117#define TRANSMIT_INCOMPLETE 1
118#define TRANSMIT_SOFT_ERROR 2
119#define TRANSMIT_HARD_ERROR 3
120
121static int *buckets = 0; /* bucket->generation array for a managed instance */
122
123#define REALTIME_MAXDELTA 60*60*24*30
124/*
125 * given time value that's either unix time or delta from current unix time, return
126 * unix time. Use the fact that delta can't exceed one month (and real time value can't
127 * be that low).
128 */
129static rel_time_t realtime(const time_t exptime) {
130    /* no. of seconds in 30 days - largest possible delta exptime */
131
132    if (exptime == 0) return 0; /* 0 means never expire */
133
134    if (exptime > REALTIME_MAXDELTA) {
135        /* if item expiration is at/before the server started, give it an
136           expiration time of 1 second after the server started.
137           (because 0 means don't expire).  without this, we'd
138           underflow and wrap around to some large value way in the
139           future, effectively making items expiring in the past
140           really expiring never */
141        if (exptime <= stats.started)
142            return (rel_time_t)1;
143        return (rel_time_t)(exptime - stats.started);
144    } else {
145        return (rel_time_t)(exptime + current_time);
146    }
147}
148
149static void stats_init(void) {
150    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
151    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
152    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
153
154    /* make the time we started always be 2 seconds before we really
155       did, so time(0) - time.started is never zero.  if so, things
156       like 'settings.oldest_live' which act as booleans as well as
157       values are now false in boolean context... */
158    stats.started = time(0) - 2;
159    stats_prefix_init();
160}
161
162static void stats_reset(void) {
163    STATS_LOCK();
164    stats.total_items = stats.total_conns = 0;
165    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
166    stats.bytes_read = stats.bytes_written = 0;
167    stats_prefix_clear();
168    STATS_UNLOCK();
169}
170
171static void settings_init(void) {
172    settings.access = 0700;
173    settings.port = 11211;
174    settings.udpport = 0;
175    /* By default this string should be NULL for getaddrinfo() */
176    settings.inter = NULL;
177    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
178    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
179    settings.verbose = 0;
180    settings.oldest_live = 0;
181    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
182    settings.socketpath = NULL;       /* by default, not using a unix socket */
183    settings.managed = false;
184    settings.factor = 1.25;
185    settings.chunk_size = 48;         /* space for a modest key and value */
186#ifdef USE_THREADS
187    settings.num_threads = 4;
188#else
189    settings.num_threads = 1;
190#endif
191    settings.prefix_delimiter = ':';
192    settings.detail_enabled = 0;
193}
194
195/* returns true if a deleted item's delete-locked-time is over, and it
196   should be removed from the namespace */
197static bool item_delete_lock_over (item *it) {
198    assert(it->it_flags & ITEM_DELETED);
199    return (current_time >= it->exptime);
200}
201
202/*
203 * Adds a message header to a connection.
204 *
205 * Returns 0 on success, -1 on out-of-memory.
206 */
207static int add_msghdr(conn *c)
208{
209    struct msghdr *msg;
210
211    assert(c != NULL);
212
213    if (c->msgsize == c->msgused) {
214        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
215        if (! msg)
216            return -1;
217        c->msglist = msg;
218        c->msgsize *= 2;
219    }
220
221    msg = c->msglist + c->msgused;
222
223    /* this wipes msg_iovlen, msg_control, msg_controllen, and
224       msg_flags, the last 3 of which aren't defined on solaris: */
225    memset(msg, 0, sizeof(struct msghdr));
226
227    msg->msg_iov = &c->iov[c->iovused];
228
229    if (c->request_addr_size > 0) {
230        msg->msg_name = &c->request_addr;
231        msg->msg_namelen = c->request_addr_size;
232    }
233
234    c->msgbytes = 0;
235    c->msgused++;
236
237    if (IS_UDP(c->protocol)) {
238        /* Leave room for the UDP header, which we'll fill in later. */
239        return add_iov(c, NULL, UDP_HEADER_SIZE);
240    }
241
242    return 0;
243}
244
245
246/*
247 * Free list management for connections.
248 */
249
250static conn **freeconns;
251static int freetotal;
252static int freecurr;
253
254
255static void conn_init(void) {
256    freetotal = 200;
257    freecurr = 0;
258    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
259        fprintf(stderr, "malloc()\n");
260    }
261    return;
262}
263
264/*
265 * Returns a connection from the freelist, if any. Should call this using
266 * conn_from_freelist() for thread safety.
267 */
268conn *do_conn_from_freelist() {
269    conn *c;
270
271    if (freecurr > 0) {
272        c = freeconns[--freecurr];
273    } else {
274        c = NULL;
275    }
276
277    return c;
278}
279
280/*
281 * Adds a connection to the freelist. 0 = success. Should call this using
282 * conn_add_to_freelist() for thread safety.
283 */
284bool do_conn_add_to_freelist(conn *c) {
285    if (freecurr < freetotal) {
286        freeconns[freecurr++] = c;
287        return false;
288    } else {
289        /* try to enlarge free connections array */
290        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
291        if (new_freeconns) {
292            freetotal *= 2;
293            freeconns = new_freeconns;
294            freeconns[freecurr++] = c;
295            return false;
296        }
297    }
298    return true;
299}
300
301static const char *prot_text(enum protocol prot) {
302    char *rv = "unknown";
303    switch(prot) {
304        case ascii_prot:
305            rv = "ascii";
306            break;
307        case binary_prot:
308            rv = "binary";
309            break;
310        case ascii_udp_prot:
311            rv = "ascii-udp";
312            break;
313        case negotiating_prot:
314            rv = "auto-negotiate";
315            break;
316    }
317    return rv;
318}
319
320conn *conn_new(const int sfd, enum conn_states init_state,
321                const int event_flags,
322                const int read_buffer_size, enum protocol prot,
323                struct event_base *base) {
324    conn *c = conn_from_freelist();
325
326    if (NULL == c) {
327        if (!(c = (conn *)malloc(sizeof(conn)))) {
328            fprintf(stderr, "malloc()\n");
329            return NULL;
330        }
331        c->rbuf = c->wbuf = 0;
332        c->ilist = 0;
333        c->suffixlist = 0;
334        c->iov = 0;
335        c->msglist = 0;
336        c->hdrbuf = 0;
337
338        c->rsize = read_buffer_size;
339        c->wsize = DATA_BUFFER_SIZE;
340        c->isize = ITEM_LIST_INITIAL;
341        c->suffixsize = SUFFIX_LIST_INITIAL;
342        c->iovsize = IOV_LIST_INITIAL;
343        c->msgsize = MSG_LIST_INITIAL;
344        c->hdrsize = 0;
345
346        c->rbuf = (char *)malloc((size_t)c->rsize);
347        c->wbuf = (char *)malloc((size_t)c->wsize);
348        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
349        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
350        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
351        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
352
353        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
354                c->msglist == 0 || c->suffixlist == 0) {
355            if (c->rbuf != 0) free(c->rbuf);
356            if (c->wbuf != 0) free(c->wbuf);
357            if (c->ilist != 0) free(c->ilist);
358            if (c->suffixlist != 0) free(c->suffixlist);
359            if (c->iov != 0) free(c->iov);
360            if (c->msglist != 0) free(c->msglist);
361            free(c);
362            fprintf(stderr, "malloc()\n");
363            return NULL;
364        }
365
366        STATS_LOCK();
367        stats.conn_structs++;
368        STATS_UNLOCK();
369    }
370
371    /* unix socket mode doesn't need this, so zeroed out.  but why
372     * is this done for every command?  presumably for UDP
373     * mode.  */
374    if (!settings.socketpath) {
375        c->request_addr_size = sizeof(c->request_addr);
376    } else {
377        c->request_addr_size = 0;
378    }
379
380    if (settings.verbose > 1) {
381        if (init_state == conn_listening) {
382            fprintf(stderr, "<%d server listening (%s)\n", sfd,
383                prot_text(prot));
384        } else if (IS_UDP(prot)) {
385            fprintf(stderr, "<%d server listening (udp)\n", sfd);
386        } else if (prot == negotiating_prot) {
387            fprintf(stderr, "<%d new auto-negotiating client connection\n",
388                    sfd);
389        } else {
390            fprintf(stderr, "<%d new unknown (%d) client connection\n",
391                sfd, prot);
392            abort();
393        }
394    }
395
396    c->sfd = sfd;
397    c->protocol = prot;
398    c->state = init_state;
399    c->rlbytes = 0;
400    c->cmd = -1;
401    c->rbytes = c->wbytes = 0;
402    c->wcurr = c->wbuf;
403    c->rcurr = c->rbuf;
404    c->ritem = 0;
405    c->icurr = c->ilist;
406    c->suffixcurr = c->suffixlist;
407    c->ileft = 0;
408    c->suffixleft = 0;
409    c->iovused = 0;
410    c->msgcurr = 0;
411    c->msgused = 0;
412
413    c->write_and_go = init_state;
414    c->write_and_free = 0;
415    c->item = 0;
416    c->bucket = -1;
417    c->gen = 0;
418
419    c->noreply = false;
420
421    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
422    event_base_set(base, &c->event);
423    c->ev_flags = event_flags;
424
425    if (event_add(&c->event, 0) == -1) {
426        if (conn_add_to_freelist(c)) {
427            conn_free(c);
428        }
429        perror("event_add");
430        return NULL;
431    }
432
433    STATS_LOCK();
434    stats.curr_conns++;
435    stats.total_conns++;
436    STATS_UNLOCK();
437
438    return c;
439}
440
441static void conn_cleanup(conn *c) {
442    assert(c != NULL);
443
444    if (c->item) {
445        item_remove(c->item);
446        c->item = 0;
447    }
448
449    if (c->ileft != 0) {
450        for (; c->ileft > 0; c->ileft--,c->icurr++) {
451            item_remove(*(c->icurr));
452        }
453    }
454
455    if (c->suffixleft != 0) {
456        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
457            if(suffix_add_to_freelist(*(c->suffixcurr))) {
458                free(*(c->suffixcurr));
459            }
460        }
461    }
462
463    if (c->write_and_free) {
464        free(c->write_and_free);
465        c->write_and_free = 0;
466    }
467}
468
469/*
470 * Frees a connection.
471 */
472void conn_free(conn *c) {
473    if (c) {
474        if (c->hdrbuf)
475            free(c->hdrbuf);
476        if (c->msglist)
477            free(c->msglist);
478        if (c->rbuf)
479            free(c->rbuf);
480        if (c->wbuf)
481            free(c->wbuf);
482        if (c->ilist)
483            free(c->ilist);
484        if (c->suffixlist)
485            free(c->suffixlist);
486        if (c->iov)
487            free(c->iov);
488        free(c);
489    }
490}
491
492static void conn_close(conn *c) {
493    assert(c != NULL);
494
495    /* delete the event, the socket and the conn */
496    event_del(&c->event);
497
498    if (settings.verbose > 1)
499        fprintf(stderr, "<%d connection closed.\n", c->sfd);
500
501    close(c->sfd);
502    accept_new_conns(true);
503    conn_cleanup(c);
504
505    /* if the connection has big buffers, just free it */
506    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
507        conn_free(c);
508    }
509
510    STATS_LOCK();
511    stats.curr_conns--;
512    STATS_UNLOCK();
513
514    return;
515}
516
517/*
518 * Shrinks a connection's buffers if they're too big.  This prevents
519 * periodic large "get" requests from permanently chewing lots of server
520 * memory.
521 *
522 * This should only be called in between requests since it can wipe output
523 * buffers!
524 */
525static void conn_shrink(conn *c) {
526    assert(c != NULL);
527
528    if (IS_UDP(c->protocol))
529        return;
530
531    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
532        char *newbuf;
533
534        if (c->rcurr != c->rbuf)
535            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
536
537        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
538
539        if (newbuf) {
540            c->rbuf = newbuf;
541            c->rsize = DATA_BUFFER_SIZE;
542        }
543        /* TODO check other branch... */
544        c->rcurr = c->rbuf;
545    }
546
547    if (c->isize > ITEM_LIST_HIGHWAT) {
548        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
549        if (newbuf) {
550            c->ilist = newbuf;
551            c->isize = ITEM_LIST_INITIAL;
552        }
553    /* TODO check error condition? */
554    }
555
556    if (c->msgsize > MSG_LIST_HIGHWAT) {
557        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
558        if (newbuf) {
559            c->msglist = newbuf;
560            c->msgsize = MSG_LIST_INITIAL;
561        }
562    /* TODO check error condition? */
563    }
564
565    if (c->iovsize > IOV_LIST_HIGHWAT) {
566        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
567        if (newbuf) {
568            c->iov = newbuf;
569            c->iovsize = IOV_LIST_INITIAL;
570        }
571    /* TODO check return value */
572    }
573}
574
575/**
576 * Convert a state name to a human readable form.
577 */
578static const char *state_text(enum conn_states state) {
579    const char* const statenames[] = { "conn_listening",
580                                       "conn_new_cmd",
581                                       "conn_waiting",
582                                       "conn_read",
583                                       "conn_parse_cmd",
584                                       "conn_write",
585                                       "conn_nread",
586                                       "conn_swallow",
587                                       "conn_closing",
588                                       "conn_mwrite" };
589    return statenames[state];
590}
591
592/*
593 * Sets a connection's current state in the state machine. Any special
594 * processing that needs to happen on certain state transitions can
595 * happen here.
596 */
597static void conn_set_state(conn *c, enum conn_states state) {
598    assert(c != NULL);
599    assert(state >= conn_listening && state < conn_max_state);
600
601    if (state != c->state) {
602        if (settings.verbose > 2) {
603            fprintf(stderr, "%d: going from %s to %s\n",
604                    c->sfd, state_text(c->state),
605                    state_text(state));
606        }
607
608        c->state = state;
609    }
610}
611
612/*
613 * Free list management for suffix buffers.
614 */
615
616static char **freesuffix;
617static int freesuffixtotal;
618static int freesuffixcurr;
619
620static void suffix_init(void) {
621    freesuffixtotal = 500;
622    freesuffixcurr  = 0;
623
624    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
625    if (freesuffix == NULL) {
626        fprintf(stderr, "malloc()\n");
627    }
628    return;
629}
630
631/*
632 * Returns a suffix buffer from the freelist, if any. Should call this using
633 * suffix_from_freelist() for thread safety.
634 */
635char *do_suffix_from_freelist() {
636    char *s;
637
638    if (freesuffixcurr > 0) {
639        s = freesuffix[--freesuffixcurr];
640    } else {
641        /* If malloc fails, let the logic fall through without spamming
642         * STDERR on the server. */
643        s = malloc( SUFFIX_SIZE );
644    }
645
646    return s;
647}
648
649/*
650 * Adds a connection to the freelist. 0 = success. Should call this using
651 * conn_add_to_freelist() for thread safety.
652 */
653bool do_suffix_add_to_freelist(char *s) {
654    if (freesuffixcurr < freesuffixtotal) {
655        freesuffix[freesuffixcurr++] = s;
656        return false;
657    } else {
658        /* try to enlarge free connections array */
659        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
660        if (new_freesuffix) {
661            freesuffixtotal *= 2;
662            freesuffix = new_freesuffix;
663            freesuffix[freesuffixcurr++] = s;
664            return false;
665        }
666    }
667    return true;
668}
669
670/*
671 * Ensures that there is room for another struct iovec in a connection's
672 * iov list.
673 *
674 * Returns 0 on success, -1 on out-of-memory.
675 */
676static int ensure_iov_space(conn *c) {
677    assert(c != NULL);
678
679    if (c->iovused >= c->iovsize) {
680        int i, iovnum;
681        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
682                                (c->iovsize * 2) * sizeof(struct iovec));
683        if (! new_iov)
684            return -1;
685        c->iov = new_iov;
686        c->iovsize *= 2;
687
688        /* Point all the msghdr structures at the new list. */
689        for (i = 0, iovnum = 0; i < c->msgused; i++) {
690            c->msglist[i].msg_iov = &c->iov[iovnum];
691            iovnum += c->msglist[i].msg_iovlen;
692        }
693    }
694
695    return 0;
696}
697
698
699/*
700 * Adds data to the list of pending data that will be written out to a
701 * connection.
702 *
703 * Returns 0 on success, -1 on out-of-memory.
704 */
705
706static int add_iov(conn *c, const void *buf, int len) {
707    struct msghdr *m;
708    int leftover;
709    bool limit_to_mtu;
710
711    assert(c != NULL);
712
713    do {
714        m = &c->msglist[c->msgused - 1];
715
716        /*
717         * Limit UDP packets, and the first payloads of TCP replies, to
718         * UDP_MAX_PAYLOAD_SIZE bytes.
719         */
720        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
721
722        /* We may need to start a new msghdr if this one is full. */
723        if (m->msg_iovlen == IOV_MAX ||
724            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
725            add_msghdr(c);
726            m = &c->msglist[c->msgused - 1];
727        }
728
729        if (ensure_iov_space(c) != 0)
730            return -1;
731
732        /* If the fragment is too big to fit in the datagram, split it up */
733        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
734            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
735            len -= leftover;
736        } else {
737            leftover = 0;
738        }
739
740        m = &c->msglist[c->msgused - 1];
741        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
742        m->msg_iov[m->msg_iovlen].iov_len = len;
743
744        c->msgbytes += len;
745        c->iovused++;
746        m->msg_iovlen++;
747
748        buf = ((char *)buf) + len;
749        len = leftover;
750    } while (leftover > 0);
751
752    return 0;
753}
754
755
756/*
757 * Constructs a set of UDP headers and attaches them to the outgoing messages.
758 */
759static int build_udp_headers(conn *c) {
760    int i;
761    unsigned char *hdr;
762
763    assert(c != NULL);
764
765    if (c->msgused > c->hdrsize) {
766        void *new_hdrbuf;
767        if (c->hdrbuf)
768            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
769        else
770            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
771        if (! new_hdrbuf)
772            return -1;
773        c->hdrbuf = (unsigned char *)new_hdrbuf;
774        c->hdrsize = c->msgused * 2;
775    }
776
777    hdr = c->hdrbuf;
778    for (i = 0; i < c->msgused; i++) {
779        c->msglist[i].msg_iov[0].iov_base = hdr;
780        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
781        *hdr++ = c->request_id / 256;
782        *hdr++ = c->request_id % 256;
783        *hdr++ = i / 256;
784        *hdr++ = i % 256;
785        *hdr++ = c->msgused / 256;
786        *hdr++ = c->msgused % 256;
787        *hdr++ = 0;
788        *hdr++ = 0;
789        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
790    }
791
792    return 0;
793}
794
795
796static void out_string(conn *c, const char *str) {
797    size_t len;
798
799    assert(c != NULL);
800
801    if (c->noreply) {
802        if (settings.verbose > 1)
803            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
804        c->noreply = false;
805        conn_set_state(c, conn_new_cmd);
806        return;
807    }
808
809    if (settings.verbose > 1)
810        fprintf(stderr, ">%d %s\n", c->sfd, str);
811
812    len = strlen(str);
813    if ((len + 2) > c->wsize) {
814        /* ought to be always enough. just fail for simplicity */
815        str = "SERVER_ERROR output line too long";
816        len = strlen(str);
817    }
818
819    memcpy(c->wbuf, str, len);
820    memcpy(c->wbuf + len, "\r\n", 3);
821    c->wbytes = len + 2;
822    c->wcurr = c->wbuf;
823
824    conn_set_state(c, conn_write);
825    c->write_and_go = conn_new_cmd;
826    return;
827}
828
829/*
830 * we get here after reading the value in set/add/replace commands. The command
831 * has been stored in c->item_comm, and the item is ready in c->item.
832 */
833static void complete_nread_ascii(conn *c) {
834    assert(c != NULL);
835
836    item *it = c->item;
837    int comm = c->item_comm;
838    int ret;
839
840    STATS_LOCK();
841    stats.set_cmds++;
842    STATS_UNLOCK();
843
844    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
845        out_string(c, "CLIENT_ERROR bad data chunk");
846    } else {
847      ret = store_item(it, comm);
848      if (ret == 1)
849          out_string(c, "STORED");
850      else if(ret == 2)
851          out_string(c, "EXISTS");
852      else if(ret == 3)
853          out_string(c, "NOT_FOUND");
854      else
855          out_string(c, "NOT_STORED");
856    }
857
858    item_remove(c->item);       /* release the c->item reference */
859    c->item = 0;
860}
861
862static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
863    int i = 0;
864    uint32_t *res_header;
865
866    assert(c);
867    assert(body_len >= 0);
868
869    c->msgcurr = 0;
870    c->msgused = 0;
871    c->iovused = 0;
872    if (add_msghdr(c) != 0) {
873        /* XXX:  out_string is inappropriate here */
874        out_string(c, "SERVER_ERROR out of memory");
875        return;
876    }
877
878    res_header = (uint32_t *)c->wbuf;
879
880    res_header[0] = ((uint32_t)BIN_RES_MAGIC) << 24;
881    res_header[0] |= ((0xff & c->cmd) << 16);
882    res_header[0] |= err & 0xffff;
883
884    res_header[1] = hdr_len << 24;
885    /* TODO:  Support datatype */
886    res_header[2] = body_len;
887    res_header[3] = c->opaque;
888
889    if(settings.verbose > 1) {
890        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
891            res_header[0], res_header[1], res_header[2], res_header[3]);
892    }
893
894    for(i = 0; i<BIN_PKT_HDR_WORDS; i++) {
895        res_header[i] = htonl(res_header[i]);
896    }
897
898    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
899    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
900}
901
902static void write_bin_error(conn *c, int err, int swallow) {
903    const char *errstr = "Unknown error";
904    switch(err) {
905        case ERR_OUT_OF_MEMORY:
906            errstr = "Out of memory";
907            break;
908        case ERR_UNKNOWN_CMD:
909            errstr = "Unknown command";
910            break;
911        case ERR_NOT_FOUND:
912            errstr = "Not found";
913            break;
914        case ERR_INVALID_ARGUMENTS:
915            errstr = "Invalid arguments";
916            break;
917        case ERR_EXISTS:
918            errstr = "Data exists for key.";
919            break;
920        case ERR_TOO_LARGE:
921            errstr = "Too large.";
922            break;
923        case ERR_NOT_STORED:
924            errstr = "Not stored.";
925            break;
926        default:
927            errstr = "UNHANDLED ERROR";
928            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
929    }
930    if(settings.verbose > 0) {
931        fprintf(stderr, "Writing an error:  %s\n", errstr);
932    }
933    add_bin_header(c, err, 0, strlen(errstr));
934    add_iov(c, errstr, strlen(errstr));
935
936    conn_set_state(c, conn_mwrite);
937    if(swallow > 0) {
938        c->sbytes = swallow;
939        c->write_and_go = conn_swallow;
940    } else {
941        c->write_and_go = conn_new_cmd;
942    }
943}
944
945/* Form and send a response to a command over the binary protocol */
946static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
947    add_bin_header(c, 0, hlen, dlen);
948    if(dlen > 0) {
949        add_iov(c, d, dlen);
950    }
951    conn_set_state(c, conn_mwrite);
952    c->write_and_go = conn_new_cmd;
953}
954
955/* Byte swap a 64-bit number */
956static int64_t swap64(int64_t in) {
957#ifdef ENDIAN_LITTLE
958    /* Little endian, flip the bytes around until someone makes a faster/better
959    * way to do this. */
960    int64_t rv = 0;
961    int i = 0;
962     for(i = 0; i<8; i++) {
963        rv = (rv << 8) | (in & 0xff);
964        in >>= 8;
965     }
966    return rv;
967#else
968    /* big-endian machines don't need byte swapping */
969    return in;
970#endif
971}
972
973static void complete_incr_bin(conn *c) {
974    item *it;
975    int64_t delta;
976    uint64_t initial;
977    int32_t exptime;
978    char *key;
979    size_t nkey;
980    int i;
981    uint64_t *response_buf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
982
983    assert(c != NULL);
984
985    key = c->rbuf + BIN_INCR_HDR_LEN;
986    nkey = c->keylen;
987    key[nkey] = 0x00;
988
989    delta = swap64(*((int64_t*)(c->rbuf)));
990    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
991    exptime = ntohl(*((int*)(c->rbuf + 16)));
992
993    if(settings.verbose) {
994        fprintf(stderr, "incr ");
995        for(i = 0; i<nkey; i++) {
996            fprintf(stderr, "%c", key[i]);
997        }
998        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
999    }
1000
1001    it = item_get(key, nkey);
1002    if (it) {
1003        /* Weird magic in add_delta forces me to pad here */
1004        char tmpbuf[INCR_MAX_STORAGE_LEN];
1005        uint64_t l = 0;
1006        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1007        tmpbuf[INCR_MAX_STORAGE_LEN] = 0x00;
1008        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1009        *response_buf = swap64(strtoull(tmpbuf, NULL, 10));
1010
1011        write_bin_response(c, response_buf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1012        item_remove(it);         /* release our reference */
1013    } else {
1014        if(exptime >= 0) {
1015            /* Save some room for the response */
1016            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1017            *response_buf = swap64(initial);
1018            it = item_alloc(key, nkey, 0, realtime(exptime),
1019                INCR_MAX_STORAGE_LEN);
1020            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1021
1022            if(store_item(it, NREAD_SET)) {
1023                write_bin_response(c, response_buf, BIN_INCR_HDR_LEN,
1024                    INCR_RES_LEN);
1025            } else {
1026                write_bin_error(c, ERR_NOT_STORED, 0);
1027            }
1028            item_remove(it);         /* release our reference */
1029        } else {
1030            write_bin_error(c, ERR_NOT_FOUND, 0);
1031        }
1032    }
1033}
1034
1035static void complete_update_bin(conn *c) {
1036    int eno = -1, ret = 0;
1037    assert(c != NULL);
1038
1039    item *it = c->item;
1040
1041    STATS_LOCK();
1042    stats.set_cmds++;
1043    STATS_UNLOCK();
1044
1045    /* We don't actually receive the trailing two characters in the bin
1046     * protocol, so we're going to just set them here */
1047    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1048    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1049
1050    switch (store_item(it, c->item_comm)) {
1051        case 1:
1052            /* Stored */
1053            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1054            break;
1055        case 2:
1056            write_bin_error(c, ERR_EXISTS, 0);
1057            break;
1058        case 3:
1059            write_bin_error(c, ERR_NOT_FOUND, 0);
1060            break;
1061        default:
1062            if(c->item_comm == NREAD_ADD) {
1063                eno = ERR_EXISTS;
1064            } else if(c->item_comm == NREAD_REPLACE) {
1065                eno = ERR_NOT_FOUND;
1066            } else {
1067                eno = ERR_NOT_STORED;
1068            }
1069            write_bin_error(c, eno, 0);
1070    }
1071
1072    item_remove(c->item);       /* release the c->item reference */
1073    c->item = 0;
1074}
1075
1076static void process_bin_get(conn *c) {
1077    item *it;
1078
1079    it = item_get(c->rbuf, c->keylen);
1080    if (it) {
1081        int *flags;
1082        uint64_t* identifier;
1083
1084        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1085
1086        /* the length has two unnecessary bytes, and then we write four more */
1087        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1088
1089        /* Add the "extras" field: CAS-id followed by flags. The cas is a 64-
1090           bit datatype and require alignment to 8-byte boundaries on some
1091           architechtures. Verify that the size of the packet header is of
1092           the correct size (if not the following code generates SIGBUS on
1093           sparc hardware).
1094        */
1095        assert(MIN_BIN_PKT_LENGTH % 8 == 0);
1096        identifier = (uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1097        *identifier = swap64(it->cas_id);
1098        add_iov(c, identifier, 8);
1099
1100        /* Add the flags */
1101        flags = (int*)(c->wbuf + MIN_BIN_PKT_LENGTH + 8);
1102        *flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1103        add_iov(c, flags, 4);
1104
1105        /* Add the data minus the CRLF */
1106        add_iov(c, ITEM_data(it), it->nbytes - 2);
1107        conn_set_state(c, conn_mwrite);
1108    } else {
1109        if(c->cmd == CMD_GETQ) {
1110            conn_set_state(c, conn_new_cmd);
1111        } else {
1112            write_bin_error(c, ERR_NOT_FOUND, 0);
1113        }
1114    }
1115}
1116
1117static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1118    assert(c);
1119    c->substate = next_substate;
1120    c->rlbytes = c->keylen + extra;
1121    assert(c->rsize >= c->rlbytes);
1122    c->ritem = c->rbuf;
1123    conn_set_state(c, conn_nread);
1124}
1125
1126static void dispatch_bin_command(conn *c) {
1127    time_t exptime = 0;
1128    switch(c->cmd) {
1129        case CMD_VERSION:
1130            write_bin_response(c, VERSION, 0, strlen(VERSION));
1131            break;
1132        case CMD_FLUSH:
1133            set_current_time();
1134
1135            settings.oldest_live = current_time - 1;
1136            item_flush_expired();
1137            write_bin_response(c, NULL, 0, 0);
1138            break;
1139        case CMD_NOOP:
1140            write_bin_response(c, NULL, 0, 0);
1141            break;
1142        case CMD_SET:
1143            /* Fallthrough */
1144        case CMD_ADD:
1145            /* Fallthrough */
1146        case CMD_REPLACE:
1147            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1148            break;
1149        case CMD_GETQ:
1150        case CMD_GET:
1151            bin_read_key(c, bin_reading_get_key, 0);
1152            break;
1153        case CMD_DELETE:
1154            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1155            break;
1156        case CMD_INCR:
1157        case CMD_DECR:
1158            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1159            break;
1160        default:
1161            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1162    }
1163}
1164
1165static void process_bin_update(conn *c) {
1166    char *key;
1167    int nkey;
1168    int vlen;
1169    int flags;
1170    int exptime;
1171    item *it;
1172    int comm;
1173    int hdrlen = BIN_SET_HDR_LEN;
1174
1175    assert(c != NULL);
1176
1177    key = c->rbuf + hdrlen;
1178    nkey = c->keylen;
1179    key[nkey] = 0x00;
1180
1181    flags = ntohl(*((int*)(c->rbuf + 8)));
1182    exptime = ntohl(*((int*)(c->rbuf + 12)));
1183    vlen = c->bin_header[2] - (nkey + hdrlen);
1184
1185    if(settings.verbose > 1) {
1186        fprintf(stderr, "Value len is %d\n", vlen);
1187    }
1188
1189    if (settings.detail_enabled) {
1190        stats_prefix_record_set(key);
1191    }
1192
1193    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1194
1195    if (it == 0) {
1196        if (! item_size_ok(nkey, flags, vlen + 2)) {
1197            write_bin_error(c, ERR_TOO_LARGE, vlen);
1198        } else {
1199            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1200        }
1201        /* swallow the data line */
1202        c->write_and_go = conn_swallow;
1203        return;
1204    }
1205
1206    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf)));
1207
1208    switch(c->cmd) {
1209        case CMD_ADD:
1210            c->item_comm = NREAD_ADD;
1211            break;
1212        case CMD_SET:
1213            c->item_comm = NREAD_SET;
1214            break;
1215        case CMD_REPLACE:
1216            c->item_comm = NREAD_REPLACE;
1217            break;
1218        default:
1219            assert(0);
1220    }
1221
1222    if(it->cas_id != 0) {
1223        c->item_comm = NREAD_CAS;
1224    }
1225
1226    c->item = it;
1227    c->ritem = ITEM_data(it);
1228    c->rlbytes = vlen;
1229    conn_set_state(c, conn_nread);
1230    c->substate = bin_read_set_value;
1231}
1232
1233static void process_bin_delete(conn *c) {
1234    char *key;
1235    size_t nkey;
1236    item *it;
1237    time_t exptime = 0;
1238
1239    assert(c != NULL);
1240
1241    exptime = ntohl(*((int*)(c->rbuf)));
1242    key = c->rbuf + 4;
1243    nkey = c->keylen;
1244    key[nkey] = 0x00;
1245
1246    if(settings.verbose) {
1247        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1248    }
1249
1250    if (settings.detail_enabled) {
1251        stats_prefix_record_delete(key);
1252    }
1253
1254    it = item_get(key, nkey);
1255    if (it) {
1256        if (exptime == 0) {
1257            item_unlink(it);
1258            item_remove(it);      /* release our reference */
1259            write_bin_response(c, NULL, 0, 0);
1260        } else {
1261            /* XXX:  This is really lame, but defer_delete returns a string */
1262            char *res = defer_delete(it, exptime);
1263            if(res[0] == 'D') {
1264                write_bin_response(c, NULL, 0, 0);
1265            } else {
1266                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1267            }
1268        }
1269    } else {
1270        write_bin_error(c, ERR_NOT_FOUND, 0);
1271    }
1272}
1273
1274static void complete_nread_binary(conn *c) {
1275    assert(c != NULL);
1276    assert(c->cmd >= 0);
1277
1278    switch(c->substate) {
1279    case bin_reading_set_header:
1280        process_bin_update(c);
1281        break;
1282    case bin_read_set_value:
1283        complete_update_bin(c);
1284        break;
1285    case bin_reading_get_key:
1286        process_bin_get(c);
1287        break;
1288    case bin_reading_del_header:
1289        process_bin_delete(c);
1290        break;
1291    case bin_reading_incr_header:
1292        complete_incr_bin(c);
1293        break;
1294    default:
1295        fprintf(stderr, "Not handling substate %d\n", c->substate);
1296        assert(0);
1297    }
1298}
1299
1300static void reset_cmd_handler(conn *c) {
1301    c->cmd = -1;
1302    c->substate = bin_no_state;
1303    conn_shrink(c);
1304    if (c->rbytes > 0) {
1305        conn_set_state(c, conn_parse_cmd);
1306    } else {
1307        conn_set_state(c, conn_waiting);
1308    }
1309}
1310
1311static void complete_nread(conn *c) {
1312    assert(c != NULL);
1313    assert(c->protocol == ascii_prot || c->protocol == binary_prot);
1314
1315    if(c->protocol == ascii_prot) {
1316        complete_nread_ascii(c);
1317    } else if (c->protocol == binary_prot) {
1318        complete_nread_binary(c);
1319    }
1320}
1321
1322/*
1323 * Stores an item in the cache according to the semantics of one of the set
1324 * commands. In threaded mode, this is protected by the cache lock.
1325 *
1326 * Returns true if the item was stored.
1327 */
1328int do_store_item(item *it, int comm) {
1329    char *key = ITEM_key(it);
1330    bool delete_locked = false;
1331    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1332    int stored = 0;
1333
1334    item *new_it = NULL;
1335    int flags;
1336
1337    if (old_it != NULL && comm == NREAD_ADD) {
1338        /* add only adds a nonexistent item, but promote to head of LRU */
1339        do_item_update(old_it);
1340    } else if (!old_it && (comm == NREAD_REPLACE
1341        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1342    {
1343        /* replace only replaces an existing value; don't store */
1344    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1345        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1346    {
1347        /* replace and add can't override delete locks; don't store */
1348    } else if (comm == NREAD_CAS) {
1349        /* validate cas operation */
1350        if (delete_locked)
1351            old_it = do_item_get_nocheck(key, it->nkey);
1352
1353        if(old_it == NULL) {
1354          // LRU expired
1355          stored = 3;
1356        }
1357        else if(it->cas_id == old_it->cas_id) {
1358          // cas validates
1359          do_item_replace(old_it, it);
1360          stored = 1;
1361        } else {
1362          if(settings.verbose > 1) {
1363            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1364                old_it->cas_id, it->cas_id);
1365          }
1366          stored = 2;
1367        }
1368    } else {
1369        /*
1370         * Append - combine new and old record into single one. Here it's
1371         * atomic and thread-safe.
1372         */
1373
1374        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1375
1376            /* we have it and old_it here - alloc memory to hold both */
1377            /* flags was already lost - so recover them from ITEM_suffix(it) */
1378
1379            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1380
1381            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1382
1383            if (new_it == NULL) {
1384                /* SERVER_ERROR out of memory */
1385                return 0;
1386            }
1387
1388            /* copy data from it and old_it to new_it */
1389
1390            if (comm == NREAD_APPEND) {
1391                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1392                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1393            } else {
1394                /* NREAD_PREPEND */
1395                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1396                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1397            }
1398
1399            it = new_it;
1400        }
1401
1402        /* "set" commands can override the delete lock
1403           window... in which case we have to find the old hidden item
1404           that's in the namespace/LRU but wasn't returned by
1405           item_get.... because we need to replace it */
1406        if (delete_locked)
1407            old_it = do_item_get_nocheck(key, it->nkey);
1408
1409        if (old_it != NULL)
1410            do_item_replace(old_it, it);
1411        else
1412            do_item_link(it);
1413
1414        stored = 1;
1415    }
1416
1417    if (old_it != NULL)
1418        do_item_remove(old_it);         /* release our reference */
1419    if (new_it != NULL)
1420        do_item_remove(new_it);
1421
1422    return stored;
1423}
1424
1425typedef struct token_s {
1426    char *value;
1427    size_t length;
1428} token_t;
1429
1430#define COMMAND_TOKEN 0
1431#define SUBCOMMAND_TOKEN 1
1432#define KEY_TOKEN 1
1433#define KEY_MAX_LENGTH 250
1434
1435#define MAX_TOKENS 8
1436
1437/*
1438 * Tokenize the command string by replacing whitespace with '\0' and update
1439 * the token array tokens with pointer to start of each token and length.
1440 * Returns total number of tokens.  The last valid token is the terminal
1441 * token (value points to the first unprocessed character of the string and
1442 * length zero).
1443 *
1444 * Usage example:
1445 *
1446 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1447 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1448 *          ...
1449 *      }
1450 *      ncommand = tokens[ix].value - command;
1451 *      command  = tokens[ix].value;
1452 *   }
1453 */
1454static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1455    char *s, *e;
1456    size_t ntokens = 0;
1457
1458    assert(command != NULL && tokens != NULL && max_tokens > 1);
1459
1460    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1461        if (*e == ' ') {
1462            if (s != e) {
1463                tokens[ntokens].value = s;
1464                tokens[ntokens].length = e - s;
1465                ntokens++;
1466                *e = '\0';
1467            }
1468            s = e + 1;
1469        }
1470        else if (*e == '\0') {
1471            if (s != e) {
1472                tokens[ntokens].value = s;
1473                tokens[ntokens].length = e - s;
1474                ntokens++;
1475            }
1476
1477            break; /* string end */
1478        }
1479    }
1480
1481    /*
1482     * If we scanned the whole string, the terminal value pointer is null,
1483     * otherwise it is the first unprocessed character.
1484     */
1485    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1486    tokens[ntokens].length = 0;
1487    ntokens++;
1488
1489    return ntokens;
1490}
1491
1492/* set up a connection to write a buffer then free it, used for stats */
1493static void write_and_free(conn *c, char *buf, int bytes) {
1494    if (buf) {
1495        c->write_and_free = buf;
1496        c->wcurr = buf;
1497        c->wbytes = bytes;
1498        conn_set_state(c, conn_write);
1499        c->write_and_go = conn_new_cmd;
1500    } else {
1501        out_string(c, "SERVER_ERROR out of memory writing stats");
1502    }
1503}
1504
1505static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1506{
1507    int noreply_index = ntokens - 2;
1508
1509    /*
1510      NOTE: this function is not the first place where we are going to
1511      send the reply.  We could send it instead from process_command()
1512      if the request line has wrong number of tokens.  However parsing
1513      malformed line for "noreply" option is not reliable anyway, so
1514      it can't be helped.
1515    */
1516    if (tokens[noreply_index].value
1517        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1518        c->noreply = true;
1519    }
1520}
1521
1522inline static void process_stats_detail(conn *c, const char *command) {
1523    assert(c != NULL);
1524
1525    if (strcmp(command, "on") == 0) {
1526        settings.detail_enabled = 1;
1527        out_string(c, "OK");
1528    }
1529    else if (strcmp(command, "off") == 0) {
1530        settings.detail_enabled = 0;
1531        out_string(c, "OK");
1532    }
1533    else if (strcmp(command, "dump") == 0) {
1534        int len;
1535        char *stats = stats_prefix_dump(&len);
1536        write_and_free(c, stats, len);
1537    }
1538    else {
1539        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1540    }
1541}
1542
1543static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1544    rel_time_t now = current_time;
1545    char *command;
1546    char *subcommand;
1547
1548    assert(c != NULL);
1549
1550    if(ntokens < 2) {
1551        out_string(c, "CLIENT_ERROR bad command line");
1552        return;
1553    }
1554
1555    command = tokens[COMMAND_TOKEN].value;
1556
1557    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1558        char temp[1024];
1559        pid_t pid = getpid();
1560        char *pos = temp;
1561
1562#ifndef WIN32
1563        struct rusage usage;
1564        getrusage(RUSAGE_SELF, &usage);
1565#endif /* !WIN32 */
1566
1567        STATS_LOCK();
1568        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1569        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1570        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1571        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1572        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1573#ifndef WIN32
1574        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1575        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1576#endif /* !WIN32 */
1577        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1578        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1579        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1580        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1581        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1582        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1583        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1584        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1585        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1586        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1587        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1588        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1589        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1590        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1591        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1592        pos += sprintf(pos, "END");
1593        STATS_UNLOCK();
1594        out_string(c, temp);
1595        return;
1596    }
1597
1598    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1599
1600    if (strcmp(subcommand, "reset") == 0) {
1601        stats_reset();
1602        out_string(c, "RESET");
1603        return;
1604    }
1605
1606#ifdef HAVE_MALLOC_H
1607#ifdef HAVE_STRUCT_MALLINFO
1608    if (strcmp(subcommand, "malloc") == 0) {
1609        char temp[512];
1610        struct mallinfo info;
1611        char *pos = temp;
1612
1613        info = mallinfo();
1614        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1615        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1616        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1617        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1618        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1619        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1620        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1621        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1622        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1623        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1624        out_string(c, temp);
1625        return;
1626    }
1627#endif /* HAVE_STRUCT_MALLINFO */
1628#endif /* HAVE_MALLOC_H */
1629
1630#if !defined(WIN32) || !defined(__APPLE__)
1631    if (strcmp(subcommand, "maps") == 0) {
1632        char *wbuf;
1633        int wsize = 8192; /* should be enough */
1634        int fd;
1635        int res;
1636
1637        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1638            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1639            return;
1640        }
1641
1642        fd = open("/proc/self/maps", O_RDONLY);
1643        if (fd == -1) {
1644            out_string(c, "SERVER_ERROR cannot open the maps file");
1645            free(wbuf);
1646            return;
1647        }
1648
1649        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1650        if (res == wsize - 6) {
1651            out_string(c, "SERVER_ERROR buffer overflow");
1652            free(wbuf); close(fd);
1653            return;
1654        }
1655        if (res == 0 || res == -1) {
1656            out_string(c, "SERVER_ERROR can't read the maps file");
1657            free(wbuf); close(fd);
1658            return;
1659        }
1660        memcpy(wbuf + res, "END\r\n", 5);
1661        write_and_free(c, wbuf, res + 5);
1662        close(fd);
1663        return;
1664    }
1665#endif
1666
1667    if (strcmp(subcommand, "cachedump") == 0) {
1668
1669        char *buf;
1670        unsigned int bytes, id, limit = 0;
1671
1672        if(ntokens < 5) {
1673            out_string(c, "CLIENT_ERROR bad command line");
1674            return;
1675        }
1676
1677        id = strtoul(tokens[2].value, NULL, 10);
1678        limit = strtoul(tokens[3].value, NULL, 10);
1679
1680        if(errno == ERANGE) {
1681            out_string(c, "CLIENT_ERROR bad command line format");
1682            return;
1683        }
1684
1685        buf = item_cachedump(id, limit, &bytes);
1686        write_and_free(c, buf, bytes);
1687        return;
1688    }
1689
1690    if (strcmp(subcommand, "slabs") == 0) {
1691        int bytes = 0;
1692        char *buf = slabs_stats(&bytes);
1693        write_and_free(c, buf, bytes);
1694        return;
1695    }
1696
1697    if (strcmp(subcommand, "items") == 0) {
1698        int bytes = 0;
1699        char *buf = item_stats(&bytes);
1700        write_and_free(c, buf, bytes);
1701        return;
1702    }
1703
1704    if (strcmp(subcommand, "detail") == 0) {
1705        if (ntokens < 4)
1706            process_stats_detail(c, "");  /* outputs the error message */
1707        else
1708            process_stats_detail(c, tokens[2].value);
1709        return;
1710    }
1711
1712    if (strcmp(subcommand, "sizes") == 0) {
1713        int bytes = 0;
1714        char *buf = item_stats_sizes(&bytes);
1715        write_and_free(c, buf, bytes);
1716        return;
1717    }
1718
1719    out_string(c, "ERROR");
1720}
1721
1722/* ntokens is overwritten here... shrug.. */
1723static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1724    char *key;
1725    size_t nkey;
1726    int i = 0;
1727    item *it;
1728    token_t *key_token = &tokens[KEY_TOKEN];
1729    char *suffix;
1730    int stats_get_cmds   = 0;
1731    int stats_get_hits   = 0;
1732    int stats_get_misses = 0;
1733    assert(c != NULL);
1734
1735    if (settings.managed) {
1736        int bucket = c->bucket;
1737        if (bucket == -1) {
1738            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1739            return;
1740        }
1741        c->bucket = -1;
1742        if (buckets[bucket] != c->gen) {
1743            out_string(c, "ERROR_NOT_OWNER");
1744            return;
1745        }
1746    }
1747
1748    do {
1749        while(key_token->length != 0) {
1750
1751            key = key_token->value;
1752            nkey = key_token->length;
1753
1754            if(nkey > KEY_MAX_LENGTH) {
1755                STATS_LOCK();
1756                stats.get_cmds   += stats_get_cmds;
1757                stats.get_hits   += stats_get_hits;
1758                stats.get_misses += stats_get_misses;
1759                STATS_UNLOCK();
1760                out_string(c, "CLIENT_ERROR bad command line format");
1761                return;
1762            }
1763
1764            stats_get_cmds++;
1765            it = item_get(key, nkey);
1766            if (settings.detail_enabled) {
1767                stats_prefix_record_get(key, NULL != it);
1768            }
1769            if (it) {
1770                if (i >= c->isize) {
1771                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1772                    if (new_list) {
1773                        c->isize *= 2;
1774                        c->ilist = new_list;
1775                    } else break;
1776                }
1777
1778                /*
1779                 * Construct the response. Each hit adds three elements to the
1780                 * outgoing data list:
1781                 *   "VALUE "
1782                 *   key
1783                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1784                 */
1785
1786                if (return_cas)
1787                {
1788                  /* Goofy mid-flight realloc. */
1789                  if (i >= c->suffixsize) {
1790                    char **new_suffix_list = realloc(c->suffixlist,
1791                                           sizeof(char *) * c->suffixsize * 2);
1792                    if (new_suffix_list) {
1793                      c->suffixsize *= 2;
1794                      c->suffixlist  = new_suffix_list;
1795                    } else break;
1796                  }
1797
1798                  suffix = suffix_from_freelist();
1799                  if (suffix == NULL) {
1800                    STATS_LOCK();
1801                    stats.get_cmds   += stats_get_cmds;
1802                    stats.get_hits   += stats_get_hits;
1803                    stats.get_misses += stats_get_misses;
1804                    STATS_UNLOCK();
1805                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1806                    return;
1807                  }
1808                  *(c->suffixlist + i) = suffix;
1809                  sprintf(suffix, " %llu\r\n", it->cas_id);
1810                  if (add_iov(c, "VALUE ", 6) != 0 ||
1811                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1812                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1813                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1814                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1815                      {
1816                          break;
1817                      }
1818                }
1819                else
1820                {
1821                  if (add_iov(c, "VALUE ", 6) != 0 ||
1822                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1823                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1824                      {
1825                          break;
1826                      }
1827                }
1828
1829
1830                if (settings.verbose > 1)
1831                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1832
1833                /* item_get() has incremented it->refcount for us */
1834                stats_get_hits++;
1835                item_update(it);
1836                *(c->ilist + i) = it;
1837                i++;
1838
1839            } else {
1840                stats_get_misses++;
1841            }
1842
1843            key_token++;
1844        }
1845
1846        /*
1847         * If the command string hasn't been fully processed, get the next set
1848         * of tokens.
1849         */
1850        if(key_token->value != NULL) {
1851            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1852            key_token = tokens;
1853        }
1854
1855    } while(key_token->value != NULL);
1856
1857    c->icurr = c->ilist;
1858    c->ileft = i;
1859    if (return_cas) {
1860        c->suffixcurr = c->suffixlist;
1861        c->suffixleft = i;
1862    }
1863
1864    if (settings.verbose > 1)
1865        fprintf(stderr, ">%d END\n", c->sfd);
1866
1867    /*
1868        If the loop was terminated because of out-of-memory, it is not
1869        reliable to add END\r\n to the buffer, because it might not end
1870        in \r\n. So we send SERVER_ERROR instead.
1871    */
1872    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1873        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1874        out_string(c, "SERVER_ERROR out of memory writing get response");
1875    }
1876    else {
1877        conn_set_state(c, conn_mwrite);
1878        c->msgcurr = 0;
1879    }
1880
1881    STATS_LOCK();
1882    stats.get_cmds   += stats_get_cmds;
1883    stats.get_hits   += stats_get_hits;
1884    stats.get_misses += stats_get_misses;
1885    STATS_UNLOCK();
1886
1887    return;
1888}
1889
1890static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1891    char *key;
1892    size_t nkey;
1893    int flags;
1894    time_t exptime;
1895    int vlen, old_vlen;
1896    uint64_t req_cas_id;
1897    item *it, *old_it;
1898
1899    assert(c != NULL);
1900
1901    set_noreply_maybe(c, tokens, ntokens);
1902
1903    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1904        out_string(c, "CLIENT_ERROR bad command line format");
1905        return;
1906    }
1907
1908    key = tokens[KEY_TOKEN].value;
1909    nkey = tokens[KEY_TOKEN].length;
1910
1911    flags = strtoul(tokens[2].value, NULL, 10);
1912    exptime = strtol(tokens[3].value, NULL, 10);
1913    vlen = strtol(tokens[4].value, NULL, 10);
1914
1915    // does cas value exist?
1916    if(handle_cas)
1917    {
1918      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1919    }
1920
1921    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1922        out_string(c, "CLIENT_ERROR bad command line format");
1923        return;
1924    }
1925
1926    if (settings.detail_enabled) {
1927        stats_prefix_record_set(key);
1928    }
1929
1930    if (settings.managed) {
1931        int bucket = c->bucket;
1932        if (bucket == -1) {
1933            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1934            return;
1935        }
1936        c->bucket = -1;
1937        if (buckets[bucket] != c->gen) {
1938            out_string(c, "ERROR_NOT_OWNER");
1939            return;
1940        }
1941    }
1942
1943    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1944
1945    if (it == 0) {
1946        if (! item_size_ok(nkey, flags, vlen + 2))
1947            out_string(c, "SERVER_ERROR object too large for cache");
1948        else
1949            out_string(c, "SERVER_ERROR out of memory storing object");
1950        /* swallow the data line */
1951        c->write_and_go = conn_swallow;
1952        c->sbytes = vlen + 2;
1953        return;
1954    }
1955    if(handle_cas)
1956      it->cas_id = req_cas_id;
1957
1958    c->item = it;
1959    c->ritem = ITEM_data(it);
1960    c->rlbytes = it->nbytes;
1961    c->item_comm = comm;
1962    conn_set_state(c, conn_nread);
1963}
1964
1965static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
1966    char temp[sizeof("18446744073709551615")];
1967    item *it;
1968    int64_t delta;
1969    char *key;
1970    size_t nkey;
1971
1972    assert(c != NULL);
1973
1974    set_noreply_maybe(c, tokens, ntokens);
1975
1976    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1977        out_string(c, "CLIENT_ERROR bad command line format");
1978        return;
1979    }
1980
1981    key = tokens[KEY_TOKEN].value;
1982    nkey = tokens[KEY_TOKEN].length;
1983
1984    if (settings.managed) {
1985        int bucket = c->bucket;
1986        if (bucket == -1) {
1987            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1988            return;
1989        }
1990        c->bucket = -1;
1991        if (buckets[bucket] != c->gen) {
1992            out_string(c, "ERROR_NOT_OWNER");
1993            return;
1994        }
1995    }
1996
1997    delta = strtoll(tokens[2].value, NULL, 10);
1998
1999    if(errno == ERANGE) {
2000        out_string(c, "CLIENT_ERROR bad command line format");
2001        return;
2002    }
2003
2004    it = item_get(key, nkey);
2005    if (!it) {
2006        out_string(c, "NOT_FOUND");
2007        return;
2008    }
2009
2010    out_string(c, add_delta(it, incr, delta, temp));
2011    item_remove(it);         /* release our reference */
2012}
2013
2014/*
2015 * adds a delta value to a numeric item.
2016 *
2017 * it    item to adjust
2018 * incr  true to increment value, false to decrement
2019 * delta amount to adjust value by
2020 * buf   buffer for response string
2021 *
2022 * returns a response string to send back to the client.
2023 */
2024char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2025    char *ptr;
2026    int64_t value;
2027    int res;
2028
2029    ptr = ITEM_data(it);
2030    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2031
2032    value = strtoull(ptr, NULL, 10);
2033
2034    if(errno == ERANGE) {
2035        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2036    }
2037
2038    if (incr)
2039        value += delta;
2040    else {
2041        value -= delta;
2042    }
2043    if(value < 0) {
2044        value = 0;
2045    }
2046    sprintf(buf, "%llu", value);
2047    res = strlen(buf);
2048    if (res + 2 > it->nbytes) { /* need to realloc */
2049        item *new_it;
2050        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2051        if (new_it == 0) {
2052            return "SERVER_ERROR out of memory in incr/decr";
2053        }
2054        memcpy(ITEM_data(new_it), buf, res);
2055        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2056        do_item_replace(it, new_it);
2057        do_item_remove(new_it);       /* release our reference */
2058    } else { /* replace in-place */
2059        memcpy(ITEM_data(it), buf, res);
2060        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2061    }
2062
2063    return buf;
2064}
2065
2066static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2067    char *key;
2068    size_t nkey;
2069    item *it;
2070    time_t exptime = 0;
2071
2072    assert(c != NULL);
2073
2074    set_noreply_maybe(c, tokens, ntokens);
2075
2076    if (settings.managed) {
2077        int bucket = c->bucket;
2078        if (bucket == -1) {
2079            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2080            return;
2081        }
2082        c->bucket = -1;
2083        if (buckets[bucket] != c->gen) {
2084            out_string(c, "ERROR_NOT_OWNER");
2085            return;
2086        }
2087    }
2088
2089    key = tokens[KEY_TOKEN].value;
2090    nkey = tokens[KEY_TOKEN].length;
2091
2092    if(nkey > KEY_MAX_LENGTH) {
2093        out_string(c, "CLIENT_ERROR bad command line format");
2094        return;
2095    }
2096
2097    if(ntokens == (c->noreply ? 5 : 4)) {
2098        exptime = strtol(tokens[2].value, NULL, 10);
2099
2100        if(errno == ERANGE) {
2101            out_string(c, "CLIENT_ERROR bad command line format");
2102            return;
2103        }
2104    }
2105
2106    if (settings.detail_enabled) {
2107        stats_prefix_record_delete(key);
2108    }
2109
2110    it = item_get(key, nkey);
2111    if (it) {
2112        if (exptime == 0) {
2113            item_unlink(it);
2114            item_remove(it);      /* release our reference */
2115            out_string(c, "DELETED");
2116        } else {
2117            /* our reference will be transfered to the delete queue */
2118            out_string(c, defer_delete(it, exptime));
2119        }
2120    } else {
2121        out_string(c, "NOT_FOUND");
2122    }
2123}
2124
2125/*
2126 * Adds an item to the deferred-delete list so it can be reaped later.
2127 *
2128 * Returns the result to send to the client.
2129 */
2130char *do_defer_delete(item *it, time_t exptime)
2131{
2132    if (delcurr >= deltotal) {
2133        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2134        if (new_delete) {
2135            todelete = new_delete;
2136            deltotal *= 2;
2137        } else {
2138            /*
2139             * can't delete it immediately, user wants a delay,
2140             * but we ran out of memory for the delete queue
2141             */
2142            item_remove(it);    /* release reference */
2143            return "SERVER_ERROR out of memory expanding delete queue";
2144        }
2145    }
2146
2147    /* use its expiration time as its deletion time now */
2148    it->exptime = realtime(exptime);
2149    it->it_flags |= ITEM_DELETED;
2150    todelete[delcurr++] = it;
2151
2152    return "DELETED";
2153}
2154
2155static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2156    unsigned int level;
2157
2158    assert(c != NULL);
2159
2160    set_noreply_maybe(c, tokens, ntokens);
2161
2162    level = strtoul(tokens[1].value, NULL, 10);
2163    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2164    out_string(c, "OK");
2165    return;
2166}
2167
2168static void process_command(conn *c, char *command) {
2169
2170    token_t tokens[MAX_TOKENS];
2171    size_t ntokens;
2172    int comm;
2173
2174    assert(c != NULL);
2175
2176    if (settings.verbose > 1)
2177        fprintf(stderr, "<%d %s\n", c->sfd, command);
2178
2179    /*
2180     * for commands set/add/replace, we build an item and read the data
2181     * directly into it, then continue in nread_complete().
2182     */
2183
2184    c->msgcurr = 0;
2185    c->msgused = 0;
2186    c->iovused = 0;
2187    if (add_msghdr(c) != 0) {
2188        out_string(c, "SERVER_ERROR out of memory preparing response");
2189        return;
2190    }
2191
2192    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2193    if (ntokens >= 3 &&
2194        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2195         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2196
2197        process_get_command(c, tokens, ntokens, false);
2198
2199    } else if ((ntokens == 6 || ntokens == 7) &&
2200               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2201                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2202                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2203                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2204                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2205
2206        process_update_command(c, tokens, ntokens, comm, false);
2207
2208    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2209
2210        process_update_command(c, tokens, ntokens, comm, true);
2211
2212    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2213
2214        process_arithmetic_command(c, tokens, ntokens, 1);
2215
2216    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2217
2218        process_get_command(c, tokens, ntokens, true);
2219
2220    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2221
2222        process_arithmetic_command(c, tokens, ntokens, 0);
2223
2224    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2225
2226        process_delete_command(c, tokens, ntokens);
2227
2228    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2229        unsigned int bucket, gen;
2230        if (!settings.managed) {
2231            out_string(c, "CLIENT_ERROR not a managed instance");
2232            return;
2233        }
2234
2235        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2236            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2237                out_string(c, "CLIENT_ERROR bucket number out of range");
2238                return;
2239            }
2240            buckets[bucket] = gen;
2241            out_string(c, "OWNED");
2242            return;
2243        } else {
2244            out_string(c, "CLIENT_ERROR bad format");
2245            return;
2246        }
2247
2248    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2249
2250        int bucket;
2251        if (!settings.managed) {
2252            out_string(c, "CLIENT_ERROR not a managed instance");
2253            return;
2254        }
2255        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2256            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2257                out_string(c, "CLIENT_ERROR bucket number out of range");
2258                return;
2259            }
2260            buckets[bucket] = 0;
2261            out_string(c, "DISOWNED");
2262            return;
2263        } else {
2264            out_string(c, "CLIENT_ERROR bad format");
2265            return;
2266        }
2267
2268    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2269        int bucket, gen;
2270        if (!settings.managed) {
2271            out_string(c, "CLIENT_ERROR not a managed instance");
2272            return;
2273        }
2274        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2275            /* we never write anything back, even if input's wrong */
2276            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2277                /* do nothing, bad input */
2278            } else {
2279                c->bucket = bucket;
2280                c->gen = gen;
2281            }
2282            conn_set_state(c, conn_new_cmd);
2283            return;
2284        } else {
2285            out_string(c, "CLIENT_ERROR bad format");
2286            return;
2287        }
2288
2289    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2290
2291        process_stat(c, tokens, ntokens);
2292
2293    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2294        time_t exptime = 0;
2295        set_current_time();
2296
2297        set_noreply_maybe(c, tokens, ntokens);
2298
2299        if(ntokens == (c->noreply ? 3 : 2)) {
2300            settings.oldest_live = current_time - 1;
2301            item_flush_expired();
2302            out_string(c, "OK");
2303            return;
2304        }
2305
2306        exptime = strtol(tokens[1].value, NULL, 10);
2307        if(errno == ERANGE) {
2308            out_string(c, "CLIENT_ERROR bad command line format");
2309            return;
2310        }
2311
2312        /*
2313          If exptime is zero realtime() would return zero too, and
2314          realtime(exptime) - 1 would overflow to the max unsigned
2315          value.  So we process exptime == 0 the same way we do when
2316          no delay is given at all.
2317        */
2318        if (exptime > 0)
2319            settings.oldest_live = realtime(exptime) - 1;
2320        else /* exptime == 0 */
2321            settings.oldest_live = current_time - 1;
2322        item_flush_expired();
2323        out_string(c, "OK");
2324        return;
2325
2326    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2327
2328        out_string(c, "VERSION " VERSION);
2329
2330    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2331
2332        conn_set_state(c, conn_closing);
2333
2334    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2335                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2336#ifdef ALLOW_SLABS_REASSIGN
2337
2338        int src, dst, rv;
2339
2340        src = strtol(tokens[2].value, NULL, 10);
2341        dst  = strtol(tokens[3].value, NULL, 10);
2342
2343        if(errno == ERANGE) {
2344            out_string(c, "CLIENT_ERROR bad command line format");
2345            return;
2346        }
2347
2348        rv = slabs_reassign(src, dst);
2349        if (rv == 1) {
2350            out_string(c, "DONE");
2351            return;
2352        }
2353        if (rv == 0) {
2354            out_string(c, "CANT");
2355            return;
2356        }
2357        if (rv == -1) {
2358            out_string(c, "BUSY");
2359            return;
2360        }
2361#else
2362        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2363#endif
2364    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2365        process_verbosity_command(c, tokens, ntokens);
2366    } else {
2367        out_string(c, "ERROR");
2368    }
2369    return;
2370}
2371
2372/*
2373 * if we have a complete line in the buffer, process it.
2374 */
2375static int try_read_command(conn *c) {
2376    assert(c != NULL);
2377    assert(c->rcurr <= (c->rbuf + c->rsize));
2378    assert(c->rbytes > 0);
2379
2380    if (c->protocol == negotiating_prot)  {
2381        if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC) {
2382            c->protocol = binary_prot;
2383        } else {
2384            c->protocol = ascii_prot;
2385        }
2386
2387        if (settings.verbose) {
2388            fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
2389                    prot_text(c->protocol));
2390        }
2391    }
2392
2393    if (c->protocol == binary_prot) {
2394        /* Do we have the complete packet header? */
2395        if (c->rbytes < MIN_BIN_PKT_LENGTH) {
2396            /* need more data! */
2397            return 0;
2398        } else {
2399            int i = 0;
2400            memcpy(c->bin_header, c->rcurr, sizeof(c->bin_header));
2401            assert(BIN_PKT_HDR_WORDS == 4);
2402            for (i = 0; i<BIN_PKT_HDR_WORDS; i++) {
2403                c->bin_header[i] = ntohl(c->bin_header[i]);
2404            }
2405
2406            if (settings.verbose) {
2407                fprintf(stderr,
2408                        "Read binary protocol data:  %08x %08x %08x %08x\n",
2409                        c->bin_header[0], c->bin_header[1], c->bin_header[2],
2410                        c->bin_header[3]);
2411            }
2412
2413            if ((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
2414                if (settings.verbose) {
2415                    fprintf(stderr, "Invalid magic:  %x\n",
2416                            c->bin_header[0] >> 24);
2417                }
2418                conn_set_state(c, conn_closing);
2419                return 0;
2420            }
2421
2422            c->msgcurr = 0;
2423            c->msgused = 0;
2424            c->iovused = 0;
2425            if (add_msghdr(c) != 0) {
2426                out_string(c, "SERVER_ERROR out of memory");
2427                return 0;
2428            }
2429
2430            c->cmd = (c->bin_header[0] >> 16) & 0xff;
2431            c->keylen = c->bin_header[0] & 0xffff;
2432            c->opaque = c->bin_header[3];
2433            if (settings.verbose > 1) {
2434                fprintf(stderr,
2435                        "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n",
2436                        c->cmd, c->opaque, c->keylen, c->bin_header[2]);
2437            }
2438
2439            dispatch_bin_command(c);
2440
2441            c->rbytes -= MIN_BIN_PKT_LENGTH;
2442            c->rcurr += MIN_BIN_PKT_LENGTH;
2443        }
2444    } else {
2445        char *el, *cont;
2446
2447        if (c->rbytes == 0)
2448            return 0;
2449        el = memchr(c->rcurr, '\n', c->rbytes);
2450        if (!el)
2451            return 0;
2452        cont = el + 1;
2453        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2454            el--;
2455        }
2456        *el = '\0';
2457
2458        assert(cont <= (c->rcurr + c->rbytes));
2459
2460        process_command(c, c->rcurr);
2461
2462        c->rbytes -= (cont - c->rcurr);
2463        c->rcurr = cont;
2464
2465        assert(c->rcurr <= (c->rbuf + c->rsize));
2466    }
2467
2468    return 1;
2469}
2470
2471/*
2472 * read a UDP request.
2473 * return 0 if there's nothing to read.
2474 */
2475static int try_read_udp(conn *c) {
2476    int res;
2477
2478    assert(c != NULL);
2479
2480    c->request_addr_size = sizeof(c->request_addr);
2481    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2482                   0, &c->request_addr, &c->request_addr_size);
2483    if (res > 8) {
2484        unsigned char *buf = (unsigned char *)c->rbuf;
2485        STATS_LOCK();
2486        stats.bytes_read += res;
2487        STATS_UNLOCK();
2488
2489        /* Beginning of UDP packet is the request ID; save it. */
2490        c->request_id = buf[0] * 256 + buf[1];
2491
2492        /* If this is a multi-packet request, drop it. */
2493        if (buf[4] != 0 || buf[5] != 1) {
2494            out_string(c, "SERVER_ERROR multi-packet request not supported");
2495            return 0;
2496        }
2497
2498        /* Don't care about any of the rest of the header. */
2499        res -= 8;
2500        memmove(c->rbuf, c->rbuf + 8, res);
2501
2502        c->rbytes += res;
2503        c->rcurr = c->rbuf;
2504        return 1;
2505    }
2506    return 0;
2507}
2508
2509/*
2510 * read from network as much as we can, handle buffer overflow and connection
2511 * close.
2512 * before reading, move the remaining incomplete fragment of a command
2513 * (if any) to the beginning of the buffer.
2514 * @return 1 data received
2515 *         0 no data received
2516 *        -1 an error occured (on the socket) (or client closed connection)
2517 *        -2 memory error (failed to allocate more memory)
2518 */
2519static int try_read_network(conn *c) {
2520    int gotdata = 0;
2521    int res;
2522
2523    assert(c != NULL);
2524
2525    if (c->rcurr != c->rbuf) {
2526        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2527            memmove(c->rbuf, c->rcurr, c->rbytes);
2528        c->rcurr = c->rbuf;
2529    }
2530
2531    while (1) {
2532        if (c->rbytes >= c->rsize) {
2533            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2534            if (!new_rbuf) {
2535                if (settings.verbose > 0)
2536                    fprintf(stderr, "Couldn't realloc input buffer\n");
2537                c->rbytes = 0; /* ignore what we read */
2538                out_string(c, "SERVER_ERROR out of memory reading request");
2539                c->write_and_go = conn_closing;
2540                return -2;
2541            }
2542            c->rcurr = c->rbuf = new_rbuf;
2543            c->rsize *= 2;
2544        }
2545
2546        int avail = c->rsize - c->rbytes;
2547        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2548        if (res > 0) {
2549            STATS_LOCK();
2550            stats.bytes_read += res;
2551            STATS_UNLOCK();
2552            gotdata = 1;
2553            c->rbytes += res;
2554            if (res == avail) {
2555                continue;
2556            } else {
2557                break;
2558            }
2559        }
2560        if (res == 0) {
2561            return -1;
2562        }
2563        if (res == -1) {
2564            if (errno == EAGAIN || errno == EWOULDBLOCK) {
2565                break;
2566            }
2567            return -1;
2568        }
2569    }
2570    return gotdata;
2571}
2572
2573static bool update_event(conn *c, const int new_flags) {
2574    assert(c != NULL);
2575
2576    struct event_base *base = c->event.ev_base;
2577    if (c->ev_flags == new_flags)
2578        return true;
2579    if (event_del(&c->event) == -1) return false;
2580    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2581    event_base_set(base, &c->event);
2582    c->ev_flags = new_flags;
2583    if (event_add(&c->event, 0) == -1) return false;
2584    return true;
2585}
2586
2587/*
2588 * Sets whether we are listening for new connections or not.
2589 */
2590void accept_new_conns(const bool do_accept) {
2591    conn *next;
2592
2593    if (! is_listen_thread())
2594        return;
2595
2596    for (next = listen_conn; next; next = next->next) {
2597        if (do_accept) {
2598            update_event(next, EV_READ | EV_PERSIST);
2599            if (listen(next->sfd, 1024) != 0) {
2600                perror("listen");
2601            }
2602        }
2603        else {
2604            update_event(next, 0);
2605            if (listen(next->sfd, 0) != 0) {
2606                perror("listen");
2607            }
2608        }
2609  }
2610}
2611
2612/*
2613 * Transmit the next chunk of data from our list of msgbuf structures.
2614 *
2615 * Returns:
2616 *   TRANSMIT_COMPLETE   All done writing.
2617 *   TRANSMIT_INCOMPLETE More data remaining to write.
2618 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2619 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2620 */
2621static int transmit(conn *c) {
2622    assert(c != NULL);
2623
2624    if (c->msgcurr < c->msgused &&
2625            c->msglist[c->msgcurr].msg_iovlen == 0) {
2626        /* Finished writing the current msg; advance to the next. */
2627        c->msgcurr++;
2628    }
2629    if (c->msgcurr < c->msgused) {
2630        ssize_t res;
2631        struct msghdr *m = &c->msglist[c->msgcurr];
2632
2633        res = sendmsg(c->sfd, m, 0);
2634        if (res > 0) {
2635            STATS_LOCK();
2636            stats.bytes_written += res;
2637            STATS_UNLOCK();
2638
2639            /* We've written some of the data. Remove the completed
2640               iovec entries from the list of pending writes. */
2641            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2642                res -= m->msg_iov->iov_len;
2643                m->msg_iovlen--;
2644                m->msg_iov++;
2645            }
2646
2647            /* Might have written just part of the last iovec entry;
2648               adjust it so the next write will do the rest. */
2649            if (res > 0) {
2650                m->msg_iov->iov_base += res;
2651                m->msg_iov->iov_len -= res;
2652            }
2653            return TRANSMIT_INCOMPLETE;
2654        }
2655        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2656            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2657                if (settings.verbose > 0)
2658                    fprintf(stderr, "Couldn't update event\n");
2659                conn_set_state(c, conn_closing);
2660                return TRANSMIT_HARD_ERROR;
2661            }
2662            return TRANSMIT_SOFT_ERROR;
2663        }
2664        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
2665           we have a real error, on which we close the connection */
2666        if (settings.verbose > 0)
2667            perror("Failed to write, and not due to blocking");
2668
2669        if (IS_UDP(c->protocol))
2670            conn_set_state(c, conn_read);
2671        else
2672            conn_set_state(c, conn_closing);
2673        return TRANSMIT_HARD_ERROR;
2674    } else {
2675        return TRANSMIT_COMPLETE;
2676    }
2677}
2678
2679static void drive_machine(conn *c) {
2680    bool stop = false;
2681    int sfd, flags = 1;
2682    socklen_t addrlen;
2683    struct sockaddr_storage addr;
2684    int res;
2685
2686    assert(c != NULL);
2687
2688    while (!stop) {
2689
2690        switch(c->state) {
2691        case conn_listening:
2692            addrlen = sizeof(addr);
2693            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2694                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2695                    /* these are transient, so don't log anything */
2696                    stop = true;
2697                } else if (errno == EMFILE) {
2698                    if (settings.verbose > 0)
2699                        fprintf(stderr, "Too many open connections\n");
2700                    accept_new_conns(false);
2701                    stop = true;
2702                } else {
2703                    perror("accept()");
2704                    stop = true;
2705                }
2706                break;
2707            }
2708            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2709                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2710                perror("setting O_NONBLOCK");
2711                close(sfd);
2712                break;
2713            }
2714
2715            dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
2716                                     DATA_BUFFER_SIZE, c->protocol);
2717            stop = true;
2718            break;
2719
2720        case conn_waiting:
2721            if (!update_event(c, EV_READ | EV_PERSIST)) {
2722                if (settings.verbose > 0)
2723                    fprintf(stderr, "Couldn't update event\n");
2724                conn_set_state(c, conn_closing);
2725                break;
2726            }
2727
2728            conn_set_state(c, conn_read);
2729            stop = true;
2730            break;
2731
2732        case conn_read:
2733            res = IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c);
2734            switch (res) {
2735            case 0 :
2736                conn_set_state(c, conn_waiting);
2737                break;
2738            case 1:
2739                conn_set_state(c, conn_parse_cmd);
2740                break;
2741            case -1:
2742                conn_set_state(c, conn_closing);
2743                break;
2744            case -2: /* Failed to allocate more memory */
2745                /* State already set by try_read_network */
2746                break;
2747            }
2748            break;
2749
2750        case conn_parse_cmd :
2751            if (try_read_command(c) == 0) {
2752                /* wee need more data! */
2753                conn_set_state(c, conn_waiting);
2754            }
2755
2756            break;
2757
2758        case conn_new_cmd:
2759            reset_cmd_handler(c);
2760            assoc_move_next_bucket();
2761            break;
2762
2763        case conn_nread:
2764            if (c->rlbytes == 0) {
2765                complete_nread(c);
2766                break;
2767            }
2768            /* first check if we have leftovers in the conn_read buffer */
2769            if (c->rbytes > 0) {
2770                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2771                memmove(c->ritem, c->rcurr, tocopy);
2772                c->ritem += tocopy;
2773                c->rlbytes -= tocopy;
2774                c->rcurr += tocopy;
2775                c->rbytes -= tocopy;
2776                if (c->rlbytes == 0) {
2777                    break;
2778                }
2779            }
2780
2781            /*  now try reading from the socket */
2782            res = read(c->sfd, c->ritem, c->rlbytes);
2783            if (res > 0) {
2784                STATS_LOCK();
2785                stats.bytes_read += res;
2786                STATS_UNLOCK();
2787                c->ritem += res;
2788                c->rlbytes -= res;
2789                break;
2790            }
2791            if (res == 0) { /* end of stream */
2792                conn_set_state(c, conn_closing);
2793                break;
2794            }
2795            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2796                if (!update_event(c, EV_READ | EV_PERSIST)) {
2797                    if (settings.verbose > 0)
2798                        fprintf(stderr, "Couldn't update event\n");
2799                    conn_set_state(c, conn_closing);
2800                    break;
2801                }
2802                stop = true;
2803                break;
2804            }
2805            /* otherwise we have a real error, on which we close the connection */
2806            if (settings.verbose > 0)
2807                fprintf(stderr, "Failed to read, and not due to blocking\n");
2808            conn_set_state(c, conn_closing);
2809            break;
2810
2811        case conn_swallow:
2812            /* we are reading sbytes and throwing them away */
2813            if (c->sbytes == 0) {
2814                conn_set_state(c, conn_new_cmd);
2815                break;
2816            }
2817
2818            /* first check if we have leftovers in the conn_read buffer */
2819            if (c->rbytes > 0) {
2820                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2821                c->sbytes -= tocopy;
2822                c->rcurr += tocopy;
2823                c->rbytes -= tocopy;
2824                break;
2825            }
2826
2827            /*  now try reading from the socket */
2828            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2829            if (res > 0) {
2830                STATS_LOCK();
2831                stats.bytes_read += res;
2832                STATS_UNLOCK();
2833                c->sbytes -= res;
2834                break;
2835            }
2836            if (res == 0) { /* end of stream */
2837                conn_set_state(c, conn_closing);
2838                break;
2839            }
2840            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2841                if (!update_event(c, EV_READ | EV_PERSIST)) {
2842                    if (settings.verbose > 0)
2843                        fprintf(stderr, "Couldn't update event\n");
2844                    conn_set_state(c, conn_closing);
2845                    break;
2846                }
2847                stop = true;
2848                break;
2849            }
2850            /* otherwise we have a real error, on which we close the connection */
2851            if (settings.verbose > 0)
2852                fprintf(stderr, "Failed to read, and not due to blocking\n");
2853            conn_set_state(c, conn_closing);
2854            break;
2855
2856        case conn_write:
2857            /*
2858             * We want to write out a simple response. If we haven't already,
2859             * assemble it into a msgbuf list (this will be a single-entry
2860             * list for TCP or a two-entry list for UDP).
2861             */
2862            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2863                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2864                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2865                    if (settings.verbose > 0)
2866                        fprintf(stderr, "Couldn't build response\n");
2867                    conn_set_state(c, conn_closing);
2868                    break;
2869                }
2870            }
2871
2872            /* fall through... */
2873
2874        case conn_mwrite:
2875            switch (transmit(c)) {
2876            case TRANSMIT_COMPLETE:
2877                if (c->state == conn_mwrite) {
2878                    while (c->ileft > 0) {
2879                        item *it = *(c->icurr);
2880                        assert((it->it_flags & ITEM_SLABBED) == 0);
2881                        item_remove(it);
2882                        c->icurr++;
2883                        c->ileft--;
2884                    }
2885                    while (c->suffixleft > 0) {
2886                        char *suffix = *(c->suffixcurr);
2887                        if(suffix_add_to_freelist(suffix)) {
2888                            /* Failed to add to freelist, don't leak */
2889                            free(suffix);
2890                        }
2891                        c->suffixcurr++;
2892                        c->suffixleft--;
2893                    }
2894                    /* XXX:  I don't know why this wasn't the general case */
2895                    if(c->protocol == binary_prot) {
2896                        conn_set_state(c, c->write_and_go);
2897                    } else {
2898                        conn_set_state(c, conn_new_cmd);
2899                    }
2900                } else if (c->state == conn_write) {
2901                    if (c->write_and_free) {
2902                        free(c->write_and_free);
2903                        c->write_and_free = 0;
2904                    }
2905                    conn_set_state(c, c->write_and_go);
2906                } else {
2907                    if (settings.verbose > 0)
2908                        fprintf(stderr, "Unexpected state %d\n", c->state);
2909                    conn_set_state(c, conn_closing);
2910                }
2911                break;
2912
2913            case TRANSMIT_INCOMPLETE:
2914            case TRANSMIT_HARD_ERROR:
2915                break;                   /* Continue in state machine. */
2916
2917            case TRANSMIT_SOFT_ERROR:
2918                stop = true;
2919                break;
2920            }
2921            break;
2922
2923        case conn_closing:
2924            if (IS_UDP(c->protocol))
2925                conn_cleanup(c);
2926            else
2927                conn_close(c);
2928            stop = true;
2929            break;
2930        }
2931    }
2932
2933    return;
2934}
2935
2936void event_handler(const int fd, const short which, void *arg) {
2937    conn *c;
2938
2939    c = (conn *)arg;
2940    assert(c != NULL);
2941
2942    c->which = which;
2943
2944    /* sanity */
2945    if (fd != c->sfd) {
2946        if (settings.verbose > 0)
2947            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2948        conn_close(c);
2949        return;
2950    }
2951
2952    drive_machine(c);
2953
2954    /* wait for next event */
2955    return;
2956}
2957
2958static int new_socket(struct addrinfo *ai) {
2959    int sfd;
2960    int flags;
2961
2962    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
2963        perror("socket()");
2964        return -1;
2965    }
2966