root/branches/binary/server/memcached.h @ 775

Revision 775, 15.9 kB (checked in by dsallings, 20 months ago)

Syscall optimizations from Trond Norbye.

From Trond:
I have been using the last two days to test the modifications I have done to
the binary protocol, and I have not been able to find any new bugs. From my
testing the binary protocol now use the same amount of system calls as the
textual protocol.

I have tried to use a common state machine, and branch the execution path as
late as possible.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/* $Id$ */
3
4#ifdef HAVE_CONFIG_H
5#include "config.h"
6#endif
7
8#include <sys/types.h>
9#include <sys/socket.h>
10#include <sys/time.h>
11#include <netinet/in.h>
12#include <event.h>
13#include <netdb.h>
14
15#define DATA_BUFFER_SIZE 2048
16#define UDP_READ_BUFFER_SIZE 65536
17#define UDP_MAX_PAYLOAD_SIZE 1400
18#define UDP_HEADER_SIZE 8
19#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
20/* I'm told the max legnth of a 64-bit num converted to string is 20 bytes.
21 * Plus a few for spaces, \r\n, \0 */
22#define SUFFIX_SIZE 24
23
24/** Initial size of list of items being returned by "get". */
25#define ITEM_LIST_INITIAL 200
26
27/** Initial size of list of CAS suffixes appended to "gets" lines. */
28#define SUFFIX_LIST_INITIAL 20
29
30/** Initial size of the sendmsg() scatter/gather array. */
31#define IOV_LIST_INITIAL 400
32
33/** Initial number of sendmsg() argument structures to allocate. */
34#define MSG_LIST_INITIAL 10
35
36/** High water marks for buffer shrinking */
37#define READ_BUFFER_HIGHWAT 8192
38#define ITEM_LIST_HIGHWAT 400
39#define IOV_LIST_HIGHWAT 600
40#define MSG_LIST_HIGHWAT 100
41
42/* Binary protocol stuff */
43#define MIN_BIN_PKT_LENGTH 16
44/* flags:32, expiration:32, cas:64 */
45#define BIN_SET_HDR_LEN 16
46/* incr:64, initial:64, expiration:32 */
47#define BIN_INCR_HDR_LEN 20
48/* flags:32, cas:64 */
49#define GET_RES_HDR_LEN (4+8)
50/* timeout:32 */
51#define BIN_DEL_HDR_LEN 4
52#define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t))
53
54/* Body is a single 64-bit int */
55#define INCR_RES_LEN 8
56/* len(18446744073709551616) + 2 (or so) */
57#define INCR_MAX_STORAGE_LEN 24
58
59#define BIN_REQ_MAGIC 0x80
60#define BIN_RES_MAGIC 0x81
61
62#define CMD_GET 0
63#define CMD_SET 1
64#define CMD_ADD 2
65#define CMD_REPLACE 3
66#define CMD_DELETE 4
67#define CMD_INCR 5
68#define CMD_DECR 6
69#define CMD_QUIT 7
70#define CMD_FLUSH 8
71#define CMD_GETQ 9
72#define CMD_NOOP 10
73#define CMD_VERSION 11
74
75#define ERR_UNKNOWN_CMD 0x81
76#define ERR_OUT_OF_MEMORY 0x82
77
78#define ERR_NOT_FOUND 0x1
79#define ERR_EXISTS 0x2
80#define ERR_TOO_LARGE 0x3
81#define ERR_INVALID_ARGUMENTS 0x4
82#define ERR_NOT_STORED 0x5
83
84/* Get a consistent bool type */
85#if HAVE_STDBOOL_H
86# include <stdbool.h>
87#else
88  typedef enum {false = 0, true = 1} bool;
89#endif
90
91#if HAVE_STDINT_H
92# include <stdint.h>
93#else
94 typedef unsigned char             uint8_t;
95#endif
96
97/* unistd.h is here */
98#if HAVE_UNISTD_H
99# include <unistd.h>
100#endif
101
102/** Time relative to server start. Smaller than time_t on 64-bit systems. */
103typedef unsigned int rel_time_t;
104
105struct stats {
106    unsigned int  curr_items;
107    unsigned int  total_items;
108    uint64_t      curr_bytes;
109    unsigned int  curr_conns;
110    unsigned int  total_conns;
111    unsigned int  conn_structs;
112    uint64_t      get_cmds;
113    uint64_t      set_cmds;
114    uint64_t      get_hits;
115    uint64_t      get_misses;
116    uint64_t      evictions;
117    time_t        started;          /* when the process was started */
118    uint64_t      bytes_read;
119    uint64_t      bytes_written;
120};
121
122#define MAX_VERBOSITY_LEVEL 2
123
124struct settings {
125    size_t maxbytes;
126    int maxconns;
127    int port;
128    int udpport;
129    char *inter;
130    int verbose;
131    rel_time_t oldest_live; /* ignore existing items older than this */
132    bool managed;          /* if 1, a tracker manages virtual buckets */
133    int evict_to_free;
134    char *socketpath;   /* path to unix socket if using local socket */
135    int access;  /* access mask (a la chmod) for unix domain socket */
136    double factor;          /* chunk size growth factor */
137    int chunk_size;
138    int num_threads;        /* number of libevent threads to run */
139    char prefix_delimiter;  /* character that marks a key prefix (for stats) */
140    int detail_enabled;     /* nonzero if we're collecting detailed stats */
141};
142
143extern struct stats stats;
144extern struct settings settings;
145
146#define ITEM_LINKED 1
147#define ITEM_DELETED 2
148
149/* temp */
150#define ITEM_SLABBED 4
151
152typedef struct _stritem {
153    struct _stritem *next;
154    struct _stritem *prev;
155    struct _stritem *h_next;    /* hash chain next */
156    rel_time_t      time;       /* least recent access */
157    rel_time_t      exptime;    /* expire time */
158    int             nbytes;     /* size of data */
159    unsigned short  refcount;
160    uint8_t         nsuffix;    /* length of flags-and-length string */
161    uint8_t         it_flags;   /* ITEM_* above */
162    uint8_t         slabs_clsid;/* which slab class we're in */
163    uint8_t         nkey;       /* key length, w/terminating null and padding */
164    uint64_t        cas_id;     /* the CAS identifier */
165    void * end[];
166    /* then null-terminated key */
167    /* then " flags length\r\n" (no terminating null) */
168    /* then data with terminating \r\n (no terminating null; it's binary!) */
169} item;
170
171#define ITEM_key(item) ((char*)&((item)->end[0]))
172
173/* warning: don't use these macros with a function, as it evals its arg twice */
174#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1)
175#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix)
176#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + (item)->nsuffix + (item)->nbytes)
177
178/**
179 * NOTE: If you modify this table you _MUST_ update the function state_text
180 */
181enum conn_states {
182    conn_listening,  /** the socket which listens for connections */
183    conn_new_cmd,    /** Prepare connection for next command */
184    conn_waiting,    /** waiting for a readable socket */
185    conn_read,       /** reading in a command line */
186    conn_parse_cmd,  /** try to parse a command from the input buffer */
187    conn_write,      /** writing out a simple response */
188    conn_nread,      /** reading in a fixed number of bytes */
189    conn_swallow,    /** swallowing unnecessary bytes w/o storing */
190    conn_closing,    /** closing this connection */
191    conn_mwrite,     /** writing out many items sequentially */
192    conn_max_state,  /** Max state value (used for assertion) */
193};
194
195enum bin_substates {
196    bin_no_state,
197    bin_reading_set_header,
198    bin_reading_cas_header,
199    bin_read_set_value,
200    bin_reading_get_key,
201    bin_reading_del_header,
202    bin_reading_incr_header,
203};
204
205enum protocol {
206    ascii_prot = 3, /* arbitrary value. */
207    ascii_udp_prot,
208    binary_prot,
209    negotiating_prot, /* Discovering the protocol */
210};
211
212#define IS_UDP(x) (x == ascii_udp_prot)
213
214#define NREAD_ADD 1
215#define NREAD_SET 2
216#define NREAD_REPLACE 3
217#define NREAD_APPEND 4
218#define NREAD_PREPEND 5
219#define NREAD_CAS 6
220
221typedef struct conn conn;
222struct conn {
223    int    sfd;
224    enum conn_states  state;
225    enum bin_substates substate;
226    struct event event;
227    short  ev_flags;
228    short  which;   /** which events were just triggered */
229
230    char   *rbuf;   /** buffer to read commands into */
231    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
232    int    rsize;   /** total allocated size of rbuf */
233    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */
234
235    char   *wbuf;
236    char   *wcurr;
237    int    wsize;
238    int    wbytes;
239    int    write_and_go; /** which state to go into after finishing current write */
240    void   *write_and_free; /** free this memory after finishing writing */
241
242    char   *ritem;  /** when we read in an item's value, it goes here */
243    int    rlbytes;
244
245    /* data for the nread state */
246
247    /**
248     * item is used to hold an item structure created after reading the command
249     * line of set/add/replace commands, but before we finished reading the actual
250     * data. The data is read into ITEM_data(item) to avoid extra copying.
251     */
252
253    void   *item;     /* for commands set/add/replace  */
254    int    item_comm; /* which one is it: set/add/replace */
255
256    /* data for the swallow state */
257    int    sbytes;    /* how many bytes to swallow */
258
259    /* data for the mwrite state */
260    struct iovec *iov;
261    int    iovsize;   /* number of elements allocated in iov[] */
262    int    iovused;   /* number of elements used in iov[] */
263
264    struct msghdr *msglist;
265    int    msgsize;   /* number of elements allocated in msglist[] */
266    int    msgused;   /* number of elements used in msglist[] */
267    int    msgcurr;   /* element in msglist[] being transmitted now */
268    int    msgbytes;  /* number of bytes in current msg */
269
270    item   **ilist;   /* list of items to write out */
271    int    isize;
272    item   **icurr;
273    int    ileft;
274
275    char   **suffixlist;
276    int    suffixsize;
277    char   **suffixcurr;
278    int    suffixleft;
279
280    enum protocol protocol;   /* which protocol this connection speaks */
281
282    /* data for UDP clients */
283    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
284    struct sockaddr request_addr; /* Who sent the most recent request */
285    socklen_t request_addr_size;
286    unsigned char *hdrbuf; /* udp packet headers */
287    int    hdrsize;   /* number of headers' worth of space is allocated */
288
289    int    binary;    /* are we in binary mode */
290    int    bucket;    /* bucket number for the next command, if running as
291                         a managed instance. -1 (_not_ 0) means invalid. */
292    int    gen;       /* generation requested for the bucket */
293    bool   noreply;   /* True if the reply should not be sent. */
294    /* Binary protocol stuff */
295    /* This is where the binary header goes */
296    uint32_t bin_header[MIN_BIN_PKT_LENGTH/sizeof(uint32_t)];
297    short cmd;
298    int opaque;
299    int keylen;
300    conn   *next;     /* Used for generating a list of conn structures */
301};
302
303
304/* number of virtual buckets for a managed instance */
305#define MAX_BUCKETS 32768
306
307/* current time of day (updated periodically) */
308extern volatile rel_time_t current_time;
309
310/*
311 * Functions
312 */
313
314conn *do_conn_from_freelist();
315bool do_conn_add_to_freelist(conn *c);
316char *do_suffix_from_freelist();
317bool do_suffix_add_to_freelist(char *s);
318char *do_defer_delete(item *item, time_t exptime);
319void do_run_deferred_deletes(void);
320char *do_add_delta(item *item, const bool incr, const int64_t delta, char *buf);
321int do_store_item(item *item, int comm);
322conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum protocol prot, struct event_base *base);
323
324
325#include "stats.h"
326#include "slabs.h"
327#include "assoc.h"
328#include "items.h"
329
330
331/*
332 * In multithreaded mode, we wrap certain functions with lock management and
333 * replace the logic of some other functions. All wrapped functions have
334 * "mt_" and "do_" variants. In multithreaded mode, the plain version of a
335 * function is #define-d to the "mt_" variant, which often just grabs a
336 * lock and calls the "do_" function. In singlethreaded mode, the "do_"
337 * function is called directly.
338 *
339 * Functions such as the libevent-related calls that need to do cross-thread
340 * communication in multithreaded mode (rather than actually doing the work
341 * in the current thread) are called via "dispatch_" frontends, which are
342 * also #define-d to directly call the underlying code in singlethreaded mode.
343 */
344#ifdef USE_THREADS
345
346void thread_init(int nthreads, struct event_base *main_base);
347int  dispatch_event_add(int thread, conn *c);
348void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot);
349
350/* Lock wrappers for cache functions that are called from main loop. */
351char *mt_add_delta(item *item, const int incr, const int64_t delta, char *buf);
352void mt_assoc_move_next_bucket(void);
353conn *mt_conn_from_freelist(void);
354bool  mt_conn_add_to_freelist(conn *c);
355char *mt_suffix_from_freelist(void);
356bool  mt_suffix_add_to_freelist(char *s);
357char *mt_defer_delete(item *it, time_t exptime);
358int   mt_is_listen_thread(void);
359item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
360char *mt_item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
361void  mt_item_flush_expired(void);
362item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked);
363int   mt_item_link(item *it);
364void  mt_item_remove(item *it);
365int   mt_item_replace(item *it, item *new_it);
366char *mt_item_stats(int *bytes);
367char *mt_item_stats_sizes(int *bytes);
368void  mt_item_unlink(item *it);
369void  mt_item_update(item *it);
370void  mt_run_deferred_deletes(void);
371void *mt_slabs_alloc(size_t size, unsigned int id);
372void  mt_slabs_free(void *ptr, size_t size, unsigned int id);
373int   mt_slabs_reassign(unsigned char srcid, unsigned char dstid);
374char *mt_slabs_stats(int *buflen);
375void  mt_stats_lock(void);
376void  mt_stats_unlock(void);
377int   mt_store_item(item *item, int comm);
378
379
380# define add_delta(x,y,z,a)          mt_add_delta(x,y,z,a)
381# define assoc_move_next_bucket()    mt_assoc_move_next_bucket()
382# define conn_from_freelist()        mt_conn_from_freelist()
383# define conn_add_to_freelist(x)     mt_conn_add_to_freelist(x)
384# define suffix_from_freelist()      mt_suffix_from_freelist()
385# define suffix_add_to_freelist(x)   mt_suffix_add_to_freelist(x)
386# define defer_delete(x,y)           mt_defer_delete(x,y)
387# define is_listen_thread()          mt_is_listen_thread()
388# define item_alloc(x,y,z,a,b)       mt_item_alloc(x,y,z,a,b)
389# define item_cachedump(x,y,z)       mt_item_cachedump(x,y,z)
390# define item_flush_expired()        mt_item_flush_expired()
391# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z)
392# define item_link(x)                mt_item_link(x)
393# define item_remove(x)              mt_item_remove(x)
394# define item_replace(x,y)           mt_item_replace(x,y)
395# define item_stats(x)               mt_item_stats(x)
396# define item_stats_sizes(x)         mt_item_stats_sizes(x)
397# define item_update(x)              mt_item_update(x)
398# define item_unlink(x)              mt_item_unlink(x)
399# define run_deferred_deletes()      mt_run_deferred_deletes()
400# define slabs_alloc(x,y)            mt_slabs_alloc(x,y)
401# define slabs_free(x,y,z)           mt_slabs_free(x,y,z)
402# define slabs_reassign(x,y)         mt_slabs_reassign(x,y)
403# define slabs_stats(x)              mt_slabs_stats(x)
404# define store_item(x,y)             mt_store_item(x,y)
405
406# define STATS_LOCK()                mt_stats_lock()
407# define STATS_UNLOCK()              mt_stats_unlock()
408
409#else /* !USE_THREADS */
410
411# define add_delta(x,y,z,a)          do_add_delta(x,y,z,a)
412# define assoc_move_next_bucket()    do_assoc_move_next_bucket()
413# define conn_from_freelist()        do_conn_from_freelist()
414# define conn_add_to_freelist(x)     do_conn_add_to_freelist(x)
415# define suffix_from_freelist()      do_suffix_from_freelist()
416# define suffix_add_to_freelist(x)   do_suffix_add_to_freelist(x)
417# define defer_delete(x,y)           do_defer_delete(x,y)
418# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
419# define dispatch_event_add(t,c)     event_add(&(c)->event, 0)
420# define is_listen_thread()          1
421# define item_alloc(x,y,z,a,b)       do_item_alloc(x,y,z,a,b)
422# define item_cachedump(x,y,z)       do_item_cachedump(x,y,z)
423# define item_flush_expired()        do_item_flush_expired()
424# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z)
425# define item_link(x)                do_item_link(x)
426# define item_remove(x)              do_item_remove(x)
427# define item_replace(x,y)           do_item_replace(x,y)
428# define item_stats(x)               do_item_stats(x)
429# define item_stats_sizes(x)         do_item_stats_sizes(x)
430# define item_unlink(x)              do_item_unlink(x)
431# define item_update(x)              do_item_update(x)
432# define run_deferred_deletes()      do_run_deferred_deletes()
433# define slabs_alloc(x,y)            do_slabs_alloc(x,y)
434# define slabs_free(x,y,z)           do_slabs_free(x,y,z)
435# define slabs_reassign(x,y)         do_slabs_reassign(x,y)
436# define slabs_stats(x)              do_slabs_stats(x)
437# define store_item(x,y)             do_store_item(x,y)
438# define thread_init(x,y)            0
439
440# define STATS_LOCK()                /**/
441# define STATS_UNLOCK()              /**/
442
443#endif /* !USE_THREADS */
444
445/* If supported, give compiler hints for branch prediction. */
446#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
447#define __builtin_expect(x, expected_value) (x)
448#endif
449
450#define likely(x)       __builtin_expect((x),1)
451#define unlikely(x)     __builtin_expect((x),0)
Note: See TracBrowser for help on using the browser.