root/tags/R_1_0/memcached.c

Revision 13, 33.1 kB (checked in by bradfitz, 7 years ago)

fixup makefile, add Id lines

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *  memcached - memory caching daemon
4 *
5 *       http://www.danga.com/memcached/
6 *
7 *  Copyright 2003 Danga Interactive, Inc.  All rights reserved.
8 *
9 *  Use and distribution licensed under the GNU General Public License (GPL)
10 *
11 *  Authors:
12 *      Anatoly Vorobey <mellon@pobox.com>
13 *      Brad Fitzpatrick <brad@danga.com>
14 *
15 *  $Id$
16 */
17
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <sys/time.h>
21#include <sys/socket.h>
22#include <sys/signal.h>
23#include <sys/resource.h>
24#include <sys/mman.h>
25#include <fcntl.h>
26#include <stdlib.h>
27#include <stdio.h>
28#include <string.h>
29#include <unistd.h>
30#include <netinet/in.h>
31#include <errno.h>
32#include <event.h>
33#include <malloc.h>
34#include <Judy.h>
35
36#include "memcached.h"
37
38struct stats stats;
39struct settings settings;
40
41static item **todelete = 0;
42static int delcurr;
43static int deltotal;
44
45/* associative array, using Judy */
46static Pvoid_t PJSLArray = (Pvoid_t) NULL;
47
48void assoc_init(void) {
49    return;
50}
51
52void *assoc_find(char *key) {
53    Word_t * PValue; 
54    JSLG( PValue, PJSLArray, key);
55    if (PValue) {
56        return ((void *)*PValue);
57    } else return 0;
58}
59
60int assoc_insert(char *key, void *value) {
61    Word_t *PValue;
62    JSLI( PValue, PJSLArray, key);
63    if (PValue) {
64        *PValue = (Word_t) value;
65        return 1;
66    } else return 0;
67}
68
69void assoc_delete(char *key) {
70    int Rc_int;
71    JSLD( Rc_int, PJSLArray, key);
72    return;
73}
74
75void stats_init(void) {
76    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
77    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
78    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
79    stats.started = time(0);
80}
81
82void stats_reset(void) {
83    stats.total_items = stats.total_conns = 0;
84    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
85    stats.bytes_read = stats.bytes_written = 0;
86}
87
88void settings_init(void) {
89    settings.port = 11211;
90    settings.interface.s_addr = htonl(INADDR_ANY);
91    settings.maxbytes = 64*1024*1024; /* default is 64MB */
92    settings.maxitems = 0;            /* no limit on no. of items by default */
93    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
94    settings.verbose = 0;
95}
96
97conn **freeconns;
98int freetotal;
99int freecurr;
100
101void conn_init(void) {
102    freetotal = 200;
103    freecurr = 0;
104    freeconns = (conn **)malloc(sizeof (conn *)*freetotal);
105    return;
106}
107
108conn *conn_new(int sfd, int init_state, int event_flags) {
109    conn *c;
110
111    /* do we have a free conn structure from a previous close? */
112    if (freecurr > 0) {
113        c = freeconns[--freecurr];
114    } else { /* allocate a new one */
115        if (!(c = (conn *)malloc(sizeof(conn)))) {
116            perror("malloc()");
117            return 0;
118        }
119        c->rbuf = c->wbuf = 0;
120        c->ilist = 0;
121
122        c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);
123        c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);
124        c->ilist = (item **) malloc(sizeof(item *)*200);
125
126        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
127            if (c->rbuf != 0) free(c->rbuf);
128            if (c->wbuf != 0) free(c->wbuf);
129            if (c->ilist !=0) free(c->ilist);
130            free(c);
131            perror("malloc()");
132            return 0;
133        }
134        c->rsize = c->wsize = DATA_BUFFER_SIZE;
135        c->isize = 200;
136        stats.conn_structs++;
137    }
138
139    c->sfd = sfd;
140    c->state = init_state;
141    c->rlbytes = 0;
142    c->rbytes = c->wbytes = 0;
143    c->wcurr = c->wbuf;
144    c->rcurr = c->rbuf;
145    c->icurr = c->ilist; 
146    c->ileft = 0;
147    c->iptr = c->ibuf;
148    c->ibytes = 0;
149
150    c->write_and_go = conn_read;
151    c->write_and_free = 0;
152    c->item = 0;
153
154    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
155    c->ev_flags = event_flags;
156
157    if (event_add(&c->event, 0) == -1) {
158        free(c);
159        return 0;
160    }
161
162    stats.curr_conns++;
163    stats.total_conns++;
164
165    return c;
166}
167
168void conn_close(conn *c) {
169    /* delete the event, the socket and the conn */
170    event_del(&c->event);
171
172    close(c->sfd);
173
174    if (c->item) {
175        item_free(c->item);
176    }
177
178    if (c->ileft) {
179        for (; c->ileft > 0; c->ileft--,c->icurr++) {
180            item_remove(*(c->icurr));
181        }
182    }
183
184    if (c->write_and_free) {
185        free(c->write_and_free);
186    }
187
188    /* if we have enough space in the free connections array, put the structure there */
189    if (freecurr < freetotal) {
190        freeconns[freecurr++] = c;
191    } else {
192        /* try to enlarge free connections array */
193        conn **new_freeconns = realloc(freeconns, sizeof(conn *)*freetotal*2);
194        if (new_freeconns) {
195            freetotal *= 2;
196            freeconns = new_freeconns;
197            freeconns[freecurr++] = c;
198        } else {
199            free(c->rbuf);
200            free(c->wbuf);
201            free(c->ilist);
202            free(c);
203        }
204    }
205
206    stats.curr_conns--;
207
208    return;
209}
210
211void out_string(conn *c, char *str) {
212    int len;
213
214    len = strlen(str);
215    if (len + 2 > c->wsize) {
216        /* ought to be always enough. just fail for simplicity */
217        str = "SERVER_ERROR output line too long";
218        len = strlen(str);
219    }
220
221    strcpy(c->wbuf, str);
222    strcat(c->wbuf, "\r\n");
223    c->wbytes = len + 2;
224    c->wcurr = c->wbuf;
225
226    c->state = conn_write;
227    c->write_and_go = conn_read;
228    return;
229}
230
231/*
232 * we get here after reading the value in set/add/replace commands. The command
233 * has been stored in c->item_comm, and the item is ready in c->item.
234 */
235
236void complete_nread(conn *c) {
237    item *it = c->item;
238    int comm = c->item_comm;
239    item *old_it;
240    time_t now = time(0);
241
242    stats.set_cmds++;
243
244    while(1) {
245        if (strncmp((char *)(it->data) + it->nbytes - 2, "\r\n", 2) != 0) {
246            out_string(c, "CLIENT_ERROR bad data chunk");
247            break;
248        }
249
250        old_it = (item *)assoc_find(it->key);
251
252        if (old_it && old_it->exptime && old_it->exptime < now) {
253            item_unlink(old_it);
254            old_it = 0;
255        }
256
257        if (old_it && comm==NREAD_ADD) {
258            item_update(old_it);
259            out_string(c, "NOT_STORED");
260            break;
261        }
262       
263        if (!old_it && comm == NREAD_REPLACE) {
264            out_string(c, "NOT_STORED");
265            break;
266        }
267
268        if (old_it && (old_it->it_flags & ITEM_DELETED) && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
269            out_string(c, "NOT_STORED");
270            break;
271        }
272       
273        if (old_it) {
274            item_replace(old_it, it);
275        } else item_link(it);
276       
277        c->item = 0;
278        out_string(c, "STORED");
279        return;
280    }
281           
282    item_free(it); 
283    c->item = 0; 
284    return;
285}
286
287void process_stat(conn *c, char *command) {
288    time_t now = time(0);
289
290    if (strcmp(command, "stats") == 0) {
291        char temp[768];
292        pid_t pid = getpid();
293        char *pos = temp;
294
295        pos += sprintf(pos, "STAT pid %u\r\n", pid);
296        pos += sprintf(pos, "STAT uptime %u\r\n", now - stats.started);
297        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
298        pos += sprintf(pos, "STAT total_items %u\r\n", stats.total_items);
299        pos += sprintf(pos, "STAT bytes %llu\r\n", stats.curr_bytes);
300        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
301        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
302        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
303        pos += sprintf(pos, "STAT cmd_get %u\r\n", stats.get_cmds);
304        pos += sprintf(pos, "STAT cmd_set %u\r\n", stats.set_cmds);
305        pos += sprintf(pos, "STAT get_hits %u\r\n", stats.get_hits);
306        pos += sprintf(pos, "STAT get_misses %u\r\n", stats.get_misses);
307        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
308        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
309        pos += sprintf(pos, "STAT limit_maxbytes %u\r\n", settings.maxbytes);
310        pos += sprintf(pos, "STAT limit_maxitems %u\r\n", settings.maxitems);
311        pos += sprintf(pos, "END");
312        out_string(c, temp);
313        return;
314    }
315
316    if (strcmp(command, "stats reset") == 0) {
317        stats_reset();
318        out_string(c, "RESET");
319        return;
320    }
321
322    if (strcmp(command, "stats malloc") == 0) {
323        char temp[512];
324        struct mallinfo info;
325
326        info = mallinfo();
327        sprintf(temp, "STAT arena_size %d\r\nSTAT free_chunks %d\r\nSTAT fastbin_blocks %d\r\nSTAT mmaped_regions %d\r\nSTAT mmapped_space %d\r\nSTAT max_total_alloc %d\r\nSTAT fastbin_space %d\r\nSTAT total_alloc %d\r\nSTAT total_free %d\r\nSTAT releasable_space %d\r\nEND", 
328                info.arena, info.ordblks, info.smblks, info.hblks, info.hblkhd, info.usmblks, info.fsmblks, info.uordblks, info.fordblks, info.keepcost);
329        out_string(c, temp);
330        return;
331    }
332
333    if (strcmp(command, "stats maps") == 0) {
334        char *wbuf;
335        int wsize = 8192; /* should be enough */
336        int fd;
337        int res;
338
339        wbuf = (char *)malloc(wsize);
340        if (wbuf == 0) {
341            out_string(c, "SERVER_ERROR out of memory");
342            return;
343        }
344           
345        fd = open("/proc/self/maps", O_RDONLY);
346        if (fd == -1) {
347            out_string(c, "SERVER_ERROR cannot open the maps file");
348            free(wbuf);
349            return;
350        }
351
352        res = read(fd, wbuf, wsize - 6);  /* 6 = END\r\n\0 */
353        if (res == wsize - 6) {
354            out_string(c, "SERVER_ERROR buffer overflow");
355            free(wbuf); close(fd);
356            return;
357        }
358        if (res == 0 || res == -1) {
359            out_string(c, "SERVER_ERROR can't read the maps file");
360            free(wbuf); close(fd);
361            return;
362        }
363        strcpy(wbuf + res, "END\r\n");
364        c->write_and_free=wbuf;
365        c->wcurr=wbuf;
366        c->wbytes = res + 6;
367        c->state = conn_write;
368        c->write_and_go = conn_read;
369        close(fd);
370        return;
371    }
372
373    if (strncmp(command, "stats cachedump", 15) == 0) {
374        char *buf;
375        unsigned int bytes, id, limit = 0;
376        char *start = command + 15;
377        if (sscanf(start, "%u %u\r\n", &id, &limit) < 1) {
378            out_string(c, "CLIENT_ERROR bad command line");
379            return;
380        }
381
382        buf = item_cachedump(id, limit, &bytes);
383        if (buf == 0) {
384            out_string(c, "SERVER_ERROR out of memory");
385            return;
386        }
387
388        c->write_and_free = buf;
389        c->wcurr = buf;
390        c->wbytes = bytes;
391        c->state = conn_write;
392        c->write_and_go = conn_read;
393        return;
394    }
395
396    if (strcmp(command, "stats slabs")==0) {
397        char buffer[4096];
398        slabs_stats(buffer, 4096);
399        out_string(c, buffer);
400        return;
401    }
402
403    if (strcmp(command, "stats items")==0) {
404        char buffer[4096];
405        item_stats(buffer, 4096);
406        out_string(c, buffer);
407        return;
408    }
409
410    if (strcmp(command, "stats sizes")==0) {
411        int bytes = 0;
412        char *buf = item_stats_sizes(&bytes);
413        if (! buf) {
414            out_string(c, "SERVER_ERROR out of memory");
415            return;
416        }
417
418        c->write_and_free = buf;
419        c->wcurr = buf;
420        c->wbytes = bytes;
421        c->state = conn_write;
422        c->write_and_go = conn_read;
423        return;
424    }
425
426    out_string(c, "ERROR");
427}
428
429void process_command(conn *c, char *command) {
430   
431    int comm = 0;
432    int incr = 0;
433
434    /*
435     * for commands set/add/replace, we build an item and read the data
436     * directly into it, then continue in nread_complete().
437     */ 
438
439    if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) || 
440        (strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
441        (strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
442
443        char s_comm[10];
444        char key[256];
445        int flags;
446        time_t expire;
447        int len, res;
448        item *it;
449
450        res = sscanf(command, "%s %s %u %u %d\n", s_comm, key, &flags, &expire, &len);
451        if (res!=5 || strlen(key)==0 ) {
452            out_string(c, "CLIENT_ERROR bad command line format");
453            return;
454        }
455        it = item_alloc(key, flags, expire, len+2);
456        if (it == 0) {
457            out_string(c, "SERVER_ERROR out of memory");
458            /* swallow the data line */
459            c->write_and_go = conn_swallow;
460            c->sbytes = len+2;
461            return;
462        }
463
464        c->item_comm = comm;
465        c->item = it;
466        c->rcurr = it->data;
467        c->rlbytes = it->nbytes;
468        c->state = conn_nread;
469        return;
470    }
471
472    if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) ||
473        (strncmp(command, "decr ", 5) == 0)) {
474        char temp[32];
475        char s_comm[10];
476        unsigned int value;
477        item *it;
478        unsigned int delta;
479        char key[255];
480        int res;
481        char *ptr;
482        time_t now = time(0);
483
484        res = sscanf(command, "%s %s %u\n", s_comm, key, &delta);
485        if (res!=3 || strlen(key)==0 ) {
486            out_string(c, "CLIENT_ERROR bad command line format");
487            return;
488        }
489       
490        it = assoc_find(key);
491        if (it && (it->it_flags & ITEM_DELETED)) {
492            it = 0;
493        }
494        if (it && it->exptime && it->exptime < now) {
495            item_unlink(it);
496            it = 0;
497        }
498
499        if (!it) {
500            out_string(c, "NOT_FOUND");
501            return;
502        }
503
504        ptr = it->data;
505        while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;
506       
507        value = atoi(ptr);
508
509        if (incr)
510            value+=delta;
511        else {
512            if (delta >= value) value = 0;
513            else value-=delta;
514        }
515
516        sprintf(temp, "%u", value);
517        res = strlen(temp);
518        if (res + 2 > it->nbytes) { /* need to realloc */
519            item *new_it;
520            new_it = item_alloc(it->key, it->flags, it->exptime, res + 2 );
521            if (new_it == 0) {
522                out_string(c, "SERVER_ERROR out of memory");
523                return;
524            }
525            memcpy(new_it->data, temp, res);
526            memcpy((char *)(new_it->data) + res, "\r\n", 2);
527            item_replace(it, new_it);
528        } else { /* replace in-place */
529            memcpy(it->data, temp, res);
530            memset(it->data + res, ' ', it->nbytes-res-2);
531        }
532        out_string(c, temp);
533        return;
534    }
535       
536    if (strncmp(command, "get ", 4) == 0) {
537
538        char *start = command + 4;
539        char key[256];
540        int next;
541        int i = 0;
542        item *it;
543        time_t now = time(0);
544
545        while(sscanf(start, " %s%n", key, &next) >= 1) {
546            start+=next;
547            stats.get_cmds++;
548            it = (item *)assoc_find(key);
549            if (it && (it->it_flags & ITEM_DELETED)) {
550                it = 0;
551            }
552            if (it && it->exptime && it->exptime < now) {
553                item_unlink(it);
554                it = 0;
555            }
556
557            if (it) {
558                stats.get_hits++;
559                it->refcount++;
560                item_update(it);
561                *(c->ilist + i) = it;
562                i++;
563                if (i > c->isize) {
564                    c->isize *= 2;
565                    c->ilist = realloc(c->ilist, sizeof(item *)*c->isize);
566                }
567            } else stats.get_misses++;
568        }
569        c->icurr = c->ilist;
570        c->ileft = i;
571        if (c->ileft) {
572            c->ipart = 0;
573            c->state = conn_mwrite;
574            c->ibytes = 0;
575            return;
576        } else {
577            out_string(c, "END");
578            return;
579        }
580    }
581
582    if (strncmp(command, "delete ", 7) == 0) {
583        char key [256];
584        char *start = command+7;
585        item *it;
586
587        sscanf(start, " %s", key);
588        it = assoc_find(key);
589        if (!it) {
590            out_string(c, "NOT_FOUND");
591            return;
592        } else {
593            it->refcount++;
594            /* use its expiration time as its deletion time now */
595            it->exptime = time(0) + 4;
596            it->it_flags |= ITEM_DELETED;
597            todelete[delcurr++] = it;
598            if (delcurr >= deltotal) {
599                deltotal *= 2;
600                todelete = realloc(todelete, sizeof(item *)*deltotal);
601            }
602        }
603        out_string(c, "DELETED");
604        return;
605    }
606       
607    if (strncmp(command, "stats", 5) == 0) {
608        process_stat(c, command);
609        return;
610    }
611
612    if (strcmp(command, "version") == 0) {
613        out_string(c, "VERSION 2.0");
614        return;
615    }
616
617    if (strcmp(command, "quit") == 0) {
618        c->state = conn_closing;
619        return;
620    }
621
622    out_string(c, "ERROR");
623    return;
624}
625
626/*
627 * if we have a complete line in the buffer, process it and move whatever
628 * remains in the buffer to its beginning.
629 */
630int try_read_command(conn *c) {
631    char *el, *cont;
632
633    if (!c->rbytes)
634        return 0;
635    el = memchr(c->rbuf, '\n', c->rbytes);
636    if (!el)
637        return 0;
638    cont = el + 1;
639    if (el - c->rbuf > 1 && *(el - 1) == '\r') {
640        el--;
641    }
642    *el = '\0';
643
644    process_command(c, c->rbuf);
645
646    if (cont - c->rbuf < c->rbytes) { /* more stuff in the buffer */
647        memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
648    }
649    c->rbytes -= (cont - c->rbuf);
650    return 1;
651}
652
653/*
654 * read from network as much as we can, handle buffer overflow and connection
655 * close.
656 * return 0 if there's nothing to read on the first read.
657 */
658int try_read_network(conn *c) {
659    int gotdata = 0;
660    int res;
661    while (1) {
662        if (c->rbytes >= c->rsize) {
663            char *new_rbuf = realloc(c->rbuf, c->rsize*2);
664            if (!new_rbuf) {
665                if(settings.verbose)
666                    fprintf(stderr, "Couldn't realloc input buffer\n");
667                c->rbytes = 0; /* ignore what we read */
668                out_string(c, "SERVER_ERROR out of memory");
669                c->write_and_go = conn_closing;
670                return 1;
671            }
672            c->rbuf = new_rbuf; c->rsize *= 2;
673        }
674        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
675        if (res > 0) {
676            stats.bytes_read += res;
677            gotdata = 1;
678            c->rbytes += res;
679            continue;
680        }
681        if (res == 0) {
682            /* connection closed */
683            c->state = conn_closing;
684            return 1;
685        }
686        if (res == -1) {
687            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
688            else return 0;
689        }
690    }
691    return gotdata;
692}
693
694int update_event(conn *c, int new_flags) {
695    if (c->ev_flags == new_flags)
696        return;
697    if (event_del(&c->event) == -1) return 0;
698    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
699    c->ev_flags = new_flags;
700    if (event_add(&c->event, 0) == -1) return 0;
701    return 1;
702}
703   
704void drive_machine(conn *c) {
705
706    int exit = 0;
707    int sfd, flags = 1;
708    socklen_t addrlen;
709    struct sockaddr addr;
710    conn *newc;
711    int res;
712
713    while (!exit) {
714      /*printf("state %d\n", c->state); */
715        switch(c->state) {
716        case conn_listening:
717            addrlen = sizeof(addr);
718            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
719                if (errno == EAGAIN || errno == EWOULDBLOCK) {
720                    perror("accept() shouldn't block");
721                } else {
722                    perror("accept()");
723                }
724                return;
725            }
726            if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
727                fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
728                perror("setting O_NONBLOCK");
729                close(sfd);
730                return;
731            }           
732            newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
733            if (!newc) {
734                if(settings.verbose)
735                    fprintf(stderr, "couldn't create new connection\n");
736                close(sfd);
737                return;
738            }
739            exit = 1;
740            break;
741
742        case conn_read:
743            if (try_read_command(c)) {
744                continue;
745            }
746            if (try_read_network(c)) {
747                continue;
748            }
749            /* we have no command line and no data to read from network */
750            if (!update_event(c, EV_READ | EV_PERSIST)) {
751                if(settings.verbose)
752                    fprintf(stderr, "Couldn't update event\n");
753                c->state = conn_closing;
754                break;
755            }
756            exit = 1;
757            break;
758
759        case conn_nread:
760            /* we are reading rlbytes into rcurr; */
761            if (c->rlbytes == 0) {
762                complete_nread(c);
763                break;
764            }
765            /* first check if we have leftovers in the conn_read buffer */
766            if (c->rbytes > 0) {
767                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
768                memcpy(c->rcurr, c->rbuf, tocopy);
769                c->rcurr += tocopy;
770                c->rlbytes -= tocopy;
771                if (c->rbytes > tocopy) {
772                    memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
773                }
774                c->rbytes -= tocopy;
775                break;
776            }
777
778            /*  now try reading from the socket */
779            res = read(c->sfd, c->rcurr, c->rlbytes);
780            if (res > 0) {
781                stats.bytes_read += res;
782                c->rcurr += res;
783                c->rlbytes -= res;
784                break;
785            }
786            if (res == 0) { /* end of stream */
787                c->state = conn_closing;
788                break;
789            }
790            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
791                if (!update_event(c, EV_READ | EV_PERSIST)) {
792                    if(settings.verbose) 
793                        fprintf(stderr, "Couldn't update event\n");
794                    c->state = conn_closing;
795                    break;
796                }
797                exit = 1;
798                break;
799            }
800            /* otherwise we have a real error, on which we close the connection */
801            if(settings.verbose)
802                fprintf(stderr, "Failed to read, and not due to blocking\n");
803            c->state = conn_closing;
804            break;
805
806        case conn_swallow:
807            /* we are reading sbytes and throwing them away */
808            if (c->sbytes == 0) {
809                c->state = conn_read;
810                break;
811            }
812
813            /* first check if we have leftovers in the conn_read buffer */
814            if (c->rbytes > 0) {
815                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
816                c->sbytes -= tocopy;
817                if (c->rbytes > tocopy) {
818                    memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
819                }
820                c->rbytes -= tocopy;
821                break;
822            }
823
824            /*  now try reading from the socket */
825            res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
826            if (res > 0) {
827                stats.bytes_read += res;
828                c->sbytes -= res;
829                break;
830            }
831            if (res == 0) { /* end of stream */
832                c->state = conn_closing;
833                break;
834            }
835            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
836                if (!update_event(c, EV_READ | EV_PERSIST)) {
837                    if(settings.verbose)
838                        fprintf(stderr, "Couldn't update event\n");
839                    c->state = conn_closing;
840                    break;
841                }
842                exit = 1;
843                break;
844            }
845            /* otherwise we have a real error, on which we close the connection */
846            if(settings.verbose)
847                fprintf(stderr, "Failed to read, and not due to blocking\n");
848            c->state = conn_closing;
849            break;
850
851        case conn_write:
852            /* we are writing wbytes bytes starting from wcurr */
853            if (c->wbytes == 0) {
854                if (c->write_and_free) {
855                    free(c->write_and_free);
856                    c->write_and_free = 0;
857                }
858                c->state = c->write_and_go;
859                break;
860            }
861            res = write(c->sfd, c->wcurr, c->wbytes);
862            if (res > 0) {
863                stats.bytes_written += res;
864                c->wcurr  += res;
865                c->wbytes -= res;
866                break;
867            }
868            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
869                if (!update_event(c, EV_WRITE | EV_PERSIST)) {
870                    if(settings.verbose)
871                        fprintf(stderr, "Couldn't update event\n");
872                    c->state = conn_closing;
873                    break;
874                }               
875                exit = 1;
876                break;
877            }
878            /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
879               we have a real error, on which we close the connection */
880            if(settings.verbose)
881                fprintf(stderr, "Failed to write, and not due to blocking\n");
882            c->state = conn_closing;
883            break;
884        case conn_mwrite:
885            /*
886             * we're writing ibytes bytes from iptr. iptr alternates between
887             * ibuf, where we build a string "VALUE...", and it->data for the
888             * current item. When we finish a chunk, we choose the next one using
889             * ipart, which has the following semantics: 0 - start the loop, 1 -
890             * we finished ibuf, go to current it->data; 2 - we finished it->data,
891             * move to the next item and build its ibuf; 3 - we finished all items,
892             * write "END".
893             */
894            if (c->ibytes > 0) {
895                res = write(c->sfd, c->iptr, c->ibytes);
896                if (res > 0) {
897                    stats.bytes_written += res;
898                    c->iptr += res;
899                    c->ibytes -= res;
900                    break;
901                }
902                if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
903                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
904                        if(settings.verbose)
905                            fprintf(stderr, "Couldn't update event\n");
906                        c->state = conn_closing;
907                        break;
908                    }
909                    exit = 1;
910                    break;
911                }
912                /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
913                   we have a real error, on which we close the connection */
914                if(settings.verbose)
915                    fprintf(stderr, "Failed to write, and not due to blocking\n");
916                c->state = conn_closing;
917                break;
918            } else {
919                item *it;
920                /* we finished a chunk, decide what to do next */
921                switch (c->ipart) {
922                case 1:
923                    it = *(c->icurr);
924                    c->iptr = it->data;
925                    c->ibytes = it->nbytes;
926                    c->ipart = 2;
927                    break;
928                case 2:
929                    it = *(c->icurr);
930                    item_remove(it);
931                    if (c->ileft <= 1) {
932                        c->ipart = 3;
933                        break;
934                    } else {
935                        c->ileft--;
936                        c->icurr++;
937                    }
938                    /* FALL THROUGH */
939                case 0:
940                    it = *(c->icurr);
941                    sprintf(c->ibuf, "VALUE %s %u %u\r\n", it->key, it->flags, it->nbytes - 2);
942                    c->iptr = c->ibuf;
943                    c->ibytes = strlen(c->iptr);
944                    c->ipart = 1;
945                    break;
946                case 3:
947                    out_string(c, "END");
948                    break;
949                }
950            }
951            break;
952
953        case conn_closing:
954            conn_close(c);
955            exit = 1;
956            break;
957        }
958
959    }
960
961    return;
962}
963
964
965void event_handler(int fd, short which, void *arg) {
966    conn *c;
967   
968    c = (conn *)arg;
969    c->which = which;
970
971    /* sanity */
972    if (fd != c->sfd) {
973        if(settings.verbose)
974            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
975        conn_close(c);
976        return;
977    }
978
979    /* do as much I/O as possible until we block */
980    drive_machine(c);
981
982    /* wait for next event */
983    return;
984}
985
986int new_socket(void) {
987    int sfd;
988    int flags;
989
990    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
991        perror("socket()");
992        return -1;
993    }
994
995    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
996        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
997        perror("setting O_NONBLOCK");
998        close(sfd);
999        return -1;
1000    }
1001    return sfd;
1002}
1003
1004int server_socket(int port) {
1005    int sfd;
1006    struct linger ling = {0, 0};
1007    struct sockaddr_in addr;
1008    int flags =1;
1009
1010    if ((sfd = new_socket()) == -1) {
1011        return -1;
1012    }
1013
1014    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
1015    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
1016    setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
1017
1018    addr.sin_family = AF_INET;
1019    addr.sin_port = htons(port);
1020    addr.sin_addr = settings.interface;
1021    if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
1022        perror("bind()");
1023        close(sfd);
1024        return -1;
1025    }
1026    if (listen(sfd, 1024) == -1) {
1027        perror("listen()");
1028        close(sfd);
1029        return -1;
1030    }
1031    return sfd;
1032}
1033
1034
1035struct event deleteevent;
1036
1037void delete_handler(int fd, short which, void *arg) {
1038    struct timeval t;
1039
1040    evtimer_del(&deleteevent);
1041    evtimer_set(&deleteevent, delete_handler, 0);
1042    t.tv_sec = 5; t.tv_usec=0;
1043    evtimer_add(&deleteevent, &t);
1044
1045    {
1046        int i, j=0;
1047        time_t now = time(0);
1048        for (i=0; i<delcurr; i++) {
1049            if (todelete[i]->exptime < now) {
1050                /* no longer mark it deleted. it's now expired, same as dead */
1051                todelete[i]->it_flags &= ~ITEM_DELETED;
1052                todelete[i]->refcount--;
1053            } else {
1054                todelete[j++] = todelete[i];
1055            }
1056        }
1057        delcurr = j;
1058    }
1059               
1060    return;
1061}
1062       
1063void usage(void) {
1064    printf("-p <num>      port number to listen on\n");
1065    printf("-l <ip_addr>  interface to listen on, default is INDRR_ANY\n");
1066    printf("-s <num>      maximum number of items to store, default is unlimited\n");
1067    printf("-m <num>      max memory to use for items in megabytes, default is 64 MB\n");
1068    printf("-c <num>      max simultaneous connections, default is 1024\n");
1069    printf("-k            lock down all paged memory\n");
1070    printf("-v            verbose (print errors/warnings while in event loop)\n");
1071    printf("-d            run as a daemon\n");
1072    printf("-h            print this help and exit\n");
1073
1074    return;
1075}
1076
1077int main (int argc, char **argv) {
1078    int c;
1079    int l_socket;
1080    conn *l_conn;
1081    struct in_addr addr;
1082    int lock_memory = 0;
1083    int daemonize = 0;
1084
1085    /* init settings */
1086    settings_init();
1087
1088    /* process arguments */
1089    while ((c = getopt(argc, argv, "p:s:m:c:khvdl:")) != -1) {
1090        switch (c) {
1091        case 'p':
1092            settings.port = atoi(optarg);
1093            break;
1094        case 's':
1095            settings.maxitems = atoi(optarg);
1096            break;
1097        case 'm':
1098            settings.maxbytes = atoi(optarg)*1024*1024;
1099            break;
1100        case 'c':
1101            settings.maxconns = atoi(optarg);
1102            break;
1103        case 'h':
1104            usage();
1105            exit(0);
1106        case 'k':
1107            lock_memory = 1;
1108            break;
1109        case 'v':
1110            settings.verbose = 1;
1111            break;
1112        case 'l':
1113            if (!inet_aton(optarg, &addr)) {
1114                fprintf(stderr, "Illegal address: %s\n", optarg);
1115                return 1;
1116            } else {
1117                settings.interface = addr;
1118            }
1119            break;
1120        case 'd':
1121            daemonize = 1;
1122            break;
1123        default:
1124            fprintf(stderr, "Illegal argument \"%c\"\n", c);
1125            return 1;
1126        }
1127    }
1128
1129    /* initialize other stuff stuff */
1130    item_init();
1131    event_init();
1132    stats_init();
1133    assoc_init();
1134    conn_init();
1135    slabs_init(settings.maxbytes);
1136
1137
1138    if (daemonize) {
1139        int res;
1140        res = daemon(0, 0);
1141        if (res == -1) {
1142            fprintf(stderr, "failed to fork() in order to daemonize\n");
1143            return 1;
1144        }
1145    }
1146
1147    /* lock paged memory if needed */
1148    if (lock_memory) {
1149        mlockall(MCL_CURRENT | MCL_FUTURE);
1150    }
1151
1152    /* create the listening socket and bind it */
1153    l_socket = server_socket(settings.port);
1154    if (l_socket == -1) {
1155        fprintf(stderr, "failed to listen\n");
1156        exit(1);
1157    }
1158
1159    /* create the initial listening connection */
1160    if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST))) {
1161        fprintf(stderr, "failed to create listening connection");
1162        exit(1);
1163    }
1164
1165    /* initialise deletion array and timer event */
1166    deltotal = 200; delcurr = 0;
1167    todelete = malloc(sizeof(item *)*deltotal);
1168    delete_handler(0,0,0); /* sets up the event */
1169
1170    /* enter the loop */
1171    event_loop(0);
1172
1173    return;
1174}
1175
Note: See TracBrowser for help on using the browser.