Changeset 672
- Timestamp:
- 12/11/07 03:28:34 (1 year ago)
- Files:
-
- branches/binary/server/memcached.c (modified) (18 diffs)
- branches/binary/server/memcached.h (modified) (4 diffs)
- branches/binary/server/thread.c (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/binary/server/memcached.c
r667 r672 64 64 */ 65 65 static void drive_machine(conn *c); 66 static int new_socket(const bool is_udp);67 static int server_socket(const int port, const bool is_udp);66 static int new_socket(const int prot); 67 static int server_socket(const int port, const int prot); 68 68 static int try_read_command(conn *c); 69 69 static int try_read_network(conn *c); … … 229 229 c->msgused++; 230 230 231 if ( c->udp) {231 if (IS_UDP(c->protocol)) { 232 232 /* Leave room for the UDP header, which we'll fill in later. */ 233 233 return add_iov(c, NULL, UDP_HEADER_SIZE); … … 294 294 295 295 conn *conn_new(const int sfd, const int init_state, const int event_flags, 296 const int read_buffer_size, const bool is_udp, struct event_base *base) {296 const int read_buffer_size, const int prot, struct event_base *base) { 297 297 conn *c = conn_from_freelist(); 298 298 … … 345 345 if (init_state == conn_listening) 346 346 fprintf(stderr, "<%d server listening\n", sfd); 347 else if ( is_udp)347 else if (IS_UDP(prot)) 348 348 fprintf(stderr, "<%d server listening (udp)\n", sfd); 349 349 else … … 352 352 353 353 c->sfd = sfd; 354 c-> udp = is_udp;354 c->protocol = prot; 355 355 c->state = init_state; 356 356 c->rlbytes = 0; … … 480 480 assert(c != NULL); 481 481 482 if ( c->udp)482 if (IS_UDP(c->protocol)) 483 483 return; 484 484 … … 652 652 * UDP_MAX_PAYLOAD_SIZE bytes. 653 653 */ 654 limit_to_mtu = c->udp|| (1 == c->msgused);654 limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused); 655 655 656 656 /* We may need to start a new msghdr if this one is full. */ … … 1319 1319 */ 1320 1320 if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0 1321 || ( c->udp&& build_udp_headers(c) != 0)) {1321 || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 1322 1322 out_string(c, "SERVER_ERROR out of memory"); 1323 1323 } … … 2033 2033 perror("Failed to write, and not due to blocking"); 2034 2034 2035 if ( c->udp)2035 if (IS_UDP(c->protocol)) 2036 2036 conn_set_state(c, conn_read); 2037 2037 else … … 2079 2079 } 2080 2080 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, 2081 DATA_BUFFER_SIZE, false);2081 DATA_BUFFER_SIZE, ascii_prot); 2082 2082 break; 2083 2083 … … 2086 2086 continue; 2087 2087 } 2088 if (( c->udp? try_read_udp(c) : try_read_network(c)) != 0) {2088 if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) { 2089 2089 continue; 2090 2090 } … … 2197 2197 * list for TCP or a two-entry list for UDP). 2198 2198 */ 2199 if (c->iovused == 0 || ( c->udp&& c->iovused == 1)) {2199 if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) { 2200 2200 if (add_iov(c, c->wcurr, c->wbytes) != 0 || 2201 ( c->udp&& build_udp_headers(c) != 0)) {2201 (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 2202 2202 if (settings.verbose > 0) 2203 2203 fprintf(stderr, "Couldn't build response\n"); … … 2254 2254 2255 2255 case conn_closing: 2256 if ( c->udp)2256 if (IS_UDP(c->protocol)) 2257 2257 conn_cleanup(c); 2258 2258 else … … 2288 2288 } 2289 2289 2290 static int new_socket(const bool is_udp) {2290 static int new_socket(const int prot) { 2291 2291 int sfd; 2292 2292 int flags; 2293 2293 2294 if ((sfd = socket(AF_INET, is_udp? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {2294 if ((sfd = socket(AF_INET, IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { 2295 2295 perror("socket()"); 2296 2296 return -1; … … 2342 2342 2343 2343 2344 static int server_socket(const int port, const bool is_udp) {2344 static int server_socket(const int port, const int prot) { 2345 2345 int sfd; 2346 2346 struct linger ling = {0, 0}; … … 2348 2348 int flags =1; 2349 2349 2350 if ((sfd = new_socket( is_udp)) == -1) {2350 if ((sfd = new_socket(prot)) == -1) { 2351 2351 return -1; 2352 2352 } 2353 2353 2354 2354 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); 2355 if ( is_udp) {2355 if (IS_UDP(prot)) { 2356 2356 maximize_sndbuf(sfd); 2357 2357 } else { … … 2375 2375 return -1; 2376 2376 } 2377 if (! is_udp&& listen(sfd, 1024) == -1) {2377 if (!IS_UDP(prot) && listen(sfd, 1024) == -1) { 2378 2378 perror("listen()"); 2379 2379 close(sfd); … … 2967 2967 /* this is guaranteed to hit all threads because we round-robin */ 2968 2968 dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, 2969 UDP_READ_BUFFER_SIZE, 1);2969 UDP_READ_BUFFER_SIZE, ascii_udp_prot); 2970 2970 } 2971 2971 } branches/binary/server/memcached.h
r651 r672 143 143 }; 144 144 145 enum protocols { 146 ascii_prot = 3, /* arbitrary value. */ 147 ascii_udp_prot 148 }; 149 150 #define IS_UDP(x) (x == ascii_udp_prot) 151 145 152 #define NREAD_ADD 1 146 153 #define NREAD_SET 2 … … 207 214 int suffixleft; 208 215 216 int protocol; /* which protocol this connection speaks */ 217 209 218 /* data for UDP clients */ 210 bool udp; /* is this is a UDP "connection" */211 219 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ 212 220 struct sockaddr request_addr; /* Who sent the most recent request */ … … 243 251 char *do_add_delta(item *item, const bool incr, const int64_t delta, char *buf); 244 252 int do_store_item(item *item, int comm); 245 conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base);253 conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const int prot, struct event_base *base); 246 254 247 255 … … 269 277 void thread_init(int nthreads, struct event_base *main_base); 270 278 int dispatch_event_add(int thread, conn *c); 271 void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);279 void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int prot); 272 280 273 281 /* Lock wrappers for cache functions that are called from main loop. */ branches/binary/server/thread.c
r653 r672 32 32 int event_flags; 33 33 int read_buffer_size; 34 int is_udp;34 int protocol; 35 35 CQ_ITEM *next; 36 36 }; … … 343 343 if (NULL != item) { 344 344 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, 345 item->read_buffer_size, item-> is_udp, me->base);345 item->read_buffer_size, item->protocol, me->base); 346 346 if (c == NULL) { 347 if ( item->is_udp) {347 if (IS_UDP(item->protocol)) { 348 348 fprintf(stderr, "Can't listen for events on UDP socket\n"); 349 349 exit(1); … … 369 369 */ 370 370 void dispatch_conn_new(int sfd, int init_state, int event_flags, 371 int read_buffer_size, int is_udp) {371 int read_buffer_size, int prot) { 372 372 CQ_ITEM *item = cqi_new(); 373 373 int thread = (last_thread + 1) % settings.num_threads; … … 379 379 item->event_flags = event_flags; 380 380 item->read_buffer_size = read_buffer_size; 381 item-> is_udp = is_udp;381 item->protocol = prot; 382 382 383 383 cq_push(&threads[thread].new_conn_queue, item);
