Changeset 672

Show
Ignore:
Timestamp:
12/11/07 03:28:34 (1 year ago)
Author:
dormando
Message:

protocol-flag patch from Dustin Sallings

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/binary/server/memcached.c

    r667 r672  
    6464 */ 
    6565static void drive_machine(conn *c); 
    66 static int new_socket(const bool is_udp); 
    67 static int server_socket(const int port, const bool is_udp); 
     66static int new_socket(const int prot); 
     67static int server_socket(const int port, const int prot); 
    6868static int try_read_command(conn *c); 
    6969static int try_read_network(conn *c); 
     
    229229    c->msgused++; 
    230230 
    231     if (c->udp) { 
     231    if (IS_UDP(c->protocol)) { 
    232232        /* Leave room for the UDP header, which we'll fill in later. */ 
    233233        return add_iov(c, NULL, UDP_HEADER_SIZE); 
     
    294294 
    295295conn *conn_new(const int sfd, const int init_state, const int event_flags, 
    296                 const int read_buffer_size, const bool is_udp, struct event_base *base) { 
     296                const int read_buffer_size, const int prot, struct event_base *base) { 
    297297    conn *c = conn_from_freelist(); 
    298298 
     
    345345        if (init_state == conn_listening) 
    346346            fprintf(stderr, "<%d server listening\n", sfd); 
    347         else if (is_udp
     347        else if (IS_UDP(prot)
    348348            fprintf(stderr, "<%d server listening (udp)\n", sfd); 
    349349        else 
     
    352352 
    353353    c->sfd = sfd; 
    354     c->udp = is_udp
     354    c->protocol = prot
    355355    c->state = init_state; 
    356356    c->rlbytes = 0; 
     
    480480    assert(c != NULL); 
    481481 
    482     if (c->udp
     482    if (IS_UDP(c->protocol)
    483483        return; 
    484484 
     
    652652         * UDP_MAX_PAYLOAD_SIZE bytes. 
    653653         */ 
    654         limit_to_mtu = c->udp || (1 == c->msgused); 
     654        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused); 
    655655 
    656656        /* We may need to start a new msghdr if this one is full. */ 
     
    13191319    */ 
    13201320    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0 
    1321         || (c->udp && build_udp_headers(c) != 0)) { 
     1321        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 
    13221322        out_string(c, "SERVER_ERROR out of memory"); 
    13231323    } 
     
    20332033            perror("Failed to write, and not due to blocking"); 
    20342034 
    2035         if (c->udp
     2035        if (IS_UDP(c->protocol)
    20362036            conn_set_state(c, conn_read); 
    20372037        else 
     
    20792079            } 
    20802080            dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, 
    2081                                      DATA_BUFFER_SIZE, false); 
     2081                                     DATA_BUFFER_SIZE, ascii_prot); 
    20822082            break; 
    20832083 
     
    20862086                continue; 
    20872087            } 
    2088             if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) { 
     2088            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) { 
    20892089                continue; 
    20902090            } 
     
    21972197             * list for TCP or a two-entry list for UDP). 
    21982198             */ 
    2199             if (c->iovused == 0 || (c->udp && c->iovused == 1)) { 
     2199            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) { 
    22002200                if (add_iov(c, c->wcurr, c->wbytes) != 0 || 
    2201                     (c->udp && build_udp_headers(c) != 0)) { 
     2201                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 
    22022202                    if (settings.verbose > 0) 
    22032203                        fprintf(stderr, "Couldn't build response\n"); 
     
    22542254 
    22552255        case conn_closing: 
    2256             if (c->udp
     2256            if (IS_UDP(c->protocol)
    22572257                conn_cleanup(c); 
    22582258            else 
     
    22882288} 
    22892289 
    2290 static int new_socket(const bool is_udp) { 
     2290static int new_socket(const int prot) { 
    22912291    int sfd; 
    22922292    int flags; 
    22932293 
    2294     if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { 
     2294    if ((sfd = socket(AF_INET, IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { 
    22952295        perror("socket()"); 
    22962296        return -1; 
     
    23422342 
    23432343 
    2344 static int server_socket(const int port, const bool is_udp) { 
     2344static int server_socket(const int port, const int prot) { 
    23452345    int sfd; 
    23462346    struct linger ling = {0, 0}; 
     
    23482348    int flags =1; 
    23492349 
    2350     if ((sfd = new_socket(is_udp)) == -1) { 
     2350    if ((sfd = new_socket(prot)) == -1) { 
    23512351        return -1; 
    23522352    } 
    23532353 
    23542354    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); 
    2355     if (is_udp) { 
     2355    if (IS_UDP(prot)) { 
    23562356        maximize_sndbuf(sfd); 
    23572357    } else { 
     
    23752375        return -1; 
    23762376    } 
    2377     if (!is_udp && listen(sfd, 1024) == -1) { 
     2377    if (!IS_UDP(prot) && listen(sfd, 1024) == -1) { 
    23782378        perror("listen()"); 
    23792379        close(sfd); 
     
    29672967            /* this is guaranteed to hit all threads because we round-robin */ 
    29682968            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, 
    2969                               UDP_READ_BUFFER_SIZE, 1); 
     2969                              UDP_READ_BUFFER_SIZE, ascii_udp_prot); 
    29702970        } 
    29712971    } 
  • branches/binary/server/memcached.h

    r651 r672  
    143143}; 
    144144 
     145enum protocols { 
     146    ascii_prot = 3, /* arbitrary value. */ 
     147    ascii_udp_prot 
     148}; 
     149 
     150#define IS_UDP(x) (x == ascii_udp_prot) 
     151 
    145152#define NREAD_ADD 1 
    146153#define NREAD_SET 2 
     
    207214    int    suffixleft; 
    208215 
     216    int protocol;   /* which protocol this connection speaks */ 
     217 
    209218    /* data for UDP clients */ 
    210     bool   udp;       /* is this is a UDP "connection" */ 
    211219    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ 
    212220    struct sockaddr request_addr; /* Who sent the most recent request */ 
     
    243251char *do_add_delta(item *item, const bool incr, const int64_t delta, char *buf); 
    244252int do_store_item(item *item, int comm); 
    245 conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base); 
     253conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const int prot, struct event_base *base); 
    246254 
    247255 
     
    269277void thread_init(int nthreads, struct event_base *main_base); 
    270278int  dispatch_event_add(int thread, conn *c); 
    271 void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp); 
     279void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int prot); 
    272280 
    273281/* Lock wrappers for cache functions that are called from main loop. */ 
  • branches/binary/server/thread.c

    r653 r672  
    3232    int     event_flags; 
    3333    int     read_buffer_size; 
    34     int     is_udp
     34    int     protocol
    3535    CQ_ITEM *next; 
    3636}; 
     
    343343    if (NULL != item) { 
    344344        conn *c = conn_new(item->sfd, item->init_state, item->event_flags, 
    345                            item->read_buffer_size, item->is_udp, me->base); 
     345                           item->read_buffer_size, item->protocol, me->base); 
    346346        if (c == NULL) { 
    347             if (item->is_udp) { 
     347            if (IS_UDP(item->protocol)) { 
    348348                fprintf(stderr, "Can't listen for events on UDP socket\n"); 
    349349                exit(1); 
     
    369369 */ 
    370370void dispatch_conn_new(int sfd, int init_state, int event_flags, 
    371                        int read_buffer_size, int is_udp) { 
     371                       int read_buffer_size, int prot) { 
    372372    CQ_ITEM *item = cqi_new(); 
    373373    int thread = (last_thread + 1) % settings.num_threads; 
     
    379379    item->event_flags = event_flags; 
    380380    item->read_buffer_size = read_buffer_size; 
    381     item->is_udp = is_udp
     381    item->protocol = prot
    382382 
    383383    cq_push(&threads[thread].new_conn_queue, item);