| 1 | /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
|---|
| 2 | /* $Id$ */ |
|---|
| 3 | |
|---|
| 4 | #ifdef HAVE_CONFIG_H |
|---|
| 5 | #include "config.h" |
|---|
| 6 | #endif |
|---|
| 7 | |
|---|
| 8 | #include <sys/types.h> |
|---|
| 9 | #include <sys/socket.h> |
|---|
| 10 | #include <sys/time.h> |
|---|
| 11 | #include <netinet/in.h> |
|---|
| 12 | #include <event.h> |
|---|
| 13 | |
|---|
| 14 | #define DATA_BUFFER_SIZE 2048 |
|---|
| 15 | #define UDP_READ_BUFFER_SIZE 65536 |
|---|
| 16 | #define UDP_MAX_PAYLOAD_SIZE 1400 |
|---|
| 17 | #define UDP_HEADER_SIZE 8 |
|---|
| 18 | #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) |
|---|
| 19 | |
|---|
| 20 | /** Initial size of list of items being returned by "get". */ |
|---|
| 21 | #define ITEM_LIST_INITIAL 200 |
|---|
| 22 | |
|---|
| 23 | /** Initial size of the sendmsg() scatter/gather array. */ |
|---|
| 24 | #define IOV_LIST_INITIAL 400 |
|---|
| 25 | |
|---|
| 26 | /** Initial number of sendmsg() argument structures to allocate. */ |
|---|
| 27 | #define MSG_LIST_INITIAL 10 |
|---|
| 28 | |
|---|
| 29 | /** High water marks for buffer shrinking */ |
|---|
| 30 | #define READ_BUFFER_HIGHWAT 8192 |
|---|
| 31 | #define ITEM_LIST_HIGHWAT 400 |
|---|
| 32 | #define IOV_LIST_HIGHWAT 600 |
|---|
| 33 | #define MSG_LIST_HIGHWAT 100 |
|---|
| 34 | |
|---|
| 35 | /* Get a consistent bool type */ |
|---|
| 36 | #if HAVE_STDBOOL_H |
|---|
| 37 | # include <stdbool.h> |
|---|
| 38 | #else |
|---|
| 39 | typedef enum {false = 0, true = 1} bool; |
|---|
| 40 | #endif |
|---|
| 41 | |
|---|
| 42 | #if HAVE_STDINT_H |
|---|
| 43 | # include <stdint.h> |
|---|
| 44 | #else |
|---|
| 45 | typedef unsigned char uint8_t; |
|---|
| 46 | #endif |
|---|
| 47 | |
|---|
| 48 | /* unistd.h is here */ |
|---|
| 49 | #if HAVE_UNISTD_H |
|---|
| 50 | # include <unistd.h> |
|---|
| 51 | #endif |
|---|
| 52 | |
|---|
| 53 | /** Time relative to server start. Smaller than time_t on 64-bit systems. */ |
|---|
| 54 | typedef unsigned int rel_time_t; |
|---|
| 55 | |
|---|
| 56 | struct stats { |
|---|
| 57 | unsigned int curr_items; |
|---|
| 58 | unsigned int total_items; |
|---|
| 59 | uint64_t curr_bytes; |
|---|
| 60 | unsigned int curr_conns; |
|---|
| 61 | unsigned int total_conns; |
|---|
| 62 | unsigned int conn_structs; |
|---|
| 63 | uint64_t get_cmds; |
|---|
| 64 | uint64_t set_cmds; |
|---|
| 65 | uint64_t get_hits; |
|---|
| 66 | uint64_t get_misses; |
|---|
| 67 | uint64_t evictions; |
|---|
| 68 | time_t started; /* when the process was started */ |
|---|
| 69 | uint64_t bytes_read; |
|---|
| 70 | uint64_t bytes_written; |
|---|
| 71 | }; |
|---|
| 72 | |
|---|
| 73 | #define MAX_VERBOSITY_LEVEL 2 |
|---|
| 74 | |
|---|
| 75 | struct settings { |
|---|
| 76 | size_t maxbytes; |
|---|
| 77 | int maxconns; |
|---|
| 78 | int port; |
|---|
| 79 | int udpport; |
|---|
| 80 | struct in_addr interf; |
|---|
| 81 | int verbose; |
|---|
| 82 | rel_time_t oldest_live; /* ignore existing items older than this */ |
|---|
| 83 | bool managed; /* if 1, a tracker manages virtual buckets */ |
|---|
| 84 | int evict_to_free; |
|---|
| 85 | char *socketpath; /* path to unix socket if using local socket */ |
|---|
| 86 | double factor; /* chunk size growth factor */ |
|---|
| 87 | int chunk_size; |
|---|
| 88 | int num_threads; /* number of libevent threads to run */ |
|---|
| 89 | char prefix_delimiter; /* character that marks a key prefix (for stats) */ |
|---|
| 90 | int detail_enabled; /* nonzero if we're collecting detailed stats */ |
|---|
| 91 | }; |
|---|
| 92 | |
|---|
| 93 | extern struct stats stats; |
|---|
| 94 | extern struct settings settings; |
|---|
| 95 | |
|---|
| 96 | #define ITEM_LINKED 1 |
|---|
| 97 | #define ITEM_DELETED 2 |
|---|
| 98 | |
|---|
| 99 | /* temp */ |
|---|
| 100 | #define ITEM_SLABBED 4 |
|---|
| 101 | |
|---|
| 102 | typedef struct _stritem { |
|---|
| 103 | struct _stritem *next; |
|---|
| 104 | struct _stritem *prev; |
|---|
| 105 | struct _stritem *h_next; /* hash chain next */ |
|---|
| 106 | rel_time_t time; /* least recent access */ |
|---|
| 107 | rel_time_t exptime; /* expire time */ |
|---|
| 108 | int nbytes; /* size of data */ |
|---|
| 109 | unsigned short refcount; |
|---|
| 110 | uint8_t nsuffix; /* length of flags-and-length string */ |
|---|
| 111 | uint8_t it_flags; /* ITEM_* above */ |
|---|
| 112 | uint8_t slabs_clsid;/* which slab class we're in */ |
|---|
| 113 | uint8_t nkey; /* key length, w/terminating null and padding */ |
|---|
| 114 | void * end[]; |
|---|
| 115 | /* then null-terminated key */ |
|---|
| 116 | /* then " flags length\r\n" (no terminating null) */ |
|---|
| 117 | /* then data with terminating \r\n (no terminating null; it's binary!) */ |
|---|
| 118 | } item; |
|---|
| 119 | |
|---|
| 120 | #define ITEM_key(item) ((char*)&((item)->end[0])) |
|---|
| 121 | |
|---|
| 122 | /* warning: don't use these macros with a function, as it evals its arg twice */ |
|---|
| 123 | #define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1) |
|---|
| 124 | #define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix) |
|---|
| 125 | #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + (item)->nsuffix + (item)->nbytes) |
|---|
| 126 | |
|---|
| 127 | enum conn_states { |
|---|
| 128 | conn_listening, /** the socket which listens for connections */ |
|---|
| 129 | conn_read, /** reading in a command line */ |
|---|
| 130 | conn_write, /** writing out a simple response */ |
|---|
| 131 | conn_nread, /** reading in a fixed number of bytes */ |
|---|
| 132 | conn_swallow, /** swallowing unnecessary bytes w/o storing */ |
|---|
| 133 | conn_closing, /** closing this connection */ |
|---|
| 134 | conn_mwrite /** writing out many items sequentially */ |
|---|
| 135 | }; |
|---|
| 136 | |
|---|
| 137 | #define NREAD_ADD 1 |
|---|
| 138 | #define NREAD_SET 2 |
|---|
| 139 | #define NREAD_REPLACE 3 |
|---|
| 140 | |
|---|
| 141 | typedef struct { |
|---|
| 142 | int sfd; |
|---|
| 143 | int state; |
|---|
| 144 | struct event event; |
|---|
| 145 | short ev_flags; |
|---|
| 146 | short which; /** which events were just triggered */ |
|---|
| 147 | |
|---|
| 148 | char *rbuf; /** buffer to read commands into */ |
|---|
| 149 | char *rcurr; /** but if we parsed some already, this is where we stopped */ |
|---|
| 150 | int rsize; /** total allocated size of rbuf */ |
|---|
| 151 | int rbytes; /** how much data, starting from rcur, do we have unparsed */ |
|---|
| 152 | |
|---|
| 153 | char *wbuf; |
|---|
| 154 | char *wcurr; |
|---|
| 155 | int wsize; |
|---|
| 156 | int wbytes; |
|---|
| 157 | int write_and_go; /** which state to go into after finishing current write */ |
|---|
| 158 | void *write_and_free; /** free this memory after finishing writing */ |
|---|
| 159 | |
|---|
| 160 | char *ritem; /** when we read in an item's value, it goes here */ |
|---|
| 161 | int rlbytes; |
|---|
| 162 | |
|---|
| 163 | /* data for the nread state */ |
|---|
| 164 | |
|---|
| 165 | /** |
|---|
| 166 | * item is used to hold an item structure created after reading the command |
|---|
| 167 | * line of set/add/replace commands, but before we finished reading the actual |
|---|
| 168 | * data. The data is read into ITEM_data(item) to avoid extra copying. |
|---|
| 169 | */ |
|---|
| 170 | |
|---|
| 171 | void *item; /* for commands set/add/replace */ |
|---|
| 172 | int item_comm; /* which one is it: set/add/replace */ |
|---|
| 173 | |
|---|
| 174 | /* data for the swallow state */ |
|---|
| 175 | int sbytes; /* how many bytes to swallow */ |
|---|
| 176 | |
|---|
| 177 | /* data for the mwrite state */ |
|---|
| 178 | struct iovec *iov; |
|---|
| 179 | int iovsize; /* number of elements allocated in iov[] */ |
|---|
| 180 | int iovused; /* number of elements used in iov[] */ |
|---|
| 181 | |
|---|
| 182 | struct msghdr *msglist; |
|---|
| 183 | int msgsize; /* number of elements allocated in msglist[] */ |
|---|
| 184 | int msgused; /* number of elements used in msglist[] */ |
|---|
| 185 | int msgcurr; /* element in msglist[] being transmitted now */ |
|---|
| 186 | int msgbytes; /* number of bytes in current msg */ |
|---|
| 187 | |
|---|
| 188 | item **ilist; /* list of items to write out */ |
|---|
| 189 | int isize; |
|---|
| 190 | item **icurr; |
|---|
| 191 | int ileft; |
|---|
| 192 | |
|---|
| 193 | /* data for UDP clients */ |
|---|
| 194 | bool udp; /* is this is a UDP "connection" */ |
|---|
| 195 | int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ |
|---|
| 196 | struct sockaddr request_addr; /* Who sent the most recent request */ |
|---|
| 197 | socklen_t request_addr_size; |
|---|
| 198 | unsigned char *hdrbuf; /* udp packet headers */ |
|---|
| 199 | int hdrsize; /* number of headers' worth of space is allocated */ |
|---|
| 200 | |
|---|
| 201 | int binary; /* are we in binary mode */ |
|---|
| 202 | int bucket; /* bucket number for the next command, if running as |
|---|
| 203 | a managed instance. -1 (_not_ 0) means invalid. */ |
|---|
| 204 | int gen; /* generation requested for the bucket */ |
|---|
| 205 | } conn; |
|---|
| 206 | |
|---|
| 207 | /* number of virtual buckets for a managed instance */ |
|---|
| 208 | #define MAX_BUCKETS 32768 |
|---|
| 209 | |
|---|
| 210 | /* current time of day (updated periodically) */ |
|---|
| 211 | extern volatile rel_time_t current_time; |
|---|
| 212 | |
|---|
| 213 | /* temporary hack */ |
|---|
| 214 | /* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); } |
|---|
| 215 | void pre_gdb (); */ |
|---|
| 216 | |
|---|
| 217 | /* |
|---|
| 218 | * Functions |
|---|
| 219 | */ |
|---|
| 220 | |
|---|
| 221 | conn *do_conn_from_freelist(); |
|---|
| 222 | bool do_conn_add_to_freelist(conn *c); |
|---|
| 223 | char *do_defer_delete(item *item, time_t exptime); |
|---|
| 224 | void do_run_deferred_deletes(void); |
|---|
| 225 | char *do_add_delta(item *item, int incr, const unsigned int delta, char *buf); |
|---|
| 226 | int do_store_item(item *item, int comm); |
|---|
| 227 | conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base); |
|---|
| 228 | |
|---|
| 229 | |
|---|
| 230 | #include "stats.h" |
|---|
| 231 | #include "slabs.h" |
|---|
| 232 | #include "assoc.h" |
|---|
| 233 | #include "items.h" |
|---|
| 234 | |
|---|
| 235 | |
|---|
| 236 | /* |
|---|
| 237 | * In multithreaded mode, we wrap certain functions with lock management and |
|---|
| 238 | * replace the logic of some other functions. All wrapped functions have |
|---|
| 239 | * "mt_" and "do_" variants. In multithreaded mode, the plain version of a |
|---|
| 240 | * function is #define-d to the "mt_" variant, which often just grabs a |
|---|
| 241 | * lock and calls the "do_" function. In singlethreaded mode, the "do_" |
|---|
| 242 | * function is called directly. |
|---|
| 243 | * |
|---|
| 244 | * Functions such as the libevent-related calls that need to do cross-thread |
|---|
| 245 | * communication in multithreaded mode (rather than actually doing the work |
|---|
| 246 | * in the current thread) are called via "dispatch_" frontends, which are |
|---|
| 247 | * also #define-d to directly call the underlying code in singlethreaded mode. |
|---|
| 248 | */ |
|---|
| 249 | #ifdef USE_THREADS |
|---|
| 250 | |
|---|
| 251 | void thread_init(int nthreads, struct event_base *main_base); |
|---|
| 252 | int dispatch_event_add(int thread, conn *c); |
|---|
| 253 | void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp); |
|---|
| 254 | |
|---|
| 255 | /* Lock wrappers for cache functions that are called from main loop. */ |
|---|
| 256 | char *mt_add_delta(item *item, const int incr, const unsigned int delta, char *buf); |
|---|
| 257 | void mt_assoc_move_next_bucket(void); |
|---|
| 258 | conn *mt_conn_from_freelist(void); |
|---|
| 259 | bool mt_conn_add_to_freelist(conn *c); |
|---|
| 260 | char *mt_defer_delete(item *it, time_t exptime); |
|---|
| 261 | int mt_is_listen_thread(void); |
|---|
| 262 | item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); |
|---|
| 263 | char *mt_item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes); |
|---|
| 264 | void mt_item_flush_expired(void); |
|---|
| 265 | item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked); |
|---|
| 266 | int mt_item_link(item *it); |
|---|
| 267 | void mt_item_remove(item *it); |
|---|
| 268 | int mt_item_replace(item *it, item *new_it); |
|---|
| 269 | char *mt_item_stats(int *bytes); |
|---|
| 270 | char *mt_item_stats_sizes(int *bytes); |
|---|
| 271 | void mt_item_unlink(item *it); |
|---|
| 272 | void mt_item_update(item *it); |
|---|
| 273 | void mt_run_deferred_deletes(void); |
|---|
| 274 | void *mt_slabs_alloc(size_t size); |
|---|
| 275 | void mt_slabs_free(void *ptr, size_t size); |
|---|
| 276 | int mt_slabs_reassign(unsigned char srcid, unsigned char dstid); |
|---|
| 277 | char *mt_slabs_stats(int *buflen); |
|---|
| 278 | void mt_stats_lock(void); |
|---|
| 279 | void mt_stats_unlock(void); |
|---|
| 280 | int mt_store_item(item *item, int comm); |
|---|
| 281 | |
|---|
| 282 | |
|---|
| 283 | # define add_delta(x,y,z,a) mt_add_delta(x,y,z,a) |
|---|
| 284 | # define assoc_move_next_bucket() mt_assoc_move_next_bucket() |
|---|
| 285 | # define conn_from_freelist() mt_conn_from_freelist() |
|---|
| 286 | # define conn_add_to_freelist(x) mt_conn_add_to_freelist(x) |
|---|
| 287 | # define defer_delete(x,y) mt_defer_delete(x,y) |
|---|
| 288 | # define is_listen_thread() mt_is_listen_thread() |
|---|
| 289 | # define item_alloc(x,y,z,a,b) mt_item_alloc(x,y,z,a,b) |
|---|
| 290 | # define item_cachedump(x,y,z) mt_item_cachedump(x,y,z) |
|---|
| 291 | # define item_flush_expired() mt_item_flush_expired() |
|---|
| 292 | # define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z) |
|---|
| 293 | # define item_link(x) mt_item_link(x) |
|---|
| 294 | # define item_remove(x) mt_item_remove(x) |
|---|
| 295 | # define item_replace(x,y) mt_item_replace(x,y) |
|---|
| 296 | # define item_stats(x) mt_item_stats(x) |
|---|
| 297 | # define item_stats_sizes(x) mt_item_stats_sizes(x) |
|---|
| 298 | # define item_update(x) mt_item_update(x) |
|---|
| 299 | # define item_unlink(x) mt_item_unlink(x) |
|---|
| 300 | # define run_deferred_deletes() mt_run_deferred_deletes() |
|---|
| 301 | # define slabs_alloc(x) mt_slabs_alloc(x) |
|---|
| 302 | # define slabs_free(x,y) mt_slabs_free(x,y) |
|---|
| 303 | # define slabs_reassign(x,y) mt_slabs_reassign(x,y) |
|---|
| 304 | # define slabs_stats(x) mt_slabs_stats(x) |
|---|
| 305 | # define store_item(x,y) mt_store_item(x,y) |
|---|
| 306 | |
|---|
| 307 | # define STATS_LOCK() mt_stats_lock() |
|---|
| 308 | # define STATS_UNLOCK() mt_stats_unlock() |
|---|
| 309 | |
|---|
| 310 | #else /* !USE_THREADS */ |
|---|
| 311 | |
|---|
| 312 | # define add_delta(x,y,z,a) do_add_delta(x,y,z,a) |
|---|
| 313 | # define assoc_move_next_bucket() do_assoc_move_next_bucket() |
|---|
| 314 | # define conn_from_freelist() do_conn_from_freelist() |
|---|
| 315 | # define conn_add_to_freelist(x) do_conn_add_to_freelist(x) |
|---|
| 316 | # define defer_delete(x,y) do_defer_delete(x,y) |
|---|
| 317 | # define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base) |
|---|
| 318 | # define dispatch_event_add(t,c) event_add(&(c)->event, 0) |
|---|
| 319 | # define is_listen_thread() 1 |
|---|
| 320 | # define item_alloc(x,y,z,a,b) do_item_alloc(x,y,z,a,b) |
|---|
| 321 | # define item_cachedump(x,y,z) do_item_cachedump(x,y,z) |
|---|
| 322 | # define item_flush_expired() do_item_flush_expired() |
|---|
| 323 | # define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z) |
|---|
| 324 | # define item_link(x) do_item_link(x) |
|---|
| 325 | # define item_remove(x) do_item_remove(x) |
|---|
| 326 | # define item_replace(x,y) do_item_replace(x,y) |
|---|
| 327 | # define item_stats(x) do_item_stats(x) |
|---|
| 328 | # define item_stats_sizes(x) do_item_stats_sizes(x) |
|---|
| 329 | # define item_unlink(x) do_item_unlink(x) |
|---|
| 330 | # define item_update(x) do_item_update(x) |
|---|
| 331 | # define run_deferred_deletes() do_run_deferred_deletes() |
|---|
| 332 | # define slabs_alloc(x) do_slabs_alloc(x) |
|---|
| 333 | # define slabs_free(x,y) do_slabs_free(x,y) |
|---|
| 334 | # define slabs_reassign(x,y) do_slabs_reassign(x,y) |
|---|
| 335 | # define slabs_stats(x) do_slabs_stats(x) |
|---|
| 336 | # define store_item(x,y) do_store_item(x,y) |
|---|
| 337 | # define thread_init(x,y) 0 |
|---|
| 338 | |
|---|
| 339 | # define STATS_LOCK() /**/ |
|---|
| 340 | # define STATS_UNLOCK() /**/ |
|---|
| 341 | |
|---|
| 342 | #endif /* !USE_THREADS */ |
|---|
| 343 | |
|---|
| 344 | |
|---|