root/trunk/server/memcached.c @ 787

Revision 787, 93.6 kB (checked in by dormando, 17 months ago)

Allocate new conn structures with calloc.

Janusz Dziemidowicz reported conn->next was sometimes not initialized.
This would have been the case for any client connection, or any
listener connection that wasn't tcp.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the BSD license.  See
10 *  the LICENSE file for full text.
11 *
12 *  Authors:
13 *      Anatoly Vorobey <mellon@pobox.com>
14 *      Brad Fitzpatrick <brad@danga.com>
15std *
16 *  $Id$
17 */
18#include "memcached.h"
19#include <sys/stat.h>
20#include <sys/socket.h>
21#include <sys/un.h>
22#include <signal.h>
23#include <sys/resource.h>
24#include <sys/uio.h>
25
26/* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h.  */
28#ifndef _P1003_1B_VISIBLE
29#define _P1003_1B_VISIBLE
30#endif
31/* need this to get IOV_MAX on some platforms. */
32#ifndef __need_IOV_MAX
33#define __need_IOV_MAX
34#endif
35#include <pwd.h>
36#include <sys/mman.h>
37#include <fcntl.h>
38#include <netinet/tcp.h>
39#include <arpa/inet.h>
40#include <errno.h>
41#include <stdlib.h>
42#include <stdio.h>
43#include <string.h>
44#include <time.h>
45#include <assert.h>
46#include <limits.h>
47
48#ifdef HAVE_MALLOC_H
49/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
50#ifndef __OpenBSD__
51#include <malloc.h>
52#endif
53#endif
54
55/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
56#ifndef IOV_MAX
57#if defined(__FreeBSD__) || defined(__APPLE__)
58# define IOV_MAX 1024
59#endif
60#endif
61
62/*
63 * forward declarations
64 */
65static void drive_machine(conn *c);
66static int new_socket(struct addrinfo *ai);
67static int server_socket(const int port, const bool is_udp);
68static int try_read_command(conn *c);
69static int try_read_network(conn *c);
70static int try_read_udp(conn *c);
71
72/* stats */
73static void stats_reset(void);
74static void stats_init(void);
75
76/* defaults */
77static void settings_init(void);
78
79/* event handling, network IO */
80static void event_handler(const int fd, const short which, void *arg);
81static void conn_close(conn *c);
82static void conn_init(void);
83static void accept_new_conns(const bool do_accept);
84static bool update_event(conn *c, const int new_flags);
85static void complete_nread(conn *c);
86static void process_command(conn *c, char *command);
87static int transmit(conn *c);
88static int ensure_iov_space(conn *c);
89static int add_iov(conn *c, const void *buf, int len);
90static int add_msghdr(conn *c);
91
92/* time handling */
93static void set_current_time(void);  /* update the global variable holding
94                              global 32-bit seconds-since-start time
95                              (to avoid 64 bit time_t) */
96
97static void conn_free(conn *c);
98
99/** exported globals **/
100struct stats stats;
101struct settings settings;
102
103/** file scope variables **/
104static item **todelete = NULL;
105static int delcurr;
106static int deltotal;
107static conn *listen_conn = NULL;
108static struct event_base *main_base;
109
110#define TRANSMIT_COMPLETE   0
111#define TRANSMIT_INCOMPLETE 1
112#define TRANSMIT_SOFT_ERROR 2
113#define TRANSMIT_HARD_ERROR 3
114
115static int *buckets = 0; /* bucket->generation array for a managed instance */
116
117#define REALTIME_MAXDELTA 60*60*24*30
118/*
119 * given time value that's either unix time or delta from current unix time, return
120 * unix time. Use the fact that delta can't exceed one month (and real time value can't
121 * be that low).
122 */
123static rel_time_t realtime(const time_t exptime) {
124    /* no. of seconds in 30 days - largest possible delta exptime */
125
126    if (exptime == 0) return 0; /* 0 means never expire */
127
128    if (exptime > REALTIME_MAXDELTA) {
129        /* if item expiration is at/before the server started, give it an
130           expiration time of 1 second after the server started.
131           (because 0 means don't expire).  without this, we'd
132           underflow and wrap around to some large value way in the
133           future, effectively making items expiring in the past
134           really expiring never */
135        if (exptime <= stats.started)
136            return (rel_time_t)1;
137        return (rel_time_t)(exptime - stats.started);
138    } else {
139        return (rel_time_t)(exptime + current_time);
140    }
141}
142
143static void stats_init(void) {
144    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
145    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
146    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
147
148    /* make the time we started always be 2 seconds before we really
149       did, so time(0) - time.started is never zero.  if so, things
150       like 'settings.oldest_live' which act as booleans as well as
151       values are now false in boolean context... */
152    stats.started = time(0) - 2;
153    stats_prefix_init();
154}
155
156static void stats_reset(void) {
157    STATS_LOCK();
158    stats.total_items = stats.total_conns = 0;
159    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
160    stats.bytes_read = stats.bytes_written = 0;
161    stats_prefix_clear();
162    STATS_UNLOCK();
163}
164
165static void settings_init(void) {
166    settings.access=0700;
167    settings.port = 11211;
168    settings.udpport = 0;
169    /* By default this string should be NULL for getaddrinfo() */
170    settings.inter = NULL;
171    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
172    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
173    settings.verbose = 0;
174    settings.oldest_live = 0;
175    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
176    settings.socketpath = NULL;       /* by default, not using a unix socket */
177    settings.managed = false;
178    settings.factor = 1.25;
179    settings.chunk_size = 48;         /* space for a modest key and value */
180#ifdef USE_THREADS
181    settings.num_threads = 4;
182#else
183    settings.num_threads = 1;
184#endif
185    settings.prefix_delimiter = ':';
186    settings.detail_enabled = 0;
187}
188
189/* returns true if a deleted item's delete-locked-time is over, and it
190   should be removed from the namespace */
191static bool item_delete_lock_over (item *it) {
192    assert(it->it_flags & ITEM_DELETED);
193    return (current_time >= it->exptime);
194}
195
196/*
197 * Adds a message header to a connection.
198 *
199 * Returns 0 on success, -1 on out-of-memory.
200 */
201static int add_msghdr(conn *c)
202{
203    struct msghdr *msg;
204
205    assert(c != NULL);
206
207    if (c->msgsize == c->msgused) {
208        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
209        if (! msg)
210            return -1;
211        c->msglist = msg;
212        c->msgsize *= 2;
213    }
214
215    msg = c->msglist + c->msgused;
216
217    /* this wipes msg_iovlen, msg_control, msg_controllen, and
218       msg_flags, the last 3 of which aren't defined on solaris: */
219    memset(msg, 0, sizeof(struct msghdr));
220
221    msg->msg_iov = &c->iov[c->iovused];
222
223    if (c->request_addr_size > 0) {
224        msg->msg_name = &c->request_addr;
225        msg->msg_namelen = c->request_addr_size;
226    }
227
228    c->msgbytes = 0;
229    c->msgused++;
230
231    if (c->udp) {
232        /* Leave room for the UDP header, which we'll fill in later. */
233        return add_iov(c, NULL, UDP_HEADER_SIZE);
234    }
235
236    return 0;
237}
238
239
240/*
241 * Free list management for connections.
242 */
243
244static conn **freeconns;
245static int freetotal;
246static int freecurr;
247
248
249static void conn_init(void) {
250    freetotal = 200;
251    freecurr = 0;
252    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
253        fprintf(stderr, "malloc()\n");
254    }
255    return;
256}
257
258/*
259 * Returns a connection from the freelist, if any. Should call this using
260 * conn_from_freelist() for thread safety.
261 */
262conn *do_conn_from_freelist() {
263    conn *c;
264
265    if (freecurr > 0) {
266        c = freeconns[--freecurr];
267    } else {
268        c = NULL;
269    }
270
271    return c;
272}
273
274/*
275 * Adds a connection to the freelist. 0 = success. Should call this using
276 * conn_add_to_freelist() for thread safety.
277 */
278bool do_conn_add_to_freelist(conn *c) {
279    if (freecurr < freetotal) {
280        freeconns[freecurr++] = c;
281        return false;
282    } else {
283        /* try to enlarge free connections array */
284        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
285        if (new_freeconns) {
286            freetotal *= 2;
287            freeconns = new_freeconns;
288            freeconns[freecurr++] = c;
289            return false;
290        }
291    }
292    return true;
293}
294
295conn *conn_new(const int sfd, const int init_state, const int event_flags,
296                const int read_buffer_size, const bool is_udp, struct event_base *base) {
297    conn *c = conn_from_freelist();
298
299    if (NULL == c) {
300        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
301            fprintf(stderr, "calloc()\n");
302            return NULL;
303        }
304        c->rbuf = c->wbuf = 0;
305        c->ilist = 0;
306        c->suffixlist = 0;
307        c->iov = 0;
308        c->msglist = 0;
309        c->hdrbuf = 0;
310
311        c->rsize = read_buffer_size;
312        c->wsize = DATA_BUFFER_SIZE;
313        c->isize = ITEM_LIST_INITIAL;
314        c->suffixsize = SUFFIX_LIST_INITIAL;
315        c->iovsize = IOV_LIST_INITIAL;
316        c->msgsize = MSG_LIST_INITIAL;
317        c->hdrsize = 0;
318
319        c->rbuf = (char *)malloc((size_t)c->rsize);
320        c->wbuf = (char *)malloc((size_t)c->wsize);
321        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
322        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
323        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
324        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
325
326        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
327                c->msglist == 0 || c->suffixlist == 0) {
328            if (c->rbuf != 0) free(c->rbuf);
329            if (c->wbuf != 0) free(c->wbuf);
330            if (c->ilist !=0) free(c->ilist);
331            if (c->suffixlist != 0) free(c->suffixlist);
332            if (c->iov != 0) free(c->iov);
333            if (c->msglist != 0) free(c->msglist);
334            free(c);
335            fprintf(stderr, "malloc()\n");
336            return NULL;
337        }
338
339        STATS_LOCK();
340        stats.conn_structs++;
341        STATS_UNLOCK();
342    }
343
344    if (settings.verbose > 1) {
345        if (init_state == conn_listening)
346            fprintf(stderr, "<%d server listening\n", sfd);
347        else if (is_udp)
348            fprintf(stderr, "<%d server listening (udp)\n", sfd);
349        else
350            fprintf(stderr, "<%d new client connection\n", sfd);
351    }
352
353    c->sfd = sfd;
354    c->udp = is_udp;
355    c->state = init_state;
356    c->rlbytes = 0;
357    c->rbytes = c->wbytes = 0;
358    c->wcurr = c->wbuf;
359    c->rcurr = c->rbuf;
360    c->ritem = 0;
361    c->icurr = c->ilist;
362    c->suffixcurr = c->suffixlist;
363    c->ileft = 0;
364    c->suffixleft = 0;
365    c->iovused = 0;
366    c->msgcurr = 0;
367    c->msgused = 0;
368
369    c->write_and_go = conn_read;
370    c->write_and_free = 0;
371    c->item = 0;
372    c->bucket = -1;
373    c->gen = 0;
374
375    c->noreply = false;
376
377    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
378    event_base_set(base, &c->event);
379    c->ev_flags = event_flags;
380
381    if (event_add(&c->event, 0) == -1) {
382        if (conn_add_to_freelist(c)) {
383            conn_free(c);
384        }
385        perror("event_add");
386        return NULL;
387    }
388
389    STATS_LOCK();
390    stats.curr_conns++;
391    stats.total_conns++;
392    STATS_UNLOCK();
393
394    return c;
395}
396
397static void conn_cleanup(conn *c) {
398    assert(c != NULL);
399
400    if (c->item) {
401        item_remove(c->item);
402        c->item = 0;
403    }
404
405    if (c->ileft != 0) {
406        for (; c->ileft > 0; c->ileft--,c->icurr++) {
407            item_remove(*(c->icurr));
408        }
409    }
410
411    if (c->suffixleft != 0) {
412        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
413            if(suffix_add_to_freelist(*(c->suffixcurr))) {
414                free(*(c->suffixcurr));
415            }
416        }
417    }
418
419    if (c->write_and_free) {
420        free(c->write_and_free);
421        c->write_and_free = 0;
422    }
423}
424
425/*
426 * Frees a connection.
427 */
428void conn_free(conn *c) {
429    if (c) {
430        if (c->hdrbuf)
431            free(c->hdrbuf);
432        if (c->msglist)
433            free(c->msglist);
434        if (c->rbuf)
435            free(c->rbuf);
436        if (c->wbuf)
437            free(c->wbuf);
438        if (c->ilist)
439            free(c->ilist);
440        if (c->suffixlist)
441            free(c->suffixlist);
442        if (c->iov)
443            free(c->iov);
444        free(c);
445    }
446}
447
448static void conn_close(conn *c) {
449    assert(c != NULL);
450
451    /* delete the event, the socket and the conn */
452    event_del(&c->event);
453
454    if (settings.verbose > 1)
455        fprintf(stderr, "<%d connection closed.\n", c->sfd);
456
457    close(c->sfd);
458    accept_new_conns(true);
459    conn_cleanup(c);
460
461    /* if the connection has big buffers, just free it */
462    if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
463        conn_free(c);
464    }
465
466    STATS_LOCK();
467    stats.curr_conns--;
468    STATS_UNLOCK();
469
470    return;
471}
472
473
474/*
475 * Shrinks a connection's buffers if they're too big.  This prevents
476 * periodic large "get" requests from permanently chewing lots of server
477 * memory.
478 *
479 * This should only be called in between requests since it can wipe output
480 * buffers!
481 */
482static void conn_shrink(conn *c) {
483    assert(c != NULL);
484
485    if (c->udp)
486        return;
487
488    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
489        char *newbuf;
490
491        if (c->rcurr != c->rbuf)
492            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
493
494        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
495
496        if (newbuf) {
497            c->rbuf = newbuf;
498            c->rsize = DATA_BUFFER_SIZE;
499        }
500        /* TODO check other branch... */
501        c->rcurr = c->rbuf;
502    }
503
504    if (c->isize > ITEM_LIST_HIGHWAT) {
505        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
506        if (newbuf) {
507            c->ilist = newbuf;
508            c->isize = ITEM_LIST_INITIAL;
509        }
510    /* TODO check error condition? */
511    }
512
513    if (c->msgsize > MSG_LIST_HIGHWAT) {
514        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
515        if (newbuf) {
516            c->msglist = newbuf;
517            c->msgsize = MSG_LIST_INITIAL;
518        }
519    /* TODO check error condition? */
520    }
521
522    if (c->iovsize > IOV_LIST_HIGHWAT) {
523        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
524        if (newbuf) {
525            c->iov = newbuf;
526            c->iovsize = IOV_LIST_INITIAL;
527        }
528    /* TODO check return value */
529    }
530}
531
532/*
533 * Sets a connection's current state in the state machine. Any special
534 * processing that needs to happen on certain state transitions can
535 * happen here.
536 */
537static void conn_set_state(conn *c, int state) {
538    assert(c != NULL);
539
540    if (state != c->state) {
541        if (state == conn_read) {
542            conn_shrink(c);
543            assoc_move_next_bucket();
544        }
545        c->state = state;
546    }
547}
548
549/*
550 * Free list management for suffix buffers.
551 */
552
553static char **freesuffix;
554static int freesuffixtotal;
555static int freesuffixcurr;
556
557static void suffix_init(void) {
558    freesuffixtotal = 500;
559    freesuffixcurr  = 0;
560
561    freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal );
562    if (freesuffix == NULL) {
563        fprintf(stderr, "malloc()\n");
564    }
565    return;
566}
567
568/*
569 * Returns a suffix buffer from the freelist, if any. Should call this using
570 * suffix_from_freelist() for thread safety.
571 */
572char *do_suffix_from_freelist() {
573    char *s;
574
575    if (freesuffixcurr > 0) {
576        s = freesuffix[--freesuffixcurr];
577    } else {
578        /* If malloc fails, let the logic fall through without spamming
579         * STDERR on the server. */
580        s = malloc( SUFFIX_SIZE );
581    }
582
583    return s;
584}
585
586/*
587 * Adds a connection to the freelist. 0 = success. Should call this using
588 * conn_add_to_freelist() for thread safety.
589 */
590bool do_suffix_add_to_freelist(char *s) {
591    if (freesuffixcurr < freesuffixtotal) {
592        freesuffix[freesuffixcurr++] = s;
593        return false;
594    } else {
595        /* try to enlarge free connections array */
596        char **new_freesuffix = realloc(freesuffix,
597            sizeof(char *) * freesuffixtotal * 2);
598        if (new_freesuffix) {
599            freesuffixtotal *= 2;
600            freesuffix = new_freesuffix;
601            freesuffix[freesuffixcurr++] = s;
602            return false;
603        }
604    }
605    return true;
606}
607
608/*
609 * Ensures that there is room for another struct iovec in a connection's
610 * iov list.
611 *
612 * Returns 0 on success, -1 on out-of-memory.
613 */
614static int ensure_iov_space(conn *c) {
615    assert(c != NULL);
616
617    if (c->iovused >= c->iovsize) {
618        int i, iovnum;
619        struct iovec *new_iov = (struct iovec *)realloc(c->iov,
620                                (c->iovsize * 2) * sizeof(struct iovec));
621        if (! new_iov)
622            return -1;
623        c->iov = new_iov;
624        c->iovsize *= 2;
625
626        /* Point all the msghdr structures at the new list. */
627        for (i = 0, iovnum = 0; i < c->msgused; i++) {
628            c->msglist[i].msg_iov = &c->iov[iovnum];
629            iovnum += c->msglist[i].msg_iovlen;
630        }
631    }
632
633    return 0;
634}
635
636
637/*
638 * Adds data to the list of pending data that will be written out to a
639 * connection.
640 *
641 * Returns 0 on success, -1 on out-of-memory.
642 */
643
644static int add_iov(conn *c, const void *buf, int len) {
645    struct msghdr *m;
646    int leftover;
647    bool limit_to_mtu;
648
649    assert(c != NULL);
650
651    do {
652        m = &c->msglist[c->msgused - 1];
653
654        /*
655         * Limit UDP packets, and the first payloads of TCP replies, to
656         * UDP_MAX_PAYLOAD_SIZE bytes.
657         */
658        limit_to_mtu = c->udp || (1 == c->msgused);
659
660        /* We may need to start a new msghdr if this one is full. */
661        if (m->msg_iovlen == IOV_MAX ||
662            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
663            add_msghdr(c);
664            m = &c->msglist[c->msgused - 1];
665        }
666
667        if (ensure_iov_space(c) != 0)
668            return -1;
669
670        /* If the fragment is too big to fit in the datagram, split it up */
671        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
672            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
673            len -= leftover;
674        } else {
675            leftover = 0;
676        }
677
678        m = &c->msglist[c->msgused - 1];
679        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
680        m->msg_iov[m->msg_iovlen].iov_len = len;
681
682        c->msgbytes += len;
683        c->iovused++;
684        m->msg_iovlen++;
685
686        buf = ((char *)buf) + len;
687        len = leftover;
688    } while (leftover > 0);
689
690    return 0;
691}
692
693
694/*
695 * Constructs a set of UDP headers and attaches them to the outgoing messages.
696 */
697static int build_udp_headers(conn *c) {
698    int i;
699    unsigned char *hdr;
700
701    assert(c != NULL);
702
703    if (c->msgused > c->hdrsize) {
704        void *new_hdrbuf;
705        if (c->hdrbuf)
706            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
707        else
708            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
709        if (! new_hdrbuf)
710            return -1;
711        c->hdrbuf = (unsigned char *)new_hdrbuf;
712        c->hdrsize = c->msgused * 2;
713    }
714
715    hdr = c->hdrbuf;
716    for (i = 0; i < c->msgused; i++) {
717        c->msglist[i].msg_iov[0].iov_base = hdr;
718        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
719        *hdr++ = c->request_id / 256;
720        *hdr++ = c->request_id % 256;
721        *hdr++ = i / 256;
722        *hdr++ = i % 256;
723        *hdr++ = c->msgused / 256;
724        *hdr++ = c->msgused % 256;
725        *hdr++ = 0;
726        *hdr++ = 0;
727        assert((void *) hdr == (void *)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
728    }
729
730    return 0;
731}
732
733
734static void out_string(conn *c, const char *str) {
735    size_t len;
736
737    assert(c != NULL);
738
739    if (c->noreply) {
740        if (settings.verbose > 1)
741            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
742        c->noreply = false;
743        conn_set_state(c, conn_read);
744        return;
745    }
746
747    if (settings.verbose > 1)
748        fprintf(stderr, ">%d %s\n", c->sfd, str);
749
750    len = strlen(str);
751    if ((len + 2) > c->wsize) {
752        /* ought to be always enough. just fail for simplicity */
753        str = "SERVER_ERROR output line too long";
754        len = strlen(str);
755    }
756
757    memcpy(c->wbuf, str, len);
758    memcpy(c->wbuf + len, "\r\n", 3);
759    c->wbytes = len + 2;
760    c->wcurr = c->wbuf;
761
762    conn_set_state(c, conn_write);
763    c->write_and_go = conn_read;
764    return;
765}
766
767/*
768 * we get here after reading the value in set/add/replace commands. The command
769 * has been stored in c->item_comm, and the item is ready in c->item.
770 */
771
772static void complete_nread(conn *c) {
773    assert(c != NULL);
774
775    item *it = c->item;
776    int comm = c->item_comm;
777    int ret;
778
779    STATS_LOCK();
780    stats.set_cmds++;
781    STATS_UNLOCK();
782
783    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
784        out_string(c, "CLIENT_ERROR bad data chunk");
785    } else {
786      ret = store_item(it, comm);
787      if (ret == 1)
788          out_string(c, "STORED");
789      else if(ret == 2)
790          out_string(c, "EXISTS");
791      else if(ret == 3)
792          out_string(c, "NOT_FOUND");
793      else
794          out_string(c, "NOT_STORED");
795    }
796
797    item_remove(c->item);       /* release the c->item reference */
798    c->item = 0;
799}
800
801/*
802 * Stores an item in the cache according to the semantics of one of the set
803 * commands. In threaded mode, this is protected by the cache lock.
804 *
805 * Returns true if the item was stored.
806 */
807int do_store_item(item *it, int comm) {
808    char *key = ITEM_key(it);
809    bool delete_locked = false;
810    item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
811    int stored = 0;
812
813    item *new_it = NULL;
814    int flags;
815
816    if (old_it != NULL && comm == NREAD_ADD) {
817        /* add only adds a nonexistent item, but promote to head of LRU */
818        do_item_update(old_it);
819    } else if (!old_it && (comm == NREAD_REPLACE
820        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
821    {
822        /* replace only replaces an existing value; don't store */
823    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
824        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
825    {
826        /* replace and add can't override delete locks; don't store */
827    } else if (comm == NREAD_CAS) {
828        /* validate cas operation */
829        if (delete_locked)
830            old_it = do_item_get_nocheck(key, it->nkey);
831
832        if(old_it == NULL) {
833          // LRU expired
834          stored = 3;
835        }
836        else if(it->cas_id == old_it->cas_id) {
837          // cas validates
838          do_item_replace(old_it, it);
839          stored = 1;
840        }
841        else
842        {
843          stored = 2;
844        }
845    } else {
846        /*
847         * Append - combine new and old record into single one. Here it's
848         * atomic and thread-safe.
849         */
850
851        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
852
853            /* we have it and old_it here - alloc memory to hold both */
854            /* flags was already lost - so recover them from ITEM_suffix(it) */
855
856            flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
857
858            new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
859
860            if (new_it == NULL) {
861                /* SERVER_ERROR out of memory */
862                return 0;
863            }
864
865            /* copy data from it and old_it to new_it */
866
867            if (comm == NREAD_APPEND) {
868                memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
869                memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
870            } else {
871                /* NREAD_PREPEND */
872                memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
873                memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
874            }
875
876            it = new_it;
877        }
878
879        /* "set" commands can override the delete lock
880           window... in which case we have to find the old hidden item
881           that's in the namespace/LRU but wasn't returned by
882           item_get.... because we need to replace it */
883        if (delete_locked)
884            old_it = do_item_get_nocheck(key, it->nkey);
885
886        if (old_it != NULL)
887            do_item_replace(old_it, it);
888        else
889            do_item_link(it);
890
891        stored = 1;
892    }
893
894    if (old_it != NULL)
895        do_item_remove(old_it);         /* release our reference */
896    if (new_it != NULL)
897        do_item_remove(new_it);
898
899    return stored;
900}
901
902typedef struct token_s {
903    char *value;
904    size_t length;
905} token_t;
906
907#define COMMAND_TOKEN 0
908#define SUBCOMMAND_TOKEN 1
909#define KEY_TOKEN 1
910#define KEY_MAX_LENGTH 250
911
912#define MAX_TOKENS 8
913
914/*
915 * Tokenize the command string by replacing whitespace with '\0' and update
916 * the token array tokens with pointer to start of each token and length.
917 * Returns total number of tokens.  The last valid token is the terminal
918 * token (value points to the first unprocessed character of the string and
919 * length zero).
920 *
921 * Usage example:
922 *
923 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
924 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
925 *          ...
926 *      }
927 *      ncommand = tokens[ix].value - command;
928 *      command  = tokens[ix].value;
929 *   }
930 */
931static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
932    char *s, *e;
933    size_t ntokens = 0;
934
935    assert(command != NULL && tokens != NULL && max_tokens > 1);
936
937    for (s = e = command; ntokens < max_tokens - 1; ++e) {
938        if (*e == ' ') {
939            if (s != e) {
940                tokens[ntokens].value = s;
941                tokens[ntokens].length = e - s;
942                ntokens++;
943                *e = '\0';
944            }
945            s = e + 1;
946        }
947        else if (*e == '\0') {
948            if (s != e) {
949                tokens[ntokens].value = s;
950                tokens[ntokens].length = e - s;
951                ntokens++;
952            }
953
954            break; /* string end */
955        }
956    }
957
958    /*
959     * If we scanned the whole string, the terminal value pointer is null,
960     * otherwise it is the first unprocessed character.
961     */
962    tokens[ntokens].value =  *e == '\0' ? NULL : e;
963    tokens[ntokens].length = 0;
964    ntokens++;
965
966    return ntokens;
967}
968
969/* set up a connection to write a buffer then free it, used for stats */
970static void write_and_free(conn *c, char *buf, int bytes) {
971    if (buf) {
972        c->write_and_free = buf;
973        c->wcurr = buf;
974        c->wbytes = bytes;
975        conn_set_state(c, conn_write);
976        c->write_and_go = conn_read;
977    } else {
978        out_string(c, "SERVER_ERROR out of memory writing stats");
979    }
980}
981
982static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
983{
984    int noreply_index = ntokens - 2;
985
986    /*
987      NOTE: this function is not the first place where we are going to
988      send the reply.  We could send it instead from process_command()
989      if the request line has wrong number of tokens.  However parsing
990      malformed line for "noreply" option is not reliable anyway, so
991      it can't be helped.
992    */
993    if (tokens[noreply_index].value
994        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
995        c->noreply = true;
996    }
997}
998
999inline static void process_stats_detail(conn *c, const char *command) {
1000    assert(c != NULL);
1001
1002    if (strcmp(command, "on") == 0) {
1003        settings.detail_enabled = 1;
1004        out_string(c, "OK");
1005    }
1006    else if (strcmp(command, "off") == 0) {
1007        settings.detail_enabled = 0;
1008        out_string(c, "OK");
1009    }
1010    else if (strcmp(command, "dump") == 0) {
1011        int len;
1012        char *stats = stats_prefix_dump(&len);
1013        write_and_free(c, stats, len);
1014    }
1015    else {
1016        out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
1017    }
1018}
1019
1020static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
1021    rel_time_t now = current_time;
1022    char *command;
1023    char *subcommand;
1024
1025    assert(c != NULL);
1026
1027    if(ntokens < 2) {
1028        out_string(c, "CLIENT_ERROR bad command line");
1029        return;
1030    }
1031
1032    command = tokens[COMMAND_TOKEN].value;
1033
1034    if (ntokens == 2 && strcmp(command, "stats") == 0) {
1035        char temp[1024];
1036        pid_t pid = getpid();
1037        char *pos = temp;
1038
1039#ifndef WIN32
1040        struct rusage usage;
1041        getrusage(RUSAGE_SELF, &usage);
1042#endif /* !WIN32 */
1043
1044        STATS_LOCK();
1045        pos += sprintf(pos, "STAT pid %u\r\n", pid);
1046        pos += sprintf(pos, "STAT uptime %u\r\n", now);
1047        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
1048        pos += sprintf(pos, "STAT version " VERSION "\r\n");
1049        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void *));
1050#ifndef WIN32
1051        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
1052        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
1053#endif /* !WIN32 */
1054        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
1055        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
1056        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
1057        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
1058        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
1059        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
1060        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
1061        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
1062        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
1063        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
1064        pos += sprintf(pos, "STAT evictions %llu\r\n", stats.evictions);
1065        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
1066        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
1067        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (uint64_t) settings.maxbytes);
1068        pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
1069        pos += sprintf(pos, "END");
1070        STATS_UNLOCK();
1071        out_string(c, temp);
1072        return;
1073    }
1074
1075    subcommand = tokens[SUBCOMMAND_TOKEN].value;
1076
1077    if (strcmp(subcommand, "reset") == 0) {
1078        stats_reset();
1079        out_string(c, "RESET");
1080        return;
1081    }
1082
1083#ifdef HAVE_MALLOC_H
1084#ifdef HAVE_STRUCT_MALLINFO
1085    if (strcmp(subcommand, "malloc") == 0) {
1086        char temp[512];
1087        struct mallinfo info;
1088        char *pos = temp;
1089
1090        info = mallinfo();
1091        pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
1092        pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
1093        pos += sprintf(pos, "STAT fastbin_blocks %d\r\n", info.smblks);
1094        pos += sprintf(pos, "STAT mmapped_regions %d\r\n", info.hblks);
1095        pos += sprintf(pos, "STAT mmapped_space %d\r\n", info.hblkhd);
1096        pos += sprintf(pos, "STAT max_total_alloc %d\r\n", info.usmblks);
1097        pos += sprintf(pos, "STAT fastbin_space %d\r\n", info.fsmblks);
1098        pos += sprintf(pos, "STAT total_alloc %d\r\n", info.uordblks);
1099        pos += sprintf(pos, "STAT total_free %d\r\n", info.fordblks);
1100        pos += sprintf(pos, "STAT releasable_space %d\r\nEND", info.keepcost);
1101        out_string(c, temp);
1102        return;
1103    }
1104#endif /* HAVE_STRUCT_MALLINFO */
1105#endif /* HAVE_MALLOC_H */
1106
1107#if !defined(WIN32) || !defined(__APPLE__)
1108    if (strcmp(subcommand, "maps") == 0) {
1109        char *wbuf;
1110        int wsize = 8192; /* should be enough */
1111        int fd;
1112        int res;
1113
1114        if ((wbuf = (char *)malloc(wsize)) == NULL) {
1115            out_string(c, "SERVER_ERROR out of memory writing stats maps");
1116            return;
1117        }
1118
1119        fd = open("/proc/self/maps", O_RDONLY);
1120        if (fd == -1) {
1121            out_string(c, "SERVER_ERROR cannot open the maps file");
1122            free(wbuf);
1123            return;
1124        }
1125
1126        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
1127        if (res == wsize - 6) {
1128            out_string(c, "SERVER_ERROR buffer overflow");
1129            free(wbuf); close(fd);
1130            return;
1131        }
1132        if (res == 0 || res == -1) {
1133            out_string(c, "SERVER_ERROR can't read the maps file");
1134            free(wbuf); close(fd);
1135            return;
1136        }
1137        memcpy(wbuf + res, "END\r\n", 5);
1138        write_and_free(c, wbuf, res + 5);
1139        close(fd);
1140        return;
1141    }
1142#endif
1143
1144    if (strcmp(subcommand, "cachedump") == 0) {
1145
1146        char *buf;
1147        unsigned int bytes, id, limit = 0;
1148
1149        if(ntokens < 5) {
1150            out_string(c, "CLIENT_ERROR bad command line");
1151            return;
1152        }
1153
1154        id = strtoul(tokens[2].value, NULL, 10);
1155        limit = strtoul(tokens[3].value, NULL, 10);
1156
1157        if(errno == ERANGE) {
1158            out_string(c, "CLIENT_ERROR bad command line format");
1159            return;
1160        }
1161
1162        buf = item_cachedump(id, limit, &bytes);
1163        write_and_free(c, buf, bytes);
1164        return;
1165    }
1166
1167    if (strcmp(subcommand, "slabs") == 0) {
1168        int bytes = 0;
1169        char *buf = slabs_stats(&bytes);
1170        write_and_free(c, buf, bytes);
1171        return;
1172    }
1173
1174    if (strcmp(subcommand, "items") == 0) {
1175        int bytes = 0;
1176        char *buf = item_stats(&bytes);
1177        write_and_free(c, buf, bytes);
1178        return;
1179    }
1180
1181    if (strcmp(subcommand, "detail") == 0) {
1182        if (ntokens < 4)
1183            process_stats_detail(c, "");  /* outputs the error message */
1184        else
1185            process_stats_detail(c, tokens[2].value);
1186        return;
1187    }
1188
1189    if (strcmp(subcommand, "sizes") == 0) {
1190        int bytes = 0;
1191        char *buf = item_stats_sizes(&bytes);
1192        write_and_free(c, buf, bytes);
1193        return;
1194    }
1195
1196    out_string(c, "ERROR");
1197}
1198
1199/* ntokens is overwritten here... shrug.. */
1200static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
1201    char *key;
1202    size_t nkey;
1203    int i = 0;
1204    item *it;
1205    token_t *key_token = &tokens[KEY_TOKEN];
1206    char *suffix;
1207    int stats_get_cmds   = 0;
1208    int stats_get_hits   = 0;
1209    int stats_get_misses = 0;
1210    assert(c != NULL);
1211
1212    if (settings.managed) {
1213        int bucket = c->bucket;
1214        if (bucket == -1) {
1215            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1216            return;
1217        }
1218        c->bucket = -1;
1219        if (buckets[bucket] != c->gen) {
1220            out_string(c, "ERROR_NOT_OWNER");
1221            return;
1222        }
1223    }
1224
1225    do {
1226        while(key_token->length != 0) {
1227
1228            key = key_token->value;
1229            nkey = key_token->length;
1230
1231            if(nkey > KEY_MAX_LENGTH) {
1232                STATS_LOCK();
1233                stats.get_cmds   += stats_get_cmds;
1234                stats.get_hits   += stats_get_hits;
1235                stats.get_misses += stats_get_misses;
1236                STATS_UNLOCK();
1237                out_string(c, "CLIENT_ERROR bad command line format");
1238                return;
1239            }
1240
1241            stats_get_cmds++;
1242            it = item_get(key, nkey);
1243            if (settings.detail_enabled) {
1244                stats_prefix_record_get(key, NULL != it);
1245            }
1246            if (it) {
1247                if (i >= c->isize) {
1248                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
1249                    if (new_list) {
1250                        c->isize *= 2;
1251                        c->ilist = new_list;
1252                    } else break;
1253                }
1254
1255                /*
1256                 * Construct the response. Each hit adds three elements to the
1257                 * outgoing data list:
1258                 *   "VALUE "
1259                 *   key
1260                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
1261                 */
1262
1263                if(return_cas == true)
1264                {
1265                  /* Goofy mid-flight realloc. */
1266                  if (i >= c->suffixsize) {
1267                    char **new_suffix_list = realloc(c->suffixlist,
1268                                           sizeof(char *) * c->suffixsize * 2);
1269                    if (new_suffix_list) {
1270                      c->suffixsize *= 2;
1271                      c->suffixlist  = new_suffix_list;
1272                    } else break;
1273                  }
1274
1275                  suffix = suffix_from_freelist();
1276                  if (suffix == NULL) {
1277                    STATS_LOCK();
1278                    stats.get_cmds   += stats_get_cmds;
1279                    stats.get_hits   += stats_get_hits;
1280                    stats.get_misses += stats_get_misses;
1281                    STATS_UNLOCK();
1282                    out_string(c, "SERVER_ERROR out of memory making CAS suffix");
1283                    return;
1284                  }
1285                  *(c->suffixlist + i) = suffix;
1286                  sprintf(suffix, " %llu\r\n", it->cas_id);
1287                  if (add_iov(c, "VALUE ", 6) != 0 ||
1288                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1289                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
1290                      add_iov(c, suffix, strlen(suffix)) != 0 ||
1291                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
1292                      {
1293                          break;
1294                      }
1295                }
1296                else
1297                {
1298                  if (add_iov(c, "VALUE ", 6) != 0 ||
1299                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
1300                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
1301                      {
1302                          break;
1303                      }
1304                }
1305
1306
1307                if (settings.verbose > 1)
1308                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1309
1310                /* item_get() has incremented it->refcount for us */
1311                stats_get_hits++;
1312                item_update(it);
1313                *(c->ilist + i) = it;
1314                i++;
1315
1316            } else {
1317                stats_get_misses++;
1318            }
1319
1320            key_token++;
1321        }
1322
1323        /*
1324         * If the command string hasn't been fully processed, get the next set
1325         * of tokens.
1326         */
1327        if(key_token->value != NULL) {
1328            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
1329            key_token = tokens;
1330        }
1331
1332    } while(key_token->value != NULL);
1333
1334    c->icurr = c->ilist;
1335    c->ileft = i;
1336    if (return_cas) {
1337        c->suffixcurr = c->suffixlist;
1338        c->suffixleft = i;
1339    }
1340
1341    if (settings.verbose > 1)
1342        fprintf(stderr, ">%d END\n", c->sfd);
1343
1344    /*
1345        If the loop was terminated because of out-of-memory, it is not
1346        reliable to add END\r\n to the buffer, because it might not end
1347        in \r\n. So we send SERVER_ERROR instead.
1348    */
1349    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
1350        || (c->udp && build_udp_headers(c) != 0)) {
1351        out_string(c, "SERVER_ERROR out of memory writing get response");
1352    }
1353    else {
1354        conn_set_state(c, conn_mwrite);
1355        c->msgcurr = 0;
1356    }
1357
1358    STATS_LOCK();
1359    stats.get_cmds   += stats_get_cmds;
1360    stats.get_hits   += stats_get_hits;
1361    stats.get_misses += stats_get_misses;
1362    STATS_UNLOCK();
1363
1364    return;
1365}
1366
1367static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
1368    char *key;
1369    size_t nkey;
1370    int flags;
1371    time_t exptime;
1372    int vlen, old_vlen;
1373    uint64_t req_cas_id;
1374    item *it, *old_it;
1375
1376    assert(c != NULL);
1377
1378    set_noreply_maybe(c, tokens, ntokens);
1379
1380    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1381        out_string(c, "CLIENT_ERROR bad command line format");
1382        return;
1383    }
1384
1385    key = tokens[KEY_TOKEN].value;
1386    nkey = tokens[KEY_TOKEN].length;
1387
1388    flags = strtoul(tokens[2].value, NULL, 10);
1389    exptime = strtol(tokens[3].value, NULL, 10);
1390    vlen = strtol(tokens[4].value, NULL, 10);
1391
1392    // does cas value exist?
1393    if(handle_cas)
1394    {
1395      req_cas_id = strtoull(tokens[5].value, NULL, 10);
1396    }
1397
1398    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1399        out_string(c, "CLIENT_ERROR bad command line format");
1400        return;
1401    }
1402
1403    if (settings.detail_enabled) {
1404        stats_prefix_record_set(key);
1405    }
1406
1407    if (settings.managed) {
1408        int bucket = c->bucket;
1409        if (bucket == -1) {
1410            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1411            return;
1412        }
1413        c->bucket = -1;
1414        if (buckets[bucket] != c->gen) {
1415            out_string(c, "ERROR_NOT_OWNER");
1416            return;
1417        }
1418    }
1419
1420    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1421
1422    if (it == 0) {
1423        if (! item_size_ok(nkey, flags, vlen + 2))
1424            out_string(c, "SERVER_ERROR object too large for cache");
1425        else
1426            out_string(c, "SERVER_ERROR out of memory storing object");
1427        /* swallow the data line */
1428        c->write_and_go = conn_swallow;
1429        c->sbytes = vlen + 2;
1430        return;
1431    }
1432    if(handle_cas)
1433      it->cas_id = req_cas_id;
1434
1435    c->item = it;
1436    c->ritem = ITEM_data(it);
1437    c->rlbytes = it->nbytes;
1438    c->item_comm = comm;
1439    conn_set_state(c, conn_nread);
1440}
1441
1442static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
1443    char temp[sizeof("18446744073709551615")];
1444    item *it;
1445    int64_t delta;
1446    char *key;
1447    size_t nkey;
1448
1449    assert(c != NULL);
1450
1451    set_noreply_maybe(c, tokens, ntokens);
1452
1453    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1454        out_string(c, "CLIENT_ERROR bad command line format");
1455        return;
1456    }
1457
1458    key = tokens[KEY_TOKEN].value;
1459    nkey = tokens[KEY_TOKEN].length;
1460
1461    if (settings.managed) {
1462        int bucket = c->bucket;
1463        if (bucket == -1) {
1464            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1465            return;
1466        }
1467        c->bucket = -1;
1468        if (buckets[bucket] != c->gen) {
1469            out_string(c, "ERROR_NOT_OWNER");
1470            return;
1471        }
1472    }
1473
1474    delta = strtoll(tokens[2].value, NULL, 10);
1475
1476    if(errno == ERANGE) {
1477        out_string(c, "CLIENT_ERROR bad command line format");
1478        return;
1479    }
1480
1481    it = item_get(key, nkey);
1482    if (!it) {
1483        out_string(c, "NOT_FOUND");
1484        return;
1485    }
1486
1487    out_string(c, add_delta(it, incr, delta, temp));
1488    item_remove(it);         /* release our reference */
1489}
1490
1491/*
1492 * adds a delta value to a numeric item.
1493 *
1494 * it    item to adjust
1495 * incr  true to increment value, false to decrement
1496 * delta amount to adjust value by
1497 * buf   buffer for response string
1498 *
1499 * returns a response string to send back to the client.
1500 */
1501char *do_add_delta(item *it, const bool incr, const int64_t delta, char *buf) {
1502    char *ptr;
1503    int64_t value;
1504    int res;
1505
1506    ptr = ITEM_data(it);
1507    while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++;    // BUG: can't be true
1508
1509    value = strtoull(ptr, NULL, 10);
1510
1511    if(errno == ERANGE) {
1512        return "CLIENT_ERROR cannot increment or decrement non-numeric value";
1513    }
1514
1515    if (incr)
1516        value += delta;
1517    else {
1518        if (delta >= value) value = 0;
1519        else value -= delta;
1520    }
1521    sprintf(buf, "%llu", value);
1522    res = strlen(buf);
1523    if (res + 2 > it->nbytes) { /* need to realloc */
1524        item *new_it;
1525        new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1526        if (new_it == 0) {
1527            return "SERVER_ERROR out of memory in incr/decr";
1528        }
1529        memcpy(ITEM_data(new_it), buf, res);
1530        memcpy(ITEM_data(new_it) + res, "\r\n", 2);
1531        do_item_replace(it, new_it);
1532        do_item_remove(new_it);       /* release our reference */
1533    } else { /* replace in-place */
1534        memcpy(ITEM_data(it), buf, res);
1535        memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
1536    }
1537
1538    return buf;
1539}
1540
1541static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
1542    char *key;
1543    size_t nkey;
1544    item *it;
1545    time_t exptime = 0;
1546
1547    assert(c != NULL);
1548
1549    set_noreply_maybe(c, tokens, ntokens);
1550
1551    if (settings.managed) {
1552        int bucket = c->bucket;
1553        if (bucket == -1) {
1554            out_string(c, "CLIENT_ERROR no BG data in managed mode");
1555            return;
1556        }
1557        c->bucket = -1;
1558        if (buckets[bucket] != c->gen) {
1559            out_string(c, "ERROR_NOT_OWNER");
1560            return;
1561        }
1562    }
1563
1564    key = tokens[KEY_TOKEN].value;
1565    nkey = tokens[KEY_TOKEN].length;
1566
1567    if(nkey > KEY_MAX_LENGTH) {
1568        out_string(c, "CLIENT_ERROR bad command line format");
1569        return;
1570    }
1571
1572    if(ntokens == (c->noreply ? 5 : 4)) {
1573        exptime = strtol(tokens[2].value, NULL, 10);
1574
1575        if(errno == ERANGE) {
1576            out_string(c, "CLIENT_ERROR bad command line format");
1577            return;
1578        }
1579    }
1580
1581    if (settings.detail_enabled) {
1582        stats_prefix_record_delete(key);
1583    }
1584
1585    it = item_get(key, nkey);
1586    if (it) {
1587        if (exptime == 0) {
1588            item_unlink(it);
1589            item_remove(it);      /* release our reference */
1590            out_string(c, "DELETED");
1591        } else {
1592            /* our reference will be transfered to the delete queue */
1593            out_string(c, defer_delete(it, exptime));
1594        }
1595    } else {
1596        out_string(c, "NOT_FOUND");
1597    }
1598}
1599
1600/*
1601 * Adds an item to the deferred-delete list so it can be reaped later.
1602 *
1603 * Returns the result to send to the client.
1604 */
1605char *do_defer_delete(item *it, time_t exptime)
1606{
1607    if (delcurr >= deltotal) {
1608        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1609        if (new_delete) {
1610            todelete = new_delete;
1611            deltotal *= 2;
1612        } else {
1613            /*
1614             * can't delete it immediately, user wants a delay,
1615             * but we ran out of memory for the delete queue
1616             */
1617            item_remove(it);    /* release reference */
1618            return "SERVER_ERROR out of memory expanding delete queue";
1619        }
1620    }
1621
1622    /* use its expiration time as its deletion time now */
1623    it->exptime = realtime(exptime);
1624    it->it_flags |= ITEM_DELETED;
1625    todelete[delcurr++] = it;
1626
1627    return "DELETED";
1628}
1629
1630static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
1631    unsigned int level;
1632
1633    assert(c != NULL);
1634
1635    set_noreply_maybe(c, tokens, ntokens);
1636
1637    level = strtoul(tokens[1].value, NULL, 10);
1638    settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
1639    out_string(c, "OK");
1640    return;
1641}
1642
1643static void process_command(conn *c, char *command) {
1644
1645    token_t tokens[MAX_TOKENS];
1646    size_t ntokens;
1647    int comm;
1648
1649    assert(c != NULL);
1650
1651    if (settings.verbose > 1)
1652        fprintf(stderr, "<%d %s\n", c->sfd, command);
1653
1654    /*
1655     * for commands set/add/replace, we build an item and read the data
1656     * directly into it, then continue in nread_complete().
1657     */
1658
1659    c->msgcurr = 0;
1660    c->msgused = 0;
1661    c->iovused = 0;
1662    if (add_msghdr(c) != 0) {
1663        out_string(c, "SERVER_ERROR out of memory preparing response");
1664        return;
1665    }
1666
1667    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1668    if (ntokens >= 3 &&
1669        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1670         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1671
1672        process_get_command(c, tokens, ntokens, false);
1673
1674    } else if ((ntokens == 6 || ntokens == 7) &&
1675               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1676                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1677                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
1678                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
1679                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
1680
1681        process_update_command(c, tokens, ntokens, comm, false);
1682
1683    } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
1684
1685        process_update_command(c, tokens, ntokens, comm, true);
1686
1687    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1688
1689        process_arithmetic_command(c, tokens, ntokens, 1);
1690
1691    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
1692
1693        process_get_command(c, tokens, ntokens, true);
1694
1695    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1696
1697        process_arithmetic_command(c, tokens, ntokens, 0);
1698
1699    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1700
1701        process_delete_command(c, tokens, ntokens);
1702
1703    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1704        unsigned int bucket, gen;
1705        if (!settings.managed) {
1706            out_string(c, "CLIENT_ERROR not a managed instance");
1707            return;
1708        }
1709
1710        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1711            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1712                out_string(c, "CLIENT_ERROR bucket number out of range");
1713                return;
1714            }
1715            buckets[bucket] = gen;
1716            out_string(c, "OWNED");
1717            return;
1718        } else {
1719            out_string(c, "CLIENT_ERROR bad format");
1720            return;
1721        }
1722
1723    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1724
1725        int bucket;
1726        if (!settings.managed) {
1727            out_string(c, "CLIENT_ERROR not a managed instance");
1728            return;
1729        }
1730        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1731            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1732                out_string(c, "CLIENT_ERROR bucket number out of range");
1733                return;
1734            }
1735            buckets[bucket] = 0;
1736            out_string(c, "DISOWNED");
1737            return;
1738        } else {
1739            out_string(c, "CLIENT_ERROR bad format");
1740            return;
1741        }
1742
1743    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1744        int bucket, gen;
1745        if (!settings.managed) {
1746            out_string(c, "CLIENT_ERROR not a managed instance");
1747            return;
1748        }
1749        if (sscanf(tokens[1].value, "%u:%u", &bucket, &gen) == 2) {
1750            /* we never write anything back, even if input's wrong */
1751            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen <= 0)) {
1752                /* do nothing, bad input */
1753            } else {
1754                c->bucket = bucket;
1755                c->gen = gen;
1756            }
1757            conn_set_state(c, conn_read);
1758            return;
1759        } else {
1760            out_string(c, "CLIENT_ERROR bad format");
1761            return;
1762        }
1763
1764    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1765
1766        process_stat(c, tokens, ntokens);
1767
1768    } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
1769        time_t exptime = 0;
1770        set_current_time();
1771
1772        set_noreply_maybe(c, tokens, ntokens);
1773
1774        if(ntokens == (c->noreply ? 3 : 2)) {
1775            settings.oldest_live = current_time - 1;
1776            item_flush_expired();
1777            out_string(c, "OK");
1778            return;
1779        }
1780
1781        exptime = strtol(tokens[1].value, NULL, 10);
1782        if(errno == ERANGE) {
1783            out_string(c, "CLIENT_ERROR bad command line format");
1784            return;
1785        }
1786
1787        /*
1788          If exptime is zero realtime() would return zero too, and
1789          realtime(exptime) - 1 would overflow to the max unsigned
1790          value.  So we process exptime == 0 the same way we do when
1791          no delay is given at all.
1792        */
1793        if (exptime > 0)
1794            settings.oldest_live = realtime(exptime) - 1;
1795        else /* exptime == 0 */
1796            settings.oldest_live = current_time - 1;
1797        item_flush_expired();
1798        out_string(c, "OK");
1799        return;
1800
1801    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
1802
1803        out_string(c, "VERSION " VERSION);
1804
1805    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1806
1807        conn_set_state(c, conn_closing);
1808
1809    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1810                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1811#ifdef ALLOW_SLABS_REASSIGN
1812
1813        int src, dst, rv;
1814
1815        src = strtol(tokens[2].value, NULL, 10);
1816        dst  = strtol(tokens[3].value, NULL, 10);
1817
1818        if(errno == ERANGE) {
1819            out_string(c, "CLIENT_ERROR bad command line format");
1820            return;
1821        }
1822
1823        rv = slabs_reassign(src, dst);
1824        if (rv == 1) {
1825            out_string(c, "DONE");
1826            return;
1827        }
1828        if (rv == 0) {
1829            out_string(c, "CANT");
1830            return;
1831        }
1832        if (rv == -1) {
1833            out_string(c, "BUSY");
1834            return;
1835        }
1836#else
1837        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1838#endif
1839    } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
1840        process_verbosity_command(c, tokens, ntokens);
1841    } else {
1842        out_string(c, "ERROR");
1843    }
1844    return;
1845}
1846
1847/*
1848 * if we have a complete line in the buffer, process it.
1849 */
1850static int try_read_command(conn *c) {
1851    char *el, *cont;
1852
1853    assert(c != NULL);
1854    assert(c->rcurr <= (c->rbuf + c->rsize));
1855
1856    if (c->rbytes == 0)
1857        return 0;
1858    el = memchr(c->rcurr, '\n', c->rbytes);
1859    if (!el)
1860        return 0;
1861    cont = el + 1;
1862    if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
1863        el--;
1864    }
1865    *el = '\0';
1866
1867    assert(cont <= (c->rcurr + c->rbytes));
1868
1869    process_command(c, c->rcurr);
1870
1871    c->rbytes -= (cont - c->rcurr);
1872    c->rcurr = cont;
1873
1874    assert(c->rcurr <= (c->rbuf + c->rsize));
1875
1876    return 1;
1877}
1878
1879/*
1880 * read a UDP request.
1881 * return 0 if there's nothing to read.
1882 */
1883static int try_read_udp(conn *c) {
1884    int res;
1885
1886    assert(c != NULL);
1887
1888    c->request_addr_size = sizeof(c->request_addr);
1889    res = recvfrom(c->sfd, c->rbuf, c->rsize,
1890                   0, &c->request_addr, &c->request_addr_size);
1891    if (res > 8) {
1892        unsigned char *buf = (unsigned char *)c->rbuf;
1893        STATS_LOCK();
1894        stats.bytes_read += res;
1895        STATS_UNLOCK();
1896
1897        /* Beginning of UDP packet is the request ID; save it. */
1898        c->request_id = buf[0] * 256 + buf[1];
1899
1900        /* If this is a multi-packet request, drop it. */
1901        if (buf[4] != 0 || buf[5] != 1) {
1902            out_string(c, "SERVER_ERROR multi-packet request not supported");
1903            return 0;
1904        }
1905
1906        /* Don't care about any of the rest of the header. */
1907        res -= 8;
1908        memmove(c->rbuf, c->rbuf + 8, res);
1909
1910        c->rbytes += res;
1911        c->rcurr = c->rbuf;
1912        return 1;
1913    }
1914    return 0;
1915}
1916
1917/*
1918 * read from network as much as we can, handle buffer overflow and connection
1919 * close.
1920 * before reading, move the remaining incomplete fragment of a command
1921 * (if any) to the beginning of the buffer.
1922 * return 0 if there's nothing to read on the first read.
1923 */
1924static int try_read_network(conn *c) {
1925    int gotdata = 0;
1926    int res;
1927
1928    assert(c != NULL);
1929
1930    if (c->rcurr != c->rbuf) {
1931        if (c->rbytes != 0) /* otherwise there's nothing to copy */
1932            memmove(c->rbuf, c->rcurr, c->rbytes);
1933        c->rcurr = c->rbuf;
1934    }
1935
1936    while (1) {
1937        if (c->rbytes >= c->rsize) {
1938            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
1939            if (!new_rbuf) {
1940                if (settings.verbose > 0)
1941                    fprintf(stderr, "Couldn't realloc input buffer\n");
1942                c->rbytes = 0; /* ignore what we read */
1943                out_string(c, "SERVER_ERROR out of memory reading request");
1944                c->write_and_go = conn_closing;
1945                return 1;
1946            }
1947            c->rcurr = c->rbuf = new_rbuf;
1948            c->rsize *= 2;
1949        }
1950
1951        /* unix socket mode doesn't need this, so zeroed out.  but why
1952         * is this done for every command?  presumably for UDP
1953         * mode.  */
1954        if (!settings.socketpath) {
1955            c->request_addr_size = sizeof(c->request_addr);
1956        } else {
1957            c->request_addr_size = 0;
1958        }
1959
1960        int avail = c->rsize - c->rbytes;
1961        res = read(c->sfd, c->rbuf + c->rbytes, avail);
1962        if (res > 0) {
1963            STATS_LOCK();
1964            stats.bytes_read += res;
1965            STATS_UNLOCK();
1966            gotdata = 1;
1967            c->rbytes += res;
1968            if (res == avail) {
1969                continue;
1970            } else {
1971                break;
1972            }
1973        }
1974        if (res == 0) {
1975            /* connection closed */
1976            conn_set_state(c, conn_closing);
1977            return 1;
1978        }
1979        if (res == -1) {
1980            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
1981            /* Should close on unhandled errors. */
1982            conn_set_state(c, conn_closing);
1983            return 1;
1984        }
1985    }
1986    return gotdata;
1987}
1988
1989static bool update_event(conn *c, const int new_flags) {
1990    assert(c != NULL);
1991
1992    struct event_base *base = c->event.ev_base;
1993    if (c->ev_flags == new_flags)
1994        return true;
1995    if (event_del(&c->event) == -1) return false;
1996    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
1997    event_base_set(base, &c->event);
1998    c->ev_flags = new_flags;
1999    if (event_add(&c->event, 0) == -1) return false;
2000    return true;
2001}
2002
2003/*
2004 * Sets whether we are listening for new connections or not.
2005 */
2006void accept_new_conns(const bool do_accept) {
2007    conn *next;
2008
2009    if (! is_listen_thread())
2010        return;
2011
2012    for (next = listen_conn; next; next = next->next) {
2013        if (do_accept) {
2014            update_event(next, EV_READ | EV_PERSIST);
2015            if (listen(next->sfd, 1024) != 0) {
2016                perror("listen");
2017            }
2018        }
2019        else {
2020            update_event(next, 0);
2021            if (listen(next->sfd, 0) != 0) {
2022                perror("listen");
2023            }
2024        }
2025  }
2026}
2027
2028
2029/*
2030 * Transmit the next chunk of data from our list of msgbuf structures.
2031 *
2032 * Returns:
2033 *   TRANSMIT_COMPLETE   All done writing.
2034 *   TRANSMIT_INCOMPLETE More data remaining to write.
2035 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
2036 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2037 */
2038static int transmit(conn *c) {
2039    assert(c != NULL);
2040
2041    if (c->msgcurr < c->msgused &&
2042            c->msglist[c->msgcurr].msg_iovlen == 0) {
2043        /* Finished writing the current msg; advance to the next. */
2044        c->msgcurr++;
2045    }
2046    if (c->msgcurr < c->msgused) {
2047        ssize_t res;
2048        struct msghdr *m = &c->msglist[c->msgcurr];
2049
2050        res = sendmsg(c->sfd, m, 0);
2051        if (res > 0) {
2052            STATS_LOCK();
2053            stats.bytes_written += res;
2054            STATS_UNLOCK();
2055
2056            /* We've written some of the data. Remove the completed
2057               iovec entries from the list of pending writes. */
2058            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
2059                res -= m->msg_iov->iov_len;
2060                m->msg_iovlen--;
2061                m->msg_iov++;
2062            }
2063
2064            /* Might have written just part of the last iovec entry;
2065               adjust it so the next write will do the rest. */
2066            if (res > 0) {
2067                m->msg_iov->iov_base += res;
2068                m->msg_iov->iov_len -= res;
2069            }
2070            return TRANSMIT_INCOMPLETE;
2071        }
2072        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2073            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2074                if (settings.verbose > 0)
2075                    fprintf(stderr, "Couldn't update event\n");
2076                conn_set_state(c, conn_closing);
2077                return TRANSMIT_HARD_ERROR;
2078            }
2079            return TRANSMIT_SOFT_ERROR;
2080        }
2081        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2082           we have a real error, on which we close the connection */
2083        if (settings.verbose > 0)
2084            perror("Failed to write, and not due to blocking");
2085
2086        if (c->udp)
2087            conn_set_state(c, conn_read);
2088        else
2089            conn_set_state(c, conn_closing);
2090        return TRANSMIT_HARD_ERROR;
2091    } else {
2092        return TRANSMIT_COMPLETE;
2093    }
2094}
2095
2096static void drive_machine(conn *c) {
2097    bool stop = false;
2098    int sfd, flags = 1;
2099    socklen_t addrlen;
2100    struct sockaddr_storage addr;
2101    int res;
2102
2103    assert(c != NULL);
2104
2105    while (!stop) {
2106
2107        switch(c->state) {
2108        case conn_listening:
2109            addrlen = sizeof(addr);
2110            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
2111                if (errno == EAGAIN || errno == EWOULDBLOCK) {
2112                    /* these are transient, so don't log anything */
2113                    stop = true;
2114                } else if (errno == EMFILE) {
2115                    if (settings.verbose > 0)
2116                        fprintf(stderr, "Too many open connections\n");
2117                    accept_new_conns(false);
2118                    stop = true;
2119                } else {
2120                    perror("accept()");
2121                    stop = true;
2122                }
2123                break;
2124            }
2125            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2126                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2127                perror("setting O_NONBLOCK");
2128                close(sfd);
2129                break;
2130            }
2131            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
2132                                     DATA_BUFFER_SIZE, false);
2133            break;
2134
2135        case conn_read:
2136            if (try_read_command(c) != 0) {
2137                continue;
2138            }
2139            if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) {
2140                continue;
2141            }
2142            /* we have no command line and no data to read from network */
2143            if (!update_event(c, EV_READ | EV_PERSIST)) {
2144                if (settings.verbose > 0)
2145                    fprintf(stderr, "Couldn't update event\n");
2146                conn_set_state(c, conn_closing);
2147                break;
2148            }
2149            stop = true;
2150            break;
2151
2152        case conn_nread:
2153            /* we are reading rlbytes into ritem; */
2154            if (c->rlbytes == 0) {
2155                complete_nread(c);
2156                break;
2157            }
2158            /* first check if we have leftovers in the conn_read buffer */
2159            if (c->rbytes > 0) {
2160                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2161                memcpy(c->ritem, c->rcurr, tocopy);
2162                c->ritem += tocopy;
2163                c->rlbytes -= tocopy;
2164                c->rcurr += tocopy;
2165                c->rbytes -= tocopy;
2166                break;
2167            }
2168
2169            /*  now try reading from the socket */
2170            res = read(c->sfd, c->ritem, c->rlbytes);
2171            if (res > 0) {
2172                STATS_LOCK();
2173                stats.bytes_read += res;
2174                STATS_UNLOCK();
2175                c->ritem += res;
2176                c->rlbytes -= res;
2177                break;
2178            }
2179            if (res == 0) { /* end of stream */
2180                conn_set_state(c, conn_closing);
2181                break;
2182            }
2183            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2184                if (!update_event(c, EV_READ | EV_PERSIST)) {
2185                    if (settings.verbose > 0)
2186                        fprintf(stderr, "Couldn't update event\n");
2187                    conn_set_state(c, conn_closing);
2188                    break;
2189                }
2190                stop = true;
2191                break;
2192            }
2193            /* otherwise we have a real error, on which we close the connection */
2194            if (settings.verbose > 0)
2195                fprintf(stderr, "Failed to read, and not due to blocking\n");
2196            conn_set_state(c, conn_closing);
2197            break;
2198
2199        case conn_swallow:
2200            /* we are reading sbytes and throwing them away */
2201            if (c->sbytes == 0) {
2202                conn_set_state(c, conn_read);
2203                break;
2204            }
2205
2206            /* first check if we have leftovers in the conn_read buffer */
2207            if (c->rbytes > 0) {
2208                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
2209                c->sbytes -= tocopy;
2210                c->rcurr += tocopy;
2211                c->rbytes -= tocopy;
2212                break;
2213            }
2214
2215            /*  now try reading from the socket */
2216            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
2217            if (res > 0) {
2218                STATS_LOCK();
2219                stats.bytes_read += res;
2220                STATS_UNLOCK();
2221                c->sbytes -= res;
2222                break;
2223            }
2224            if (res == 0) { /* end of stream */
2225                conn_set_state(c, conn_closing);
2226                break;
2227            }
2228            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2229                if (!update_event(c, EV_READ | EV_PERSIST)) {
2230                    if (settings.verbose > 0)
2231                        fprintf(stderr, "Couldn't update event\n");
2232                    conn_set_state(c, conn_closing);
2233                    break;
2234                }
2235                stop = true;
2236                break;
2237            }
2238            /* otherwise we have a real error, on which we close the connection */
2239            if (settings.verbose > 0)
2240                fprintf(stderr, "Failed to read, and not due to blocking\n");
2241            conn_set_state(c, conn_closing);
2242            break;
2243
2244        case conn_write:
2245            /*
2246             * We want to write out a simple response. If we haven't already,
2247             * assemble it into a msgbuf list (this will be a single-entry
2248             * list for TCP or a two-entry list for UDP).
2249             */
2250            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
2251                if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
2252                    (c->udp && build_udp_headers(c) != 0)) {
2253                    if (settings.verbose > 0)
2254                        fprintf(stderr, "Couldn't build response\n");
2255                    conn_set_state(c, conn_closing);
2256                    break;
2257                }
2258            }
2259
2260            /* fall through... */
2261
2262        case conn_mwrite:
2263            switch (transmit(c)) {
2264            case TRANSMIT_COMPLETE:
2265                if (c->state == conn_mwrite) {
2266                    while (c->ileft > 0) {
2267                        item *it = *(c->icurr);
2268                        assert((it->it_flags & ITEM_SLABBED) == 0);
2269                        item_remove(it);
2270                        c->icurr++;
2271                        c->ileft--;
2272                    }
2273                    while (c->suffixleft > 0) {
2274                        char *suffix = *(c->suffixcurr);
2275                        if(suffix_add_to_freelist(suffix)) {
2276                            /* Failed to add to freelist, don't leak */
2277                            free(suffix);
2278                        }
2279                        c->suffixcurr++;
2280                        c->suffixleft--;
2281                    }
2282                    conn_set_state(c, conn_read);
2283                } else if (c->state == conn_write) {
2284                    if (c->write_and_free) {
2285                        free(c->write_and_free);
2286                        c->write_and_free = 0;
2287                    }
2288                    conn_set_state(c, c->write_and_go);
2289                } else {
2290                    if (settings.verbose > 0)
2291                        fprintf(stderr, "Unexpected state %d\n", c->state);
2292                    conn_set_state(c, conn_closing);
2293                }
2294                break;
2295
2296            case TRANSMIT_INCOMPLETE:
2297            case TRANSMIT_HARD_ERROR:
2298                break;                   /* Continue in state machine. */
2299
2300            case TRANSMIT_SOFT_ERROR:
2301                stop = true;
2302                break;
2303            }
2304            break;
2305
2306        case conn_closing:
2307            if (c->udp)
2308                conn_cleanup(c);
2309            else
2310                conn_close(c);
2311            stop = true;
2312            break;
2313        }
2314    }
2315
2316    return;
2317}
2318
2319void event_handler(const int fd, const short which, void *arg) {
2320    conn *c;
2321
2322    c = (conn *)arg;
2323    assert(c != NULL);
2324
2325    c->which = which;
2326
2327    /* sanity */
2328    if (fd != c->sfd) {
2329        if (settings.verbose > 0)
2330            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
2331        conn_close(c);
2332        return;
2333    }
2334
2335    drive_machine(c);
2336
2337    /* wait for next event */
2338    return;
2339}
2340
2341static int new_socket(struct addrinfo *ai) {
2342    int sfd;
2343    int flags;
2344
2345    if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
2346        perror("socket()");
2347        return -1;
2348    }
2349
2350    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2351        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2352        perror("setting O_NONBLOCK");
2353        close(sfd);
2354        return -1;
2355    }
2356    return sfd;
2357}
2358
2359
2360/*
2361 * Sets a socket's send buffer size to the maximum allowed by the system.
2362 */
2363static void maximize_sndbuf(const int sfd) {
2364    socklen_t intsize = sizeof(int);
2365    int last_good = 0;
2366    int min, max, avg;
2367    int old_size;
2368
2369    /* Start with the default size. */
2370    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
2371        if (settings.verbose > 0)
2372            perror("getsockopt(SO_SNDBUF)");
2373        return;
2374    }
2375
2376    /* Binary-search for the real maximum. */
2377    min = old_size;
2378    max = MAX_SENDBUF_SIZE;
2379
2380    while (min <= max) {
2381        avg = ((unsigned int)(min + max)) / 2;
2382        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
2383            last_good = avg;
2384            min = avg + 1;
2385        } else {
2386            max = avg - 1;
2387        }
2388    }
2389
2390    if (settings.verbose > 1)
2391        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
2392}
2393
2394static int server_socket(const int port, const bool is_udp) {
2395    int sfd;
2396    struct linger ling = {0, 0};
2397    struct addrinfo *ai;
2398    struct addrinfo *next;
2399    struct addrinfo hints;
2400    char port_buf[NI_MAXSERV];
2401    int error;
2402    int success = 0;
2403
2404    int flags =1;
2405
2406    /*
2407     * the memset call clears nonstandard fields in some impementations
2408     * that otherwise mess things up.
2409     */
2410    memset(&hints, 0, sizeof (hints));
2411    hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
2412    if (is_udp)
2413    {
2414        hints.ai_protocol = IPPROTO_UDP;
2415        hints.ai_socktype = SOCK_DGRAM;
2416        hints.ai_family = AF_INET; /* This left here because of issues with OSX 10.5 */
2417    } else {
2418        hints.ai_family = AF_UNSPEC;
2419        hints.ai_protocol = IPPROTO_TCP;
2420        hints.ai_socktype = SOCK_STREAM;
2421    }
2422
2423    snprintf(port_buf, NI_MAXSERV, "%d", port);
2424    error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
2425    if (error != 0) {
2426      if (error != EAI_SYSTEM)
2427        fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
2428      else
2429        perror("getaddrinfo()");
2430
2431      return 1;
2432    }
2433
2434    for (next= ai; next; next= next->ai_next) {
2435        conn *listen_conn_add;
2436        if ((sfd = new_socket(next)) == -1) {
2437            freeaddrinfo(ai);
2438            return 1;
2439        }
2440
2441        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2442        if (is_udp) {
2443            maximize_sndbuf(sfd);
2444        } else {
2445            setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2446            setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2447            setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
2448        }
2449
2450        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
2451            if (errno != EADDRINUSE) {
2452                perror("bind()");
2453                close(sfd);
2454                freeaddrinfo(ai);
2455                return 1;
2456            }
2457            close(sfd);
2458            continue;
2459        } else {
2460          success++;
2461          if (!is_udp && listen(sfd, 1024) == -1) {
2462              perror("listen()");
2463              close(sfd);
2464              freeaddrinfo(ai);
2465              return 1;
2466          }
2467      }
2468
2469      if (is_udp)
2470      {
2471        int c;
2472
2473        for (c = 0; c < settings.num_threads; c++) {
2474            /* this is guaranteed to hit all threads because we round-robin */
2475            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
2476                              UDP_READ_BUFFER_SIZE, 1);
2477        }
2478      } else {
2479        if (!(listen_conn_add = conn_new(sfd, conn_listening,
2480                                         EV_READ | EV_PERSIST, 1, false, main_base))) {
2481            fprintf(stderr, "failed to create listening connection\n");
2482            exit(EXIT_FAILURE);
2483        }
2484
2485        listen_conn_add->next = listen_conn;
2486        listen_conn = listen_conn_add;
2487      }
2488    }
2489
2490    freeaddrinfo(ai);
2491
2492    /* Return zero iff we detected no errors in starting up connections */
2493    return success == 0;
2494}
2495
2496static int new_socket_unix(void) {
2497    int sfd;
2498    int flags;
2499
2500    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
2501        perror("socket()");
2502        return -1;
2503    }
2504
2505    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
2506        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
2507        perror("setting O_NONBLOCK");
2508        close(sfd);
2509        return -1;
2510    }
2511    return sfd;
2512}
2513
2514static int server_socket_unix(const char *path, int access_mask) {
2515    int sfd;
2516    struct linger ling = {0, 0};
2517    struct sockaddr_un addr;
2518    struct stat tstat;
2519    int flags =1;
2520    int old_umask;
2521
2522    if (!path) {
2523        return 1;
2524    }
2525
2526    if ((sfd = new_socket_unix()) == -1) {
2527        return 1;
2528    }
2529
2530    /*
2531     * Clean up a previous socket file if we left it around
2532     */
2533    if (lstat(path, &tstat) == 0) {
2534        if (S_ISSOCK(tstat.st_mode))
2535            unlink(path);
2536    }
2537
2538    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
2539    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
2540    setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
2541
2542    /*
2543     * the memset call clears nonstandard fields in some impementations
2544     * that otherwise mess things up.
2545     */
2546    memset(&addr, 0, sizeof(addr));
2547
2548    addr.sun_family = AF_UNIX;
2549    strcpy(addr.sun_path, path);
2550    old_umask=umask( ~(access_mask&0777));
2551    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
2552        perror("bind()");
2553        close(sfd);
2554        umask(old_umask);
2555        return 1;
2556    }
2557    umask(old_umask);
2558    if (listen(sfd, 1024) == -1) {
2559        perror("listen()");
2560        close(sfd);
2561        return 1;
2562    }
2563    if (!(listen_conn = conn_new(sfd, conn_listening,
2564                                     EV_READ | EV_PERSIST, 1, false, main_base))) {
2565        fprintf(stderr, "failed to create listening connection\n");
2566        exit(EXIT_FAILURE);
2567    }
2568
2569    return 0;
2570}
2571
2572/*
2573 * We keep the current time of day in a global variable that's updated by a
2574 * timer event. This saves us a bunch of time() system calls (we really only
2575 * need to get the time once a second, whereas there can be tens of thousands
2576 * of requests a second) and allows us to use server-start-relative timestamps
2577 * rather than absolute UNIX timestamps, a space savings on systems where
2578 * sizeof(time_t) > sizeof(unsigned int).
2579 */
2580volatile rel_time_t current_time;
2581static struct event clockevent;
2582
2583/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
2584static void set_current_time(void) {
2585    struct timeval timer;
2586
2587    gettimeofday(&timer, NULL);
2588    current_time = (rel_time_t) (timer.tv_sec - stats.started);
2589}
2590
2591static void clock_handler(const int fd, const short which, void *arg) {
2592    struct timeval t = {.tv_sec = 1, .tv_usec = 0};
2593    static bool initialized = false;
2594
2595    if (initialized) {
2596        /* only delete the event if it's actually there. */
2597        evtimer_del(&clockevent);
2598    } else {
2599        initialized = true;
2600    }
2601
2602    evtimer_set(&clockevent, clock_handler, 0);
2603    event_base_set(main_base, &clockevent);
2604    evtimer_add(&clockevent, &t);
2605
2606    set_current_time();
2607}
2608
2609static struct event deleteevent;
2610
2611static void delete_handler(const int fd, const short which, void *arg) {
2612    struct timeval t = {.tv_sec = 5, .tv_usec = 0};
2613    static bool initialized = false;
2614
2615    if (initialized) {
2616        /* some versions of libevent don't like deleting events that don't exist,
2617           so only delete once we know this event has been added. */
2618        evtimer_del(&deleteevent);
2619    } else {
2620        initialized = true;
2621    }
2622
2623    evtimer_set(&deleteevent, delete_handler, 0);
2624    event_base_set(main_base, &deleteevent);
2625    evtimer_add(&deleteevent, &t);
2626    run_deferred_deletes();
2627}
2628
2629/* Call run_deferred_deletes instead of this. */
2630void do_run_deferred_deletes(void)
2631{
2632    int i, j = 0;
2633
2634    for (i = 0; i < delcurr; i++) {
2635        item *it = todelete[i];
2636        if (item_delete_lock_over(it)) {
2637            assert(it->refcount > 0);
2638            it->it_flags &= ~ITEM_DELETED;
2639            do_item_unlink(it);
2640            do_item_remove(it);
2641        } else {
2642            todelete[j++] = it;
2643        }
2644    }
2645    delcurr = j;
2646}
2647
2648static void usage(void) {
2649    printf(PACKAGE " " VERSION "\n");
2650    printf("-p <num>      TCP port number to listen on (default: 11211)\n"
2651           "-U <num>      UDP port number to listen on (default: 0, off)\n"
2652           "-s <file>     unix socket path to listen on (disables network support)\n"
2653           "-a <mask>     access mask for unix socket, in octal (default 0700)\n"
2654           "-l <ip_addr>  interface to listen on, default is INDRR_ANY\n"
2655           "-d            run as a daemon\n"
2656           "-r            maximize core file limit\n"
2657           "-u <username> assume identity of <username> (only when run as root)\n"
2658           "-m <num>      max memory to use for items in megabytes, default is 64 MB\n"
2659           "-M            return error on memory exhausted (rather than removing items)\n"
2660           "-c <num>      max simultaneous connections, default is 1024\n"
2661           "-k            lock down all paged memory.  Note that there is a\n"
2662           "              limit on how much memory you may lock.  Trying to\n"
2663           "              allocate more than that would fail, so be sure you\n"
2664           "              set the limit correctly for the user you started\n"
2665           "              the daemon with (not for -u <username> user;\n"
2666           "              under sh this is done with 'ulimit -S -l NUM_KB').\n"
2667           "-v            verbose (print errors/warnings while in event loop)\n"
2668           "-vv           very verbose (also print client commands/reponses)\n"
2669           "-h            print this help and exit\n"
2670           "-i            print memcached and libevent license\n"
2671           "-b            run a managed instanced (mnemonic: buckets)\n"
2672           "-P <file>     save PID in <file>, only used with -d option\n"
2673           "-f <factor>   chunk size growth factor, default 1.25\n"
2674           "-n <bytes>    minimum space allocated for key+value+flags, default 48\n"
2675
2676#if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
2677           "-L            Try to use large memory pages (if available). Increasing\n"
2678           "              the memory page size could reduce the number of TLB misses\n"
2679           "              and improve the performance. In order to get large pages\n"
2680           "              from the OS, memcached will allocate the total item-cache\n"
2681           "              in one large chunk.\n"
2682#endif
2683           );
2684
2685#ifdef USE_THREADS
2686    printf("-t <num>      number of threads to use, default 4\n");
2687#endif
2688    return;
2689}
2690
2691static void usage_license(void) {
2692    printf(PACKAGE " " VERSION "\n\n");
2693    printf(
2694    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2695    "All rights reserved.\n"
2696    "\n"
2697    "Redistribution and use in source and binary forms, with or without\n"
2698    "modification, are permitted provided that the following conditions are\n"
2699    "met:\n"
2700    "\n"
2701    "    * Redistributions of source code must retain the above copyright\n"
2702    "notice, this list of conditions and the following disclaimer.\n"
2703    "\n"
2704    "    * Redistributions in binary form must reproduce the above\n"
2705    "copyright notice, this list of conditions and the following disclaimer\n"
2706    "in the documentation and/or other materials provided with the\n"
2707    "distribution.\n"
2708    "\n"
2709    "    * Neither the name of the Danga Interactive nor the names of its\n"
2710    "contributors may be used to endorse or promote products derived from\n"
2711    "this software without specific prior written permission.\n"
2712    "\n"
2713    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2714    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2715    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2716    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2717    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2718    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2719    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2720    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2721    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2722    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2723    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2724    "\n"
2725    "\n"
2726    "This product includes software developed by Niels Provos.\n"
2727    "\n"
2728    "[ libevent ]\n"
2729    "\n"
2730    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2731    "All rights reserved.\n"
2732    "\n"
2733    "Redistribution and use in source and binary forms, with or without\n"
2734    "modification, are permitted provided that the following conditions\n"
2735    "are met:\n"
2736    "1. Redistributions of source code must retain the above copyright\n"
2737    "   notice, this list of conditions and the following disclaimer.\n"
2738    "2. Redistributions in binary form must reproduce the above copyright\n"
2739    "   notice, this list of conditions and the following disclaimer in the\n"
2740    "   documentation and/or other materials provided with the distribution.\n"
2741    "3. All advertising materials mentioning features or use of this software\n"
2742    "   must display the following acknowledgement:\n"
2743    "      This product includes software developed by Niels Provos.\n"
2744    "4. The name of the author may not be used to endorse or promote products\n"
2745    "   derived from this software without specific prior written permission.\n"
2746    "\n"
2747    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2748    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2749    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2750    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2751    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2752    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2753    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2754    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2755    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2756    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2757    );
2758
2759    return;
2760}
2761
2762static void save_pid(const pid_t pid, const char *pid_file) {
2763    FILE *fp;
2764    if (pid_file == NULL)
2765        return;
2766
2767    if ((fp = fopen(pid_file, "w")) == NULL) {
2768        fprintf(stderr, "Could not open the pid file %s for writing\n", pid_file);
2769        return;
2770    }
2771
2772    fprintf(fp,"%ld\n", (long)pid);
2773    if (fclose(fp) == -1) {
2774        fprintf(stderr, "Could not close the pid file %s.\n", pid_file);
2775        return;
2776    }
2777}
2778
2779static void remove_pidfile(const char *pid_file) {
2780  if (pid_file == NULL)
2781      return;
2782
2783  if (unlink(pid_file) != 0) {
2784      fprintf(stderr, "Could not remove the pid file %s.\n", pid_file);
2785  }
2786
2787}
2788
2789
2790static void sig_handler(const int sig) {
2791    printf("SIGINT handled.\n");
2792    exit(EXIT_SUCCESS);
2793}
2794
2795#if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
2796/*
2797 * On systems that supports multiple page sizes we may reduce the
2798 * number of TLB-misses by using the biggest available page size
2799 */
2800int enable_large_pages(void) {
2801    int ret = -1;
2802    size_t sizes[32];
2803    int avail = getpagesizes(sizes, 32);
2804    if (avail != -1) {
2805        size_t max = sizes[0];
2806        struct memcntl_mha arg = {0};
2807        int ii;
2808
2809        for (ii = 1; ii < avail; ++ii) {
2810            if (max < sizes[ii]) {
2811                max = sizes[ii];
2812            }
2813        }
2814
2815        arg.mha_flags   = 0;
2816        arg.mha_pagesize = max;
2817        arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
2818
2819        if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
2820            fprintf(stderr, "Failed to set large pages: %s\n",
2821                    strerror(errno));
2822            fprintf(stderr, "Will use default page size\n");
2823        } else {
2824            ret = 0;
2825        }
2826    } else {
2827        fprintf(stderr, "Failed to get supported pagesizes: %s\n",
2828                strerror(errno));
2829        fprintf(stderr, "Will use default page size\n");
2830    }
2831
2832    return ret;
2833}
2834#endif
2835
2836int main (int argc, char **argv) {
2837    int c;
2838    int x;
2839    bool lock_memory = false;
2840    bool daemonize = false;
2841    bool preallocate = false;
2842    int maxcore = 0;
2843    char *username = NULL;
2844    char *pid_file = NULL;
2845    struct passwd *pw;
2846    struct sigaction sa;
2847    struct rlimit rlim;
2848    /* listening socket */
2849    static int *l_socket = NULL;
2850
2851    /* udp socket */
2852    static int *u_socket = NULL;
2853    static int u_socket_count = 0;
2854
2855    /* handle SIGINT */
2856    signal(SIGINT, sig_handler);
2857
2858    /* init settings */
2859    settings_init();
2860
2861    /* set stderr non-buffering (for running under, say, daemontools) */
2862    setbuf(stderr, NULL);
2863
2864    /* process arguments */
2865    while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:L")) != -1) {
2866        switch (c) {
2867        case 'a':
2868            /* access for unix domain socket, as octal mask (like chmod)*/
2869            settings.access= strtol(optarg,NULL,8);
2870            break;
2871
2872        case 'U':
2873            settings.udpport = atoi(optarg);
2874            break;
2875        case 'b':
2876            settings.managed = true;
2877            break;
2878        case 'p':
2879            settings.port = atoi(optarg);
2880            break;
2881        case 's':
2882            settings.socketpath = optarg;
2883            break;
2884        case 'm':
2885            settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
2886            break;
2887        case 'M':
2888            settings.evict_to_free = 0;
2889            break;
2890        case 'c':
2891            settings.maxconns = atoi(optarg);
2892            break;
2893        case 'h':
2894            usage();
2895            exit(EXIT_SUCCESS);
2896        case 'i':
2897            usage_license();
2898            exit(EXIT_SUCCESS);
2899        case 'k':
2900            lock_memory = true;
2901            break;
2902        case 'v':
2903            settings.verbose++;
2904            break;
2905        case 'l':
2906            settings.inter= strdup(optarg);
2907            break;
2908        case 'd':
2909            daemonize = true;
2910            break;
2911        case 'r':
2912            maxcore = 1;
2913            break;
2914        case 'u':
2915            username = optarg;
2916            break;
2917        case 'P':
2918            pid_file = optarg;
2919            break;
2920        case 'f':
2921            settings.factor = atof(optarg);
2922            if (settings.factor <= 1.0) {
2923                fprintf(stderr, "Factor must be greater than 1\n");
2924                return 1;
2925            }
2926            break;
2927        case 'n':
2928            settings.chunk_size = atoi(optarg);
2929            if (settings.chunk_size == 0) {
2930                fprintf(stderr, "Chunk size must be greater than 0\n");
2931                return 1;
2932            }
2933            break;
2934        case 't':
2935            settings.num_threads = atoi(optarg);
2936            if (settings.num_threads == 0) {
2937                fprintf(stderr, "Number of threads must be greater than 0\n");
2938                return 1;
2939            }
2940            break;
2941        case 'D':
2942            if (! optarg || ! optarg[0]) {
2943                fprintf(stderr, "No delimiter specified\n");
2944                return 1;
2945            }
2946            settings.prefix_delimiter = optarg[0];
2947            settings.detail_enabled = 1;
2948            break;
2949#if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
2950        case 'L' :
2951            if (enable_large_pages() == 0) {
2952                preallocate = true;
2953            }
2954            break;
2955#endif
2956        default:
2957            fprintf(stderr, "Illegal argument \"%c\"\n", c);
2958            return 1;
2959        }
2960    }
2961
2962    if (maxcore != 0) {