Changeset 775
- Timestamp:
- 04/02/08 17:41:43 (19 months ago)
- Location:
- branches/binary/server
- Files:
-
- 2 modified
-
memcached.c (modified) (28 diffs)
-
memcached.h (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/binary/server/memcached.c
r771 r775 299 299 } 300 300 301 static c har *prot_text(enum protocol prot) {301 static const char *prot_text(enum protocol prot) { 302 302 char *rv = "unknown"; 303 303 switch(prot) { … … 384 384 } else if (IS_UDP(prot)) { 385 385 fprintf(stderr, "<%d server listening (udp)\n", sfd); 386 } else if (prot == binary_prot) {387 fprintf(stderr, "<%d new binary client connection\n", sfd);388 } else if (prot == ascii_prot) {389 fprintf(stderr, "<%d new ascii client connection\n", sfd);390 386 } else if (prot == negotiating_prot) { 391 fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd); 387 fprintf(stderr, "<%d new auto-negotiating client connection\n", 388 sfd); 392 389 } else { 393 390 fprintf(stderr, "<%d new unknown (%d) client connection\n", … … 518 515 } 519 516 520 static enum conn_states get_init_state(conn *c) {521 int rv = 0;522 assert(c != NULL);523 524 switch(c->protocol) {525 case binary_prot:526 rv = conn_bin_init;527 break;528 case negotiating_prot:529 rv = conn_negotiate;530 break;531 default:532 rv = conn_read;533 }534 return rv;535 }536 537 /* Set the given connection to its initial state. The initial state will vary538 * base don protocol type. */539 static void conn_set_init_state(conn *c) {540 assert(c != NULL);541 542 conn_set_state(c, get_init_state(c));543 }544 545 517 /* 546 518 * Shrinks a connection's buffers if they're too big. This prevents … … 601 573 } 602 574 575 /** 576 * Convert a state name to a human readable form. 577 */ 578 static const char *state_text(enum conn_states state) { 579 const char* const statenames[] = { "conn_listening", 580 "conn_new_cmd", 581 "conn_waiting", 582 "conn_read", 583 "conn_parse_cmd", 584 "conn_write", 585 "conn_nread", 586 "conn_swallow", 587 "conn_closing", 588 "conn_mwrite" }; 589 return statenames[state]; 590 } 591 603 592 /* 604 593 * Sets a connection's current state in the state machine. Any special … … 608 597 static void conn_set_state(conn *c, enum conn_states state) { 609 598 assert(c != NULL); 599 assert(state >= conn_listening && state < conn_max_state); 610 600 611 601 if (state != c->state) { 612 if (state == conn_read) { 613 conn_shrink(c); 614 assoc_move_next_bucket(); 615 } 602 if (settings.verbose > 2) { 603 fprintf(stderr, "%d: going from %s to %s\n", 604 c->sfd, state_text(c->state), 605 state_text(state)); 606 } 607 616 608 c->state = state; 617 609 } … … 811 803 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); 812 804 c->noreply = false; 813 conn_set_state(c, conn_ read);805 conn_set_state(c, conn_new_cmd); 814 806 return; 815 807 } … … 831 823 832 824 conn_set_state(c, conn_write); 833 c->write_and_go = get_init_state(c);825 c->write_and_go = conn_new_cmd; 834 826 return; 835 827 } … … 909 901 910 902 static void write_bin_error(conn *c, int err, int swallow) { 911 c har *errstr = "Unknown error";903 const char *errstr = "Unknown error"; 912 904 switch(err) { 905 case ERR_OUT_OF_MEMORY: 906 errstr = "Out of memory"; 907 break; 913 908 case ERR_UNKNOWN_CMD: 914 909 errstr = "Unknown command"; … … 944 939 c->write_and_go = conn_swallow; 945 940 } else { 946 c->write_and_go = conn_ bin_init;941 c->write_and_go = conn_new_cmd; 947 942 } 948 943 } … … 955 950 } 956 951 conn_set_state(c, conn_mwrite); 957 c->write_and_go = conn_ bin_init;952 c->write_and_go = conn_new_cmd; 958 953 } 959 954 … … 1113 1108 } else { 1114 1109 if(c->cmd == CMD_GETQ) { 1115 conn_set_state(c, conn_ bin_init);1110 conn_set_state(c, conn_new_cmd); 1116 1111 } else { 1117 1112 write_bin_error(c, ERR_NOT_FOUND, 0); … … 1279 1274 static void complete_nread_binary(conn *c) { 1280 1275 assert(c != NULL); 1281 1282 if(c->cmd < 0) { 1283 /* No command defined. Figure out what they're trying to say. */ 1284 int i = 0; 1285 /* I did a bit of hard-coding around the packet sizes */ 1286 assert(BIN_PKT_HDR_WORDS == 4); 1287 for(i = 0; i<BIN_PKT_HDR_WORDS; i++) { 1288 c->bin_header[i] = ntohl(c->bin_header[i]); 1289 } 1290 if(settings.verbose) { 1291 fprintf(stderr, "Read binary protocol data: %08x %08x %08x %08x\n", 1292 c->bin_header[0], c->bin_header[1], c->bin_header[2], 1293 c->bin_header[3]); 1294 } 1295 if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) { 1296 if(settings.verbose) { 1297 fprintf(stderr, "Invalid magic: %x\n", c->bin_header[0] >> 24); 1298 } 1299 conn_set_state(c, conn_closing); 1300 return; 1301 } 1302 1303 c->msgcurr = 0; 1304 c->msgused = 0; 1305 c->iovused = 0; 1306 if (add_msghdr(c) != 0) { 1307 out_string(c, "SERVER_ERROR out of memory"); 1308 return; 1309 } 1310 1311 c->cmd = (c->bin_header[0] >> 16) & 0xff; 1312 c->keylen = c->bin_header[0] & 0xffff; 1313 c->opaque = c->bin_header[3]; 1314 if(settings.verbose > 1) { 1315 fprintf(stderr, 1316 "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd, 1317 c->opaque, c->keylen, c->bin_header[2]); 1318 } 1319 dispatch_bin_command(c); 1320 } else { 1321 switch(c->substate) { 1322 case bin_reading_set_header: 1323 process_bin_update(c); 1324 break; 1325 case bin_read_set_value: 1326 complete_update_bin(c); 1327 break; 1328 case bin_reading_get_key: 1329 process_bin_get(c); 1330 break; 1331 case bin_reading_del_header: 1332 process_bin_delete(c); 1333 break; 1334 case bin_reading_incr_header: 1335 complete_incr_bin(c); 1336 break; 1337 default: 1338 fprintf(stderr, "Not handling substate %d\n", c->substate); 1339 assert(0); 1340 } 1341 } 1342 } 1343 1344 static void reinit_bin_connection(conn *c) { 1345 if (settings.verbose > 1) 1346 fprintf(stderr, "*** Reinitializing binary connection.\n"); 1347 c->rlbytes = MIN_BIN_PKT_LENGTH; 1348 c->write_and_go = conn_bin_init; 1276 assert(c->cmd >= 0); 1277 1278 switch(c->substate) { 1279 case bin_reading_set_header: 1280 process_bin_update(c); 1281 break; 1282 case bin_read_set_value: 1283 complete_update_bin(c); 1284 break; 1285 case bin_reading_get_key: 1286 process_bin_get(c); 1287 break; 1288 case bin_reading_del_header: 1289 process_bin_delete(c); 1290 break; 1291 case bin_reading_incr_header: 1292 complete_incr_bin(c); 1293 break; 1294 default: 1295 fprintf(stderr, "Not handling substate %d\n", c->substate); 1296 assert(0); 1297 } 1298 } 1299 1300 static void reset_cmd_handler(conn *c) { 1349 1301 c->cmd = -1; 1350 1302 c->substate = bin_no_state; 1351 c->rbytes = c->wbytes = 0;1352 c->ritem = (char*)c->bin_header;1353 c->rcurr = c->rbuf;1354 c->wcurr = c->wbuf;1355 1303 conn_shrink(c); 1356 conn_set_state(c, conn_nread); 1357 } 1358 1359 /* These do the initial protocol switch. At this point, we should've read 1360 * exactly one byte, and must treat that byte as the beginning of a command. */ 1361 static void setup_bin_protocol(conn *c) { 1362 char *loc = (char*)c->bin_header; 1363 if (settings.verbose > 1) 1364 fprintf(stderr, "Negotiated protocol as binary.\n"); 1365 1366 c->protocol = binary_prot; 1367 reinit_bin_connection(c); 1368 /* Emulate a read of the first byte */ 1369 c->ritem[0] = c->rbuf[0]; 1370 c->ritem++; 1371 c->rlbytes--; 1372 } 1373 1374 static void setup_ascii_protocol(conn *c) { 1375 if (settings.verbose > 1) 1376 fprintf(stderr, "Negotiated protocol as ascii.\n"); 1377 c->protocol = ascii_prot; 1378 1379 /* We've already got the first letter of the command, so pretend like we 1380 * Did a single byte read from try_read_command */ 1381 c->rcurr = c->rbuf; 1382 c->rbytes = 1; 1383 conn_set_state(c, conn_read); 1304 if (c->rbytes > 0) { 1305 conn_set_state(c, conn_parse_cmd); 1306 } else { 1307 conn_set_state(c, conn_waiting); 1308 } 1384 1309 } 1385 1310 1386 1311 static void complete_nread(conn *c) { 1387 1312 assert(c != NULL); 1313 assert(c->protocol == ascii_prot || c->protocol == binary_prot); 1388 1314 1389 1315 if(c->protocol == ascii_prot) { 1390 1316 complete_nread_ascii(c); 1391 } else if (c->protocol == binary_prot) {1317 } else if (c->protocol == binary_prot) { 1392 1318 complete_nread_binary(c); 1393 } else if(c->protocol == negotiating_prot) {1394 /* The first byte is either BIN_REQ_MAGIC, or we're speaking ascii */1395 if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC)1396 setup_bin_protocol(c);1397 else1398 setup_ascii_protocol(c);1399 } else {1400 assert(0); /* XXX: Invalid case. Should probably do more here. */1401 1319 } 1402 1320 } … … 1579 1497 c->wbytes = bytes; 1580 1498 conn_set_state(c, conn_write); 1581 c->write_and_go = get_init_state(c);1499 c->write_and_go = conn_new_cmd; 1582 1500 } else { 1583 1501 out_string(c, "SERVER_ERROR out of memory writing stats"); … … 1866 1784 */ 1867 1785 1868 if (return_cas == true)1786 if (return_cas) 1869 1787 { 1870 1788 /* Goofy mid-flight realloc. */ … … 2362 2280 c->gen = gen; 2363 2281 } 2364 conn_set_ init_state(c);2282 conn_set_state(c, conn_new_cmd); 2365 2283 return; 2366 2284 } else { … … 2456 2374 */ 2457 2375 static int try_read_command(conn *c) { 2458 char *el, *cont;2459 2460 2376 assert(c != NULL); 2461 2377 assert(c->rcurr <= (c->rbuf + c->rsize)); 2462 2463 if (c->rbytes == 0) 2464 return 0; 2465 el = memchr(c->rcurr, '\n', c->rbytes); 2466 if (!el) 2467 return 0; 2468 cont = el + 1; 2469 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { 2470 el--; 2471 } 2472 *el = '\0'; 2473 2474 assert(cont <= (c->rcurr + c->rbytes)); 2475 2476 process_command(c, c->rcurr); 2477 2478 c->rbytes -= (cont - c->rcurr); 2479 c->rcurr = cont; 2480 2481 assert(c->rcurr <= (c->rbuf + c->rsize)); 2378 assert(c->rbytes > 0); 2379 2380 if (c->protocol == negotiating_prot) { 2381 if ((c->rbuf[0] & 0xff) == BIN_REQ_MAGIC) { 2382 c->protocol = binary_prot; 2383 } else { 2384 c->protocol = ascii_prot; 2385 } 2386 2387 if (settings.verbose) { 2388 fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd, 2389 prot_text(c->protocol)); 2390 } 2391 } 2392 2393 if (c->protocol == binary_prot) { 2394 /* Do we have the complete packet header? */ 2395 if (c->rbytes < MIN_BIN_PKT_LENGTH) { 2396 /* need more data! */ 2397 return 0; 2398 } else { 2399 int i = 0; 2400 memcpy(c->bin_header, c->rcurr, sizeof(c->bin_header)); 2401 assert(BIN_PKT_HDR_WORDS == 4); 2402 for (i = 0; i<BIN_PKT_HDR_WORDS; i++) { 2403 c->bin_header[i] = ntohl(c->bin_header[i]); 2404 } 2405 2406 if (settings.verbose) { 2407 fprintf(stderr, 2408 "Read binary protocol data: %08x %08x %08x %08x\n", 2409 c->bin_header[0], c->bin_header[1], c->bin_header[2], 2410 c->bin_header[3]); 2411 } 2412 2413 if ((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) { 2414 if (settings.verbose) { 2415 fprintf(stderr, "Invalid magic: %x\n", 2416 c->bin_header[0] >> 24); 2417 } 2418 conn_set_state(c, conn_closing); 2419 return 0; 2420 } 2421 2422 c->msgcurr = 0; 2423 c->msgused = 0; 2424 c->iovused = 0; 2425 if (add_msghdr(c) != 0) { 2426 out_string(c, "SERVER_ERROR out of memory"); 2427 return 0; 2428 } 2429 2430 c->cmd = (c->bin_header[0] >> 16) & 0xff; 2431 c->keylen = c->bin_header[0] & 0xffff; 2432 c->opaque = c->bin_header[3]; 2433 if (settings.verbose > 1) { 2434 fprintf(stderr, 2435 "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", 2436 c->cmd, c->opaque, c->keylen, c->bin_header[2]); 2437 } 2438 2439 dispatch_bin_command(c); 2440 2441 c->rbytes -= MIN_BIN_PKT_LENGTH; 2442 c->rcurr += MIN_BIN_PKT_LENGTH; 2443 } 2444 } else { 2445 char *el, *cont; 2446 2447 if (c->rbytes == 0) 2448 return 0; 2449 el = memchr(c->rcurr, '\n', c->rbytes); 2450 if (!el) 2451 return 0; 2452 cont = el + 1; 2453 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { 2454 el--; 2455 } 2456 *el = '\0'; 2457 2458 assert(cont <= (c->rcurr + c->rbytes)); 2459 2460 process_command(c, c->rcurr); 2461 2462 c->rbytes -= (cont - c->rcurr); 2463 c->rcurr = cont; 2464 2465 assert(c->rcurr <= (c->rbuf + c->rsize)); 2466 } 2482 2467 2483 2468 return 1; … … 2527 2512 * before reading, move the remaining incomplete fragment of a command 2528 2513 * (if any) to the beginning of the buffer. 2529 * return 0 if there's nothing to read on the first read. 2514 * @return 1 data received 2515 * 0 no data received 2516 * -1 an error occured (on the socket) (or client closed connection) 2517 * -2 memory error (failed to allocate more memory) 2530 2518 */ 2531 2519 static int try_read_network(conn *c) { … … 2550 2538 out_string(c, "SERVER_ERROR out of memory reading request"); 2551 2539 c->write_and_go = conn_closing; 2552 return 1;2540 return -2; 2553 2541 } 2554 2542 c->rcurr = c->rbuf = new_rbuf; … … 2571 2559 } 2572 2560 if (res == 0) { 2573 /* connection closed */ 2574 conn_set_state(c, conn_closing); 2575 return 1; 2561 return -1; 2576 2562 } 2577 2563 if (res == -1) { 2578 if (errno == EAGAIN || errno == EWOULDBLOCK) break;2579 /* Should close on unhandled errors. */2580 conn_set_state(c, conn_closing);2581 return 1;2564 if (errno == EAGAIN || errno == EWOULDBLOCK) { 2565 break; 2566 } 2567 return -1; 2582 2568 } 2583 2569 } … … 2694 2680 bool stop = false; 2695 2681 int sfd, flags = 1; 2696 enum conn_states init_state; /* initial state for a new connection */2697 2682 socklen_t addrlen; 2698 2683 struct sockaddr_storage addr; … … 2727 2712 break; 2728 2713 } 2729 init_state = get_init_state(c); 2730 2731 dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST, 2714 2715 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, 2732 2716 DATA_BUFFER_SIZE, c->protocol); 2733 2734 break; 2735 2736 case conn_negotiate: 2737 if (settings.verbose > 1) 2738 fprintf(stderr, "Negotiating protocol for a new connection\n"); 2739 c->rlbytes = 1; 2740 c->ritem = c->rbuf; 2741 c->rcurr = c->rbuf; 2742 c->wcurr = c->wbuf; 2743 conn_set_state(c, conn_nread); 2744 break; 2745 2746 case conn_read: 2747 if (try_read_command(c) != 0) { 2748 continue; 2749 } 2750 if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) { 2751 continue; 2752 } 2753 /* we have no command line and no data to read from network */ 2717 stop = true; 2718 break; 2719 2720 case conn_waiting: 2754 2721 if (!update_event(c, EV_READ | EV_PERSIST)) { 2755 2722 if (settings.verbose > 0) … … 2758 2725 break; 2759 2726 } 2727 2728 conn_set_state(c, conn_read); 2760 2729 stop = true; 2761 2730 break; 2762 2731 2763 case conn_bin_init: /* Reinitialize a binary connection */ 2764 reinit_bin_connection(c); 2732 case conn_read: 2733 res = IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c); 2734 switch (res) { 2735 case 0 : 2736 conn_set_state(c, conn_waiting); 2737 break; 2738 case 1: 2739 conn_set_state(c, conn_parse_cmd); 2740 break; 2741 case -1: 2742 conn_set_state(c, conn_closing); 2743 break; 2744 case -2: /* Failed to allocate more memory */ 2745 /* State already set by try_read_network */ 2746 break; 2747 } 2748 break; 2749 2750 case conn_parse_cmd : 2751 if (try_read_command(c) == 0) { 2752 /* wee need more data! */ 2753 conn_set_state(c, conn_waiting); 2754 } 2755 2756 break; 2757 2758 case conn_new_cmd: 2759 reset_cmd_handler(c); 2760 assoc_move_next_bucket(); 2765 2761 break; 2766 2762 … … 2773 2769 if (c->rbytes > 0) { 2774 2770 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; 2775 mem cpy(c->ritem, c->rcurr, tocopy);2771 memmove(c->ritem, c->rcurr, tocopy); 2776 2772 c->ritem += tocopy; 2777 2773 c->rlbytes -= tocopy; 2778 2774 c->rcurr += tocopy; 2779 2775 c->rbytes -= tocopy; 2780 break; 2776 if (c->rlbytes == 0) { 2777 break; 2778 } 2781 2779 } 2782 2780 … … 2814 2812 /* we are reading sbytes and throwing them away */ 2815 2813 if (c->sbytes == 0) { 2816 conn_set_ init_state(c);2814 conn_set_state(c, conn_new_cmd); 2817 2815 break; 2818 2816 } … … 2898 2896 conn_set_state(c, c->write_and_go); 2899 2897 } else { 2900 conn_set_ init_state(c);2898 conn_set_state(c, conn_new_cmd); 2901 2899 } 2902 2900 } else if (c->state == conn_write) { … … 3286 3284 "-v verbose (print errors/warnings while in event loop)\n" 3287 3285 "-vv very verbose (also print client commands/reponses)\n" 3286 "-vvv extremely verbose (also print internal state transitions)\n" 3288 3287 "-h print this help and exit\n" 3289 3288 "-i print memcached and libevent license\n" … … 3481 3480 3482 3481 /* process arguments */ 3483 while ((c = getopt(argc, argv, "a:bp: B:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {3482 while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:L")) != -1) { 3484 3483 switch (c) { 3485 3484 case 'a': … … 3565 3564 settings.detail_enabled = 1; 3566 3565 break; 3566 case 'L' : 3567 3567 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL) 3568 case 'L' :3569 3568 if (enable_large_pages() == 0) { 3570 3569 preallocate = true; 3571 3570 } 3572 break;3573 3571 #endif 3572 break; 3574 3573 default: 3575 3574 fprintf(stderr, "Illegal argument \"%c\"\n", c); -
branches/binary/server/memcached.h
r772 r775 176 176 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + (item)->nsuffix + (item)->nbytes) 177 177 178 /** 179 * NOTE: If you modify this table you _MUST_ update the function state_text 180 */ 178 181 enum conn_states { 179 182 conn_listening, /** the socket which listens for connections */ 183 conn_new_cmd, /** Prepare connection for next command */ 184 conn_waiting, /** waiting for a readable socket */ 180 185 conn_read, /** reading in a command line */ 186 conn_parse_cmd, /** try to parse a command from the input buffer */ 181 187 conn_write, /** writing out a simple response */ 182 188 conn_nread, /** reading in a fixed number of bytes */ … … 184 190 conn_closing, /** closing this connection */ 185 191 conn_mwrite, /** writing out many items sequentially */ 186 conn_bin_init, /** Reinitializing a binary protocol connection */ 187 conn_negotiate, /** Negotiating a protocol */ 192 conn_max_state, /** Max state value (used for assertion) */ 188 193 }; 189 194
