root/trunk/server/memcached.c @ 736

Revision 736, 93.5 kB (checked in by dormando, 21 months ago)

Give 'SERVER_ERROR out of memory' errors more context.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__) || defined(__APPLE__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(struct addrinfo *ai);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92/* time handling */
93static void set_current_time(void);  /* update the global variable holding
94                              global 32-bit seconds-since-start time
95                              (to avoid 64 bit time_t) */
96
97static void conn_free(conn *c);
98
99/** exported globals **/
100struct stats stats;
101struct settings settings;
102
103/** file scope variables **/
104static item **todelete = NULL;
105static int delcurr;
106static int deltotal;
107static conn *listen_conn = NULL;
108static struct event_base *main_base;
109
110#define TRANSMIT_COMPLETE   0
111#define TRANSMIT_INCOMPLETE 1
112#define TRANSMIT_SOFT_ERROR 2
113#define TRANSMIT_HARD_ERROR 3
114
115static int *buckets = 0; /* bucket->generation array for a managed instance */
116
117#define REALTIME_MAXDELTA 60*60*24*30
118/*
119 * given time value that's either unix time or delta from current unix time, return
120 * unix time. Use the fact that delta can't exceed one month (and real time value can't
121 * be that low).
122 */
123static rel_time_t realtime(const time_t exptime) {
124    /* no. of seconds in 30 days - largest possible delta exptime */
125
126    if (exptime == 0) return 0; /* 0 means never expire */
127
128    if (exptime > REALTIME_MAXDELTA) {
129        /* if item expiration is at/before the server started, give it an
130           expiration time of 1 second after the server started.
131           (because 0 means don't expire).  without this, we'd
132           underflow and wrap around to some large value way in the
133           future, effectively making items expiring in the past
134           really expiring never */
135        if (exptime <= stats.started)
136            return (rel_time_t)1;
137        return (rel_time_t)(exptime - stats.started);
138    } else {
139        return (rel_time_t)(exptime + current_time);
140    }
141}
142
143static void stats_init(void) {
144    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
145    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
146    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
147
148    /* make the time we started always be 2 seconds before we really
149       did, so time(0) - time.started is never zero.  if so, things
150       like 'settings.oldest_live' which act as booleans as well as
151       values are now false in boolean context... */
152    stats.started = time(0) - 2;
153    stats_prefix_init();
154}
155
156static void stats_reset(void) {
157    STATS_LOCK();
158    stats.total_items = stats.total_conns = 0;
159    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
160    stats.bytes_read = stats.bytes_written = 0;
161    stats_prefix_clear();
162    STATS_UNLOCK();
163}
164
165static void settings_init(void) {
166    settings.access=0700;
167    settings.port = 11211;
168    settings.udpport = 0;
169    /* By default this string should be NULL for getaddrinfo() */
170    settings.inter = NULL;
171    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
172    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
173    settings.verbose = 0;
174    settings.oldest_live = 0;
175    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
176    settings.socketpath = NULL;       /* by default, not using a unix socket */
177    settings.managed = false;
178    settings.factor = 1.25;
179    settings.chunk_size = 48;         /* space for a modest key and value */
180#ifdef USE_THREADS
181    settings.num_threads = 4;
182#else
183    settings.num_threads = 1;
184#endif
185    settings.prefix_delimiter = ':';
186    settings.detail_enabled = 0;
187}
188
189/* returns true if a deleted item's delete-locked-time is over, and it
190   should be removed from the namespace */
191static bool item_delete_lock_over (item *it) {
192    assert(it->it_flags & ITEM_DELETED);
193    return (current_time >= it->exptime);
194}
195
196/*
197 * Adds a message header to a connection.
198 *
199 * Returns 0 on success, -1 on out-of-memory.
200 */
201static int add_msghdr(conn *c)
202{
203    struct msghdr *msg;
204
205    assert(c != NULL);
206
207    if (c->msgsize == c->msgused) {
208        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
209        if (! msg)
210            return -1;
211        c->msglist = msg;
212        c->msgsize *= 2;
213    }
214
215    msg = c->msglist + c->msgused;
216
217    /* this wipes msg_iovlen, msg_control, msg_controllen, and
218       msg_flags, the last 3 of which aren't defined on solaris: */
219    memset(msg, 0, sizeof(struct msghdr));
220
221    msg->msg_iov = &c->iov[c->iovused];
222
223    if (c->request_addr_size > 0) {
224        msg->msg_name = &c->request_addr;
225        msg->msg_namelen = c->request_addr_size;
226    }
227
228    c->msgbytes = 0;
229    c->msgused++;
230
231    if (c->udp) {
232        /* Leave room for the UDP header, which we'll fill in later. */
233        return add_iov(c, NULL, UDP_HEADER_SIZE);
234    }
235
236    return 0;
237}
238
239
240/*
241 * Free list management for connections.
242 */
243
244static conn **freeconns;
245static int freetotal;
246static int freecurr;
247
248
249static void conn_init(void) {
250    freetotal = 200;
251    freecurr = 0;
252    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
253        fprintf(stderr, "malloc()\n");
254    }
255    return;
256}
257
258/*
259 * Returns a connection from the freelist, if any. Should call this using
260 * conn_from_freelist() for thread safety.
261 */
262conn *do_conn_from_freelist() {
263    conn *c;
264
265    if (freecurr > 0) {
266        c = freeconns[--freecurr];
267    } else {
268        c = NULL;
269    }
270
271    return c;
272}
273
274/*
275 * Adds a connection to the freelist. 0 = success. Should call this using
276 * conn_add_to_freelist() for thread safety.
277 */
278bool do_conn_add_to_freelist(conn *c) {
279    if (freecurr < freetotal) {
280        freeconns[freecurr++] = c;
281        return false;
282    } else {
283        /* try to enlarge free connections array */
284        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
285        if (new_freeconns) {
286            freetotal *= 2;
287            freeconns = new_freeconns;
288            freeconns[freecurr++] = c;
289            return false;
290        }
291    }
292    return true;
293}
294
295conn *conn_new(const int sfd, const int init_state, const int event_flags,
296                const int read_buffer_size, const bool is_udp, struct event_base *base) {
297    conn *c = conn_from_freelist();
298
299    if (NULL == c) {
300        if (!(c = (conn *)malloc(sizeof(conn)))) {
301            fprintf(stderr, "malloc()\n");
302            return NULL;
303        }
304        c->rbuf = c->wbuf = 0;
305        c->ilist = 0;
306        c->suffixlist = 0;
307        c->iov = 0;
308        c->msglist = 0;
309        c->hdrbuf = 0;
310
311        c->rsize = read_buffer_size;
312        c->wsize = DATA_BUFFER_SIZE;
313        c->isize = ITEM_LIST_INITIAL;
314        c->suffixsize = SUFFIX_LIST_INITIAL;
315        c->iovsize = IOV_LIST_INITIAL;
316        c->msgsize = MSG_LIST_INITIAL;
317        c->hdrsize = 0;
318
319        c->rbuf = (char *)malloc((size_t)c->rsize);
320        c->wbuf = (char *)malloc((size_t)c->wsize);
321        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
322        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
323        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
324        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
325
326        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
327                c->msglist == 0 || c->suffixlist == 0) {
328            if (c->rbuf != 0) free(c->rbuf);
329            if (c->wbuf != 0) free(c->wbuf);
330            if (c->ilist !=0) free(c->ilist);
331            if (c->suffixlist != 0) free(c->suffixlist);
332            if (c->iov != 0) free(c->iov);
333            if (c->msglist != 0) free(c->msglist);
334            free(c);
335            fprintf(stderr, "malloc()\n");
336            return NULL;
337        }
338
339        STATS_LOCK();
340        stats.conn_structs++;
341        STATS_UNLOCK();
342    }
343
344    if (settings.verbose > 1) {
345        if (init_state == conn_listening)
346            fprintf(stderr, "<%d server listening\n", sfd);
347        else if (is_udp)
348            fprintf(stderr, "<%d server listening (udp)\n", sfd);
349        else
350            fprintf(stderr, "<%d new client connection\n", sfd);
351    }
352
353    c->sfd = sfd;
354    c->udp = is_udp;
355    c->state = init_state;
356    c->rlbytes = 0;
357    c->rbytes = c->wbytes = 0;
358    c->wcurr = c->wbuf;
359    c->rcurr = c->rbuf;
360    c->ritem = 0;
361    c->icurr = c->ilist;
362    c->suffixcurr = c->suffixlist;
363    c->ileft = 0;
364    c->suffixleft = 0;
365    c->iovused = 0;
366    c->msgcurr = 0;
367    c->msgused = 0;
368
369    c->write_and_go = conn_read;
370    c->write_and_free = 0;
371    c->item = 0;
372    c->bucket = -1;
373    c->gen = 0;
374
375    c->noreply = false;
376
377    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
378    event_base_set(base, &c->event);
379    c->ev_flags = event_flags;
380
381    if (event_add(&c->event, 0) == -1) {
382        if (conn_add_to_freelist(c)) {
383            conn_free(c);
384        }
385        perror("event_add");
386        return NULL;
387    }
388
389    STATS_LOCK();
390    stats.curr_conns++;
391    stats.total_conns++;
392    STATS_UNLOCK();
393
394    return c;
395}
396
397static void conn_cleanup(conn *c) {
398    assert(c != NULL);
399
400    if (c->item) {
401        item_remove(c->item);
402        c->item = 0;
403    }
404
405    if (c->ileft != 0) {
406        for (; c->ileft > 0; c->ileft--,c->icurr++) {
407            item_remove(*(c->icurr));
408        }
409    }
410
411    if (c->suffixleft != 0) {
412        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
413            if(suffix_add_to_freelist(*(c->suffixcurr))) {
414                free(*(c->suffixcurr));
415            }
416        }
417    }
418
419    if (c->write_and_free) {
420        free(c->write_and_free);
421        c->write_and_free = 0;
422    }
423}
424
425/*
426 * Frees a connection.
427 */
428void conn_free(conn *c) {
429    if (c) {
430        if (c->hdrbuf)
431            free(c->hdrbuf);
432        if (c->msglist)
433            free(c->msglist);
434        if (c->rbuf)
435            free(c->rbuf);
436        if (c->wbuf)
437            free(c->wbuf);
438        if (c->ilist)
439            free(c->ilist);
440        if (c->suffixlist)
441            free(c->suffixlist);
442        if (c->iov)
443            free(c->iov);
444        free(c);
445    }
446}
447
448static void conn_close(conn *c) {
449    assert(c != NULL);
450
451    /* delete the event, the socket and the conn */
452    event_del(&c->event);
453
454    if (settings.verbose > 1)
455        fprintf(stderr, "<%d connection closed.\n", c->sfd);
456
457    close(c->sfd);
458    accept_new_conns(true);
459    conn_cleanup(c);
460
461    /* if the connection has big buffers, just free it */
462    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
463        conn_free(c);
464    }
465
466    STATS_LOCK();
467    stats.curr_conns--;
468    STATS_UNLOCK();
469
470    return;
471}
472
473
474/*
475 * Shrinks a connection's buffers if they're too big.  This prevents
476 * periodic large "get" requests from permanently chewing lots of server
477 * memory.
478 *
479 * This should only be called in between requests since it can wipe output
480 * buffers!
481 */
482static void conn_shrink(conn *c) {
483    assert(c != NULL);
484
485    if (c->udp)
486        return;
487
488    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
489        char *newbuf;
490
491        if (c->rcurr != c->rbuf)
492            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
493
494        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
495
496        if (newbuf) {
497            c->rbuf = newbuf;
498            c->rsize = DATA_BUFFER_SIZE;
499        }
500        /* TODO check other branch... */
501        c->rcurr = c->rbuf;
502    }
503
504    if (c->isize > ITEM_LIST_HIGHWAT) {
505        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
506        if (newbuf) {
507            c->ilist = newbuf;
508            c->isize = ITEM_LIST_INITIAL;
509        }
510    /* TODO check error condition? */
511    }
512
513    if (c->msgsize > MSG_LIST_HIGHWAT) {
514        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
515        if (newbuf) {
516            c->msglist = newbuf;
517            c->msgsize = MSG_LIST_INITIAL;
518        }
519    /* TODO check error condition? */
520    }
521
522    if (c->iovsize > IOV_LIST_HIGHWAT) {
523        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
524        if (newbuf) {
525            c->iov = newbuf;
526            c->iovsize = IOV_LIST_INITIAL;
527        }
528    /* TODO check return value */
529    }
530}
531
532/*
533 * Sets a connection's current state in the state machine. Any special
534 * processing that needs to happen on certain state transitions can
535 * happen here.
536 */
537static void conn_set_state(conn *c, int state) {
538    assert(c != NULL);
539
540    if (state != c->state) {
541        if (state == conn_read) {
542            conn_shrink(c);
543            assoc_move_next_bucket();
544        }
545        c->state = state;
546    }
547}
548
549/*
550 * Free list management for suffix buffers.
551 */
552
553static char **freesuffix;
554static int freesuffixtotal;
555static int freesuffixcurr;
556
557static void suffix_init(void) {
558    freesuffixtotal = 500;
559    freesuffixcurr  = 0;
560
561    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
562    if (freesuffix == NULL) {
563        fprintf(stderr, "malloc()\n");
564    }
565    return;
566}
567
568/*
569 * Returns a suffix buffer from the freelist, if any. Should call this using
570 * suffix_from_freelist() for thread safety.
571 */
572char *do_suffix_from_freelist() {
573    char *s;
574
575    if (freesuffixcurr > 0) {
576        s = freesuffix[--freesuffixcurr];
577    } else {
578        /* If malloc fails, let the logic fall through without spamming
579         * STDERR on the server. */
580        s = malloc( SUFFIX_SIZE );
581    }
582
583    return s;
584}
585
586/*
587 * Adds a connection to the freelist. 0 = success. Should call this using
588 * conn_add_to_freelist() for thread safety.
589 */
590bool do_suffix_add_to_freelist(char *s) {
591    if (freesuffixcurr < freesuffixtotal) {
592        freesuffix[freesuffixcurr++] = s;
593        return false;
594    } else {
595        /* try to enlarge free connections array */
596        char **new_freesuffix = realloc(freesuffix, freesuffixtotal * 2);
597        if (new_freesuffix) {
598            freesuffixtotal *= 2;
599            freesuffix = new_freesuffix;
600            freesuffix[freesuffixcurr++] = s;
601            return false;
602        }
603    }
604    return true;
605}
606
607/*
608 * Ensures that there is room for another struct iovec in a connection's
609 * iov list.
610 *
611 * Returns 0 on success, -1 on out-of-memory.
612 */
613static int ensure_iov_space(conn *c) {
614    assert(c != NULL);
615
616    if (c->iovused >= c->iovsize) {
617        int i, iovnum;
618        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
619                                (c->iovsize * 2) * sizeof(struct iovec));
620        if (! new_iov)
621            return -1;
622        c->iov = new_iov;
623        c->iovsize *= 2;
624
625        /* Point all the msghdr structures at the new list. */
626        for (i = 0, iovnum = 0; i < c->msgused; i++) {
627            c->msglist[i].msg_iov = &c->iov[iovnum];
628            iovnum += c->msglist[i].msg_iovlen;
629        }
630    }
631
632    return 0;
633}
634
635
636/*
637 * Adds data to the list of pending data that will be written out to a
638 * connection.
639 *
640 * Returns 0 on success, -1 on out-of-memory.
641 */
642
643static int add_iov(conn *c, const void *buf, int len) {
644    struct msghdr *m;
645    int leftover;
646    bool limit_to_mtu;
647
648    assert(c != NULL);
649
650    do {
651        m = &c->msglist[c->msgused - 1];
652
653        /*
654         * Limit UDP packets, and the first payloads of TCP replies, to
655         * UDP_MAX_PAYLOAD_SIZE bytes.
656         */
657        limit_to_mtu = c->udp || (1 == c->msgused);
658
659        /* We may need to start a new msghdr if this one is full. */
660        if (m->msg_iovlen == IOV_MAX ||
661            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
662            add_msghdr(c);
663            m = &c->msglist[c->msgused - 1];
664        }
665
666        if (ensure_iov_space(c) != 0)
667            return -1;
668
669        /* If the fragment is too big to fit in the datagram, split it up */
670        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
671            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
672            len -= leftover;
673        } else {
674            leftover = 0;
675        }
676
677        m = &c->msglist[c->msgused - 1];
678        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
679        m->msg_iov[m->msg_iovlen].iov_len = len;
680
681        c->msgbytes += len;
682        c->iovused++;
683        m->msg_iovlen++;
684
685        buf = ((char *)buf) + len;
686        len = leftover;
687    } while (leftover > 0);
688
689    return 0;
690}
691
692
693/*
694 * Constructs a set of UDP headers and attaches them to the outgoing messages.
695 */
696static int build_udp_headers(conn *c) {
697    int i;
698    unsigned char *hdr;
699
700    assert(c != NULL);
701
702    if (c->msgused > c->hdrsize) {
703        void *new_hdrbuf;
704        if (c->hdrbuf)
705            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
706        else
707            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
708        if (! new_hdrbuf)
709            return -1;
710        c->hdrbuf = (unsigned char *)new_hdrbuf;
711        c->hdrsize = c->msgused * 2;
712    }
713
714    hdr = c->hdrbuf;
715    for (i = 0; i < c->msgused; i++) {
716        c->msglist[i].msg_iov[0].iov_base = hdr;
717        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
718        *hdr++ = c->request_id / 256;
719        *hdr++ = c->request_id % 256;
720        *hdr++ = i / 256;
721        *hdr++ = i % 256;
722        *hdr++ = c->msgused / 256;
723        *hdr++ = c->msgused % 256;
724        *hdr++ = 0;
725        *hdr++ = 0;
726        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
727    }
728
729    return 0;
730}
731
732
733static void out_string(conn *c, const char *str) {
734    size_t len;
735
736    assert(c != NULL);
737
738    if (c->noreply) {
739        if (settings.verbose > 1)
740            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
741        c->noreply = false;
742        conn_set_state(c, conn_read);
743        return;
744    }
745
746    if (settings.verbose > 1)
747        fprintf(stderr, ">%d %s\n", c->sfd, str);
748
749    len = strlen(str);
750    if ((len + 2) > c->wsize) {
751        /* ought to be always enough. just fail for simplicity */
752        str = "SERVER_ERROR output line too long";
753        len = strlen(str);
754    }
755
756    memcpy(c->wbuf, str, len);
757    memcpy(c->wbuf + len, "\r\n", 3);
758    c->wbytes = len + 2;
759    c->wcurr = c->wbuf;
760
761    conn_set_state(c, conn_write);
762    c->write_and_go = conn_read;
763    return;
764}
765
766/*
767 * we get here after reading the value in set/add/replace commands. The command
768 * has been stored in c->item_comm, and the item is ready in c->item.
769 */
770
771static void complete_nread(conn *c) {
772    assert(c != NULL);
773
774    item *it = c->item;
775    int comm = c->item_comm;
776    int ret;
777
778    STATS_LOCK();
779    stats.set_cmds++;
780    STATS_UNLOCK();
781
782    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
783        out_string(c, "CLIENT_ERROR bad data chunk");
784    } else {
785      ret = store_item(it, comm);
786      if (ret == 1)
787          out_string(c, "STORED");
788      else if(ret == 2)
789          out_string(c, "EXISTS");
790      else if(ret == 3)
791          out_string(c, "NOT_FOUND");
792      else
793          out_string(c, "NOT_STORED");
794    }
795
796    item_remove(c->item);       /* release the c->item reference */
797    c->item = 0;
798}
799
800/*
801 * Stores an item in the cache according to the semantics of one of the set
802 * commands. In threaded mode, this is protected by the cache lock.
803 *
804 * Returns true if the item was stored.
805 */
806int do_store_item(item *it, int comm) {
807    char *key = ITEM_key(it);
808    bool delete_locked = false;
809    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
810    int stored = 0;
811
812    item *new_it = NULL;
813    int flags;
814
815    if (old_it != NULL && comm == NREAD_ADD) {
816        /* add only adds a nonexistent item, but promote to head of LRU */
817        do_item_update(old_it);
818    } else if (!old_it && (comm == NREAD_REPLACE
819        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
820    {
821        /* replace only replaces an existing value; don't store */
822    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
823        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
824    {
825        /* replace and add can't override delete locks; don't store */
826    } else if (comm == NREAD_CAS) {
827        /* validate cas operation */
828        if (delete_locked)
829            old_it = do_item_get_nocheck(key, it->nkey);
830
831        if(old_it == NULL) {
832          // LRU expired
833          stored = 3;
834        }
835        else if(it->cas_id == old_it->cas_id) {
836          // cas validates
837          do_item_replace(old_it, it);
838          stored = 1;
839        }
840        else
841        {
842          stored = 2;
843        }
844    } else {
845        /*
846         * Append - combine new and old record into single one. Here it's
847         * atomic and thread-safe.
848         */
849
850        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
851
852            /* we have it and old_it here - alloc memory to hold both */
853            /* flags was already lost - so recover them from ITEM_suffix(it) */
854
855            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
856
857            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
858
859            if (new_it == NULL) {
860                /* SERVER_ERROR out of memory */
861                return 0;
862            }
863
864            /* copy data from it and old_it to new_it */
865
866            if (comm == NREAD_APPEND) {
867                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
868                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
869            } else {
870                /* NREAD_PREPEND */
871                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
872                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
873            }
874
875            it = new_it;
876        }
877
878        /* "set" commands can override the delete lock
879           window... in which case we have to find the old hidden item
880           that's in the namespace/LRU but wasn't returned by
881           item_get.... because we need to replace it */
882        if (delete_locked)
883            old_it = do_item_get_nocheck(key, it->nkey);
884
885        if (old_it != NULL)
886            do_item_replace(old_it, it);
887        else
888            do_item_link(it);
889
890        stored = 1;
891    }
892
893    if (old_it != NULL)
894        do_item_remove(old_it);         /* release our reference */
895    if (new_it != NULL)
896        do_item_remove(new_it);
897
898    return stored;
899}
900
901typedef struct token_s {
902    char *value;
903    size_t length;
904} token_t;
905
906#define COMMAND_TOKEN 0
907#define SUBCOMMAND_TOKEN 1
908#define KEY_TOKEN 1
909#define KEY_MAX_LENGTH 250
910
911#define MAX_TOKENS 8
912
913/*
914 * Tokenize the command string by replacing whitespace with '\0' and update
915 * the token array tokens with pointer to start of each token and length.
916 * Returns total number of tokens.  The last valid token is the terminal
917 * token (value points to the first unprocessed character of the string and
918 * length zero).
919 *
920 * Usage example:
921 *
922 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
923 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
924 *          ...
925 *      }
926 *      ncommand = tokens[ix].value - command;
927 *      command  = tokens[ix].value;
928 *   }
929 */
930static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
931    char *s, *e;
932    size_t ntokens = 0;
933
934    assert(command != NULL && tokens != NULL && max_tokens > 1);
935
936    for (s = e = command; ntokens < max_tokens - 1; ++e) {
937        if (*e == ' ') {
938            if (s != e) {
939                tokens[ntokens].value = s;
940                tokens[ntokens].length = e - s;
941                ntokens++;
942                *e = '\0';
943            }
944            s = e + 1;
945        }
946        else if (*e == '\0') {
947            if (s != e) {
948                tokens[ntokens].value = s;
949                tokens[ntokens].length = e - s;
950                ntokens++;
951            }
952
953            break; /* string end */
954        }
955    }
956
957    /*
958     * If we scanned the whole string, the terminal value pointer is null,
959     * otherwise it is the first unprocessed character.
960     */
961    tokens[ntokens].value =  *e == '\0' ? NULL : e;
962    tokens[ntokens].length = 0;
963    ntokens++;
964
965    return ntokens;
966}
967
968/* set up a connection to write a buffer then free it, used for stats */
969static void write_and_free(conn *c, char *buf, int bytes) {
970    if (buf) {
971        c->write_and_free = buf;
972        c->wcurr = buf;
973        c->wbytes = bytes;
974        conn_set_state(c, conn_write);
975        c->write_and_go = conn_read;
976    } else {
977        out_string(c, "SERVER_ERROR out of memory writing stats");
978    }
979}
980
981static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
982{
983    int noreply_index = ntokens - 2;
984
985    /*
986      NOTE: this function is not the first place where we are going to
987      send the reply.  We could send it instead from process_command()
988      if the request line has wrong number of tokens.  However parsing
989      malformed line for "noreply" option is not reliable anyway, so
990      it can't be helped.
991    */
992    if (tokens[noreply_index].value
993        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
994        c->noreply = true;
995    }
996}
997
998inline static void process_stats_detail(conn *c, const char *command) {
999    assert(c != NULL);
1000
1001    if (strcmp(command, "on") == 0) {
1002        settings.detail_enabled = 1;
1003        out_string(c, "OK");
1004    }
1005    else if (strcmp(command, "off") == 0) {
1006        settings.detail_enabled = 0;
1007        out_string(c, "OK");
1008    }
1009    else if (strcmp(command, "dump") == 0) {
1010        int len;
1011        char *stats = stats_prefix_dump(&len);
1012        write_and_free(c, stats, len);
1013    }
1014    else {
1015        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1016    }
1017}
1018
1019static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1020    rel_time_t now = current_time;
1021    char *command;
1022    char *subcommand;
1023
1024    assert(c != NULL);
1025
1026    if(ntokens < 2) {
1027        out_string(c, "CLIENT_ERROR bad command line");
1028        return;
1029    }
1030
1031    command = tokens[COMMAND_TOKEN].value;
1032
1033    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1034        char temp[1024];
1035        pid_t pid = getpid();
1036        char *pos = temp;
1037
1038#ifndef WIN32
1039        struct rusage usage;
1040        getrusage(RUSAGE_SELF, &usage);
1041#endif /* !WIN32 */
1042
1043        STATS_LOCK();
1044        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1045        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1046        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1047        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1048        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1049#ifndef WIN32
1050        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1051        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1052#endif /* !WIN32 */
1053        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1054        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1055        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1056        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1057        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1058        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1059        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1060        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1061        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1062        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1063        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1064        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1065        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1066        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1067        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1068        pos += sprintf(pos, "END");
1069        STATS_UNLOCK();
1070        out_string(c, temp);
1071        return;
1072    }
1073
1074    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1075
1076    if (strcmp(subcommand, "reset") == 0) {
1077        stats_reset();
1078        out_string(c, "RESET");
1079        return;
1080    }
1081
1082#ifdef HAVE_MALLOC_H
1083#ifdef HAVE_STRUCT_MALLINFO
1084    if (strcmp(subcommand, "malloc") == 0) {
1085        char temp[512];
1086        struct mallinfo info;
1087        char *pos = temp;
1088
1089        info = mallinfo();
1090        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1091        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1092        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1093        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1094        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1095        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1096        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1097        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1098        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1099        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1100        out_string(c, temp);
1101        return;
1102    }
1103#endif /* HAVE_STRUCT_MALLINFO */
1104#endif /* HAVE_MALLOC_H */
1105
1106#if !defined(WIN32) || !defined(__APPLE__)
1107    if (strcmp(subcommand, "maps") == 0) {
1108        char *wbuf;
1109        int wsize = 8192; /* should be enough */
1110        int fd;
1111        int res;
1112
1113        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1114            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1115            return;
1116        }
1117
1118        fd = open("/proc/self/maps", O_RDONLY);
1119        if (fd == -1) {
1120            out_string(c, "SERVER_ERROR cannot open the maps file");
1121            free(wbuf);
1122            return;
1123        }
1124
1125        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1126        if (res == wsize - 6) {
1127            out_string(c, "SERVER_ERROR buffer overflow");
1128            free(wbuf); close(fd);
1129            return;
1130        }
1131        if (res == 0 || res == -1) {
1132            out_string(c, "SERVER_ERROR can't read the maps file");
1133            free(wbuf); close(fd);
1134            return;
1135        }
1136        memcpy(wbuf + res, "END\r\n", 5);
1137        write_and_free(c, wbuf, res + 5);
1138        close(fd);
1139        return;
1140    }
1141#endif
1142
1143    if (strcmp(subcommand, "cachedump") == 0) {
1144
1145        char *buf;
1146        unsigned int bytes, id, limit = 0;
1147
1148        if(ntokens < 5) {
1149            out_string(c, "CLIENT_ERROR bad command line");
1150            return;
1151        }
1152
1153        id = strtoul(tokens[2].value, NULL, 10);
1154        limit = strtoul(tokens[3].value, NULL, 10);
1155
1156        if(errno == ERANGE) {
1157            out_string(c, "CLIENT_ERROR bad command line format");
1158            return;
1159        }
1160
1161        buf = item_cachedump(id, limit, &bytes);
1162        write_and_free(c, buf, bytes);
1163        return;
1164    }
1165
1166    if (strcmp(subcommand, "slabs") == 0) {
1167        int bytes = 0;
1168        char *buf = slabs_stats(&bytes);
1169        write_and_free(c, buf, bytes);
1170        return;
1171    }
1172
1173    if (strcmp(subcommand, "items") == 0) {
1174        int bytes = 0;
1175        char *buf = item_stats(&bytes);
1176        write_and_free(c, buf, bytes);
1177        return;
1178    }
1179
1180    if (strcmp(subcommand, "detail") == 0) {
1181        if (ntokens < 4)
1182            process_stats_detail(c, "");  /* outputs the error message */
1183        else
1184            process_stats_detail(c, tokens[2].value);
1185        return;
1186    }
1187
1188    if (strcmp(subcommand, "sizes") == 0) {
1189        int bytes = 0;
1190        char *buf = item_stats_sizes(&bytes);
1191        write_and_free(c, buf, bytes);
1192        return;
1193    }
1194
1195    out_string(c, "ERROR");
1196}
1197
1198/* ntokens is overwritten here... shrug.. */
1199static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1200    char *key;
1201    size_t nkey;
1202    int i = 0;
1203    item *it;
1204    token_t *key_token = &tokens[KEY_TOKEN];
1205    char *suffix;
1206    int stats_get_cmds   = 0;
1207    int stats_get_hits   = 0;
1208    int stats_get_misses = 0;
1209    assert(c != NULL);
1210
1211    if (settings.managed) {
1212        int bucket = c->bucket;
1213        if (bucket == -1) {
1214            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1215            return;
1216        }
1217        c->bucket = -1;
1218        if (buckets[bucket] != c->gen) {
1219            out_string(c, "ERROR_NOT_OWNER");
1220            return;
1221        }
1222    }
1223
1224    do {
1225        while(key_token->length != 0) {
1226
1227            key = key_token->value;
1228            nkey = key_token->length;
1229
1230            if(nkey > KEY_MAX_LENGTH) {
1231                STATS_LOCK();
1232                stats.get_cmds   += stats_get_cmds;
1233                stats.get_hits   += stats_get_hits;
1234                stats.get_misses += stats_get_misses;
1235                STATS_UNLOCK();
1236                out_string(c, "CLIENT_ERROR bad command line format");
1237                return;
1238            }
1239
1240            stats_get_cmds++;
1241            it = item_get(key, nkey);
1242            if (settings.detail_enabled) {
1243                stats_prefix_record_get(key, NULL != it);
1244            }
1245            if (it) {
1246                if (i >= c->isize) {
1247                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1248                    if (new_list) {
1249                        c->isize *= 2;
1250                        c->ilist = new_list;
1251                    } else break;
1252                }
1253
1254                /*
1255                 * Construct the response. Each hit adds three elements to the
1256                 * outgoing data list:
1257                 *   "VALUE "
1258                 *   key
1259                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1260                 */
1261
1262                if(return_cas == true)
1263                {
1264                  /* Goofy mid-flight realloc. */
1265                  if (i >= c->suffixsize) {
1266                    char **new_suffix_list = realloc(c->suffixlist,
1267                                           sizeof(char *) * c->suffixsize * 2);
1268                    if (new_suffix_list) {
1269                      c->suffixsize *= 2;
1270                      c->suffixlist  = new_suffix_list;
1271                    } else break;
1272                  }
1273
1274                  suffix = suffix_from_freelist();
1275                  if (suffix == NULL) {
1276                    STATS_LOCK();
1277                    stats.get_cmds   += stats_get_cmds;
1278                    stats.get_hits   += stats_get_hits;
1279                    stats.get_misses += stats_get_misses;
1280                    STATS_UNLOCK();
1281                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1282                    return;
1283                  }
1284                  *(c->suffixlist + i) = suffix;
1285                  sprintf(suffix, " %llu\r\n", it->cas_id);
1286                  if (add_iov(c, "VALUE ", 6) != 0 ||
1287                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1288                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1289                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1290                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1291                      {
1292                          break;
1293                      }
1294                }
1295                else
1296                {
1297                  if (add_iov(c, "VALUE ", 6) != 0 ||
1298                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1299                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1300                      {
1301                          break;
1302                      }
1303                }
1304
1305
1306                if (settings.verbose > 1)
1307                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1308
1309                /* item_get() has incremented it->refcount for us */
1310                stats_get_hits++;
1311                item_update(it);
1312                *(c->ilist + i) = it;
1313                i++;
1314
1315            } else {
1316                stats_get_misses++;
1317            }
1318
1319            key_token++;
1320        }
1321
1322        /*
1323         * If the command string hasn't been fully processed, get the next set
1324         * of tokens.
1325         */
1326        if(key_token->value != NULL) {
1327            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1328            key_token = tokens;
1329        }
1330
1331    } while(key_token->value != NULL);
1332
1333    c->icurr = c->ilist;
1334    c->ileft = i;
1335    if (return_cas) {
1336        c->suffixcurr = c->suffixlist;
1337        c->suffixleft = i;
1338    }
1339
1340    if (settings.verbose > 1)
1341        fprintf(stderr, ">%d END\n", c->sfd);
1342
1343    /*
1344        If the loop was terminated because of out-of-memory, it is not
1345        reliable to add END\r\n to the buffer, because it might not end
1346        in \r\n. So we send SERVER_ERROR instead.
1347    */
1348    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1349        || (c->udp && build_udp_headers(c) != 0)) {
1350        out_string(c, "SERVER_ERROR out of memory writing get response");
1351    }
1352    else {
1353        conn_set_state(c, conn_mwrite);
1354        c->msgcurr = 0;
1355    }
1356
1357    STATS_LOCK();
1358    stats.get_cmds   += stats_get_cmds;
1359    stats.get_hits   += stats_get_hits;
1360    stats.get_misses += stats_get_misses;
1361    STATS_UNLOCK();
1362
1363    return;
1364}
1365
1366static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1367    char *key;
1368    size_t nkey;
1369    int flags;
1370    time_t exptime;
1371    int vlen, old_vlen;
1372    uint64_t req_cas_id;
1373    item *it, *old_it;
1374
1375    assert(c != NULL);
1376
1377    set_noreply_maybe(c, tokens, ntokens);
1378
1379    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1380        out_string(c, "CLIENT_ERROR bad command line format");
1381        return;
1382    }
1383
1384    key = tokens[KEY_TOKEN].value;
1385    nkey = tokens[KEY_TOKEN].length;
1386
1387    flags = strtoul(tokens[2].value, NULL, 10);
1388    exptime = strtol(tokens[3].value, NULL, 10);
1389    vlen = strtol(tokens[4].value, NULL, 10);
1390
1391    // does cas value exist?
1392    if(handle_cas)
1393    {
1394      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1395    }
1396
1397    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1398        out_string(c, "CLIENT_ERROR bad command line format");
1399        return;
1400    }
1401
1402    if (settings.detail_enabled) {
1403        stats_prefix_record_set(key);
1404    }
1405
1406    if (settings.managed) {
1407        int bucket = c->bucket;
1408        if (bucket == -1) {
1409            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1410            return;
1411        }
1412        c->bucket = -1;
1413        if (buckets[bucket] != c->gen) {
1414            out_string(c, "ERROR_NOT_OWNER");
1415            return;
1416        }
1417    }
1418
1419    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1420
1421    if (it == 0) {
1422        if (! item_size_ok(nkey, flags, vlen + 2))
1423            out_string(c, "SERVER_ERROR object too large for cache");
1424        else
1425            out_string(c, "SERVER_ERROR out of memory storing object");
1426        /* swallow the data line */
1427        c->write_and_go = conn_swallow;
1428        c->sbytes = vlen + 2;
1429        return;
1430    }
1431    if(handle_cas)
1432      it->cas_id = req_cas_id;
1433
1434    c->item = it;
1435    c->ritem = ITEM_data(it);
1436    c->rlbytes = it->nbytes;
1437    c->item_comm = comm;
1438    conn_set_state(c, conn_nread);
1439}
1440
1441static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
1442    char temp[sizeof("18446744073709551615")];
1443    item *it;
1444    int64_t delta;
1445    char *key;
1446    size_t nkey;
1447
1448    assert(c != NULL);
1449
1450    set_noreply_maybe(c, tokens, ntokens);
1451
1452    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1453        out_string(c, "CLIENT_ERROR bad command line format");
1454        return;
1455    }
1456
1457    key = tokens[KEY_TOKEN].value;
1458    nkey = tokens[KEY_TOKEN].length;
1459
1460    if (settings.managed) {
1461        int bucket = c->bucket;
1462        if (bucket == -1) {
1463            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1464            return;
1465        }
1466        c->bucket = -1;
1467        if (buckets[bucket] != c->gen) {
1468            out_string(c, "ERROR_NOT_OWNER");
1469            return;
1470        }
1471    }
1472
1473    delta = strtoll(tokens[2].value, NULL, 10);
1474
1475    if(errno == ERANGE) {
1476        out_string(c, "CLIENT_ERROR bad command line format");
1477        return;
1478    }
1479
1480    it = item_get(key, nkey);
1481    if (!it) {
1482        out_string(c, "NOT_FOUND");
1483        return;
1484    }
1485
1486    out_string(c, add_delta(it, incr, delta, temp));
1487    item_remove(it);         /* release our reference */
1488}
1489
1490/*
1491 * adds a delta value to a numeric item.
1492 *
1493 * it    item to adjust
1494 * incr  true to increment value, false to decrement
1495 * delta amount to adjust value by
1496 * buf   buffer for response string
1497 *
1498 * returns a response string to send back to the client.
1499 */
1500char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
1501    char *ptr;
1502    int64_t value;
1503    int res;
1504
1505    ptr = ITEM_data(it);
1506    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1507
1508    value = strtoull(ptr, NULL, 10);
1509
1510    if(errno == ERANGE) {
1511        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1512    }
1513
1514    if (incr)
1515        value += delta;
1516    else {
1517        if (delta >= value) value = 0;
1518        else value -= delta;
1519    }
1520    sprintf(buf, "%llu", value);
1521    res = strlen(buf);
1522    if (res + 2 > it->nbytes) { /* need to realloc */
1523        item *new_it;
1524        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1525        if (new_it == 0) {
1526            return "SERVER_ERROR out of memory in incr/decr";
1527        }
1528        memcpy(ITEM_data(new_it), buf, res);
1529        memcpy(ITEM_data(new_it) + res, "\r\n", 3);
1530        do_item_replace(it, new_it);
1531        do_item_remove(new_it);       /* release our reference */
1532    } else { /* replace in-place */
1533        memcpy(ITEM_data(it), buf, res);
1534        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1535    }
1536
1537    return buf;
1538}
1539
1540static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1541    char *key;
1542    size_t nkey;
1543    item *it;
1544    time_t exptime = 0;
1545
1546    assert(c != NULL);
1547
1548    set_noreply_maybe(c, tokens, ntokens);
1549
1550    if (settings.managed) {
1551        int bucket = c->bucket;
1552        if (bucket == -1) {
1553            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1554            return;
1555        }
1556        c->bucket = -1;
1557        if (buckets[bucket] != c->gen) {
1558            out_string(c, "ERROR_NOT_OWNER");
1559            return;
1560        }
1561    }
1562
1563    key = tokens[KEY_TOKEN].value;
1564    nkey = tokens[KEY_TOKEN].length;
1565
1566    if(nkey > KEY_MAX_LENGTH) {
1567        out_string(c, "CLIENT_ERROR bad command line format");
1568        return;
1569    }
1570
1571    if(ntokens == (c->noreply ? 5 : 4)) {
1572        exptime = strtol(tokens[2].value, NULL, 10);
1573
1574        if(errno == ERANGE) {
1575            out_string(c, "CLIENT_ERROR bad command line format");
1576            return;
1577        }
1578    }
1579
1580    if (settings.detail_enabled) {
1581        stats_prefix_record_delete(key);
1582    }
1583
1584    it = item_get(key, nkey);
1585    if (it) {
1586        if (exptime == 0) {
1587            item_unlink(it);
1588            item_remove(it);      /* release our reference */
1589            out_string(c, "DELETED");
1590        } else {
1591            /* our reference will be transfered to the delete queue */
1592            out_string(c, defer_delete(it, exptime));
1593        }
1594    } else {
1595        out_string(c, "NOT_FOUND");
1596    }
1597}
1598
1599/*
1600 * Adds an item to the deferred-delete list so it can be reaped later.
1601 *
1602 * Returns the result to send to the client.
1603 */
1604char *do_defer_delete(item *it, time_t exptime)
1605{
1606    if (delcurr >= deltotal) {
1607        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1608        if (new_delete) {
1609            todelete = new_delete;
1610            deltotal *= 2;
1611        } else {
1612            /*
1613             * can't delete it immediately, user wants a delay,
1614             * but we ran out of memory for the delete queue
1615             */
1616            item_remove(it);    /* release reference */
1617            return "SERVER_ERROR out of memory expanding delete queue";
1618        }
1619    }
1620
1621    /* use its expiration time as its deletion time now */
1622    it->exptime = realtime(exptime);
1623    it->it_flags |= ITEM_DELETED;
1624    todelete[delcurr++] = it;
1625
1626    return "DELETED";
1627}
1628
1629static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1630    unsigned int level;
1631
1632    assert(c != NULL);
1633
1634    set_noreply_maybe(c, tokens, ntokens);
1635
1636    level = strtoul(tokens[1].value, NULL, 10);
1637    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1638    out_string(c, "OK");
1639    return;
1640}
1641
1642static void process_command(conn *c, char *command) {
1643
1644    token_t tokens[MAX_TOKENS];
1645    size_t ntokens;
1646    int comm;
1647
1648    assert(c != NULL);
1649
1650    if (settings.verbose > 1)
1651        fprintf(stderr, "<%d %s\n", c->sfd, command);
1652
1653    /*
1654     * for commands set/add/replace, we build an item and read the data
1655     * directly into it, then continue in nread_complete().
1656     */
1657
1658    c->msgcurr = 0;
1659    c->msgused = 0;
1660    c->iovused = 0;
1661    if (add_msghdr(c) != 0) {
1662        out_string(c, "SERVER_ERROR out of memory preparing response");
1663        return;
1664    }
1665
1666    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1667    if (ntokens >= 3 &&
1668        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1669         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1670
1671        process_get_command(c, tokens, ntokens, false);
1672
1673    } else if ((ntokens == 6 || ntokens == 7) &&
1674               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1675                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1676                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
1677                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
1678                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
1679
1680        process_update_command(c, tokens, ntokens, comm, false);
1681
1682    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
1683
1684        process_update_command(c, tokens, ntokens, comm, true);
1685
1686    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1687
1688        process_arithmetic_command(c, tokens, ntokens, 1);
1689
1690    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
1691
1692        process_get_command(c, tokens, ntokens, true);
1693
1694    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1695
1696        process_arithmetic_command(c, tokens, ntokens, 0);
1697
1698    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1699
1700        process_delete_command(c, tokens, ntokens);
1701
1702    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1703        unsigned int bucket, gen;
1704        if (!settings.managed) {
1705            out_string(c, "CLIENT_ERROR not a managed instance");
1706            return;
1707        }
1708
1709        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1710            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1711                out_string(c, "CLIENT_ERROR bucket number out of range");
1712                return;
1713            }
1714            buckets[bucket] = gen;
1715            out_string(c, "OWNED");
1716            return;
1717        } else {
1718            out_string(c, "CLIENT_ERROR bad format");
1719            return;
1720        }
1721
1722    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1723
1724        int bucket;
1725        if (!settings.managed) {
1726            out_string(c, "CLIENT_ERROR not a managed instance");
1727            return;
1728        }
1729        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1730            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1731                out_string(c, "CLIENT_ERROR bucket number out of range");
1732                return;
1733            }
1734            buckets[bucket] = 0;
1735            out_string(c, "DISOWNED");
1736            return;
1737        } else {
1738            out_string(c, "CLIENT_ERROR bad format");
1739            return;
1740        }
1741
1742    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1743        int bucket, gen;
1744        if (!settings.managed) {
1745            out_string(c, "CLIENT_ERROR not a managed instance");
1746            return;
1747        }
1748        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1749            /* we never write anything back, even if input's wrong */
1750            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1751                /* do nothing, bad input */
1752            } else {
1753                c->bucket = bucket;
1754                c->gen = gen;
1755            }
1756            conn_set_state(c, conn_read);
1757            return;
1758        } else {
1759            out_string(c, "CLIENT_ERROR bad format");
1760            return;
1761        }
1762
1763    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1764
1765        process_stat(c, tokens, ntokens);
1766
1767    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1768        time_t exptime = 0;
1769        set_current_time();
1770
1771        set_noreply_maybe(c, tokens, ntokens);
1772
1773        if(ntokens == (c->noreply ? 3 : 2)) {
1774            settings.oldest_live = current_time - 1;
1775            item_flush_expired();
1776            out_string(c, "OK");
1777            return;
1778        }
1779
1780        exptime = strtol(tokens[1].value, NULL, 10);
1781        if(errno == ERANGE) {
1782            out_string(c, "CLIENT_ERROR bad command line format");
1783            return;
1784        }
1785
1786        /*
1787          If exptime is zero realtime() would return zero too, and
1788          realtime(exptime) - 1 would overflow to the max unsigned
1789          value.  So we process exptime == 0 the same way we do when
1790          no delay is given at all.
1791        */
1792        if (exptime > 0)
1793            settings.oldest_live = realtime(exptime) - 1;
1794        else /* exptime == 0 */
1795            settings.oldest_live = current_time - 1;
1796        item_flush_expired();
1797        out_string(c, "OK");
1798        return;
1799
1800    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1801
1802        out_string(c, "VERSION " VERSION);
1803
1804    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1805
1806        conn_set_state(c, conn_closing);
1807
1808    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1809                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1810#ifdef ALLOW_SLABS_REASSIGN
1811
1812        int src, dst, rv;
1813
1814        src = strtol(tokens[2].value, NULL, 10);
1815        dst  = strtol(tokens[3].value, NULL, 10);
1816
1817        if(errno == ERANGE) {
1818            out_string(c, "CLIENT_ERROR bad command line format");
1819            return;
1820        }
1821
1822        rv = slabs_reassign(src, dst);
1823        if (rv == 1) {
1824            out_string(c, "DONE");
1825            return;
1826        }
1827        if (rv == 0) {
1828            out_string(c, "CANT");
1829            return;
1830        }
1831        if (rv == -1) {
1832            out_string(c, "BUSY");
1833            return;
1834        }
1835#else
1836        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1837#endif
1838    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1839        process_verbosity_command(c, tokens, ntokens);
1840    } else {
1841        out_string(c, "ERROR");
1842    }
1843    return;
1844}
1845
1846/*
1847 * if we have a complete line in the buffer, process it.
1848 */
1849static int try_read_command(conn *c) {
1850    char *el, *cont;
1851
1852    assert(c != NULL);
1853    assert(c->rcurr <= (c->rbuf + c->rsize));
1854
1855    if (c->rbytes == 0)
1856        return 0;
1857    el = memchr(c->rcurr, '\n', c->rbytes);
1858    if (!el)
1859        return 0;
1860    cont = el + 1;
1861    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1862        el--;
1863    }
1864    *el = '\0';
1865
1866    assert(cont <= (c->rcurr + c->rbytes));
1867
1868    process_command(c, c->rcurr);
1869
1870    c->rbytes -= (cont - c->rcurr);
1871    c->rcurr = cont;
1872
1873    assert(c->rcurr <= (c->rbuf + c->rsize));
1874
1875    return 1;
1876}
1877
1878/*
1879 * read a UDP request.
1880 * return 0 if there's nothing to read.
1881 */
1882static int try_read_udp(conn *c) {
1883    int res;
1884
1885    assert(c != NULL);
1886
1887    c->request_addr_size = sizeof(c->request_addr);
1888    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1889                   0, &c->request_addr, &c->request_addr_size);
1890    if (res > 8) {
1891        unsigned char *buf = (unsigned char *)c->rbuf;
1892        STATS_LOCK();
1893        stats.bytes_read += res;
1894        STATS_UNLOCK();
1895
1896        /* Beginning of UDP packet is the request ID; save it. */
1897        c->request_id = buf[0] * 256 + buf[1];
1898
1899        /* If this is a multi-packet request, drop it. */
1900        if (buf[4] != 0 || buf[5] != 1) {
1901            out_string(c, "SERVER_ERROR multi-packet request not supported");
1902            return 0;
1903        }
1904
1905        /* Don't care about any of the rest of the header. */
1906        res -= 8;
1907        memmove(c->rbuf, c->rbuf + 8, res);
1908
1909        c->rbytes += res;
1910        c->rcurr = c->rbuf;
1911        return 1;
1912    }
1913    return 0;
1914}
1915
1916/*
1917 * read from network as much as we can, handle buffer overflow and connection
1918 * close.
1919 * before reading, move the remaining incomplete fragment of a command
1920 * (if any) to the beginning of the buffer.
1921 * return 0 if there's nothing to read on the first read.
1922 */
1923static int try_read_network(conn *c) {
1924    int gotdata = 0;
1925    int res;
1926
1927    assert(c != NULL);
1928
1929    if (c->rcurr != c->rbuf) {
1930        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1931            memmove(c->rbuf, c->rcurr, c->rbytes);
1932        c->rcurr = c->rbuf;
1933    }
1934
1935    while (1) {
1936        if (c->rbytes >= c->rsize) {
1937            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1938            if (!new_rbuf) {
1939                if (settings.verbose > 0)
1940                    fprintf(stderr, "Couldn't realloc input buffer\n");
1941                c->rbytes = 0; /* ignore what we read */
1942                out_string(c, "SERVER_ERROR out of memory reading request");
1943                c->write_and_go = conn_closing;
1944                return 1;
1945            }
1946            c->rcurr = c->rbuf = new_rbuf;
1947            c->rsize *= 2;
1948        }
1949
1950        /* unix socket mode doesn't need this, so zeroed out.  but why
1951         * is this done for every command?  presumably for UDP
1952         * mode.  */
1953        if (!settings.socketpath) {
1954            c->request_addr_size = sizeof(c->request_addr);
1955        } else {
1956            c->request_addr_size = 0;
1957        }
1958
1959        int avail = c->rsize - c->rbytes;
1960        res = read(c->sfd, c->rbuf + c->rbytes, avail);
1961        if (res > 0) {
1962            STATS_LOCK();
1963            stats.bytes_read += res;
1964            STATS_UNLOCK();
1965            gotdata = 1;
1966            c->rbytes += res;
1967            if (res == avail) {
1968                continue;
1969            } else {
1970                break;
1971            }
1972        }
1973        if (res == 0) {
1974            /* connection closed */
1975            conn_set_state(c, conn_closing);
1976            return 1;
1977        }
1978        if (res == -1) {
1979            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1980            /* Should close on unhandled errors. */
1981            conn_set_state(c, conn_closing);
1982            return 1;
1983        }
1984    }
1985    return gotdata;
1986}
1987
1988static bool update_event(conn *c, const int new_flags) {
1989    assert(c != NULL);
1990
1991    struct event_base *base = c->event.ev_base;
1992    if (c->ev_flags == new_flags)
1993        return true;
1994    if (event_del(&c->event) == -1) return false;
1995    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1996    event_base_set(base, &c->event);
1997    c->ev_flags = new_flags;
1998    if (event_add(&c->event, 0) == -1) return false;
1999    return true;
2000}
2001
2002/*
2003 * Sets whether we are listening for new connections or not.
2004 */
2005void accept_new_conns(const bool do_accept) {
2006    conn *next;
2007
2008    if (! is_listen_thread())
2009        return;
2010
2011    for (next = listen_conn; next; next = next->next) {
2012        if (do_accept) {
2013            update_event(next, EV_READ | EV_PERSIST);
2014            if (listen(next->sfd, 1024) != 0) {
2015                perror("listen");
2016            }
2017        }
2018        else {
2019            update_event(next, 0);
2020            if (listen(next->sfd, 0) != 0) {
2021                perror("listen");
2022            }
2023        }
2024  }
2025}
2026
2027
2028/*
2029 * Transmit the next chunk of data from our list of msgbuf structures.
2030 *
2031 * Returns:
2032 *   TRANSMIT_COMPLETE   All done writing.
2033 *   TRANSMIT_INCOMPLETE More data remaining to write.
2034 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2035 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2036 */
2037static int transmit(conn *c) {
2038    assert(c != NULL);
2039
2040    if (c->msgcurr < c->msgused &&
2041            c->msglist[c->msgcurr].msg_iovlen == 0) {
2042        /* Finished writing the current msg; advance to the next. */
2043        c->msgcurr++;
2044    }
2045    if (c->msgcurr < c->msgused) {
2046        ssize_t res;
2047        struct msghdr *m = &c->msglist[c->msgcurr];
2048
2049        res = sendmsg(c->sfd, m, 0);
2050        if (res > 0) {
2051            STATS_LOCK();
2052            stats.bytes_written += res;
2053            STATS_UNLOCK();
2054
2055            /* We've written some of the data. Remove the completed
2056               iovec entries from the list of pending writes. */
2057            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2058                res -= m->msg_iov->iov_len;
2059                m->msg_iovlen--;
2060                m->msg_iov++;
2061            }
2062
2063            /* Might have written just part of the last iovec entry;
2064               adjust it so the next write will do the rest. */
2065            if (res > 0) {
2066                m->msg_iov->iov_base += res;
2067                m->msg_iov->iov_len -= res;
2068            }
2069            return TRANSMIT_INCOMPLETE;
2070        }
2071        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2072            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2073                if (settings.verbose > 0)
2074                    fprintf(stderr, "Couldn't update event\n");
2075                conn_set_state(c, conn_closing);
2076                return TRANSMIT_HARD_ERROR;
2077            }
2078            return TRANSMIT_SOFT_ERROR;
2079        }
2080        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2081           we have a real error, on which we close the connection */
2082        if (settings.verbose > 0)
2083            perror("Failed to write, and not due to blocking");
2084
2085        if (c->udp)
2086            conn_set_state(c, conn_read);
2087        else
2088            conn_set_state(c, conn_closing);
2089        return TRANSMIT_HARD_ERROR;
2090    } else {
2091        return TRANSMIT_COMPLETE;
2092    }
2093}
2094
2095static void drive_machine(conn *c) {
2096    bool stop = false;
2097    int sfd, flags = 1;
2098    socklen_t addrlen;
2099    struct sockaddr_storage addr;
2100    int res;
2101
2102    assert(c != NULL);
2103
2104    while (!stop) {
2105
2106        switch(c->state) {
2107        case conn_listening:
2108            addrlen = sizeof(addr);
2109            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2110                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2111                    /* these are transient, so don't log anything */
2112                    stop = true;
2113                } else if (errno == EMFILE) {
2114                    if (settings.verbose > 0)
2115                        fprintf(stderr, "Too many open connections\n");
2116                    accept_new_conns(false);
2117                    stop = true;
2118                } else {
2119                    perror("accept()");
2120                    stop = true;
2121                }
2122                break;
2123            }
2124            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2125                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2126                perror("setting O_NONBLOCK");
2127                close(sfd);
2128                break;
2129            }
2130            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
2131                                     DATA_BUFFER_SIZE, false);
2132            break;
2133
2134        case conn_read:
2135            if (try_read_command(c) != 0) {
2136                continue;
2137            }
2138            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
2139                continue;
2140            }
2141            /* we have no command line and no data to read from network */
2142            if (!update_event(c, EV_READ | EV_PERSIST)) {
2143                if (settings.verbose > 0)
2144                    fprintf(stderr, "Couldn't update event\n");
2145                conn_set_state(c, conn_closing);
2146                break;
2147            }
2148            stop = true;
2149            break;
2150
2151        case conn_nread:
2152            /* we are reading rlbytes into ritem; */
2153            if (c->rlbytes == 0) {
2154                complete_nread(c);
2155                break;
2156            }
2157            /* first check if we have leftovers in the conn_read buffer */
2158            if (c->rbytes > 0) {
2159                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2160                memcpy(c->ritem, c->rcurr, tocopy);
2161                c->ritem += tocopy;
2162                c->rlbytes -= tocopy;
2163                c->rcurr += tocopy;
2164                c->rbytes -= tocopy;
2165                break;
2166            }
2167
2168            /*  now try reading from the socket */
2169            res = read(c->sfd, c->ritem, c->rlbytes);
2170            if (res > 0) {
2171                STATS_LOCK();
2172                stats.bytes_read += res;
2173                STATS_UNLOCK();
2174                c->ritem += res;
2175                c->rlbytes -= res;
2176                break;
2177            }
2178            if (res == 0) { /* end of stream */
2179                conn_set_state(c, conn_closing);
2180                break;
2181            }
2182            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2183                if (!update_event(c, EV_READ | EV_PERSIST)) {
2184                    if (settings.verbose > 0)
2185                        fprintf(stderr, "Couldn't update event\n");
2186                    conn_set_state(c, conn_closing);
2187                    break;
2188                }
2189                stop = true;
2190                break;
2191            }
2192            /* otherwise we have a real error, on which we close the connection */
2193            if (settings.verbose > 0)
2194                fprintf(stderr, "Failed to read, and not due to blocking\n");
2195            conn_set_state(c, conn_closing);
2196            break;
2197
2198        case conn_swallow:
2199            /* we are reading sbytes and throwing them away */
2200            if (c->sbytes == 0) {
2201                conn_set_state(c, conn_read);
2202                break;
2203            }
2204
2205            /* first check if we have leftovers in the conn_read buffer */
2206            if (c->rbytes > 0) {
2207                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2208                c->sbytes -= tocopy;
2209                c->rcurr += tocopy;
2210                c->rbytes -= tocopy;
2211                break;
2212            }
2213
2214            /*  now try reading from the socket */
2215            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2216            if (res > 0) {
2217                STATS_LOCK();
2218                stats.bytes_read += res;
2219                STATS_UNLOCK();
2220                c->sbytes -= res;
2221                break;
2222            }
2223            if (res == 0) { /* end of stream */
2224                conn_set_state(c, conn_closing);
2225                break;
2226            }
2227            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2228                if (!update_event(c, EV_READ | EV_PERSIST)) {
2229                    if (settings.verbose > 0)
2230                        fprintf(stderr, "Couldn't update event\n");
2231                    conn_set_state(c, conn_closing);
2232                    break;
2233                }
2234                stop = true;
2235                break;
2236            }
2237            /* otherwise we have a real error, on which we close the connection */
2238            if (settings.verbose > 0)
2239                fprintf(stderr, "Failed to read, and not due to blocking\n");
2240            conn_set_state(c, conn_closing);
2241            break;
2242
2243        case conn_write:
2244            /*
2245             * We want to write out a simple response. If we haven't already,
2246             * assemble it into a msgbuf list (this will be a single-entry
2247             * list for TCP or a two-entry list for UDP).
2248             */
2249            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
2250                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2251                    (c->udp && build_udp_headers(c) != 0)) {
2252                    if (settings.verbose > 0)
2253                        fprintf(stderr, "Couldn't build response\n");
2254                    conn_set_state(c, conn_closing);
2255                    break;
2256                }
2257            }
2258
2259            /* fall through... */
2260
2261        case conn_mwrite:
2262            switch (transmit(c)) {
2263            case TRANSMIT_COMPLETE:
2264                if (c->state == conn_mwrite) {
2265                    while (c->ileft > 0) {
2266                        item *it = *(c->icurr);
2267                        assert((it->it_flags & ITEM_SLABBED) == 0);
2268                        item_remove(it);
2269                        c->icurr++;
2270                        c->ileft--;
2271                    }
2272                    while (c->suffixleft > 0) {
2273                        char *suffix = *(c->suffixcurr);
2274                        if(suffix_add_to_freelist(suffix)) {
2275                            /* Failed to add to freelist, don't leak */
2276                            free(suffix);
2277                        }
2278                        c->suffixcurr++;
2279                        c->suffixleft--;
2280                    }
2281                    conn_set_state(c, conn_read);
2282                } else if (c->state == conn_write) {
2283                    if (c->write_and_free) {
2284                        free(c->write_and_free);
2285                        c->write_and_free = 0;
2286                    }
2287                    conn_set_state(c, c->write_and_go);
2288                } else {
2289                    if (settings.verbose > 0)
2290                        fprintf(stderr, "Unexpected state %d\n", c->state);
2291                    conn_set_state(c, conn_closing);
2292                }
2293                break;
2294
2295            case TRANSMIT_INCOMPLETE:
2296            case TRANSMIT_HARD_ERROR:
2297                break;                   /* Continue in state machine. */
2298
2299            case TRANSMIT_SOFT_ERROR:
2300                stop = true;
2301                break;
2302            }
2303            break;
2304
2305        case conn_closing:
2306            if (c->udp)
2307                conn_cleanup(c);
2308            else
2309                conn_close(c);
2310            stop = true;
2311            break;
2312        }
2313    }
2314
2315    return;
2316}
2317
2318void event_handler(const int fd, const short which, void *arg) {
2319    conn *c;
2320
2321    c = (conn *)arg;
2322    assert(c != NULL);
2323
2324    c->which = which;
2325
2326    /* sanity */
2327    if (fd != c->sfd) {
2328        if (settings.verbose > 0)
2329            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2330        conn_close(c);
2331        return;
2332    }
2333
2334    drive_machine(c);
2335
2336    /* wait for next event */
2337    return;
2338}
2339
2340static int new_socket(struct addrinfo *ai) {
2341    int sfd;
2342    int flags;
2343
2344    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
2345        perror("socket()");
2346        return -1;
2347    }
2348
2349    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2350        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2351        perror("setting O_NONBLOCK");
2352        close(sfd);
2353        return -1;
2354    }
2355    return sfd;
2356}
2357
2358
2359/*
2360 * Sets a socket's send buffer size to the maximum allowed by the system.
2361 */
2362static void maximize_sndbuf(const int sfd) {
2363    socklen_t intsize = sizeof(int);
2364    int last_good = 0;
2365    int min, max, avg;
2366    int old_size;
2367
2368    /* Start with the default size. */
2369    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2370        if (settings.verbose > 0)
2371            perror("getsockopt(SO_SNDBUF)");
2372        return;
2373    }
2374
2375    /* Binary-search for the real maximum. */
2376    min = old_size;
2377    max = MAX_SENDBUF_SIZE;
2378
2379    while (min <= max) {
2380        avg = ((unsigned int)(min + max)) / 2;
2381        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2382            last_good = avg;
2383            min = avg + 1;
2384        } else {
2385            max = avg - 1;
2386        }
2387    }
2388
2389    if (settings.verbose > 1)
2390        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2391}
2392
2393static int server_socket(const int port, const bool is_udp) {
2394    int sfd;
2395    struct linger ling = {0, 0};
2396    struct addrinfo *ai;
2397    struct addrinfo *next;
2398    struct addrinfo hints;
2399    char port_buf[NI_MAXSERV];
2400    int error;
2401    int success = 0;
2402
2403    int flags =1;
2404
2405    /*
2406     * the memset call clears nonstandard fields in some impementations
2407     * that otherwise mess things up.
2408     */
2409    memset(&hints, 0, sizeof (hints));
2410    hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
2411    if (is_udp)
2412    {
2413        hints.ai_protocol = IPPROTO_UDP;
2414        hints.ai_socktype = SOCK_DGRAM;
2415        hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */
2416    } else {
2417        hints.ai_family = AF_UNSPEC;
2418        hints.ai_protocol = IPPROTO_TCP;
2419        hints.ai_socktype = SOCK_STREAM;
2420    }
2421
2422    snprintf(port_buf, NI_MAXSERV, "%d", port);
2423    error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
2424    if (error != 0) {
2425      if (error != EAI_SYSTEM)
2426        fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
2427      else
2428        perror("getaddrinfo()");
2429
2430      return 1;
2431    }
2432
2433    for (next= ai; next; next= next->ai_next) {
2434        conn *listen_conn_add;
2435        if ((sfd = new_socket(next)) == -1) {
2436            freeaddrinfo(ai);
2437            return 1;
2438        }
2439
2440        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2441        if (is_udp) {
2442            maximize_sndbuf(sfd);
2443        } else {
2444            setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2445            setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2446            setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2447        }
2448
2449        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
2450            if (errno != EADDRINUSE) {
2451                perror("bind()");
2452                close(sfd);
2453                freeaddrinfo(ai);
2454                return 1;
2455            }
2456            close(sfd);
2457            continue;
2458        } else {
2459          success++;
2460          if (!is_udp && listen(sfd, 1024) == -1) {
2461              perror("listen()");
2462              close(sfd);
2463              freeaddrinfo(ai);
2464              return 1;
2465          }
2466      }
2467
2468      if (is_udp)
2469      {
2470        int c;
2471
2472        for (c = 0; c < settings.num_threads; c++) {
2473            /* this is guaranteed to hit all threads because we round-robin */
2474            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
2475                              UDP_READ_BUFFER_SIZE, 1);
2476        }
2477      } else {
2478        if (!(listen_conn_add = conn_new(sfd, conn_listening,
2479                                         EV_READ | EV_PERSIST, 1, false, main_base))) {
2480            fprintf(stderr, "failed to create listening connection\n");
2481            exit(EXIT_FAILURE);
2482        }
2483
2484        listen_conn_add->next = listen_conn;
2485        listen_conn = listen_conn_add;
2486      }
2487    }
2488
2489    freeaddrinfo(ai);
2490
2491    /* Return zero iff we detected no errors in starting up connections */
2492    return success == 0;
2493}
2494
2495static int new_socket_unix(void) {
2496    int sfd;
2497    int flags;
2498
2499    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2500        perror("socket()");
2501        return -1;
2502    }
2503
2504    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2505        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2506        perror("setting O_NONBLOCK");
2507        close(sfd);
2508        return -1;
2509    }
2510    return sfd;
2511}
2512
2513static int server_socket_unix(const char *path, int access_mask) {
2514    int sfd;
2515    struct linger ling = {0, 0};
2516    struct sockaddr_un addr;
2517    struct stat tstat;
2518    int flags =1;
2519    int old_umask;
2520
2521    if (!path) {
2522        return 1;
2523    }
2524
2525    if ((sfd = new_socket_unix()) == -1) {
2526        return 1;
2527    }
2528
2529    /*
2530     * Clean up a previous socket file if we left it around
2531     */
2532    if (lstat(path, &tstat) == 0) {
2533        if (S_ISSOCK(tstat.st_mode))
2534            unlink(path);
2535    }
2536
2537    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2538    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2539    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2540
2541    /*
2542     * the memset call clears nonstandard fields in some impementations
2543     * that otherwise mess things up.
2544     */
2545    memset(&addr, 0, sizeof(addr));
2546
2547    addr.sun_family = AF_UNIX;
2548    strcpy(addr.sun_path, path);
2549    old_umask=umask( ~(access_mask&0777));
2550    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2551        perror("bind()");
2552        close(sfd);
2553        umask(old_umask);
2554        return 1;
2555    }
2556    umask(old_umask);
2557    if (listen(sfd, 1024) == -1) {
2558        perror("listen()");
2559        close(sfd);
2560        return 1;
2561    }
2562    if (!(listen_conn = conn_new(sfd, conn_listening,
2563                                     EV_READ | EV_PERSIST, 1, false, main_base))) {
2564        fprintf(stderr, "failed to create listening connection\n");
2565        exit(EXIT_FAILURE);
2566    }
2567
2568    return 0;
2569}
2570
2571/*
2572 * We keep the current time of day in a global variable that's updated by a
2573 * timer event. This saves us a bunch of time() system calls (we really only
2574 * need to get the time once a second, whereas there can be tens of thousands
2575 * of requests a second) and allows us to use server-start-relative timestamps
2576 * rather than absolute UNIX timestamps, a space savings on systems where
2577 * sizeof(time_t) > sizeof(unsigned int).
2578 */
2579volatile rel_time_t current_time;
2580static struct event clockevent;
2581
2582/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2583static void set_current_time(void) {
2584    struct timeval timer;
2585
2586    gettimeofday(&timer, NULL);
2587    current_time = (rel_time_t) (timer.tv_sec - stats.started);
2588}
2589
2590static void clock_handler(const int fd, const short which, void *arg) {
2591    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2592    static bool initialized = false;
2593
2594    if (initialized) {
2595        /* only delete the event if it's actually there. */
2596        evtimer_del(&clockevent);
2597    } else {
2598        initialized = true;
2599    }
2600
2601    evtimer_set(&clockevent, clock_handler, 0);
2602    event_base_set(main_base, &clockevent);
2603    evtimer_add(&clockevent, &t);
2604
2605    set_current_time();
2606}
2607
2608static struct event deleteevent;
2609
2610static void delete_handler(const int fd, const short which, void *arg) {
2611    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2612    static bool initialized = false;
2613
2614    if (initialized) {
2615        /* some versions of libevent don't like deleting events that don't exist,
2616           so only delete once we know this event has been added. */
2617        evtimer_del(&deleteevent);
2618    } else {
2619        initialized = true;
2620    }
2621
2622    evtimer_set(&deleteevent, delete_handler, 0);
2623    event_base_set(main_base, &deleteevent);
2624    evtimer_add(&deleteevent, &t);
2625    run_deferred_deletes();
2626}
2627
2628/* Call run_deferred_deletes instead of this. */
2629void do_run_deferred_deletes(void)
2630{
2631    int i, j = 0;
2632
2633    for (i = 0; i < delcurr; i++) {
2634        item *it = todelete[i];
2635        if (item_delete_lock_over(it)) {
2636            assert(it->refcount > 0);
2637            it->it_flags &= ~ITEM_DELETED;
2638            do_item_unlink(it);
2639            do_item_remove(it);
2640        } else {
2641            todelete[j++] = it;
2642        }
2643    }
2644    delcurr = j;
2645}
2646
2647static void usage(void) {
2648    printf(PACKAGE " " VERSION "\n");
2649    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2650           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2651           "-s <file>     unix socket path to listen on (disables network support)\n"
2652           "-a <mask>     access mask for unix socket, in octal (default 0700)\n"
2653           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2654           "-d            run as a daemon\n"
2655           "-r            maximize core file limit\n"
2656           "-u <username> assume identity of <username> (only when run as root)\n"
2657           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2658           "-M            return error on memory exhausted (rather than removing items)\n"
2659           "-c <num>      max simultaneous connections, default is 1024\n"
2660           "-k            lock down all paged memory.  Note that there is a\n"
2661           "              limit on how much memory you may lock.  Trying to\n"
2662           "              allocate more than that would fail, so be sure you\n"
2663           "              set the limit correctly for the user you started\n"
2664           "              the daemon with (not for -u <username> user;\n"
2665           "              under sh this is done with 'ulimit -S -l NUM_KB').\n"
2666           "-v            verbose (print errors/warnings while in event loop)\n"
2667           "-vv           very verbose (also print client commands/reponses)\n"
2668           "-h            print this help and exit\n"
2669           "-i            print memcached and libevent license\n"
2670           "-b            run a managed instanced (mnemonic: buckets)\n"
2671           "-P <file>     save PID in <file>, only used with -d option\n"
2672           "-f <factor>   chunk size growth factor, default 1.25\n"
2673           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n"
2674
2675#if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
2676           "-L            Try to use large memory pages (if available). Increasing\n"
2677           "              the memory page size could reduce the number of TLB misses\n"
2678           "              and improve the performance. In order to get large pages\n"
2679           "              from the OS, memcached will allocate the total item-cache\n"
2680           "              in one large chunk.\n"
2681#endif
2682           );
2683
2684#ifdef USE_THREADS
2685    printf("-t <num>      number of threads to use, default 4\n");
2686#endif
2687    return;
2688}
2689
2690static void usage_license(void) {
2691    printf(PACKAGE " " VERSION "\n\n");
2692    printf(
2693    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2694    "All rights reserved.\n"
2695    "\n"
2696    "Redistribution and use in source and binary forms, with or without\n"
2697    "modification, are permitted provided that the following conditions are\n"
2698    "met:\n"
2699    "\n"
2700    "    * Redistributions of source code must retain the above copyright\n"
2701    "notice, this list of conditions and the following disclaimer.\n"
2702    "\n"
2703    "    * Redistributions in binary form must reproduce the above\n"
2704    "copyright notice, this list of conditions and the following disclaimer\n"
2705    "in the documentation and/or other materials provided with the\n"
2706    "distribution.\n"
2707    "\n"
2708    "    * Neither the name of the Danga Interactive nor the names of its\n"
2709    "contributors may be used to endorse or promote products derived from\n"
2710    "this software without specific prior written permission.\n"
2711    "\n"
2712    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2713    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2714    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2715    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2716    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2717    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2718    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2719    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2720    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2721    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2722    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2723    "\n"
2724    "\n"
2725    "This product includes software developed by Niels Provos.\n"
2726    "\n"
2727    "[ libevent ]\n"
2728    "\n"
2729    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2730    "All rights reserved.\n"
2731    "\n"
2732    "Redistribution and use in source and binary forms, with or without\n"
2733    "modification, are permitted provided that the following conditions\n"
2734    "are met:\n"
2735    "1. Redistributions of source code must retain the above copyright\n"
2736    "   notice, this list of conditions and the following disclaimer.\n"
2737    "2. Redistributions in binary form must reproduce the above copyright\n"
2738    "   notice, this list of conditions and the following disclaimer in the\n"
2739    "   documentation and/or other materials provided with the distribution.\n"
2740    "3. All advertising materials mentioning features or use of this software\n"
2741    "   must display the following acknowledgement:\n"
2742    "      This product includes software developed by Niels Provos.\n"
2743    "4. The name of the author may not be used to endorse or promote products\n"
2744    "   derived from this software without specific prior written permission.\n"
2745    "\n"
2746    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2747    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2748    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2749    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2750    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2751    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2752    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2753    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2754    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2755    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2756    );
2757
2758    return;
2759}
2760
2761static void save_pid(const pid_t pid, const char *pid_file) {
2762    FILE *fp;
2763    if (pid_file == NULL)
2764        return;
2765
2766    if ((fp = fopen(pid_file, "w")) == NULL) {
2767        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2768        return;
2769    }
2770
2771    fprintf(fp,"%ld\n", (long)pid);
2772    if (fclose(fp) == -1) {
2773        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2774        return;
2775    }
2776}
2777
2778static void remove_pidfile(const char *pid_file) {
2779  if (pid_file == NULL)
2780      return;
2781
2782  if (unlink(pid_file) != 0) {
2783      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2784  }
2785
2786}
2787
2788
2789static void sig_handler(const int sig) {
2790    printf("SIGINT handled.\n");
2791    exit(EXIT_SUCCESS);
2792}
2793
2794#if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
2795/*
2796 * On systems that supports multiple page sizes we may reduce the
2797 * number of TLB-misses by using the biggest available page size
2798 */
2799int enable_large_pages(void) {
2800    int ret = -1;
2801    size_t sizes[32];
2802    int avail = getpagesizes(sizes, 32);
2803    if (avail != -1) {
2804        size_t max = sizes[0];
2805        struct memcntl_mha arg = {0};
2806        int ii;
2807
2808        for (ii = 1; ii < avail; ++ii) {
2809            if (max < sizes[ii]) {
2810                max = sizes[ii];
2811            }
2812        }
2813
2814        arg.mha_flags   = 0;
2815        arg.mha_pagesize = max;
2816        arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
2817
2818        if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
2819            fprintf(stderr, "Failed to set large pages: %s\n",
2820                    strerror(errno));
2821            fprintf(stderr, "Will use default page size\n");
2822        } else {
2823            ret = 0;
2824        }
2825    } else {
2826        fprintf(stderr, "Failed to get supported pagesizes: %s\n",
2827                strerror(errno));
2828        fprintf(stderr, "Will use default page size\n");
2829    }
2830
2831    return ret;
2832}
2833#endif
2834
2835int main (int argc, char **argv) {
2836    int c;
2837    int x;
2838    bool lock_memory = false;
2839    bool daemonize = false;
2840    bool preallocate = false;
2841    int maxcore = 0;
2842    char *username = NULL;
2843    char *pid_file = NULL;
2844    struct passwd *pw;
2845    struct sigaction sa;
2846    struct rlimit rlim;
2847    /* listening socket */
2848    static int *l_socket = NULL;
2849
2850    /* udp socket */
2851    static int *u_socket = NULL;
2852    static int u_socket_count = 0;
2853
2854    /* handle SIGINT */
2855    signal(SIGINT, sig_handler);
2856
2857    /* init settings */
2858    settings_init();
2859
2860    /* set stderr non-buffering (for running under, say, daemontools) */
2861    setbuf(stderr, NULL);
2862
2863    /* process arguments */
2864    while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:L")) != -1) {
2865        switch (c) {
2866        case 'a':
2867            /* access for unix domain socket, as octal mask (like chmod)*/
2868            settings.access= strtol(optarg,NULL,8);
2869            break;
2870
2871        case 'U':
2872            settings.udpport = atoi(optarg);
2873            break;
2874        case 'b':
2875            settings.managed = true;
2876            break;
2877        case 'p':
2878            settings.port = atoi(optarg);
2879            break;
2880        case 's':
2881            settings.socketpath = optarg;
2882            break;
2883        case 'm':
2884            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2885            break;
2886        case 'M':
2887            settings.evict_to_free = 0;
2888            break;
2889        case 'c':
2890            settings.maxconns = atoi(optarg);
2891            break;
2892        case 'h':
2893            usage();
2894            exit(EXIT_SUCCESS);
2895        case 'i':
2896            usage_license();
2897            exit(EXIT_SUCCESS);
2898        case 'k':
2899            lock_memory = true;
2900            break;
2901        case 'v':
2902            settings.verbose++;
2903            break;
2904        case 'l':
2905            settings.inter= strdup(optarg);
2906            break;
2907        case 'd':
2908            daemonize = true;
2909            break;
2910        case 'r':
2911            maxcore = 1;
2912            break;
2913        case 'u':
2914            username = optarg;
2915            break;
2916        case 'P':
2917            pid_file = optarg;
2918            break;
2919        case 'f':
2920            settings.factor = atof(optarg);
2921            if (settings.factor <= 1.0) {
2922                fprintf(stderr, "Factor must be greater than 1\n");
2923                return 1;
2924            }
2925            break;
2926        case 'n':
2927            settings.chunk_size = atoi(optarg);
2928            if (settings.chunk_size == 0) {
2929                fprintf(stderr, "Chunk size must be greater than 0\n");
2930                return 1;
2931            }
2932            break;
2933        case 't':
2934            settings.num_threads = atoi(optarg);
2935            if (settings.num_threads == 0) {
2936                fprintf(stderr, "Number of threads must be greater than 0\n");
2937                return 1;
2938            }
2939            break;
2940        case 'D':
2941            if (! optarg || ! optarg[0]) {
2942                fprintf(stderr, "No delimiter specified\n");
2943                return 1;
2944            }
2945            settings.prefix_delimiter = optarg[0];
2946            settings.detail_enabled = 1;
2947            break;
2948#if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
2949        case 'L' :
2950            if (enable_large_pages() == 0) {
2951                preallocate = true;
2952            }
2953            break;
2954#endif
2955        default:
2956            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2957            return 1;
2958        }
2959    }
2960
2961    if (maxcore != 0) {
2962        struct rlimit rlim_new;
2963        /*
2964         * First try raising to infinity; if that fails, try bringing
2965