root/branches/binary/server/memcached.c @ 731

Revision 731, 111.4 kB (checked in by dsallings, 21 months ago)

Merge commit 'trunk' into lbinary r729 (server_socket refactoring)

Conflicts:

server/memcached.c

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25#include <ctype.h>
26
27/* some POSIX systems need the following definition
28 * to get mlockall flags out of sys/mman.h.  */
29#ifndef _P1003_1B_VISIBLE
30#define _P1003_1B_VISIBLE
31#endif
32/* need this to get IOV_MAX on some platforms. */
33#ifndef __need_IOV_MAX
34#define __need_IOV_MAX
35#endif
36#include <pwd.h>
37#include <sys/mman.h>
38#include <fcntl.h>
39#include <netinet/tcp.h>
40#include <arpa/inet.h>
41#include <errno.h>
42#include <stdlib.h>
43#include <stdio.h>
44#include <string.h>
45#include <time.h>
46#include <assert.h>
47#include <limits.h>
48
49#ifdef HAVE_MALLOC_H
50/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
51#ifndef __OpenBSD__
52#include <malloc.h>
53#endif
54#endif
55
56/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
57#ifndef IOV_MAX
58#if defined(__FreeBSD__) || defined(__APPLE__)
59# define IOV_MAX 1024
60#endif
61#endif
62
63/*
64 * forward declarations
65 */
66static void drive_machine(conn *c);
67static int new_socket(struct addrinfo *ai);
68static int server_socket(const int port, const int prot);
69static int try_read_command(conn *c);
70static int try_read_network(conn *c);
71static int try_read_udp(conn *c);
72static void conn_set_state(conn *, int);
73
74/* stats */
75static void stats_reset(void);
76static void stats_init(void);
77
78/* defaults */
79static void settings_init(void);
80
81/* event handling, network IO */
82static void event_handler(const int fd, const short which, void *arg);
83static void conn_close(conn *c);
84static void conn_init(void);
85static void accept_new_conns(const bool do_accept);
86static bool update_event(conn *c, const int new_flags);
87static void complete_nread(conn *c);
88static void process_command(conn *c, char *command);
89static int transmit(conn *c);
90static int ensure_iov_space(conn *c);
91static int add_iov(conn *c, const void *buf, int len);
92static int add_msghdr(conn *c);
93
94/* time handling */
95static void set_current_time(void);  /* update the global variable holding
96                              global 32-bit seconds-since-start time
97                              (to avoid 64 bit time_t) */
98
99static void conn_free(conn *c);
100
101/** exported globals **/
102struct stats stats;
103struct settings settings;
104
105/** file scope variables **/
106static item **todelete = NULL;
107static int delcurr;
108static int deltotal;
109static conn *listen_conn = NULL;
110static struct event_base *main_base;
111
112#define TRANSMIT_COMPLETE   0
113#define TRANSMIT_INCOMPLETE 1
114#define TRANSMIT_SOFT_ERROR 2
115#define TRANSMIT_HARD_ERROR 3
116
117static int *buckets = 0; /* bucket->generation array for a managed instance */
118
119#define REALTIME_MAXDELTA 60*60*24*30
120/*
121 * given time value that's either unix time or delta from current unix time, return
122 * unix time. Use the fact that delta can't exceed one month (and real time value can't
123 * be that low).
124 */
125static rel_time_t realtime(const time_t exptime) {
126    /* no. of seconds in 30 days - largest possible delta exptime */
127
128    if (exptime == 0) return 0; /* 0 means never expire */
129
130    if (exptime > REALTIME_MAXDELTA) {
131        /* if item expiration is at/before the server started, give it an
132           expiration time of 1 second after the server started.
133           (because 0 means don't expire).  without this, we'd
134           underflow and wrap around to some large value way in the
135           future, effectively making items expiring in the past
136           really expiring never */
137        if (exptime <= stats.started)
138            return (rel_time_t)1;
139        return (rel_time_t)(exptime - stats.started);
140    } else {
141        return (rel_time_t)(exptime + current_time);
142    }
143}
144
145static void stats_init(void) {
146    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
147    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
148    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
149
150    /* make the time we started always be 2 seconds before we really
151       did, so time(0) - time.started is never zero.  if so, things
152       like 'settings.oldest_live' which act as booleans as well as
153       values are now false in boolean context... */
154    stats.started = time(0) - 2;
155    stats_prefix_init();
156}
157
158static void stats_reset(void) {
159    STATS_LOCK();
160    stats.total_items = stats.total_conns = 0;
161    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
162    stats.bytes_read = stats.bytes_written = 0;
163    stats_prefix_clear();
164    STATS_UNLOCK();
165}
166
167static void settings_init(void) {
168    settings.access=0700;
169    settings.port = 11211;
170    settings.udpport = 0;
171    /* By default this string should be NULL for getaddrinfo() */
172    settings.inter = NULL;
173    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
174    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
175    settings.verbose = 0;
176    settings.oldest_live = 0;
177    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
178    settings.socketpath = NULL;       /* by default, not using a unix socket */
179    settings.managed = false;
180    settings.factor = 1.25;
181    settings.chunk_size = 48;         /* space for a modest key and value */
182#ifdef USE_THREADS
183    settings.num_threads = 4;
184#else
185    settings.num_threads = 1;
186#endif
187    settings.prefix_delimiter = ':';
188    settings.detail_enabled = 0;
189}
190
191/* returns true if a deleted item's delete-locked-time is over, and it
192   should be removed from the namespace */
193static bool item_delete_lock_over (item *it) {
194    assert(it->it_flags & ITEM_DELETED);
195    return (current_time >= it->exptime);
196}
197
198/*
199 * Adds a message header to a connection.
200 *
201 * Returns 0 on success, -1 on out-of-memory.
202 */
203static int add_msghdr(conn *c)
204{
205    struct msghdr *msg;
206
207    assert(c != NULL);
208
209    if (c->msgsize == c->msgused) {
210        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
211        if (! msg)
212            return -1;
213        c->msglist = msg;
214        c->msgsize *= 2;
215    }
216
217    msg = c->msglist + c->msgused;
218
219    /* this wipes msg_iovlen, msg_control, msg_controllen, and
220       msg_flags, the last 3 of which aren't defined on solaris: */
221    memset(msg, 0, sizeof(struct msghdr));
222
223    msg->msg_iov = &c->iov[c->iovused];
224
225    if (c->request_addr_size > 0) {
226        msg->msg_name = &c->request_addr;
227        msg->msg_namelen = c->request_addr_size;
228    }
229
230    c->msgbytes = 0;
231    c->msgused++;
232
233    if (IS_UDP(c->protocol)) {
234        /* Leave room for the UDP header, which we'll fill in later. */
235        return add_iov(c, NULL, UDP_HEADER_SIZE);
236    }
237
238    return 0;
239}
240
241
242/*
243 * Free list management for connections.
244 */
245
246static conn **freeconns;
247static int freetotal;
248static int freecurr;
249
250
251static void conn_init(void) {
252    freetotal = 200;
253    freecurr = 0;
254    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
255        fprintf(stderr, "malloc()\n");
256    }
257    return;
258}
259
260/*
261 * Returns a connection from the freelist, if any. Should call this using
262 * conn_from_freelist() for thread safety.
263 */
264conn *do_conn_from_freelist() {
265    conn *c;
266
267    if (freecurr > 0) {
268        c = freeconns[--freecurr];
269    } else {
270        c = NULL;
271    }
272
273    return c;
274}
275
276/*
277 * Adds a connection to the freelist. 0 = success. Should call this using
278 * conn_add_to_freelist() for thread safety.
279 */
280bool do_conn_add_to_freelist(conn *c) {
281    if (freecurr < freetotal) {
282        freeconns[freecurr++] = c;
283        return false;
284    } else {
285        /* try to enlarge free connections array */
286        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
287        if (new_freeconns) {
288            freetotal *= 2;
289            freeconns = new_freeconns;
290            freeconns[freecurr++] = c;
291            return false;
292        }
293    }
294    return true;
295}
296
297static char *prot_text(const int prot) {
298    char *rv="unknown";
299    switch(prot) {
300        case ascii_prot:
301            rv="ascii";
302            break;
303        case binary_prot:
304            rv="binary";
305            break;
306        case ascii_udp_prot:
307            rv="ascii-udp";
308            break;
309    }
310    return rv;
311}
312
313conn *conn_new(const int sfd, const int init_state, const int event_flags,
314                const int read_buffer_size, const int prot,
315                struct event_base *base) {
316    conn *c = conn_from_freelist();
317
318    if (NULL == c) {
319        if (!(c = (conn *)malloc(sizeof(conn)))) {
320            fprintf(stderr, "malloc()\n");
321            return NULL;
322        }
323        c->rbuf = c->wbuf = 0;
324        c->ilist = 0;
325        c->suffixlist = 0;
326        c->iov = 0;
327        c->msglist = 0;
328        c->hdrbuf = 0;
329
330        c->rsize = read_buffer_size;
331        c->wsize = DATA_BUFFER_SIZE;
332        c->isize = ITEM_LIST_INITIAL;
333        c->suffixsize = SUFFIX_LIST_INITIAL;
334        c->iovsize = IOV_LIST_INITIAL;
335        c->msgsize = MSG_LIST_INITIAL;
336        c->hdrsize = 0;
337
338        c->rbuf = (char *)malloc((size_t)c->rsize);
339        c->wbuf = (char *)malloc((size_t)c->wsize);
340        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
341        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
342        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
343        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
344
345        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
346                c->msglist == 0 || c->suffixlist == 0) {
347            if (c->rbuf != 0) free(c->rbuf);
348            if (c->wbuf != 0) free(c->wbuf);
349            if (c->ilist !=0) free(c->ilist);
350            if (c->suffixlist != 0) free(c->suffixlist);
351            if (c->iov != 0) free(c->iov);
352            if (c->msglist != 0) free(c->msglist);
353            free(c);
354            fprintf(stderr, "malloc()\n");
355            return NULL;
356        }
357
358        STATS_LOCK();
359        stats.conn_structs++;
360        STATS_UNLOCK();
361    }
362
363    /* unix socket mode doesn't need this, so zeroed out.  but why
364     * is this done for every command?  presumably for UDP
365     * mode.  */
366    if (!settings.socketpath) {
367        c->request_addr_size = sizeof(c->request_addr);
368    } else {
369        c->request_addr_size = 0;
370    }
371
372    if (settings.verbose > 1) {
373        if (init_state == conn_listening) {
374            fprintf(stderr, "<%d server listening (%s)\n", sfd,
375                prot_text(prot));
376        } else if (IS_UDP(prot)) {
377            fprintf(stderr, "<%d server listening (udp)\n", sfd);
378        } else if (prot == binary_prot) {
379            fprintf(stderr, "<%d new binary client connection\n", sfd);
380        } else if (prot == ascii_prot) {
381            fprintf(stderr, "<%d new ascii client connection\n", sfd);
382        } else {
383            fprintf(stderr, "<%d new unknown (%d) client connection\n",
384                sfd, prot);
385            abort();
386        }
387    }
388
389    c->sfd = sfd;
390    c->protocol = prot;
391    c->state = init_state;
392    c->rlbytes = 0;
393    c->cmd = -1;
394    c->rbytes = c->wbytes = 0;
395    c->wcurr = c->wbuf;
396    c->rcurr = c->rbuf;
397    c->ritem = 0;
398    c->icurr = c->ilist;
399    c->suffixcurr = c->suffixlist;
400    c->ileft = 0;
401    c->suffixleft = 0;
402    c->iovused = 0;
403    c->msgcurr = 0;
404    c->msgused = 0;
405
406    c->write_and_go = init_state;
407    c->write_and_free = 0;
408    c->item = 0;
409    c->bucket = -1;
410    c->gen = 0;
411
412    c->noreply = false;
413
414    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
415    event_base_set(base, &c->event);
416    c->ev_flags = event_flags;
417
418    if (event_add(&c->event, 0) == -1) {
419        if (conn_add_to_freelist(c)) {
420            conn_free(c);
421        }
422        perror("event_add");
423        return NULL;
424    }
425
426    STATS_LOCK();
427    stats.curr_conns++;
428    stats.total_conns++;
429    STATS_UNLOCK();
430
431    return c;
432}
433
434static void conn_cleanup(conn *c) {
435    assert(c != NULL);
436
437    if (c->item) {
438        item_remove(c->item);
439        c->item = 0;
440    }
441
442    if (c->ileft != 0) {
443        for (; c->ileft > 0; c->ileft--,c->icurr++) {
444            item_remove(*(c->icurr));
445        }
446    }
447
448    if (c->suffixleft != 0) {
449        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
450            if(suffix_add_to_freelist(*(c->suffixcurr))) {
451                free(*(c->suffixcurr));
452            }
453        }
454    }
455
456    if (c->write_and_free) {
457        free(c->write_and_free);
458        c->write_and_free = 0;
459    }
460}
461
462/*
463 * Frees a connection.
464 */
465void conn_free(conn *c) {
466    if (c) {
467        if (c->hdrbuf)
468            free(c->hdrbuf);
469        if (c->msglist)
470            free(c->msglist);
471        if (c->rbuf)
472            free(c->rbuf);
473        if (c->wbuf)
474            free(c->wbuf);
475        if (c->ilist)
476            free(c->ilist);
477        if (c->suffixlist)
478            free(c->suffixlist);
479        if (c->iov)
480            free(c->iov);
481        free(c);
482    }
483}
484
485static void conn_close(conn *c) {
486    assert(c != NULL);
487
488    /* delete the event, the socket and the conn */
489    event_del(&c->event);
490
491    if (settings.verbose > 1)
492        fprintf(stderr, "<%d connection closed.\n", c->sfd);
493
494    close(c->sfd);
495    accept_new_conns(true);
496    conn_cleanup(c);
497
498    /* if the connection has big buffers, just free it */
499    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
500        conn_free(c);
501    }
502
503    STATS_LOCK();
504    stats.curr_conns--;
505    STATS_UNLOCK();
506
507    return;
508}
509
510static int get_init_state(conn *c) {
511    int rv=0;
512    assert(c != NULL);
513
514    switch(c->protocol) {
515        case binary_prot:
516            rv=conn_bin_init;
517            break;
518        default:
519            rv=conn_read;
520    }
521    return rv;
522}
523
524/* Set the given connection to its initial state.  The initial state will vary
525 * base don protocol type. */
526static void conn_set_init_state(conn *c) {
527    assert(c != NULL);
528
529    conn_set_state(c, get_init_state(c));
530}
531
532/*
533 * Shrinks a connection's buffers if they're too big.  This prevents
534 * periodic large "get" requests from permanently chewing lots of server
535 * memory.
536 *
537 * This should only be called in between requests since it can wipe output
538 * buffers!
539 */
540static void conn_shrink(conn *c) {
541    assert(c != NULL);
542
543    if (IS_UDP(c->protocol))
544        return;
545
546    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
547        char *newbuf;
548
549        if (c->rcurr != c->rbuf)
550            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
551
552        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
553
554        if (newbuf) {
555            c->rbuf = newbuf;
556            c->rsize = DATA_BUFFER_SIZE;
557        }
558        /* TODO check other branch... */
559        c->rcurr = c->rbuf;
560    }
561
562    if (c->isize > ITEM_LIST_HIGHWAT) {
563        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
564        if (newbuf) {
565            c->ilist = newbuf;
566            c->isize = ITEM_LIST_INITIAL;
567        }
568    /* TODO check error condition? */
569    }
570
571    if (c->msgsize > MSG_LIST_HIGHWAT) {
572        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
573        if (newbuf) {
574            c->msglist = newbuf;
575            c->msgsize = MSG_LIST_INITIAL;
576        }
577    /* TODO check error condition? */
578    }
579
580    if (c->iovsize > IOV_LIST_HIGHWAT) {
581        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
582        if (newbuf) {
583            c->iov = newbuf;
584            c->iovsize = IOV_LIST_INITIAL;
585        }
586    /* TODO check return value */
587    }
588}
589
590/*
591 * Sets a connection's current state in the state machine. Any special
592 * processing that needs to happen on certain state transitions can
593 * happen here.
594 */
595static void conn_set_state(conn *c, int state) {
596    assert(c != NULL);
597
598    if (state != c->state) {
599        if (state == conn_read) {
600            conn_shrink(c);
601            assoc_move_next_bucket();
602        }
603        c->state = state;
604    }
605}
606
607/*
608 * Free list management for suffix buffers.
609 */
610
611static char **freesuffix;
612static int freesuffixtotal;
613static int freesuffixcurr;
614
615static void suffix_init(void) {
616    freesuffixtotal = 500;
617    freesuffixcurr  = 0;
618
619    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
620    if (freesuffix == NULL) {
621        fprintf(stderr, "malloc()\n");
622    }
623    return;
624}
625
626/*
627 * Returns a suffix buffer from the freelist, if any. Should call this using
628 * suffix_from_freelist() for thread safety.
629 */
630char *do_suffix_from_freelist() {
631    char *s;
632
633    if (freesuffixcurr > 0) {
634        s = freesuffix[--freesuffixcurr];
635    } else {
636        /* If malloc fails, let the logic fall through without spamming
637         * STDERR on the server. */
638        s = malloc( SUFFIX_SIZE );
639    }
640
641    return s;
642}
643
644/*
645 * Adds a connection to the freelist. 0 = success. Should call this using
646 * conn_add_to_freelist() for thread safety.
647 */
648bool do_suffix_add_to_freelist(char *s) {
649    if (freesuffixcurr < freesuffixtotal) {
650        freesuffix[freesuffixcurr++] = s;
651        return false;
652    } else {
653        /* try to enlarge free connections array */
654        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
655        if (new_freesuffix) {
656            freesuffixtotal *= 2;
657            freesuffix = new_freesuffix;
658            freesuffix[freesuffixcurr++] = s;
659            return false;
660        }
661    }
662    return true;
663}
664
665/*
666 * Ensures that there is room for another struct iovec in a connection's
667 * iov list.
668 *
669 * Returns 0 on success, -1 on out-of-memory.
670 */
671static int ensure_iov_space(conn *c) {
672    assert(c != NULL);
673
674    if (c->iovused >= c->iovsize) {
675        int i, iovnum;
676        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
677                                (c->iovsize * 2) * sizeof(struct iovec));
678        if (! new_iov)
679            return -1;
680        c->iov = new_iov;
681        c->iovsize *= 2;
682
683        /* Point all the msghdr structures at the new list. */
684        for (i = 0, iovnum = 0; i < c->msgused; i++) {
685            c->msglist[i].msg_iov = &c->iov[iovnum];
686            iovnum += c->msglist[i].msg_iovlen;
687        }
688    }
689
690    return 0;
691}
692
693
694/*
695 * Adds data to the list of pending data that will be written out to a
696 * connection.
697 *
698 * Returns 0 on success, -1 on out-of-memory.
699 */
700
701static int add_iov(conn *c, const void *buf, int len) {
702    struct msghdr *m;
703    int leftover;
704    bool limit_to_mtu;
705
706    assert(c != NULL);
707
708    do {
709        m = &c->msglist[c->msgused - 1];
710
711        /*
712         * Limit UDP packets, and the first payloads of TCP replies, to
713         * UDP_MAX_PAYLOAD_SIZE bytes.
714         */
715        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
716
717        /* We may need to start a new msghdr if this one is full. */
718        if (m->msg_iovlen == IOV_MAX ||
719            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
720            add_msghdr(c);
721            m = &c->msglist[c->msgused - 1];
722        }
723
724        if (ensure_iov_space(c) != 0)
725            return -1;
726
727        /* If the fragment is too big to fit in the datagram, split it up */
728        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
729            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
730            len -= leftover;
731        } else {
732            leftover = 0;
733        }
734
735        m = &c->msglist[c->msgused - 1];
736        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
737        m->msg_iov[m->msg_iovlen].iov_len = len;
738
739        c->msgbytes += len;
740        c->iovused++;
741        m->msg_iovlen++;
742
743        buf = ((char *)buf) + len;
744        len = leftover;
745    } while (leftover > 0);
746
747    return 0;
748}
749
750
751/*
752 * Constructs a set of UDP headers and attaches them to the outgoing messages.
753 */
754static int build_udp_headers(conn *c) {
755    int i;
756    unsigned char *hdr;
757
758    assert(c != NULL);
759
760    if (c->msgused > c->hdrsize) {
761        void *new_hdrbuf;
762        if (c->hdrbuf)
763            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
764        else
765            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
766        if (! new_hdrbuf)
767            return -1;
768        c->hdrbuf = (unsigned char *)new_hdrbuf;
769        c->hdrsize = c->msgused * 2;
770    }
771
772    hdr = c->hdrbuf;
773    for (i = 0; i < c->msgused; i++) {
774        c->msglist[i].msg_iov[0].iov_base = hdr;
775        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
776        *hdr++ = c->request_id / 256;
777        *hdr++ = c->request_id % 256;
778        *hdr++ = i / 256;
779        *hdr++ = i % 256;
780        *hdr++ = c->msgused / 256;
781        *hdr++ = c->msgused % 256;
782        *hdr++ = 0;
783        *hdr++ = 0;
784        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
785    }
786
787    return 0;
788}
789
790
791static void out_string(conn *c, const char *str) {
792    size_t len;
793
794    assert(c != NULL);
795
796    if (c->noreply) {
797        if (settings.verbose > 1)
798            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
799        c->noreply = false;
800        conn_set_state(c, conn_read);
801        return;
802    }
803
804    if (settings.verbose > 1)
805        fprintf(stderr, ">%d %s\n", c->sfd, str);
806
807    len = strlen(str);
808    if ((len + 2) > c->wsize) {
809        /* ought to be always enough. just fail for simplicity */
810        str = "SERVER_ERROR output line too long";
811        len = strlen(str);
812    }
813
814    memcpy(c->wbuf, str, len);
815    memcpy(c->wbuf + len, "\r\n", 3);
816    c->wbytes = len + 2;
817    c->wcurr = c->wbuf;
818
819    conn_set_state(c, conn_write);
820    c->write_and_go = get_init_state(c);
821    return;
822}
823
824/*
825 * we get here after reading the value in set/add/replace commands. The command
826 * has been stored in c->item_comm, and the item is ready in c->item.
827 */
828static void complete_nread_ascii(conn *c) {
829    assert(c != NULL);
830
831    item *it = c->item;
832    int comm = c->item_comm;
833    int ret;
834
835    STATS_LOCK();
836    stats.set_cmds++;
837    STATS_UNLOCK();
838
839    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
840        out_string(c, "CLIENT_ERROR bad data chunk");
841    } else {
842      ret = store_item(it, comm);
843      if (ret == 1)
844          out_string(c, "STORED");
845      else if(ret == 2)
846          out_string(c, "EXISTS");
847      else if(ret == 3)
848          out_string(c, "NOT_FOUND");
849      else
850          out_string(c, "NOT_STORED");
851    }
852
853    item_remove(c->item);       /* release the c->item reference */
854    c->item = 0;
855}
856
857static void add_bin_header(conn *c, int err, int hdr_len, int body_len) {
858    int i=0;
859    uint32_t res_header[BIN_PKT_HDR_WORDS];
860
861    assert(c);
862    assert(body_len >= 0);
863
864    c->msgcurr = 0;
865    c->msgused = 0;
866    c->iovused = 0;
867    if (add_msghdr(c) != 0) {
868        /* XXX:  out_string is inappropriate here */
869        out_string(c, "SERVER_ERROR out of memory");
870        return;
871    }
872
873    res_header[0] = BIN_RES_MAGIC << 24;
874    res_header[0] |= ((0xff & c->cmd) << 16);
875    res_header[0] |= err & 0xffff;
876
877    res_header[1] = hdr_len << 24;
878    /* TODO:  Support datatype */
879    res_header[2] = body_len;
880    res_header[3] = c->opaque;
881
882    if(settings.verbose > 1) {
883        fprintf(stderr, "Writing bin response:  %08x %08x %08x %08x\n",
884            res_header[0], res_header[1], res_header[2], res_header[3]);
885    }
886
887    for(i=0; i<BIN_PKT_HDR_WORDS; i++) {
888        res_header[i] = htonl(res_header[i]);
889    }
890
891    assert(c->wsize >= MIN_BIN_PKT_LENGTH);
892    memcpy(c->wbuf, &res_header, MIN_BIN_PKT_LENGTH);
893    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH);
894}
895
896static void write_bin_error(conn *c, int err, int swallow) {
897    char *errstr="Unknown error";
898    switch(err) {
899        case ERR_UNKNOWN_CMD:
900            errstr="Unknown command";
901            break;
902        case ERR_NOT_FOUND:
903            errstr="Not found";
904            break;
905        case ERR_INVALID_ARGUMENTS:
906            errstr="Invalid arguments";
907            break;
908        case ERR_EXISTS:
909            errstr="Data exists for key.";
910            break;
911        case ERR_TOO_LARGE:
912            errstr="Too large.";
913            break;
914        case ERR_NOT_STORED:
915            errstr="Not stored.";
916            break;
917        default:
918            errstr="UNHANDLED ERROR";
919            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err);
920    }
921    if(settings.verbose > 0) {
922        fprintf(stderr, "Writing an error:  %s\n", errstr);
923    }
924    add_bin_header(c, err, 0, strlen(errstr));
925    add_iov(c, errstr, strlen(errstr));
926
927    conn_set_state(c, conn_mwrite);
928    if(swallow > 0) {
929        c->sbytes=swallow;
930        c->write_and_go = conn_swallow;
931    } else {
932        c->write_and_go = conn_bin_init;
933    }
934}
935
936/* Form and send a response to a command over the binary protocol */
937static void write_bin_response(conn *c, void *d, int hlen, int dlen) {
938    add_bin_header(c, 0, hlen, dlen);
939    if(dlen > 0) {
940        add_iov(c, d, dlen);
941    }
942    conn_set_state(c, conn_mwrite);
943    c->write_and_go = conn_bin_init;
944}
945
946/* Byte swap a 64-bit number */
947static int64_t swap64(int64_t in) {
948#ifdef ENDIAN_LITTLE
949    /* Little endian, flip the bytes around until someone makes a faster/better
950    * way to do this. */
951    int64_t rv=0;
952    int i=0;
953     for(i=0; i<8; i++) {
954        rv = (rv << 8) | (in & 0xff);
955        in >>= 8;
956     }
957    return rv;
958#else
959    /* big-endian machines don't need byte swapping */
960    return in;
961#endif
962}
963
964static void complete_incr_bin(conn *c) {
965    item *it;
966    int64_t delta;
967    uint64_t initial;
968    int32_t exptime;
969    char *key;
970    size_t nkey;
971    int i;
972    uint64_t *responseBuf = (uint64_t*) c->wbuf + BIN_INCR_HDR_LEN;
973
974    assert(c != NULL);
975
976    key=c->rbuf + BIN_INCR_HDR_LEN;
977    nkey=c->keylen;
978    key[nkey]=0x00;
979
980    delta = swap64(*((int64_t*)(c->rbuf)));
981    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
982    exptime = ntohl(*((int*)(c->rbuf + 16)));
983
984    if(settings.verbose) {
985        fprintf(stderr, "incr ");
986        for(i=0; i<nkey; i++) {
987            fprintf(stderr, "%c", key[i]);
988        }
989        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime);
990    }
991
992    /* XXX:  Not sure what to do with these yet
993    if (settings.managed) {
994        int bucket = c->bucket;
995        if (bucket == -1) {
996            out_string(c, "CLIENT_ERROR no BG data in managed mode");
997            return;
998        }
999        c->bucket = -1;
1000        if (buckets[bucket] != c->gen) {
1001            out_string(c, "ERROR_NOT_OWNER");
1002            return;
1003        }
1004    }
1005    */
1006
1007    it = item_get(key, nkey);
1008    if (it) {
1009        /* Weird magic in add_delta forces me to pad here */
1010        char tmpbuf[INCR_MAX_STORAGE_LEN];
1011        uint64_t l=0;
1012        memset(tmpbuf, ' ', INCR_MAX_STORAGE_LEN);
1013        tmpbuf[INCR_MAX_STORAGE_LEN]=0x00;
1014        add_delta(it, c->cmd == CMD_INCR, delta, tmpbuf);
1015        *responseBuf=swap64(strtoull(tmpbuf, NULL, 10));
1016
1017        write_bin_response(c, responseBuf, BIN_INCR_HDR_LEN, INCR_RES_LEN);
1018        item_remove(it);         /* release our reference */
1019    } else {
1020        if(exptime >= 0) {
1021            /* Save some room for the response */
1022            assert(c->wsize > BIN_INCR_HDR_LEN + BIN_DEL_HDR_LEN);
1023            *responseBuf=swap64(initial);
1024            it = item_alloc(key, nkey, 0, realtime(exptime),
1025                INCR_MAX_STORAGE_LEN);
1026            snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu", initial);
1027
1028            if(store_item(it, NREAD_SET)) {
1029                write_bin_response(c, responseBuf, BIN_INCR_HDR_LEN,
1030                    INCR_RES_LEN);
1031            } else {
1032                write_bin_error(c, ERR_NOT_STORED, 0);
1033            }
1034            item_remove(it);         /* release our reference */
1035        } else {
1036            write_bin_error(c, ERR_NOT_FOUND, 0);
1037        }
1038    }
1039}
1040
1041static void complete_update_bin(conn *c) {
1042    int eno=-1, ret=0;
1043    assert(c != NULL);
1044
1045    item *it = c->item;
1046
1047    STATS_LOCK();
1048    stats.set_cmds++;
1049    STATS_UNLOCK();
1050
1051    /* We don't actually receive the trailing two characters in the bin
1052     * protocol, so we're going to just set them here */
1053    *(ITEM_data(it) + it->nbytes - 2) = '\r';
1054    *(ITEM_data(it) + it->nbytes - 1) = '\n';
1055
1056    switch (store_item(it, c->item_comm)) {
1057        case 1:
1058            /* Stored */
1059            write_bin_response(c, NULL, BIN_SET_HDR_LEN, 0);
1060            break;
1061        case 2:
1062            write_bin_error(c, ERR_EXISTS, 0);
1063            break;
1064        case 3:
1065            write_bin_error(c, ERR_NOT_FOUND, 0);
1066            break;
1067        default:
1068            if(c->item_comm == NREAD_ADD) {
1069                eno=ERR_EXISTS;
1070            } else if(c->item_comm == NREAD_REPLACE) {
1071                eno=ERR_NOT_FOUND;
1072            } else {
1073                eno=ERR_NOT_STORED;
1074            }
1075            write_bin_error(c, eno, 0);
1076    }
1077
1078    item_remove(c->item);       /* release the c->item reference */
1079    c->item = 0;
1080}
1081
1082static void process_bin_get(conn *c) {
1083    item *it;
1084
1085    it = item_get(c->rbuf, c->keylen);
1086    if (it) {
1087        int *flags;
1088        uint64_t* identifier;
1089
1090        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4);
1091
1092        /* This is a bit of magic.  I'm using wbuf as the header, so I'll place
1093        this is int in far enough to cover the header */
1094        flags=(int*)(c->wbuf + MIN_BIN_PKT_LENGTH);
1095        *flags=htonl(strtoul(ITEM_suffix(it), NULL, 10));
1096
1097        /* the length has two unnecessary bytes, and then we write four more */
1098        add_bin_header(c, 0, GET_RES_HDR_LEN, it->nbytes - 2 + GET_RES_HDR_LEN);
1099        /* Flags */
1100        add_iov(c, flags, 4);
1101        identifier=(uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH + 4);
1102        *identifier=swap64((uint32_t)it->cas_id);
1103        add_iov(c, identifier, 8);
1104        /* bytes minus the CRLF */
1105        add_iov(c, ITEM_data(it), it->nbytes - 2);
1106        conn_set_state(c, conn_mwrite);
1107    } else {
1108        if(c->cmd == CMD_GETQ) {
1109            conn_set_state(c, conn_bin_init);
1110        } else {
1111            write_bin_error(c, ERR_NOT_FOUND, 0);
1112        }
1113    }
1114}
1115
1116static void bin_read_key(conn *c, int next_substate, int extra) {
1117    assert(c);
1118    c->substate = next_substate;
1119    c->rlbytes = c->keylen + extra;
1120    assert(c->rsize >= c->rlbytes);
1121    c->ritem = c->rbuf;
1122    conn_set_state(c, conn_nread);
1123}
1124
1125static void dispatch_bin_command(conn *c) {
1126    time_t exptime = 0;
1127    switch(c->cmd) {
1128        case CMD_VERSION:
1129            write_bin_response(c, VERSION, 0, strlen(VERSION));
1130            break;
1131        case CMD_FLUSH:
1132            set_current_time();
1133
1134            settings.oldest_live = current_time - 1;
1135            item_flush_expired();
1136            write_bin_response(c, NULL, 0, 0);
1137            break;
1138        case CMD_NOOP:
1139            write_bin_response(c, NULL, 0, 0);
1140            break;
1141        case CMD_SET:
1142            /* Fallthrough */
1143        case CMD_ADD:
1144            /* Fallthrough */
1145        case CMD_REPLACE:
1146            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN);
1147            break;
1148        case CMD_GETQ:
1149        case CMD_GET:
1150            bin_read_key(c, bin_reading_get_key, 0);
1151            break;
1152        case CMD_DELETE:
1153            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN);
1154            break;
1155        case CMD_INCR:
1156        case CMD_DECR:
1157            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN);
1158            break;
1159        default:
1160            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]);
1161    }
1162}
1163
1164static void process_bin_update(conn *c) {
1165    char *key;
1166    int nkey;
1167    int vlen;
1168    int flags;
1169    int exptime;
1170    item *it;
1171    int comm;
1172    int hdrlen=BIN_SET_HDR_LEN;
1173
1174    assert(c != NULL);
1175
1176    key=c->rbuf + hdrlen;
1177    nkey=c->keylen;
1178    key[nkey]=0x00;
1179
1180    flags = ntohl(*((int*)(c->rbuf)));
1181    exptime = ntohl(*((int*)(c->rbuf + 4)));
1182    vlen = c->bin_header[2] - (nkey + hdrlen);
1183
1184    if(settings.verbose) {
1185        fprintf(stderr, "Value len is %d\n", vlen);
1186    }
1187
1188    if (settings.detail_enabled) {
1189        stats_prefix_record_set(key);
1190    }
1191
1192    /* Not sure what to do with this.
1193    if (settings.managed) {
1194        int bucket = c->bucket;
1195        if (bucket == -1) {
1196            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1197            return;
1198        }
1199        c->bucket = -1;
1200        if (buckets[bucket] != c->gen) {
1201            out_string(c, "ERROR_NOT_OWNER");
1202            return;
1203        }
1204    }
1205    */
1206
1207    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1208
1209    if (it == 0) {
1210        if (! item_size_ok(nkey, flags, vlen + 2)) {
1211            write_bin_error(c, ERR_TOO_LARGE, vlen);
1212        } else {
1213            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen);
1214        }
1215        /* swallow the data line */
1216        c->write_and_go = conn_swallow;
1217        return;
1218    }
1219
1220    it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8)));
1221
1222    switch(c->cmd) {
1223        case CMD_ADD:
1224            c->item_comm = NREAD_ADD;
1225            break;
1226        case CMD_SET:
1227            c->item_comm = NREAD_SET;
1228            break;
1229        case CMD_REPLACE:
1230            c->item_comm = NREAD_REPLACE;
1231            break;
1232        default:
1233            assert(0);
1234    }
1235
1236    if(it->cas_id != 0) {
1237        c->item_comm = NREAD_CAS;
1238    }
1239
1240    c->item = it;
1241    c->ritem = ITEM_data(it);
1242    c->rlbytes = vlen;
1243    conn_set_state(c, conn_nread);
1244    c->substate = bin_read_set_value;
1245}
1246
1247static void process_bin_delete(conn *c) {
1248    char *key;
1249    size_t nkey;
1250    item *it;
1251    time_t exptime = 0;
1252
1253    assert(c != NULL);
1254
1255    /* XXX:  I don't know what to do with this yet
1256    if (settings.managed) {
1257        int bucket = c->bucket;
1258        if (bucket == -1) {
1259            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1260            return;
1261        }
1262        c->bucket = -1;
1263        if (buckets[bucket] != c->gen) {
1264            out_string(c, "ERROR_NOT_OWNER");
1265            return;
1266        }
1267    }
1268    */
1269
1270    exptime = ntohl(*((int*)(c->rbuf)));
1271    key = c->rbuf + 4;
1272    nkey = c->keylen;
1273    key[nkey]=0x00;
1274
1275    if(settings.verbose) {
1276        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime);
1277    }
1278
1279    if (settings.detail_enabled) {
1280        stats_prefix_record_delete(key);
1281    }
1282
1283    it = item_get(key, nkey);
1284    if (it) {
1285        if (exptime == 0) {
1286            item_unlink(it);
1287            item_remove(it);      /* release our reference */
1288            write_bin_response(c, NULL, 0, 0);
1289        } else {
1290            /* XXX:  This is really lame, but defer_delete returns a string */
1291            char *res=defer_delete(it, exptime);
1292            if(res[0] == 'D') {
1293                write_bin_response(c, NULL, 0, 0);
1294            } else {
1295                write_bin_error(c, ERR_OUT_OF_MEMORY, 0);
1296            }
1297        }
1298    } else {
1299        write_bin_error(c, ERR_NOT_FOUND, 0);
1300    }
1301}
1302
1303static void complete_nread_binary(conn *c) {
1304    assert(c != NULL);
1305
1306    if(c->cmd < 0) {
1307        /* No command defined.  Figure out what they're trying to say. */
1308        int i=0;
1309        /* I did a bit of hard-coding around the packet sizes */
1310        assert(BIN_PKT_HDR_WORDS == 3);
1311        for(i=0; i<BIN_PKT_HDR_WORDS; i++) {
1312            c->bin_header[i] = ntohl(c->bin_header[i]);
1313        }
1314        if(settings.verbose) {
1315            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x %08x\n",
1316                c->bin_header[0], c->bin_header[1], c->bin_header[2],
1317                c->bin_header[3]);
1318        }
1319        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) {
1320            if(settings.verbose) {
1321                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24);
1322            }
1323            conn_set_state(c, conn_closing);
1324            return;
1325        }
1326   
1327        c->msgcurr = 0;
1328        c->msgused = 0;
1329        c->iovused = 0;
1330        if (add_msghdr(c) != 0) {
1331            out_string(c, "SERVER_ERROR out of memory");
1332            return;
1333        }
1334   
1335        c->cmd = (c->bin_header[0] >> 16) & 0xff;
1336        c->keylen = c->bin_header[0] & 0xffff;
1337        c->opaque = c->bin_header[3];
1338        if(settings.verbose > 1) {
1339            fprintf(stderr,
1340                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd,
1341                c->opaque, c->keylen, c->bin_header[2]);
1342        }
1343        dispatch_bin_command(c);
1344    } else {
1345        switch(c->substate) {
1346            case bin_reading_set_header:
1347                process_bin_update(c);
1348                break;
1349            case bin_read_set_value:
1350                complete_update_bin(c);
1351                break;
1352            case bin_reading_get_key:
1353                process_bin_get(c);
1354                break;
1355            case bin_reading_del_header:
1356                process_bin_delete(c);
1357                break;
1358            case bin_reading_incr_header:
1359                complete_incr_bin(c);
1360                break;
1361            default:
1362                fprintf(stderr, "Not handling substate %d\n", c->substate);
1363                assert(0);
1364        }
1365    }
1366}
1367
1368static void complete_nread(conn *c) {
1369    assert(c != NULL);
1370
1371    if(c->protocol == ascii_prot) {
1372        complete_nread_ascii(c);
1373    } else if(c->protocol == binary_prot) {
1374        complete_nread_binary(c);
1375    } else {
1376        assert(0); /* XXX:  Invalid case.  Should probably do more here. */
1377    }
1378}
1379
1380/*
1381 * Stores an item in the cache according to the semantics of one of the set
1382 * commands. In threaded mode, this is protected by the cache lock.
1383 *
1384 * Returns true if the item was stored.
1385 */
1386int do_store_item(item *it, int comm) {
1387    char *key = ITEM_key(it);
1388    bool delete_locked = false;
1389    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
1390    int stored = 0;
1391
1392    item *new_it = NULL;
1393    int flags;
1394
1395    if (old_it != NULL && comm == NREAD_ADD) {
1396        /* add only adds a nonexistent item, but promote to head of LRU */
1397        do_item_update(old_it);
1398    } else if (!old_it && (comm == NREAD_REPLACE
1399        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1400    {
1401        /* replace only replaces an existing value; don't store */
1402    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
1403        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
1404    {
1405        /* replace and add can't override delete locks; don't store */
1406    } else if (comm == NREAD_CAS) {
1407        /* validate cas operation */
1408        if (delete_locked)
1409            old_it = do_item_get_nocheck(key, it->nkey);
1410
1411        if(old_it == NULL) {
1412          // LRU expired
1413          stored = 3;
1414        }
1415        else if(it->cas_id == old_it->cas_id) {
1416          // cas validates
1417          do_item_replace(old_it, it);
1418          stored = 1;
1419        } else {
1420          if(settings.verbose > 1) {
1421            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
1422                old_it->cas_id, it->cas_id);
1423          }
1424          stored = 2;
1425        }
1426    } else {
1427        /*
1428         * Append - combine new and old record into single one. Here it's
1429         * atomic and thread-safe.
1430         */
1431
1432        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
1433
1434            /* we have it and old_it here - alloc memory to hold both */
1435            /* flags was already lost - so recover them from ITEM_suffix(it) */
1436
1437            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
1438
1439            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1440
1441            if (new_it == NULL) {
1442                /* SERVER_ERROR out of memory */
1443                return 0;
1444            }
1445
1446            /* copy data from it and old_it to new_it */
1447
1448            if (comm == NREAD_APPEND) {
1449                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1450                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
1451            } else {
1452                /* NREAD_PREPEND */
1453                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
1454                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1455            }
1456
1457            it = new_it;
1458        }
1459
1460        /* "set" commands can override the delete lock
1461           window... in which case we have to find the old hidden item
1462           that's in the namespace/LRU but wasn't returned by
1463           item_get.... because we need to replace it */
1464        if (delete_locked)
1465            old_it = do_item_get_nocheck(key, it->nkey);
1466
1467        if (old_it != NULL)
1468            do_item_replace(old_it, it);
1469        else
1470            do_item_link(it);
1471
1472        stored = 1;
1473    }
1474
1475    if (old_it != NULL)
1476        do_item_remove(old_it);         /* release our reference */
1477    if (new_it != NULL)
1478        do_item_remove(new_it);
1479
1480    return stored;
1481}
1482
1483typedef struct token_s {
1484    char *value;
1485    size_t length;
1486} token_t;
1487
1488#define COMMAND_TOKEN 0
1489#define SUBCOMMAND_TOKEN 1
1490#define KEY_TOKEN 1
1491#define KEY_MAX_LENGTH 250
1492
1493#define MAX_TOKENS 8
1494
1495/*
1496 * Tokenize the command string by replacing whitespace with '\0' and update
1497 * the token array tokens with pointer to start of each token and length.
1498 * Returns total number of tokens.  The last valid token is the terminal
1499 * token (value points to the first unprocessed character of the string and
1500 * length zero).
1501 *
1502 * Usage example:
1503 *
1504 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1505 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1506 *          ...
1507 *      }
1508 *      ncommand = tokens[ix].value - command;
1509 *      command  = tokens[ix].value;
1510 *   }
1511 */
1512static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
1513    char *s, *e;
1514    size_t ntokens = 0;
1515
1516    assert(command != NULL && tokens != NULL && max_tokens > 1);
1517
1518    for (s = e = command; ntokens < max_tokens - 1; ++e) {
1519        if (*e == ' ') {
1520            if (s != e) {
1521                tokens[ntokens].value = s;
1522                tokens[ntokens].length = e - s;
1523                ntokens++;
1524                *e = '\0';
1525            }
1526            s = e + 1;
1527        }
1528        else if (*e == '\0') {
1529            if (s != e) {
1530                tokens[ntokens].value = s;
1531                tokens[ntokens].length = e - s;
1532                ntokens++;
1533            }
1534
1535            break; /* string end */
1536        }
1537    }
1538
1539    /*
1540     * If we scanned the whole string, the terminal value pointer is null,
1541     * otherwise it is the first unprocessed character.
1542     */
1543    tokens[ntokens].value =  *e == '\0' ? NULL : e;
1544    tokens[ntokens].length = 0;
1545    ntokens++;
1546
1547    return ntokens;
1548}
1549
1550/* set up a connection to write a buffer then free it, used for stats */
1551static void write_and_free(conn *c, char *buf, int bytes) {
1552    if (buf) {
1553        c->write_and_free = buf;
1554        c->wcurr = buf;
1555        c->wbytes = bytes;
1556        conn_set_state(c, conn_write);
1557        c->write_and_go = get_init_state(c);
1558    } else {
1559        out_string(c, "SERVER_ERROR out of memory");
1560    }
1561}
1562
1563static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
1564{
1565    int noreply_index = ntokens - 2;
1566
1567    /*
1568      NOTE: this function is not the first place where we are going to
1569      send the reply.  We could send it instead from process_command()
1570      if the request line has wrong number of tokens.  However parsing
1571      malformed line for "noreply" option is not reliable anyway, so
1572      it can't be helped.
1573    */
1574    if (tokens[noreply_index].value
1575        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
1576        c->noreply = true;
1577    }
1578}
1579
1580inline static void process_stats_detail(conn *c, const char *command) {
1581    assert(c != NULL);
1582
1583    if (strcmp(command, "on") == 0) {
1584        settings.detail_enabled = 1;
1585        out_string(c, "OK");
1586    }
1587    else if (strcmp(command, "off") == 0) {
1588        settings.detail_enabled = 0;
1589        out_string(c, "OK");
1590    }
1591    else if (strcmp(command, "dump") == 0) {
1592        int len;
1593        char *stats = stats_prefix_dump(&len);
1594        write_and_free(c, stats, len);
1595    }
1596    else {
1597        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1598    }
1599}
1600
1601static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1602    rel_time_t now = current_time;
1603    char *command;
1604    char *subcommand;
1605
1606    assert(c != NULL);
1607
1608    if(ntokens < 2) {
1609        out_string(c, "CLIENT_ERROR bad command line");
1610        return;
1611    }
1612
1613    command = tokens[COMMAND_TOKEN].value;
1614
1615    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1616        char temp[1024];
1617        pid_t pid = getpid();
1618        char *pos = temp;
1619
1620#ifndef WIN32
1621        struct rusage usage;
1622        getrusage(RUSAGE_SELF, &usage);
1623#endif /* !WIN32 */
1624
1625        STATS_LOCK();
1626        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1627        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1628        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1629        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1630        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1631#ifndef WIN32
1632        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1633        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1634#endif /* !WIN32 */
1635        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1636        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1637        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1638        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1639        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1640        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1641        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1642        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1643        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1644        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1645        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1646        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1647        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1648        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1649        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1650        pos += sprintf(pos, "END");
1651        STATS_UNLOCK();
1652        out_string(c, temp);
1653        return;
1654    }
1655
1656    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1657
1658    if (strcmp(subcommand, "reset") == 0) {
1659        stats_reset();
1660        out_string(c, "RESET");
1661        return;
1662    }
1663
1664#ifdef HAVE_MALLOC_H
1665#ifdef HAVE_STRUCT_MALLINFO
1666    if (strcmp(subcommand, "malloc") == 0) {
1667        char temp[512];
1668        struct mallinfo info;
1669        char *pos = temp;
1670
1671        info = mallinfo();
1672        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1673        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1674        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1675        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1676        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1677        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1678        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1679        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1680        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1681        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1682        out_string(c, temp);
1683        return;
1684    }
1685#endif /* HAVE_STRUCT_MALLINFO */
1686#endif /* HAVE_MALLOC_H */
1687
1688#if !defined(WIN32) || !defined(__APPLE__)
1689    if (strcmp(subcommand, "maps") == 0) {
1690        char *wbuf;
1691        int wsize = 8192; /* should be enough */
1692        int fd;
1693        int res;
1694
1695        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1696            out_string(c, "SERVER_ERROR out of memory");
1697            return;
1698        }
1699
1700        fd = open("/proc/self/maps", O_RDONLY);
1701        if (fd == -1) {
1702            out_string(c, "SERVER_ERROR cannot open the maps file");
1703            free(wbuf);
1704            return;
1705        }
1706
1707        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1708        if (res == wsize - 6) {
1709            out_string(c, "SERVER_ERROR buffer overflow");
1710            free(wbuf); close(fd);
1711            return;
1712        }
1713        if (res == 0 || res == -1) {
1714            out_string(c, "SERVER_ERROR can't read the maps file");
1715            free(wbuf); close(fd);
1716            return;
1717        }
1718        memcpy(wbuf + res, "END\r\n", 5);
1719        write_and_free(c, wbuf, res + 5);
1720        close(fd);
1721        return;
1722    }
1723#endif
1724
1725    if (strcmp(subcommand, "cachedump") == 0) {
1726
1727        char *buf;
1728        unsigned int bytes, id, limit = 0;
1729
1730        if(ntokens < 5) {
1731            out_string(c, "CLIENT_ERROR bad command line");
1732            return;
1733        }
1734
1735        id = strtoul(tokens[2].value, NULL, 10);
1736        limit = strtoul(tokens[3].value, NULL, 10);
1737
1738        if(errno == ERANGE) {
1739            out_string(c, "CLIENT_ERROR bad command line format");
1740            return;
1741        }
1742
1743        buf = item_cachedump(id, limit, &bytes);
1744        write_and_free(c, buf, bytes);
1745        return;
1746    }
1747
1748    if (strcmp(subcommand, "slabs") == 0) {
1749        int bytes = 0;
1750        char *buf = slabs_stats(&bytes);
1751        write_and_free(c, buf, bytes);
1752        return;
1753    }
1754
1755    if (strcmp(subcommand, "items") == 0) {
1756        int bytes = 0;
1757        char *buf = item_stats(&bytes);
1758        write_and_free(c, buf, bytes);
1759        return;
1760    }
1761
1762    if (strcmp(subcommand, "detail") == 0) {
1763        if (ntokens < 4)
1764            process_stats_detail(c, "");  /* outputs the error message */
1765        else
1766            process_stats_detail(c, tokens[2].value);
1767        return;
1768    }
1769
1770    if (strcmp(subcommand, "sizes") == 0) {
1771        int bytes = 0;
1772        char *buf = item_stats_sizes(&bytes);
1773        write_and_free(c, buf, bytes);
1774        return;
1775    }
1776
1777    out_string(c, "ERROR");
1778}
1779
1780/* ntokens is overwritten here... shrug.. */
1781static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1782    char *key;
1783    size_t nkey;
1784    int i = 0;
1785    item *it;
1786    token_t *key_token = &tokens[KEY_TOKEN];
1787    char *suffix;
1788    int stats_get_cmds   = 0;
1789    int stats_get_hits   = 0;
1790    int stats_get_misses = 0;
1791    assert(c != NULL);
1792
1793    if (settings.managed) {
1794        int bucket = c->bucket;
1795        if (bucket == -1) {
1796            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1797            return;
1798        }
1799        c->bucket = -1;
1800        if (buckets[bucket] != c->gen) {
1801            out_string(c, "ERROR_NOT_OWNER");
1802            return;
1803        }
1804    }
1805
1806    do {
1807        while(key_token->length != 0) {
1808
1809            key = key_token->value;
1810            nkey = key_token->length;
1811
1812            if(nkey > KEY_MAX_LENGTH) {
1813                STATS_LOCK();
1814                stats.get_cmds   += stats_get_cmds;
1815                stats.get_hits   += stats_get_hits;
1816                stats.get_misses += stats_get_misses;
1817                STATS_UNLOCK();
1818                out_string(c, "CLIENT_ERROR bad command line format");
1819                return;
1820            }
1821
1822            stats_get_cmds++;
1823            it = item_get(key, nkey);
1824            if (settings.detail_enabled) {
1825                stats_prefix_record_get(key, NULL != it);
1826            }
1827            if (it) {
1828                if (i >= c->isize) {
1829                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1830                    if (new_list) {
1831                        c->isize *= 2;
1832                        c->ilist = new_list;
1833                    } else break;
1834                }
1835
1836                /*
1837                 * Construct the response. Each hit adds three elements to the
1838                 * outgoing data list:
1839                 *   "VALUE "
1840                 *   key
1841                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1842                 */
1843
1844                if(return_cas == true)
1845                {
1846                  /* Goofy mid-flight realloc. */
1847                  if (i >= c->suffixsize) {
1848                    char **new_suffix_list = realloc(c->suffixlist,
1849                                           sizeof(char *) * c->suffixsize * 2);
1850                    if (new_suffix_list) {
1851                      c->suffixsize *= 2;
1852                      c->suffixlist  = new_suffix_list;
1853                    } else break;
1854                  }
1855
1856                  suffix = suffix_from_freelist();
1857                  if (suffix == NULL) {
1858                    STATS_LOCK();
1859                    stats.get_cmds   += stats_get_cmds;
1860                    stats.get_hits   += stats_get_hits;
1861                    stats.get_misses += stats_get_misses;
1862                    STATS_UNLOCK();
1863                    out_string(c, "SERVER_ERROR out of memory");
1864                    return;
1865                  }
1866                  *(c->suffixlist + i) = suffix;
1867                  sprintf(suffix, " %llu\r\n", it->cas_id);
1868                  if (add_iov(c, "VALUE ", 6) != 0 ||
1869                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1870                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1871                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1872                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1873                      {
1874                          break;
1875                      }
1876                }
1877                else
1878                {
1879                  if (add_iov(c, "VALUE ", 6) != 0 ||
1880                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1881                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1882                      {
1883                          break;
1884                      }
1885                }
1886
1887
1888                if (settings.verbose > 1)
1889                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1890
1891                /* item_get() has incremented it->refcount for us */
1892                stats_get_hits++;
1893                item_update(it);
1894                *(c->ilist + i) = it;
1895                i++;
1896
1897            } else {
1898                stats_get_misses++;
1899            }
1900
1901            key_token++;
1902        }
1903
1904        /*
1905         * If the command string hasn't been fully processed, get the next set
1906         * of tokens.
1907         */
1908        if(key_token->value != NULL) {
1909            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1910            key_token = tokens;
1911        }
1912
1913    } while(key_token->value != NULL);
1914
1915    c->icurr = c->ilist;
1916    c->ileft = i;
1917    if (return_cas) {
1918        c->suffixcurr = c->suffixlist;
1919        c->suffixleft = i;
1920    }
1921
1922    if (settings.verbose > 1)
1923        fprintf(stderr, ">%d END\n", c->sfd);
1924
1925    /*
1926        If the loop was terminated because of out-of-memory, it is not
1927        reliable to add END\r\n to the buffer, because it might not end
1928        in \r\n. So we send SERVER_ERROR instead.
1929    */
1930    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1931        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
1932        out_string(c, "SERVER_ERROR out of memory");
1933    }
1934    else {
1935        conn_set_state(c, conn_mwrite);
1936        c->msgcurr = 0;
1937    }
1938
1939    STATS_LOCK();
1940    stats.get_cmds   += stats_get_cmds;
1941    stats.get_hits   += stats_get_hits;
1942    stats.get_misses += stats_get_misses;
1943    STATS_UNLOCK();
1944
1945    return;
1946}
1947
1948static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1949    char *key;
1950    size_t nkey;
1951    int flags;
1952    time_t exptime;
1953    int vlen, old_vlen;
1954    uint64_t req_cas_id;
1955    item *it, *old_it;
1956
1957    assert(c != NULL);
1958
1959    set_noreply_maybe(c, tokens, ntokens);
1960
1961    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1962        out_string(c, "CLIENT_ERROR bad command line format");
1963        return;
1964    }
1965
1966    key = tokens[KEY_TOKEN].value;
1967    nkey = tokens[KEY_TOKEN].length;
1968
1969    flags = strtoul(tokens[2].value, NULL, 10);
1970    exptime = strtol(tokens[3].value, NULL, 10);
1971    vlen = strtol(tokens[4].value, NULL, 10);
1972
1973    // does cas value exist?
1974    if(handle_cas)
1975    {
1976      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1977    }
1978
1979    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1980        out_string(c, "CLIENT_ERROR bad command line format");
1981        return;
1982    }
1983
1984    if (settings.detail_enabled) {
1985        stats_prefix_record_set(key);
1986    }
1987
1988    if (settings.managed) {
1989        int bucket = c->bucket;
1990        if (bucket == -1) {
1991            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1992            return;
1993        }
1994        c->bucket = -1;
1995        if (buckets[bucket] != c->gen) {
1996            out_string(c, "ERROR_NOT_OWNER");
1997            return;
1998        }
1999    }
2000
2001    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
2002
2003    if (it == 0) {
2004        if (! item_size_ok(nkey, flags, vlen + 2))
2005            out_string(c, "SERVER_ERROR object too large for cache");
2006        else
2007            out_string(c, "SERVER_ERROR out of memory");
2008        /* swallow the data line */
2009        c->write_and_go = conn_swallow;
2010        c->sbytes = vlen + 2;
2011        return;
2012    }
2013    if(handle_cas)
2014      it->cas_id = req_cas_id;
2015
2016    c->item = it;
2017    c->ritem = ITEM_data(it);
2018    c->rlbytes = it->nbytes;
2019    c->item_comm = comm;
2020    conn_set_state(c, conn_nread);
2021}
2022
2023static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2024    char temp[sizeof("18446744073709551615")];
2025    item *it;
2026    int64_t delta;
2027    char *key;
2028    size_t nkey;
2029
2030    assert(c != NULL);
2031
2032    set_noreply_maybe(c, tokens, ntokens);
2033
2034    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2035        out_string(c, "CLIENT_ERROR bad command line format");
2036        return;
2037    }
2038
2039    key = tokens[KEY_TOKEN].value;
2040    nkey = tokens[KEY_TOKEN].length;
2041
2042    if (settings.managed) {
2043        int bucket = c->bucket;
2044        if (bucket == -1) {
2045            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2046            return;
2047        }
2048        c->bucket = -1;
2049        if (buckets[bucket] != c->gen) {
2050            out_string(c, "ERROR_NOT_OWNER");
2051            return;
2052        }
2053    }
2054
2055    delta = strtoll(tokens[2].value, NULL, 10);
2056
2057    if(errno == ERANGE) {
2058        out_string(c, "CLIENT_ERROR bad command line format");
2059        return;
2060    }
2061
2062    it = item_get(key, nkey);
2063    if (!it) {
2064        out_string(c, "NOT_FOUND");
2065        return;
2066    }
2067
2068    out_string(c, add_delta(it, incr, delta, temp));
2069    item_remove(it);         /* release our reference */
2070}
2071
2072/*
2073 * adds a delta value to a numeric item.
2074 *
2075 * it    item to adjust
2076 * incr  true to increment value, false to decrement
2077 * delta amount to adjust value by
2078 * buf   buffer for response string
2079 *
2080 * returns a response string to send back to the client.
2081 */
2082char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
2083    char *ptr;
2084    int64_t value;
2085    int res;
2086
2087    ptr = ITEM_data(it);
2088    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
2089
2090    value = strtoull(ptr, NULL, 10);
2091
2092    if(errno == ERANGE) {
2093        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
2094    }
2095
2096    if (incr)
2097        value += delta;
2098    else {
2099        value -= delta;
2100    }
2101    if(value < 0) {
2102        value=0;
2103    }
2104    sprintf(buf, "%llu", value);
2105    res = strlen(buf);
2106    if (res + 2 > it->nbytes) { /* need to realloc */
2107        item *new_it;
2108        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
2109        if (new_it == 0) {
2110            return "SERVER_ERROR out of memory";
2111        }
2112        memcpy(ITEM_data(new_it), buf, res);
2113        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
2114        do_item_replace(it, new_it);
2115        do_item_remove(new_it);       /* release our reference */
2116    } else { /* replace in-place */
2117        memcpy(ITEM_data(it), buf, res);
2118        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2119    }
2120
2121    return buf;
2122}
2123
2124static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
2125    char *key;
2126    size_t nkey;
2127    item *it;
2128    time_t exptime = 0;
2129
2130    assert(c != NULL);
2131
2132    set_noreply_maybe(c, tokens, ntokens);
2133
2134    if (settings.managed) {
2135        int bucket = c->bucket;
2136        if (bucket == -1) {
2137            out_string(c, "CLIENT_ERROR no BG data in managed mode");
2138            return;
2139        }
2140        c->bucket = -1;
2141        if (buckets[bucket] != c->gen) {
2142            out_string(c, "ERROR_NOT_OWNER");
2143            return;
2144        }
2145    }
2146
2147    key = tokens[KEY_TOKEN].value;
2148    nkey = tokens[KEY_TOKEN].length;
2149
2150    if(nkey > KEY_MAX_LENGTH) {
2151        out_string(c, "CLIENT_ERROR bad command line format");
2152        return;
2153    }
2154
2155    if(ntokens == (c->noreply ? 5 : 4)) {
2156        exptime = strtol(tokens[2].value, NULL, 10);
2157
2158        if(errno == ERANGE) {
2159            out_string(c, "CLIENT_ERROR bad command line format");
2160            return;
2161        }
2162    }
2163
2164    if (settings.detail_enabled) {
2165        stats_prefix_record_delete(key);
2166    }
2167
2168    it = item_get(key, nkey);
2169    if (it) {
2170        if (exptime == 0) {
2171            item_unlink(it);
2172            item_remove(it);      /* release our reference */
2173            out_string(c, "DELETED");
2174        } else {
2175            /* our reference will be transfered to the delete queue */
2176            out_string(c, defer_delete(it, exptime));
2177        }
2178    } else {
2179        out_string(c, "NOT_FOUND");
2180    }
2181}
2182
2183/*
2184 * Adds an item to the deferred-delete list so it can be reaped later.
2185 *
2186 * Returns the result to send to the client.
2187 */
2188char *do_defer_delete(item *it, time_t exptime)
2189{
2190    if (delcurr >= deltotal) {
2191        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
2192        if (new_delete) {
2193            todelete = new_delete;
2194            deltotal *= 2;
2195        } else {
2196            /*
2197             * can't delete it immediately, user wants a delay,
2198             * but we ran out of memory for the delete queue
2199             */
2200            item_remove(it);    /* release reference */
2201            return "SERVER_ERROR out of memory";
2202        }
2203    }
2204
2205    /* use its expiration time as its deletion time now */
2206    it->exptime = realtime(exptime);
2207    it->it_flags |= ITEM_DELETED;
2208    todelete[delcurr++] = it;
2209
2210    return "DELETED";
2211}
2212
2213static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
2214    unsigned int level;
2215
2216    assert(c != NULL);
2217
2218    set_noreply_maybe(c, tokens, ntokens);
2219
2220    level = strtoul(tokens[1].value, NULL, 10);
2221    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
2222    out_string(c, "OK");
2223    return;
2224}
2225
2226static void process_command(conn *c, char *command) {
2227
2228    token_t tokens[MAX_TOKENS];
2229    size_t ntokens;
2230    int comm;
2231
2232    assert(c != NULL);
2233
2234    if (settings.verbose > 1)
2235        fprintf(stderr, "<%d %s\n", c->sfd, command);
2236
2237    /*
2238     * for commands set/add/replace, we build an item and read the data
2239     * directly into it, then continue in nread_complete().
2240     */
2241
2242    c->msgcurr = 0;
2243    c->msgused = 0;
2244    c->iovused = 0;
2245    if (add_msghdr(c) != 0) {
2246        out_string(c, "SERVER_ERROR out of memory");
2247        return;
2248    }
2249
2250    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
2251    if (ntokens >= 3 &&
2252        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
2253         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
2254
2255        process_get_command(c, tokens, ntokens, false);
2256
2257    } else if ((ntokens == 6 || ntokens == 7) &&
2258               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
2259                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
2260                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
2261                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
2262                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
2263
2264        process_update_command(c, tokens, ntokens, comm, false);
2265
2266    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
2267
2268        process_update_command(c, tokens, ntokens, comm, true);
2269
2270    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
2271
2272        process_arithmetic_command(c, tokens, ntokens, 1);
2273
2274    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
2275
2276        process_get_command(c, tokens, ntokens, true);
2277
2278    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
2279
2280        process_arithmetic_command(c, tokens, ntokens, 0);
2281
2282    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
2283
2284        process_delete_command(c, tokens, ntokens);
2285
2286    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
2287        unsigned int bucket, gen;
2288        if (!settings.managed) {
2289            out_string(c, "CLIENT_ERROR not a managed instance");
2290            return;
2291        }
2292
2293        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
2294            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2295                out_string(c, "CLIENT_ERROR bucket number out of range");
2296                return;
2297            }
2298            buckets[bucket] = gen;
2299            out_string(c, "OWNED");
2300            return;
2301        } else {
2302            out_string(c, "CLIENT_ERROR bad format");
2303            return;
2304        }
2305
2306    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
2307
2308        int bucket;
2309        if (!settings.managed) {
2310            out_string(c, "CLIENT_ERROR not a managed instance");
2311            return;
2312        }
2313        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
2314            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
2315                out_string(c, "CLIENT_ERROR bucket number out of range");
2316                return;
2317            }
2318            buckets[bucket] = 0;
2319            out_string(c, "DISOWNED");
2320            return;
2321        } else {
2322            out_string(c, "CLIENT_ERROR bad format");
2323            return;
2324        }
2325
2326    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
2327        int bucket, gen;
2328        if (!settings.managed) {
2329            out_string(c, "CLIENT_ERROR not a managed instance");
2330            return;
2331        }
2332        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
2333            /* we never write anything back, even if input's wrong */
2334            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
2335                /* do nothing, bad input */
2336            } else {
2337                c->bucket = bucket;
2338                c->gen = gen;
2339            }
2340            conn_set_init_state(c);
2341            return;
2342        } else {
2343            out_string(c, "CLIENT_ERROR bad format");
2344            return;
2345        }
2346
2347    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
2348
2349        process_stat(c, tokens, ntokens);
2350
2351    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
2352        time_t exptime = 0;
2353        set_current_time();
2354
2355        set_noreply_maybe(c, tokens, ntokens);
2356
2357        if(ntokens == (c->noreply ? 3 : 2)) {
2358            settings.oldest_live = current_time - 1;
2359            item_flush_expired();
2360            out_string(c, "OK");
2361            return;
2362        }
2363
2364        exptime = strtol(tokens[1].value, NULL, 10);
2365        if(errno == ERANGE) {
2366            out_string(c, "CLIENT_ERROR bad command line format");
2367            return;
2368        }
2369
2370        /*
2371          If exptime is zero realtime() would return zero too, and
2372          realtime(exptime) - 1 would overflow to the max unsigned
2373          value.  So we process exptime == 0 the same way we do when
2374          no delay is given at all.
2375        */
2376        if (exptime > 0)
2377            settings.oldest_live = realtime(exptime) - 1;
2378        else /* exptime == 0 */
2379            settings.oldest_live = current_time - 1;
2380        item_flush_expired();
2381        out_string(c, "OK");
2382        return;
2383
2384    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
2385
2386        out_string(c, "VERSION " VERSION);
2387
2388    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
2389
2390        conn_set_state(c, conn_closing);
2391
2392    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
2393                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
2394#ifdef ALLOW_SLABS_REASSIGN
2395
2396        int src, dst, rv;
2397
2398        src = strtol(tokens[2].value, NULL, 10);
2399        dst  = strtol(tokens[3].value, NULL, 10);
2400
2401        if(errno == ERANGE) {
2402            out_string(c, "CLIENT_ERROR bad command line format");
2403            return;
2404        }
2405
2406        rv = slabs_reassign(src, dst);
2407        if (rv == 1) {
2408            out_string(c, "DONE");
2409            return;
2410        }
2411        if (rv == 0) {
2412            out_string(c, "CANT");
2413            return;
2414        }
2415        if (rv == -1) {
2416            out_string(c, "BUSY");
2417            return;
2418        }
2419#else
2420        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
2421#endif
2422    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
2423        process_verbosity_command(c, tokens, ntokens);
2424    } else {
2425        out_string(c, "ERROR");
2426    }
2427    return;
2428}
2429
2430/*
2431 * if we have a complete line in the buffer, process it.
2432 */
2433static int try_read_command(conn *c) {
2434    char *el, *cont;
2435
2436    assert(c != NULL);
2437    assert(c->rcurr <= (c->rbuf + c->rsize));
2438
2439    if (c->rbytes == 0)
2440        return 0;
2441    el = memchr(c->rcurr, '\n', c->rbytes);
2442    if (!el)
2443        return 0;
2444    cont = el + 1;
2445    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
2446        el--;
2447    }
2448    *el = '\0';
2449
2450    assert(cont <= (c->rcurr + c->rbytes));
2451
2452    process_command(c, c->rcurr);
2453
2454    c->rbytes -= (cont - c->rcurr);
2455    c->rcurr = cont;
2456
2457    assert(c->rcurr <= (c->rbuf + c->rsize));
2458
2459    return 1;
2460}
2461
2462/*
2463 * read a UDP request.
2464 * return 0 if there's nothing to read.
2465 */
2466static int try_read_udp(conn *c) {
2467    int res;
2468
2469    assert(c != NULL);
2470
2471    c->request_addr_size = sizeof(c->request_addr);
2472    res = recvfrom(c->sfd, c->rbuf, c->rsize,
2473                   0, &c->request_addr, &c->request_addr_size);
2474    if (res > 8) {
2475        unsigned char *buf = (unsigned char *)c->rbuf;
2476        STATS_LOCK();
2477        stats.bytes_read += res;
2478        STATS_UNLOCK();
2479
2480        /* Beginning of UDP packet is the request ID; save it. */
2481        c->request_id = buf[0] * 256 + buf[1];
2482
2483        /* If this is a multi-packet request, drop it. */
2484        if (buf[4] != 0 || buf[5] != 1) {
2485            out_string(c, "SERVER_ERROR multi-packet request not supported");
2486            return 0;
2487        }
2488
2489        /* Don't care about any of the rest of the header. */
2490        res -= 8;
2491        memmove(c->rbuf, c->rbuf + 8, res);
2492
2493        c->rbytes += res;
2494        c->rcurr = c->rbuf;
2495        return 1;
2496    }
2497    return 0;
2498}
2499
2500/*
2501 * read from network as much as we can, handle buffer overflow and connection
2502 * close.
2503 * before reading, move the remaining incomplete fragment of a command
2504 * (if any) to the beginning of the buffer.
2505 * return 0 if there's nothing to read on the first read.
2506 */
2507static int try_read_network(conn *c) {
2508    int gotdata = 0;
2509    int res;
2510
2511    assert(c != NULL);
2512
2513    if (c->rcurr != c->rbuf) {
2514        if (c->rbytes != 0) /* otherwise there's nothing to copy */
2515            memmove(c->rbuf, c->rcurr, c->rbytes);
2516        c->rcurr = c->rbuf;
2517    }
2518
2519    while (1) {
2520        if (c->rbytes >= c->rsize) {
2521            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2522            if (!new_rbuf) {
2523                if (settings.verbose > 0)
2524                    fprintf(stderr, "Couldn't realloc input buffer\n");
2525                c->rbytes = 0; /* ignore what we read */
2526                out_string(c, "SERVER_ERROR out of memory");
2527                c->write_and_go = conn_closing;
2528                return 1;
2529            }
2530            c->rcurr = c->rbuf = new_rbuf;
2531            c->rsize *= 2;
2532        }
2533
2534        int avail = c->rsize - c->rbytes;
2535        res = read(c->sfd, c->rbuf + c->rbytes, avail);
2536        if (res > 0) {
2537            STATS_LOCK();
2538            stats.bytes_read += res;
2539            STATS_UNLOCK();
2540            gotdata = 1;
2541            c->rbytes += res;
2542            if (res == avail) {
2543                continue;
2544            } else {
2545                break;
2546            }
2547        }
2548        if (res == 0) {
2549            /* connection closed */
2550            conn_set_state(c, conn_closing);
2551            return 1;
2552        }
2553        if (res == -1) {
2554            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
2555            /* Should close on unhandled errors. */
2556            conn_set_state(c, conn_closing);
2557            return 1;
2558        }
2559    }
2560    return gotdata;
2561}
2562
2563static bool update_event(conn *c, const int new_flags) {
2564    assert(c != NULL);
2565
2566    struct event_base *base = c->event.ev_base;
2567    if (c->ev_flags == new_flags)
2568        return true;
2569    if (event_del(&c->event) == -1) return false;
2570    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2571    event_base_set(base, &c->event);
2572    c->ev_flags = new_flags;
2573    if (event_add(&c->event, 0) == -1) return false;
2574    return true;
2575}
2576
2577/*
2578 * Sets whether we are listening for new connections or not.
2579 */
2580void accept_new_conns(const bool do_accept) {
2581    conn *next;
2582
2583    if (! is_listen_thread())
2584        return;
2585
2586    for (next = listen_conn; next; next = next->next) {
2587        if (do_accept) {
2588            update_event(next, EV_READ | EV_PERSIST);
2589            if (listen(next->sfd, 1024) != 0) {
2590                perror("listen");
2591            }
2592        }
2593        else {
2594            update_event(next, 0);
2595            if (listen(next->sfd, 0) != 0) {
2596                perror("listen");
2597            }
2598        }
2599  }
2600}
2601
2602/*
2603 * Transmit the next chunk of data from our list of msgbuf structures.
2604 *
2605 * Returns:
2606 *   TRANSMIT_COMPLETE   All done writing.
2607 *   TRANSMIT_INCOMPLETE More data remaining to write.
2608 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2609 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2610 */
2611static int transmit(conn *c) {
2612    assert(c != NULL);
2613
2614    if (c->msgcurr < c->msgused &&
2615            c->msglist[c->msgcurr].msg_iovlen == 0) {
2616        /* Finished writing the current msg; advance to the next. */
2617        c->msgcurr++;
2618    }
2619    if (c->msgcurr < c->msgused) {
2620        ssize_t res;
2621        struct msghdr *m = &c->msglist[c->msgcurr];
2622
2623        res = sendmsg(c->sfd, m, 0);
2624        if (res > 0) {
2625            STATS_LOCK();
2626            stats.bytes_written += res;
2627            STATS_UNLOCK();
2628
2629            /* We've written some of the data. Remove the completed
2630               iovec entries from the list of pending writes. */
2631            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2632                res -= m->msg_iov->iov_len;
2633                m->msg_iovlen--;
2634                m->msg_iov++;
2635            }
2636
2637            /* Might have written just part of the last iovec entry;
2638               adjust it so the next write will do the rest. */
2639            if (res > 0) {
2640                m->msg_iov->iov_base += res;
2641                m->msg_iov->iov_len -= res;
2642            }
2643            return TRANSMIT_INCOMPLETE;
2644        }
2645        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2646            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2647                if (settings.verbose > 0)
2648                    fprintf(stderr, "Couldn't update event\n");
2649                conn_set_state(c, conn_closing);
2650                return TRANSMIT_HARD_ERROR;
2651            }
2652            return TRANSMIT_SOFT_ERROR;
2653        }
2654        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2655           we have a real error, on which we close the connection */
2656        if (settings.verbose > 0)
2657            perror("Failed to write, and not due to blocking");
2658
2659        if (IS_UDP(c->protocol))
2660            conn_set_state(c, conn_read);
2661        else
2662            conn_set_state(c, conn_closing);
2663        return TRANSMIT_HARD_ERROR;
2664    } else {
2665        return TRANSMIT_COMPLETE;
2666    }
2667}
2668
2669static void drive_machine(conn *c) {
2670    bool stop = false;
2671    int sfd, flags = 1;
2672    int init_state; /* initial state for a new connection */
2673    socklen_t addrlen;
2674    struct sockaddr_storage addr;
2675    int res;
2676
2677    assert(c != NULL);
2678
2679    while (!stop) {
2680
2681        switch(c->state) {
2682        case conn_listening:
2683            addrlen = sizeof(addr);
2684            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2685                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2686                    /* these are transient, so don't log anything */
2687                    stop = true;
2688                } else if (errno == EMFILE) {
2689                    if (settings.verbose > 0)
2690                        fprintf(stderr, "Too many open connections\n");
2691                    accept_new_conns(false);
2692                    stop = true;
2693                } else {
2694                    perror("accept()");
2695                    stop = true;
2696                }
2697                break;
2698            }
2699            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2700                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2701                perror("setting O_NONBLOCK");
2702                close(sfd);
2703                break;
2704            }
2705            init_state = get_init_state(c);
2706
2707            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST,
2708                                     DATA_BUFFER_SIZE, c->protocol);
2709
2710            break;
2711
2712        case conn_read:
2713            if (try_read_command(c) != 0) {
2714                continue;
2715            }
2716            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) {
2717                continue;
2718            }
2719            /* we have no command line and no data to read from network */
2720            if (!update_event(c, EV_READ | EV_PERSIST)) {
2721                if (settings.verbose > 0)
2722                    fprintf(stderr, "Couldn't update event\n");
2723                conn_set_state(c, conn_closing);
2724                break;
2725            }
2726            stop = true;
2727            break;
2728
2729        case conn_bin_init: /* Reinitialize a binary connection */
2730            c->rlbytes = MIN_BIN_PKT_LENGTH;
2731            c->write_and_go = conn_bin_init;
2732            c->cmd = -1;
2733            c->substate = bin_no_state;
2734            c->rbytes = c->wbytes = 0;
2735            c->wcurr = c->wbuf;
2736            c->rcurr = c->rbuf;
2737            c->ritem = (char*)c->bin_header;
2738            conn_set_state(c, conn_nread);
2739            conn_shrink(c);
2740            break;
2741
2742        case conn_nread:
2743            if (c->rlbytes == 0) {
2744                complete_nread(c);
2745                break;
2746            }
2747            /* first check if we have leftovers in the conn_read buffer */
2748            if (c->rbytes > 0) {
2749                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2750                memcpy(c->ritem, c->rcurr, tocopy);
2751                c->ritem += tocopy;
2752                c->rlbytes -= tocopy;
2753                c->rcurr += tocopy;
2754                c->rbytes -= tocopy;
2755                break;
2756            }
2757
2758            /*  now try reading from the socket */
2759            res = read(c->sfd, c->ritem, c->rlbytes);
2760            if (res > 0) {
2761                STATS_LOCK();
2762                stats.bytes_read += res;
2763                STATS_UNLOCK();
2764                c->ritem += res;
2765                c->rlbytes -= res;
2766                break;
2767            }
2768            if (res == 0) { /* end of stream */
2769                conn_set_state(c, conn_closing);
2770                break;
2771            }
2772            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2773                if (!update_event(c, EV_READ | EV_PERSIST)) {
2774                    if (settings.verbose > 0)
2775                        fprintf(stderr, "Couldn't update event\n");
2776                    conn_set_state(c, conn_closing);
2777                    break;
2778                }
2779                stop = true;
2780                break;
2781            }
2782            /* otherwise we have a real error, on which we close the connection */
2783            if (settings.verbose > 0)
2784                fprintf(stderr, "Failed to read, and not due to blocking\n");
2785            conn_set_state(c, conn_closing);
2786            break;
2787
2788        case conn_swallow:
2789            /* we are reading sbytes and throwing them away */
2790            if (c->sbytes == 0) {
2791                conn_set_init_state(c);
2792                break;
2793            }
2794
2795            /* first check if we have leftovers in the conn_read buffer */
2796            if (c->rbytes > 0) {
2797                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2798                c->sbytes -= tocopy;
2799                c->rcurr += tocopy;
2800                c->rbytes -= tocopy;
2801                break;
2802            }
2803
2804            /*  now try reading from the socket */
2805            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2806            if (res > 0) {
2807                STATS_LOCK();
2808                stats.bytes_read += res;
2809                STATS_UNLOCK();
2810                c->sbytes -= res;
2811                break;
2812            }
2813            if (res == 0) { /* end of stream */
2814                conn_set_state(c, conn_closing);
2815                break;
2816            }
2817            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2818                if (!update_event(c, EV_READ | EV_PERSIST)) {
2819                    if (settings.verbose > 0)
2820                        fprintf(stderr, "Couldn't update event\n");
2821                    conn_set_state(c, conn_closing);
2822                    break;
2823                }
2824                stop = true;
2825                break;
2826            }
2827            /* otherwise we have a real error, on which we close the connection */
2828            if (settings.verbose > 0)
2829                fprintf(stderr, "Failed to read, and not due to blocking\n");
2830            conn_set_state(c, conn_closing);
2831            break;
2832
2833        case conn_write:
2834            /*
2835             * We want to write out a simple response. If we haven't already,
2836             * assemble it into a msgbuf list (this will be a single-entry
2837             * list for TCP or a two-entry list for UDP).
2838             */
2839            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
2840                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2841                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
2842                    if (settings.verbose > 0)
2843                        fprintf(stderr, "Couldn't build response\n");
2844                    conn_set_state(c, conn_closing);
2845                    break;
2846                }
2847            }
2848
2849            /* fall through... */
2850
2851        case conn_mwrite:
2852            switch (transmit(c)) {
2853            case TRANSMIT_COMPLETE:
2854                if (c->state == conn_mwrite) {
2855                    while (c->ileft > 0) {
2856                        item *it = *(c->icurr);
2857                        assert((it->it_flags & ITEM_SLABBED) == 0);
2858                        item_remove(it);
2859                        c->icurr++;
2860                        c->ileft--;
2861                    }
2862                    while (c->suffixleft > 0) {
2863                        char *suffix = *(c->suffixcurr);
2864                        if(suffix_add_to_freelist(suffix)) {
2865                            /* Failed to add to freelist, don't leak */
2866                            free(suffix);
2867                        }
2868                        c->suffixcurr++;
2869                        c->suffixleft--;
2870                    }
2871                    /* XXX:  I don't know why this wasn't the general case */
2872                    if(c->protocol == binary_prot) {
2873                        conn_set_state(c, c->write_and_go);
2874                    } else {
2875                        conn_set_init_state(c);
2876                    }
2877                } else if (c->state == conn_write) {
2878                    if (c->write_and_free) {
2879                        free(c->write_and_free);
2880                        c->write_and_free = 0;
2881                    }
2882                    conn_set_state(c, c->write_and_go);
2883                } else {
2884                    if (settings.verbose > 0)
2885                        fprintf(stderr, "Unexpected state %d\n", c->state);
2886                    conn_set_state(c, conn_closing);
2887                }
2888                break;
2889
2890            case TRANSMIT_INCOMPLETE:
2891            case TRANSMIT_HARD_ERROR:
2892                break;                   /* Continue in state machine. */
2893
2894            case TRANSMIT_SOFT_ERROR:
2895                stop = true;
2896                break;
2897            }
2898            break;
2899
2900        case conn_closing:
2901            if (IS_UDP(c->protocol))
2902                conn_cleanup(c);
2903            else
2904                conn_close(c);
2905            stop = true;
2906            break;
2907        }
2908    }
2909
2910    return;
2911}
2912
2913void event_handler(const int fd, const short which, void *arg) {
2914    conn *c;
2915
2916    c = (conn *)arg;
2917    assert(c != NULL);
2918
2919    c->which = which;
2920
2921    /* sanity */
2922    if (fd != c->sfd) {
2923        if (settings.verbose > 0)
2924            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2925        conn_close(c);
2926        return;
2927    }
2928
2929    drive_machine(c);
2930
2931    /* wait for next event */
2932    return;
2933}
2934
2935static int new_socket(struct addrinfo *ai) {
2936    int sfd;
2937    int flags;
2938
2939    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
2940        perror("socket()");
2941        return -1;
2942    }
2943
2944    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2945        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2946        perror("setting O_NONBLOCK");
2947        close(sfd);
2948        return -1;
2949    }
2950    return sfd;
2951}
2952
2953
2954/*
2955 * Sets a socket's send buffer size to the maximum allowed by the system.
2956 */
2957static void maximize_sndbuf(const int sfd) {
2958    socklen_t intsize = sizeof(int);
2959    int last_good = 0;
2960    int min, max, avg;
2961    int old_size;
2962
2963    /* Start with the default size. */
2964    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2965        if (settings.verbose > 0)
2966            perror("getsockopt(SO_SNDBUF)");
2967        return;
2968    }
2969
2970    /* Binary-search for the real maximum. */
2971    min = old_size;
2972    max = MAX_SENDBUF_SIZE;
2973
2974    while (min <= max) {
2975        avg = ((unsigned int)(min + max)) / 2;
2976        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *