Changeset 646

Show
Ignore:
Timestamp:
11/20/07 07:01:27 (1 year ago)
Author:
dormando
Message:

Dynamic suffix buffer work.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/server/memcached.c

    r645 r646  
    304304        c->rbuf = c->wbuf = 0; 
    305305        c->ilist = 0; 
     306        c->suffixlist = 0; 
    306307        c->iov = 0; 
    307308        c->msglist = 0; 
     
    311312        c->wsize = DATA_BUFFER_SIZE; 
    312313        c->isize = ITEM_LIST_INITIAL; 
     314        c->suffixsize = SUFFIX_LIST_INITIAL; 
    313315        c->iovsize = IOV_LIST_INITIAL; 
    314316        c->msgsize = MSG_LIST_INITIAL; 
     
    318320        c->wbuf = (char *)malloc((size_t)c->wsize); 
    319321        c->ilist = (item **)malloc(sizeof(item *) * c->isize); 
     322        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize); 
    320323        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize); 
    321324        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize); 
    322325 
    323326        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 || 
    324                 c->msglist == 0) { 
     327                c->msglist == 0 || c->suffixlist == 0) { 
    325328            if (c->rbuf != 0) free(c->rbuf); 
    326329            if (c->wbuf != 0) free(c->wbuf); 
    327330            if (c->ilist !=0) free(c->ilist); 
     331            if (c->suffixlist != 0) free(c->suffixlist); 
    328332            if (c->iov != 0) free(c->iov); 
    329333            if (c->msglist != 0) free(c->msglist); 
     
    356360    c->ritem = 0; 
    357361    c->icurr = c->ilist; 
     362    c->suffixcurr = c->suffixlist; 
    358363    c->ileft = 0; 
     364    c->suffixleft = 0; 
    359365    c->iovused = 0; 
    360366    c->msgcurr = 0; 
     
    397403        for (; c->ileft > 0; c->ileft--,c->icurr++) { 
    398404            item_remove(*(c->icurr)); 
     405        } 
     406    } 
     407 
     408    if (c->suffixleft != 0) { 
     409        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) { 
     410            suffix_add_to_freelist(*(c->suffixcurr)); 
    399411        } 
    400412    } 
     
    421433        if (c->ilist) 
    422434            free(c->ilist); 
     435        if (c->suffixlist) 
     436            free(c->suffixlist); 
    423437        if (c->iov) 
    424438            free(c->iov); 
     
    528542} 
    529543 
     544/* 
     545 * Free list management for suffix buffers. 
     546 */ 
     547 
     548static char **freesuffix; 
     549static int freesuffixtotal; 
     550static int freesuffixcurr; 
     551 
     552static void suffix_init(void) { 
     553    freesuffixtotal = 500; 
     554    freesuffixcurr  = 0; 
     555 
     556    freesuffix = (char **)malloc( sizeof(char *) * 
     557                                  21 * freesuffixtotal ); 
     558    if (freesuffix == NULL) { 
     559        perror("malloc()"); 
     560    } 
     561    return; 
     562} 
     563 
     564/* 
     565 * Returns a suffix buffer from the freelist, if any. Should call this using 
     566 * suffix_from_freelist() for thread safety. 
     567 */ 
     568char *do_suffix_from_freelist() { 
     569    char *s; 
     570 
     571    if (freesuffixcurr > 0) { 
     572        s = freesuffix[--freesuffixcurr]; 
     573    } else { 
     574        /* FIXME: global define? */ 
     575        /* If malloc fails, let the logic fall through without spamming 
     576         * STDERR on the server. */ 
     577        s = malloc( sizeof(char *) * SUFFIX_SIZE ); 
     578    } 
     579 
     580    return s; 
     581} 
     582 
     583/* 
     584 * Adds a connection to the freelist. 0 = success. Should call this using 
     585 * conn_add_to_freelist() for thread safety. 
     586 */ 
     587bool do_suffix_add_to_freelist(char *s) { 
     588    if (freesuffixcurr < freesuffixtotal) { 
     589        freesuffix[freesuffixcurr++] = s; 
     590        return false; 
     591    } else { 
     592        /* try to enlarge free connections array */ 
     593        char **new_freesuffix = realloc(freesuffix, 
     594                                        SUFFIX_SIZE * freesuffixtotal * 2); 
     595        if (new_freesuffix) { 
     596            freesuffixtotal *= 2; 
     597            freesuffix = new_freesuffix; 
     598            freesuffix[freesuffixcurr++] = s; 
     599            return false; 
     600        } 
     601    } 
     602    return true; 
     603} 
    530604 
    531605/* 
     
    10961170 
    10971171/* ntokens is overwritten here... shrug.. */ 
    1098 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_key_ptr) { 
     1172static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) { 
    10991173    char *key; 
    11001174    size_t nkey; 
     
    11021176    item *it; 
    11031177    token_t *key_token = &tokens[KEY_TOKEN]; 
    1104     char suffix[255]
     1178    char *suffix
    11051179    int stats_get_cmds   = 0; 
    11061180    int stats_get_hits   = 0; 
     
    11591233                 */ 
    11601234 
    1161                 if(return_key_ptr == true) 
     1235                if(return_cas == true) 
    11621236                { 
    1163                   sprintf(suffix," %d %d %llu\r\n", atoi(ITEM_suffix(it) + 1), 
    1164                       it->nbytes - 2, it->cas_id); 
     1237                  /* Goofy mid-flight realloc. */ 
     1238                  if (i >= c->suffixsize) { 
     1239                    char **new_suffix_list = realloc(c->suffixlist, 
     1240                                           sizeof(char *) * c->suffixsize * 2); 
     1241                    if (new_suffix_list) { 
     1242                      c->suffixsize *= 2; 
     1243                      c->suffixlist  = new_suffix_list; 
     1244                    } 
     1245                  } 
     1246 
     1247                  suffix = suffix_from_freelist(); 
     1248                  if (suffix == NULL) { 
     1249                    STATS_LOCK(); 
     1250                    stats.get_cmds   += stats_get_cmds; 
     1251                    stats.get_hits   += stats_get_hits; 
     1252                    stats.get_misses += stats_get_misses; 
     1253                    STATS_UNLOCK(); 
     1254                    out_string(c, "SERVER_ERROR out of memory"); 
     1255                    return; 
     1256                  } 
     1257                  *(c->suffixlist + i) = suffix; 
     1258                  sprintf(suffix, " %llu\r\n", it->cas_id); 
    11651259                  if (add_iov(c, "VALUE ", 6) != 0 || 
    11661260                      add_iov(c, ITEM_key(it), it->nkey) != 0 || 
     1261                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 || 
    11671262                      add_iov(c, suffix, strlen(suffix)) != 0 || 
    11681263                      add_iov(c, ITEM_data(it), it->nbytes) != 0) 
     
    12111306    c->icurr = c->ilist; 
    12121307    c->ileft = i; 
     1308    if (return_cas) { 
     1309        c->suffixcurr = c->suffixlist; 
     1310        c->suffixleft = i; 
     1311    } 
    12131312 
    12141313    if (settings.verbose > 1) 
     
    12231322        || (c->udp && build_udp_headers(c) != 0)) { 
    12241323        out_string(c, "SERVER_ERROR out of memory"); 
     1324    } 
     1325    else if (return_cas) { 
     1326        conn_set_state(c, conn_caswrite); 
     1327        c->msgcurr = 0; 
    12251328    } 
    12261329    else { 
     
    21032206 
    21042207        case conn_mwrite: 
     2208        case conn_caswrite: 
    21052209            switch (transmit(c)) { 
    21062210            case TRANSMIT_COMPLETE: 
    2107                 if (c->state == conn_mwrite) { 
     2211                if (c->state == conn_mwrite || c->state == conn_caswrite) { 
    21082212                    while (c->ileft > 0) { 
    21092213                        item *it = *(c->icurr); 
     
    21122216                        c->icurr++; 
    21132217                        c->ileft--; 
     2218                    } 
     2219                    if (c->state == conn_caswrite) { 
     2220                        while (c->suffixleft > 0) { 
     2221                            char *suffix = *(c->suffixcurr); 
     2222                            suffix_add_to_freelist(suffix); 
     2223                            c->suffixcurr++; 
     2224                            c->suffixleft--; 
     2225                        } 
    21142226                    } 
    21152227                    conn_set_state(c, conn_read); 
     
    27892901    assoc_init(); 
    27902902    conn_init(); 
     2903    /* Hacky suffix buffers. */ 
     2904    suffix_init(); 
    27912905    slabs_init(settings.maxbytes, settings.factor); 
    27922906 
  • trunk/server/memcached.h

    r636 r646  
    1717#define UDP_HEADER_SIZE 8 
    1818#define MAX_SENDBUF_SIZE (256 * 1024 * 1024) 
     19/* I'm told the max legnth of a 64-bit num converted to string is 20 bytes. 
     20 * Plus one for good luck. */ 
     21#define SUFFIX_SIZE 21 
    1922 
    2023/** Initial size of list of items being returned by "get". */ 
    2124#define ITEM_LIST_INITIAL 200 
     25 
     26/** Initial size of list of CAS suffixes appended to "gets" lines. */ 
     27#define SUFFIX_LIST_INITIAL 20 
    2228 
    2329/** Initial size of the sendmsg() scatter/gather array. */ 
     
    134140    conn_swallow,    /** swallowing unnecessary bytes w/o storing */ 
    135141    conn_closing,    /** closing this connection */ 
    136     conn_mwrite      /** writing out many items sequentially */ 
     142    conn_mwrite,     /** writing out many items sequentially */ 
     143    conn_caswrite,   /** writing out many items sequentially with cas value */ 
    137144}; 
    138145 
     
    195202    item   **icurr; 
    196203    int    ileft; 
     204 
     205    char   **suffixlist; 
     206    int    suffixsize; 
     207    char   **suffixcurr; 
     208    int    suffixleft; 
    197209 
    198210    /* data for UDP clients */ 
     
    226238conn *do_conn_from_freelist(); 
    227239bool do_conn_add_to_freelist(conn *c); 
     240char *do_suffix_from_freelist(); 
     241bool do_suffix_add_to_freelist(char *s); 
    228242char *do_defer_delete(item *item, time_t exptime); 
    229243void do_run_deferred_deletes(void); 
     
    263277conn *mt_conn_from_freelist(void); 
    264278bool  mt_conn_add_to_freelist(conn *c); 
     279char *mt_suffix_from_freelist(void); 
     280bool  mt_suffix_add_to_freelist(char *s); 
    265281char *mt_defer_delete(item *it, time_t exptime); 
    266282int   mt_is_listen_thread(void); 
     
    290306# define conn_from_freelist()        mt_conn_from_freelist() 
    291307# define conn_add_to_freelist(x)     mt_conn_add_to_freelist(x) 
     308# define suffix_from_freelist()      mt_suffix_from_freelist() 
     309# define suffix_add_to_freelist(x)   mt_suffix_add_to_freelist(x) 
    292310# define defer_delete(x,y)           mt_defer_delete(x,y) 
    293311# define is_listen_thread()          mt_is_listen_thread() 
     
    319337# define conn_from_freelist()        do_conn_from_freelist() 
    320338# define conn_add_to_freelist(x)     do_conn_add_to_freelist(x) 
     339# define suffix_from_freelist()      do_suffix_from_freelist() 
     340# define suffix_add_to_freelist(x)   do_suffix_add_to_freelist(x) 
    321341# define defer_delete(x,y)           do_defer_delete(x,y) 
    322342# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base) 
  • trunk/server/thread.c

    r619 r646  
    4848static pthread_mutex_t conn_lock; 
    4949 
     50/* Lock for alternative item suffix freelist */ 
     51static pthread_mutex_t suffix_lock; 
     52 
    5053/* Lock for cache operations (item_*, assoc_*) */ 
    5154static pthread_mutex_t cache_lock; 
     
    245248    return result; 
    246249} 
     250 
     251/* 
     252 * Pulls a suffix buffer from the freelist, if one is available. 
     253 */ 
     254char *mt_suffix_from_freelist() { 
     255    char *s; 
     256 
     257    pthread_mutex_lock(&suffix_lock); 
     258    s = do_suffix_from_freelist(); 
     259    pthread_mutex_unlock(&suffix_lock); 
     260 
     261    return s; 
     262} 
     263 
     264 
     265/* 
     266 * Adds a suffix buffer to the freelist. 
     267 * 
     268 * Returns 0 on success, 1 if the buffer couldn't be added. 
     269 */ 
     270bool mt_suffix_add_to_freelist(char *s) { 
     271    bool result; 
     272 
     273    pthread_mutex_lock(&conn_lock); 
     274    result = do_conn_add_to_freelist(s); 
     275    pthread_mutex_unlock(&conn_lock); 
     276 
     277    return result; 
     278} 
     279 
    247280 
    248281/****************************** LIBEVENT THREADS *****************************/