Changeset 701
- Timestamp:
- 02/18/08 09:22:47 (9 months ago)
- Files:
-
- trunk/server/memcached.c (modified) (23 diffs)
- trunk/server/memcached.h (modified) (3 diffs)
- trunk/server/t/lib/MemcachedTest.pm (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/server/memcached.c
r700 r701 65 65 static void drive_machine(conn *c); 66 66 static int new_socket(struct addrinfo *ai); 67 static int server_socket(const int port, const bool is_udp);67 static int *server_socket(const int port, const bool is_udp, int *count); 68 68 static int try_read_command(conn *c); 69 69 static int try_read_network(conn *c); … … 95 95 (to avoid 64 bit time_t) */ 96 96 97 void pre_gdb(void);98 97 static void conn_free(conn *c); 99 98 … … 106 105 static int delcurr; 107 106 static int deltotal; 108 static conn *listen_conn ;107 static conn *listen_conn = NULL; 109 108 static struct event_base *main_base; 110 109 … … 252 251 freecurr = 0; 253 252 if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) { 254 perror("malloc()");253 fprintf(stderr, "malloc()\n"); 255 254 } 256 255 return; … … 300 299 if (NULL == c) { 301 300 if (!(c = (conn *)malloc(sizeof(conn)))) { 302 perror("malloc()");301 fprintf(stderr, "malloc()\n"); 303 302 return NULL; 304 303 } … … 334 333 if (c->msglist != 0) free(c->msglist); 335 334 free(c); 336 perror("malloc()");335 fprintf(stderr, "malloc()\n"); 337 336 return NULL; 338 337 } … … 382 381 conn_free(c); 383 382 } 383 perror("event_add"); 384 384 return NULL; 385 385 } … … 559 559 freesuffix = (char **)malloc( sizeof(char *) * freesuffixtotal ); 560 560 if (freesuffix == NULL) { 561 perror("malloc()");561 fprintf(stderr, "malloc()\n"); 562 562 } 563 563 return; … … 1965 1965 */ 1966 1966 void accept_new_conns(const bool do_accept) { 1967 conn *next; 1968 1967 1969 if (! is_listen_thread()) 1968 1970 return; 1969 if (do_accept) { 1970 update_event(listen_conn, EV_READ | EV_PERSIST); 1971 if (listen(listen_conn->sfd, 1024) != 0) { 1972 perror("listen"); 1973 } 1974 } 1975 else { 1976 update_event(listen_conn, 0); 1977 if (listen(listen_conn->sfd, 0) != 0) { 1978 perror("listen"); 1979 } 1980 } 1971 1972 for (next = listen_conn; next; next = next->next) { 1973 if (do_accept) { 1974 update_event(next, EV_READ | EV_PERSIST); 1975 if (listen(next->sfd, 1024) != 0) { 1976 perror("listen"); 1977 } 1978 } 1979 else { 1980 update_event(next, 0); 1981 if (listen(next->sfd, 0) != 0) { 1982 perror("listen"); 1983 } 1984 } 1985 } 1981 1986 } 1982 1987 … … 2348 2353 2349 2354 2350 static int server_socket(const int port, const bool is_udp) {2355 static int *server_socket(const int port, const bool is_udp, int *count) { 2351 2356 int sfd; 2357 int *sfd_list; 2358 int *sfd_ptr; 2352 2359 struct linger ling = {0, 0}; 2353 2360 struct addrinfo *ai; 2361 struct addrinfo *next; 2354 2362 struct addrinfo hints; 2355 2363 char port_buf[NI_MAXSERV]; 2356 2364 int error; 2365 int success = 0; 2357 2366 2358 2367 int flags =1; … … 2383 2392 perror("getaddrinfo()"); 2384 2393 2385 return -1; 2386 } 2387 2388 if ((sfd = new_socket(ai)) == -1) { 2389 freeaddrinfo(ai); 2390 return -1; 2391 } 2392 2393 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); 2394 if (is_udp) { 2395 maximize_sndbuf(sfd); 2396 } else { 2397 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); 2398 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); 2399 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); 2400 } 2401 2402 if (bind(sfd, ai->ai_addr, ai->ai_addrlen) == -1) { 2403 perror("bind()"); 2404 close(sfd); 2405 freeaddrinfo(ai); 2406 return -1; 2407 } 2408 if (!is_udp && listen(sfd, 1024) == -1) { 2409 perror("listen()"); 2410 close(sfd); 2411 freeaddrinfo(ai); 2412 return -1; 2394 return NULL; 2395 } 2396 2397 for (*count= 1, next= ai; next->ai_next; next= next->ai_next, (*count)++); 2398 2399 sfd_list= (int *)calloc(*count, sizeof(int)); 2400 if (sfd_list == NULL) { 2401 fprintf(stderr, "calloc()\n"); 2402 return NULL; 2403 } 2404 memset(sfd_list, -1, sizeof(int) * (*count)); 2405 2406 for (sfd_ptr= sfd_list, next= ai; next; next= next->ai_next, sfd_ptr++) { 2407 if ((sfd = new_socket(next)) == -1) { 2408 free(sfd_list); 2409 freeaddrinfo(ai); 2410 return NULL; 2411 } 2412 2413 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); 2414 if (is_udp) { 2415 maximize_sndbuf(sfd); 2416 } else { 2417 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); 2418 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); 2419 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); 2420 } 2421 2422 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { 2423 if (errno != EADDRINUSE) { 2424 perror("bind()"); 2425 /* If we are not at the first element, loop back and close all sockets */ 2426 if (sfd_ptr != sfd_list) { 2427 do { 2428 --sfd_ptr; 2429 close(*sfd_ptr); 2430 } while (sfd_ptr != sfd_list); 2431 } 2432 close(sfd); 2433 freeaddrinfo(ai); 2434 free(sfd_list); 2435 return NULL; 2436 } 2437 close(sfd); 2438 *sfd_ptr= -1; 2439 } else { 2440 success++; 2441 *sfd_ptr= sfd; 2442 if (!is_udp && listen(sfd, 1024) == -1) { 2443 perror("listen()"); 2444 if (sfd_ptr != sfd_list) { 2445 do { 2446 --sfd_ptr; 2447 close(*sfd_ptr); 2448 } while (sfd_ptr != sfd_list); 2449 } 2450 close(sfd); 2451 freeaddrinfo(ai); 2452 free(sfd_list); 2453 return NULL; 2454 } 2455 } 2413 2456 } 2414 2457 2415 2458 freeaddrinfo(ai); 2416 2459 2417 return sfd; 2460 if (success == 0) { 2461 free(sfd_list); 2462 return NULL; 2463 } 2464 2465 return sfd_list; 2418 2466 } 2419 2467 … … 2436 2484 } 2437 2485 2438 static int server_socket_unix(const char *path, int access_mask) {2486 static int *server_socket_unix(const char *path, int access_mask) { 2439 2487 int sfd; 2488 int *sfd_list; 2440 2489 struct linger ling = {0, 0}; 2441 2490 struct sockaddr_un addr; … … 2445 2494 2446 2495 if (!path) { 2447 return -1;2496 return NULL; 2448 2497 } 2449 2498 2450 2499 if ((sfd = new_socket_unix()) == -1) { 2451 return -1; 2452 } 2500 return NULL; 2501 } 2502 2503 /* 2504 When UNIX domain sockets get refactored, this will go away. 2505 For now we know there is just one socket. 2506 */ 2507 sfd_list= (int *)calloc(1, sizeof(int)); 2508 if (sfd_list == NULL) { 2509 fprintf(stderr, "calloc()\n"); 2510 return NULL; 2511 } 2512 memset(sfd_list, -1, sizeof(int) * 1); 2453 2513 2454 2514 /* … … 2476 2536 perror("bind()"); 2477 2537 close(sfd); 2538 free(sfd_list); 2478 2539 umask(old_umask); 2479 return -1;2540 return NULL; 2480 2541 } 2481 2542 umask(old_umask); … … 2483 2544 perror("listen()"); 2484 2545 close(sfd); 2485 return -1; 2486 } 2487 return sfd; 2488 } 2489 2490 /* listening socket */ 2491 static int l_socket = 0; 2492 2493 /* udp socket */ 2494 static int u_socket = -1; 2495 2496 /* invoke right before gdb is called, on assert */ 2497 void pre_gdb(void) { 2498 int i; 2499 if (l_socket > -1) close(l_socket); 2500 if (u_socket > -1) close(u_socket); 2501 for (i = 3; i <= 500; i++) close(i); /* so lame */ 2502 kill(getpid(), SIGABRT); 2546 free(sfd_list); 2547 return NULL; 2548 } 2549 2550 *sfd_list= sfd; 2551 2552 return sfd_list; 2503 2553 } 2504 2554 … … 2715 2765 int main (int argc, char **argv) { 2716 2766 int c; 2767 int x; 2717 2768 bool lock_memory = false; 2718 2769 bool daemonize = false; … … 2723 2774 struct sigaction sa; 2724 2775 struct rlimit rlim; 2776 /* listening socket */ 2777 static int *l_socket = NULL; 2778 static int l_socket_count = 0; 2779 2780 /* udp socket */ 2781 static int *u_socket = NULL; 2782 static int u_socket_count = 0; 2725 2783 2726 2784 /* handle SIGINT */ … … 2879 2937 /* create the listening socket and bind it */ 2880 2938 if (settings.socketpath == NULL) { 2881 l_socket = server_socket(settings.port, 0 );2882 if (l_socket == -1) {2939 l_socket = server_socket(settings.port, 0, &l_socket_count); 2940 if (l_socket == NULL) { 2883 2941 fprintf(stderr, "failed to listen\n"); 2884 2942 exit(EXIT_FAILURE); … … 2888 2946 if (settings.udpport > 0 && settings.socketpath == NULL) { 2889 2947 /* create the UDP listening socket and bind it */ 2890 u_socket = server_socket(settings.udpport, 1 );2891 if (u_socket == -1) {2948 u_socket = server_socket(settings.udpport, 1, &u_socket_count); 2949 if (u_socket == NULL) { 2892 2950 fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport); 2893 2951 exit(EXIT_FAILURE); … … 2927 2985 if (settings.socketpath != NULL) { 2928 2986 l_socket = server_socket_unix(settings.socketpath,settings.access); 2929 if (l_socket == -1) { 2930 fprintf(stderr, "failed to listen\n"); 2931 exit(EXIT_FAILURE); 2932 } 2987 if (l_socket == NULL) { 2988 fprintf(stderr, "failed to listen\n"); 2989 exit(EXIT_FAILURE); 2990 } 2991 /* We only support one of these, so whe know the count */ 2992 l_socket_count = 1; 2933 2993 } 2934 2994 … … 2979 3039 } 2980 3040 /* create the initial listening connection */ 2981 if (!(listen_conn = conn_new(l_socket, conn_listening, 2982 EV_READ | EV_PERSIST, 1, false, main_base))) { 2983 fprintf(stderr, "failed to create listening connection"); 2984 exit(EXIT_FAILURE); 3041 int *l_socket_ptr; 3042 conn *next = NULL; 3043 for (l_socket_ptr= l_socket, x = 0; x < l_socket_count; l_socket_ptr++, x++) { 3044 conn *listen_conn_add; 3045 if (*l_socket_ptr > -1 ) { 3046 if (!(listen_conn_add = conn_new(*l_socket_ptr, conn_listening, 3047 EV_READ | EV_PERSIST, 1, false, main_base))) { 3048 fprintf(stderr, "failed to create listening connection\n"); 3049 exit(EXIT_FAILURE); 3050 } 3051 3052 if (listen_conn == NULL) { 3053 next = listen_conn = listen_conn_add; 3054 } else { 3055 next->next= listen_conn_add; 3056 } 3057 } 2985 3058 } 2986 3059 /* start up worker threads if MT mode */ … … 3001 3074 delete_handler(0, 0, 0); /* sets up the event */ 3002 3075 /* create the initial listening udp connection, monitored on all threads */ 3003 if (u_socket > -1) {3076 if (u_socket) { 3004 3077 for (c = 0; c < settings.num_threads; c++) { 3005 3078 /* this is guaranteed to hit all threads because we round-robin */ 3006 dispatch_conn_new( u_socket, conn_read, EV_READ | EV_PERSIST,3079 dispatch_conn_new(*u_socket, conn_read, EV_READ | EV_PERSIST, 3007 3080 UDP_READ_BUFFER_SIZE, 1); 3008 3081 } … … 3016 3089 if (settings.inter) 3017 3090 free(settings.inter); 3091 if (l_socket) 3092 free(l_socket); 3093 if (u_socket) 3094 free(u_socket); 3018 3095 3019 3096 return 0; trunk/server/memcached.h
r700 r701 151 151 #define NREAD_CAS 6 152 152 153 typedef struct { 153 typedef struct conn conn; 154 struct conn { 154 155 int sfd; 155 156 int state; … … 220 221 a managed instance. -1 (_not_ 0) means invalid. */ 221 222 int gen; /* generation requested for the bucket */ 222 } conn; 223 conn *next; /* Used for generating a list of conn structures */ 224 }; 223 225 224 226 /* number of virtual buckets for a managed instance */ … … 227 229 /* current time of day (updated periodically) */ 228 230 extern volatile rel_time_t current_time; 229 230 /* temporary hack */231 /* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); }232 void pre_gdb (); */233 231 234 232 /* trunk/server/t/lib/MemcachedTest.pm
r641 r701 143 143 144 144 sub new_memcached { 145 my $args = shift || "";146 my $port = free_port();145 my ($args, $passed_port) = @_; 146 my $port = $passed_port || free_port(); 147 147 my $udpport = free_port("udp"); 148 148 $args .= " -p $port";
