Changeset 658

Show
Ignore:
Timestamp:
12/03/07 08:17:24 (1 year ago)
Author:
dormando
Message:

Binary branch, code from Dustin Sallings and a few others

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/binary-1.3.0/memcached.c

    r654 r658  
    2323#include <sys/resource.h> 
    2424#include <sys/uio.h> 
     25#include <ctype.h> 
    2526 
    2627/* some POSIX systems need the following definition 
     
    6465 */ 
    6566static void drive_machine(conn *c); 
    66 static int new_socket(const bool is_udp); 
    67 static int server_socket(const int port, const bool is_udp); 
     67static int new_socket(const int prot); 
     68static int server_socket(const int port, const int prot); 
    6869static int try_read_command(conn *c); 
    6970static int try_read_network(conn *c); 
    7071static int try_read_udp(conn *c); 
     72static void conn_set_state(conn *, int); 
    7173 
    7274/* stats */ 
     
    107109static int deltotal; 
    108110static conn *listen_conn; 
     111static conn *bin_listen_conn; 
    109112static struct event_base *main_base; 
    110113 
     
    229232    c->msgused++; 
    230233 
    231     if (c->udp) { 
     234    if (IS_UDP(c->protocol)) { 
    232235        /* Leave room for the UDP header, which we'll fill in later. */ 
    233236        return add_iov(c, NULL, UDP_HEADER_SIZE); 
     
    294297 
    295298conn *conn_new(const int sfd, const int init_state, const int event_flags, 
    296                 const int read_buffer_size, const bool is_udp, struct event_base *base) { 
     299                const int read_buffer_size, const int prot, 
     300                struct event_base *base) { 
    297301    conn *c = conn_from_freelist(); 
    298302 
     
    342346    } 
    343347 
     348    /* unix socket mode doesn't need this, so zeroed out.  but why 
     349     * is this done for every command?  presumably for UDP 
     350     * mode.  */ 
     351    if (!settings.socketpath) { 
     352        c->request_addr_size = sizeof(c->request_addr); 
     353    } else { 
     354        c->request_addr_size = 0; 
     355    } 
     356 
    344357    if (settings.verbose > 1) { 
    345         if (init_state == conn_listening) 
     358        if (init_state == conn_listening) { 
    346359            fprintf(stderr, "<%d server listening\n", sfd); 
    347         else if (is_udp) 
     360        } else if (IS_UDP(prot)) { 
    348361            fprintf(stderr, "<%d server listening (udp)\n", sfd); 
    349         else 
    350             fprintf(stderr, "<%d new client connection\n", sfd); 
     362        } else if (prot == binary_prot) { 
     363            fprintf(stderr, "<%d new binary client connection\n", sfd); 
     364        } else if (prot == ascii_prot) { 
     365            fprintf(stderr, "<%d new ascii client connection\n", sfd); 
     366        } else { 
     367            fprintf(stderr, "<%d new unknown (%d) client connection\n", 
     368                sfd, prot); 
     369            abort(); 
     370        } 
    351371    } 
    352372 
    353373    c->sfd = sfd; 
    354     c->udp = is_udp
     374    c->protocol = prot
    355375    c->state = init_state; 
    356376    c->rlbytes = 0; 
     377    c->cmd = -1; 
    357378    c->rbytes = c->wbytes = 0; 
    358379    c->wcurr = c->wbuf; 
     
    367388    c->msgused = 0; 
    368389 
    369     c->write_and_go = conn_read
     390    c->write_and_go = init_state
    370391    c->write_and_free = 0; 
    371392    c->item = 0; 
     
    468489} 
    469490 
     491static int get_init_state(conn *c) { 
     492    int rv=0; 
     493    assert(c != NULL); 
     494 
     495    switch(c->protocol) { 
     496        case binary_prot: 
     497            rv=conn_bin_init; 
     498            break; 
     499        default: 
     500            rv=conn_read; 
     501    } 
     502    return rv; 
     503} 
     504 
     505/* Set the given connection to its initial state.  The initial state will vary 
     506 * base don protocol type. */ 
     507static void conn_set_init_state(conn *c) { 
     508    assert(c != NULL); 
     509 
     510    conn_set_state(c, get_init_state(c)); 
     511} 
    470512 
    471513/* 
     
    480522    assert(c != NULL); 
    481523 
    482     if (c->udp
     524    if (IS_UDP(c->protocol)
    483525        return; 
    484526 
     
    652694         * UDP_MAX_PAYLOAD_SIZE bytes. 
    653695         */ 
    654         limit_to_mtu = c->udp || (1 == c->msgused); 
     696        limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused); 
    655697 
    656698        /* We may need to start a new msghdr if this one is full. */ 
     
    749791 
    750792    conn_set_state(c, conn_write); 
    751     c->write_and_go = conn_read
     793    c->write_and_go = get_init_state(c)
    752794    return; 
    753795} 
     
    757799 * has been stored in c->item_comm, and the item is ready in c->item. 
    758800 */ 
    759  
    760 static void complete_nread(conn *c) { 
     801static void complete_nread_ascii(conn *c) { 
    761802    assert(c != NULL); 
    762803 
     
    785826    item_remove(c->item);       /* release the c->item reference */ 
    786827    c->item = 0; 
     828} 
     829 
     830static void add_bin_header(conn *c, int err, int body_len) { 
     831    int i=0; 
     832    uint32_t res_header[BIN_PKT_HDR_WORDS]; 
     833 
     834    assert(c); 
     835    assert(body_len >= 0); 
     836 
     837    c->msgcurr = 0; 
     838    c->msgused = 0; 
     839    c->iovused = 0; 
     840    if (add_msghdr(c) != 0) { 
     841        /* XXX:  out_string is inappropriate here */ 
     842        out_string(c, "SERVER_ERROR out of memory"); 
     843        return; 
     844    } 
     845 
     846    res_header[0] = BIN_RES_MAGIC << 24; 
     847    res_header[0] |= ((0xff & c->cmd) << 16); 
     848    res_header[0] |= (err << 8); 
     849 
     850    res_header[1] = c->opaque; 
     851    res_header[2] = body_len; 
     852 
     853    if(settings.verbose > 1) { 
     854        fprintf(stderr, "Writing bin response:  %08x %08x %08x\n", 
     855            res_header[0], res_header[1], res_header[2]); 
     856    } 
     857 
     858    for(i=0; i<BIN_PKT_HDR_WORDS; i++) { 
     859        res_header[i] = htonl(res_header[i]); 
     860    } 
     861 
     862    assert(c->wsize >= MIN_BIN_PKT_LENGTH); 
     863    memcpy(c->wbuf, &res_header, MIN_BIN_PKT_LENGTH); 
     864    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH); 
     865} 
     866 
     867static void write_bin_error(conn *c, int err, int swallow) { 
     868    char *errstr="Unknown error"; 
     869    switch(err) { 
     870        case ERR_UNKNOWN_CMD: 
     871            errstr="Unknown command"; 
     872            break; 
     873        case ERR_NOT_FOUND: 
     874            errstr="Not found"; 
     875            break; 
     876        case ERR_INVALID_ARGUMENTS: 
     877            errstr="Invalid arguments"; 
     878            break; 
     879        case ERR_EXISTS: 
     880            errstr="Data exists for key."; 
     881            break; 
     882        case ERR_TOO_LARGE: 
     883            errstr="Too large."; 
     884            break; 
     885        case ERR_NOT_STORED: 
     886            errstr="Not stored."; 
     887            break; 
     888        default: 
     889            errstr="UNHANDLED ERROR"; 
     890            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err); 
     891    } 
     892    if(settings.verbose > 0) { 
     893        fprintf(stderr, "Writing an error:  %s\n", errstr); 
     894    } 
     895    add_bin_header(c, err, strlen(errstr)); 
     896    add_iov(c, errstr, strlen(errstr)); 
     897 
     898    conn_set_state(c, conn_mwrite); 
     899    if(swallow > 0) { 
     900        c->sbytes=swallow; 
     901        c->write_and_go = conn_swallow; 
     902    } else { 
     903        c->write_and_go = conn_bin_init; 
     904    } 
     905} 
     906 
     907/* Form and send a response to a command over the binary protocol */ 
     908static void write_bin_response(conn *c, void *d, int dlen) { 
     909    add_bin_header(c, 0, dlen); 
     910    if(dlen > 0) { 
     911        add_iov(c, d, dlen); 
     912    } 
     913    conn_set_state(c, conn_mwrite); 
     914    c->write_and_go = conn_bin_init; 
     915} 
     916 
     917/* Byte swap a 64-bit number */ 
     918static int64_t swap64(int64_t in) { 
     919#ifdef ENDIAN_LITTLE 
     920    /* Little endian, flip the bytes around until someone makes a faster/better 
     921    * way to do this. */ 
     922    int64_t rv=0; 
     923    int i=0; 
     924     for(i=0; i<8; i++) { 
     925        rv = (rv << 8) | (in & 0xff); 
     926        in >>= 8; 
     927     } 
     928    return rv; 
     929#else 
     930    /* big-endian machines don't need byte swapping */ 
     931    return in; 
     932#endif 
     933} 
     934 
     935static void complete_incr_bin(conn *c) { 
     936    item *it; 
     937    int64_t delta; 
     938    uint64_t initial; 
     939    int32_t exptime; 
     940    char *key; 
     941    size_t nkey; 
     942    int i,res; 
     943    char *responseBuf = c->wbuf + BIN_INCR_HDR_LEN; 
     944 
     945    assert(c != NULL); 
     946 
     947    key=c->rbuf + BIN_INCR_HDR_LEN; 
     948    nkey=c->keylen; 
     949    key[nkey]=0x00; 
     950 
     951    delta = swap64(*((int64_t*)(c->rbuf))); 
     952    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8))); 
     953    exptime = ntohl(*((int*)(c->rbuf + 16))); 
     954 
     955    if(settings.verbose) { 
     956        fprintf(stderr, "incr "); 
     957        for(i=0; i<nkey; i++) { 
     958            fprintf(stderr, "%c", key[i]); 
     959        } 
     960        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime); 
     961    } 
     962 
     963    /* XXX:  Not sure what to do with these yet 
     964    if (settings.managed) { 
     965        int bucket = c->bucket; 
     966        if (bucket == -1) { 
     967            out_string(c, "CLIENT_ERROR no BG data in managed mode"); 
     968            return; 
     969        } 
     970        c->bucket = -1; 
     971        if (buckets[bucket] != c->gen) { 
     972            out_string(c, "ERROR_NOT_OWNER"); 
     973            return; 
     974        } 
     975    } 
     976    */ 
     977 
     978    it = item_get(key, nkey); 
     979    if (it) { 
     980        /* Weird magic in add_delta forces me to pad here */ 
     981        memset(responseBuf, ' ', 32); 
     982        responseBuf[32]=0x00; 
     983        add_delta(it, true, delta, responseBuf); 
     984        res = strlen(responseBuf); 
     985         
     986        assert(res > 0); 
     987        write_bin_response(c, responseBuf, res); 
     988        item_remove(it);         /* release our reference */ 
     989    } else { 
     990        if(exptime >= 0) { 
     991            /* Save some room for the response */ 
     992            assert(c->wsize > BIN_INCR_HDR_LEN + 32); 
     993            snprintf(responseBuf, BIN_INCR_HDR_LEN + 32, "%llu", initial); 
     994             
     995            res = strlen(responseBuf); 
     996            it = item_alloc(key, nkey, 0, realtime(exptime), res + 2); 
     997             
     998            memcpy(ITEM_data(it), responseBuf, res); 
     999             
     1000            if(store_item(it, NREAD_SET)) { 
     1001                write_bin_response(c, responseBuf, res); 
     1002            } else { 
     1003                write_bin_error(c, ERR_NOT_STORED, 0); 
     1004            } 
     1005            item_remove(it);         /* release our reference */ 
     1006        } else { 
     1007            write_bin_error(c, ERR_NOT_FOUND, 0); 
     1008        } 
     1009    } 
     1010} 
     1011 
     1012static void complete_update_bin(conn *c) { 
     1013    int eno=-1, ret=0; 
     1014    assert(c != NULL); 
     1015 
     1016    item *it = c->item; 
     1017 
     1018    STATS_LOCK(); 
     1019    stats.set_cmds++; 
     1020    STATS_UNLOCK(); 
     1021 
     1022    /* We don't actually receive the trailing two characters in the bin 
     1023     * protocol, so we're going to just set them here */ 
     1024    *(ITEM_data(it) + it->nbytes - 2) = '\r'; 
     1025    *(ITEM_data(it) + it->nbytes - 1) = '\n'; 
     1026 
     1027    switch (store_item(it, c->item_comm)) { 
     1028        case 1: 
     1029            /* Stored */ 
     1030            write_bin_response(c, NULL, 0); 
     1031            break; 
     1032        case 2: 
     1033            write_bin_error(c, ERR_EXISTS, 0); 
     1034            break; 
     1035        case 3: 
     1036            write_bin_error(c, ERR_NOT_FOUND, 0); 
     1037            break; 
     1038        default: 
     1039            if(c->item_comm == NREAD_ADD) { 
     1040                eno=ERR_EXISTS; 
     1041            } else if(c->item_comm == NREAD_REPLACE) { 
     1042                eno=ERR_NOT_FOUND; 
     1043            } else { 
     1044                eno=ERR_NOT_STORED; 
     1045            } 
     1046            write_bin_error(c, eno, 0); 
     1047    } 
     1048 
     1049    item_remove(c->item);       /* release the c->item reference */ 
     1050    c->item = 0; 
     1051} 
     1052 
     1053static void process_bin_get(conn *c) { 
     1054    item *it; 
     1055 
     1056    it = item_get(c->rbuf, c->keylen); 
     1057    if (it) { 
     1058        int *flags; 
     1059 
     1060        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4); 
     1061 
     1062        /* This is a bit of magic.  I'm using wbuf as the header, so I'll place 
     1063        this is int in far enough to cover the header */ 
     1064        flags=(int*)(c->wbuf + MIN_BIN_PKT_LENGTH); 
     1065        *flags=htonl(strtoul(ITEM_suffix(it), NULL, 10)); 
     1066 
     1067        /* the length has two unnecessary bytes, and then we write four more */ 
     1068        add_bin_header(c, 0, it->nbytes - 2 + 4 + (c->cmd == CMD_GETS?8:0)); 
     1069        /* Flags */ 
     1070        add_iov(c, flags, 4); 
     1071        /* if it's a gets, add the identifier */ 
     1072        if(c->cmd == CMD_GETS) { 
     1073            uint64_t* identifier; 
     1074            identifier=(uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH + 4); 
     1075            *identifier=swap64((uint32_t)it->cas_id); 
     1076            add_iov(c, identifier, 8); 
     1077        } 
     1078        /* bytes minus the CRLF */ 
     1079        add_iov(c, ITEM_data(it), it->nbytes - 2); 
     1080        conn_set_state(c, conn_mwrite); 
     1081    } else { 
     1082        if(c->cmd == CMD_GETQ) { 
     1083            conn_set_state(c, conn_bin_init); 
     1084        } else { 
     1085            write_bin_error(c, ERR_NOT_FOUND, 0); 
     1086        } 
     1087    } 
     1088} 
     1089 
     1090static void bin_read_key(conn *c, int next_substate, int extra) { 
     1091    assert(c); 
     1092    c->substate = next_substate; 
     1093    c->rlbytes = c->keylen + extra; 
     1094    assert(c->rsize >= c->rlbytes); 
     1095    c->ritem = c->rbuf; 
     1096    conn_set_state(c, conn_nread); 
     1097} 
     1098 
     1099static void dispatch_bin_command(conn *c) { 
     1100    time_t exptime = 0; 
     1101    switch(c->cmd) { 
     1102        case CMD_VERSION: 
     1103            write_bin_response(c, VERSION, strlen(VERSION)); 
     1104            break; 
     1105        case CMD_FLUSH: 
     1106            set_current_time(); 
     1107 
     1108            settings.oldest_live = current_time - 1; 
     1109            item_flush_expired(); 
     1110            write_bin_response(c, NULL, 0); 
     1111            break; 
     1112        case CMD_NOOP: 
     1113            write_bin_response(c, NULL, 0); 
     1114            break; 
     1115        case CMD_SET: 
     1116            /* Fallthrough */ 
     1117        case CMD_ADD: 
     1118            /* Fallthrough */ 
     1119        case CMD_REPLACE: 
     1120            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN); 
     1121            break; 
     1122        case CMD_CAS: 
     1123            bin_read_key(c, bin_reading_cas_header, BIN_CAS_HDR_LEN); 
     1124            break; 
     1125        case CMD_GETS: 
     1126        case CMD_GETQ: 
     1127        case CMD_GET: 
     1128            bin_read_key(c, bin_reading_get_key, 0); 
     1129            break; 
     1130        case CMD_DELETE: 
     1131            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN); 
     1132            break; 
     1133        case CMD_INCR: 
     1134            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN); 
     1135            break; 
     1136        default: 
     1137            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]); 
     1138    } 
     1139} 
     1140 
     1141static void process_bin_update(conn *c, bool cas) { 
     1142    char *key; 
     1143    int nkey; 
     1144    int vlen; 
     1145    int flags; 
     1146    int exptime; 
     1147    item *it; 
     1148    int comm; 
     1149    int hdrlen=cas ? BIN_CAS_HDR_LEN : BIN_SET_HDR_LEN; 
     1150 
     1151    assert(c != NULL); 
     1152 
     1153    key=c->rbuf + hdrlen; 
     1154    nkey=c->keylen; 
     1155    key[nkey]=0x00; 
     1156 
     1157    flags = ntohl(*((int*)(c->rbuf))); 
     1158    exptime = ntohl(*((int*)(c->rbuf + 4))); 
     1159    vlen = c->bin_header[2] - (nkey + hdrlen); 
     1160 
     1161    if(settings.verbose) { 
     1162        fprintf(stderr, "Value len is %d\n", vlen); 
     1163    } 
     1164 
     1165    if (settings.detail_enabled) { 
     1166        stats_prefix_record_set(key); 
     1167    } 
     1168 
     1169    /* Not sure what to do with this. 
     1170    if (settings.managed) { 
     1171        int bucket = c->bucket; 
     1172        if (bucket == -1) { 
     1173            out_string(c, "CLIENT_ERROR no BG data in managed mode"); 
     1174            return; 
     1175        } 
     1176        c->bucket = -1; 
     1177        if (buckets[bucket] != c->gen) { 
     1178            out_string(c, "ERROR_NOT_OWNER"); 
     1179            return; 
     1180        } 
     1181    } 
     1182    */ 
     1183 
     1184    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2); 
     1185    if(cas) { 
     1186        it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8))); 
     1187    } 
     1188 
     1189    if (it == 0) { 
     1190        if (! item_size_ok(nkey, flags, vlen + 2)) { 
     1191            write_bin_error(c, ERR_TOO_LARGE, vlen); 
     1192        } else { 
     1193            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen); 
     1194        } 
     1195        /* swallow the data line */ 
     1196        c->write_and_go = conn_swallow; 
     1197        return; 
     1198    } 
     1199 
     1200    switch(c->cmd) { 
     1201        case CMD_ADD: 
     1202            c->item_comm = NREAD_ADD; 
     1203            break; 
     1204        case CMD_CAS: 
     1205            c->item_comm = NREAD_CAS; 
     1206            break; 
     1207        case CMD_SET: 
     1208            c->item_comm = NREAD_SET; 
     1209            break; 
     1210        case CMD_REPLACE: 
     1211            c->item_comm = NREAD_REPLACE; 
     1212            break; 
     1213        default: 
     1214            assert(0); 
     1215    } 
     1216 
     1217    c->item = it; 
     1218    c->ritem = ITEM_data(it); 
     1219    c->rlbytes = vlen; 
     1220    conn_set_state(c, conn_nread); 
     1221    c->substate = bin_read_set_value; 
     1222} 
     1223 
     1224static void process_bin_delete(conn *c) { 
     1225    char *key; 
     1226    size_t nkey; 
     1227    item *it; 
     1228    time_t exptime = 0; 
     1229 
     1230    assert(c != NULL); 
     1231 
     1232    /* XXX:  I don't know what to do with this yet 
     1233    if (settings.managed) { 
     1234        int bucket = c->bucket; 
     1235        if (bucket == -1) { 
     1236            out_string(c, "CLIENT_ERROR no BG data in managed mode"); 
     1237            return; 
     1238        } 
     1239        c->bucket = -1; 
     1240        if (buckets[bucket] != c->gen) { 
     1241            out_string(c, "ERROR_NOT_OWNER"); 
     1242            return; 
     1243        } 
     1244    } 
     1245    */ 
     1246 
     1247    exptime = ntohl(*((int*)(c->rbuf))); 
     1248    key = c->rbuf + 4; 
     1249    nkey = c->keylen; 
     1250    key[nkey]=0x00; 
     1251 
     1252    if(settings.verbose) { 
     1253        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime); 
     1254    } 
     1255 
     1256    if (settings.detail_enabled) { 
     1257        stats_prefix_record_delete(key); 
     1258    } 
     1259 
     1260    it = item_get(key, nkey); 
     1261    if (it) { 
     1262        if (exptime == 0) { 
     1263            item_unlink(it); 
     1264            item_remove(it);      /* release our reference */ 
     1265            write_bin_response(c, NULL, 0); 
     1266        } else { 
     1267            /* XXX:  This is really lame, but defer_delete returns a string */ 
     1268            char *res=defer_delete(it, exptime); 
     1269            if(res[0] == 'D') { 
     1270                write_bin_response(c, NULL, 0); 
     1271            } else { 
     1272                write_bin_error(c, ERR_OUT_OF_MEMORY, 0); 
     1273            } 
     1274        } 
     1275    } else { 
     1276        write_bin_error(c, ERR_NOT_FOUND, 0); 
     1277    } 
     1278} 
     1279 
     1280static void complete_nread_binary(conn *c) { 
     1281    assert(c != NULL); 
     1282 
     1283    if(c->cmd < 0) { 
     1284        /* No command defined.  Figure out what they're trying to say. */ 
     1285        int i=0; 
     1286        /* I did a bit of hard-coding around the packet sizes */ 
     1287        assert(BIN_PKT_HDR_WORDS == 3); 
     1288        for(i=0; i<BIN_PKT_HDR_WORDS; i++) { 
     1289            c->bin_header[i] = ntohl(c->bin_header[i]); 
     1290        } 
     1291        if(settings.verbose) { 
     1292            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x\n", 
     1293                c->bin_header[0], c->bin_header[1], c->bin_header[2]); 
     1294        } 
     1295        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) { 
     1296            if(settings.verbose) { 
     1297                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24); 
     1298            } 
     1299            conn_set_state(c, conn_closing); 
     1300            return; 
     1301        } 
     1302     
     1303        c->msgcurr = 0; 
     1304        c->msgused = 0; 
     1305        c->iovused = 0; 
     1306        if (add_msghdr(c) != 0) { 
     1307            out_string(c, "SERVER_ERROR out of memory"); 
     1308            return; 
     1309        } 
     1310     
     1311        c->cmd = (c->bin_header[0] >> 16) & 0xff; 
     1312        c->keylen = (c->bin_header[0] >> 8) & 0xff; 
     1313        c->opaque = c->bin_header[1]; 
     1314        if(settings.verbose > 1) { 
     1315            fprintf(stderr, 
     1316                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd, 
     1317                c->opaque, c->keylen, c->bin_header[2]); 
     1318        } 
     1319        dispatch_bin_command(c); 
     1320    } else { 
     1321        switch(c->substate) { 
     1322            case bin_reading_set_header: 
     1323                process_bin_update(c, false); 
     1324                break; 
     1325            case bin_reading_cas_header: 
     1326                process_bin_update(c, true); 
     1327                break; 
     1328            case bin_read_set_value: 
     1329                complete_update_bin(c); 
     1330                break; 
     1331            case bin_reading_get_key: 
     1332                process_bin_get(c); 
     1333                break; 
     1334            case bin_reading_del_header: 
     1335                process_bin_delete(c); 
     1336                break; 
     1337            case bin_reading_incr_header: 
     1338                complete_incr_bin(c); 
     1339                break; 
     1340            default: 
     1341                fprintf(stderr, "Not handling substate %d\n", c->substate); 
     1342                assert(0); 
     1343        } 
     1344    } 
     1345} 
     1346 
     1347static void complete_nread(conn *c) { 
     1348    assert(c != NULL); 
     1349 
     1350    if(c->protocol == ascii_prot) { 
     1351        complete_nread_ascii(c); 
     1352    } else if(c->protocol == binary_prot) { 
     1353        complete_nread_binary(c); 
     1354    } else { 
     1355        assert(0); /* XXX:  Invalid case.  Should probably do more here. */ 
     1356    } 
    7871357} 
    7881358 
     
    8261396          do_item_replace(old_it, it); 
    8271397          stored = 1; 
    828         } 
    829         else 
    830         { 
     1398        } else { 
     1399          if(settings.verbose > 1) { 
     1400            fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n", 
     1401                                old_it->cas_id, it->cas_id); 
     1402          } 
    8311403          stored = 2; 
    8321404        } 
     
    9621534        c->wbytes = bytes; 
    9631535        conn_set_state(c, conn_write); 
    964         c->write_and_go = conn_read
     1536        c->write_and_go = get_init_state(c)
    9651537    } else { 
    9661538        out_string(c, "SERVER_ERROR out of memory"); 
     
    13191891    */ 
    13201892    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0 
    1321         || (c->udp && build_udp_headers(c) != 0)) { 
     1893        || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 
    13221894        out_string(c, "SERVER_ERROR out of memory"); 
    13231895    } 
     
    14832055        value += delta; 
    14842056    else { 
    1485         if (delta >= value) value = 0; 
    1486         else value -= delta; 
     2057        value -= delta; 
     2058    } 
     2059    if(value < 0) { 
     2060        value=0; 
    14872061    } 
    14882062    sprintf(buf, "%llu", value); 
     
    17182292                c->gen = gen; 
    17192293            } 
    1720             conn_set_state(c, conn_read); 
     2294            conn_set_init_state(c); 
    17212295            return; 
    17222296        } else { 
     
    19012475        } 
    19022476 
    1903         /* unix socket mode doesn't need this, so zeroed out.  but why 
    1904          * is this done for every command?  presumably for UDP 
    1905          * mode.  */ 
    1906         if (!settings.socketpath) { 
    1907             c->request_addr_size = sizeof(c->request_addr); 
    1908         } else { 
    1909             c->request_addr_size = 0; 
    1910         } 
    1911  
    19122477        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); 
    19132478        if (res > 0) { 
     
    19652530    } 
    19662531} 
    1967  
    19682532 
    19692533/* 
     
    20242588            perror("Failed to write, and not due to blocking"); 
    20252589 
    2026         if (c->udp
     2590        if (IS_UDP(c->protocol)
    20272591            conn_set_state(c, conn_read); 
    20282592        else 
     
    20372601    bool stop = false; 
    20382602    int sfd, flags = 1; 
     2603    int init_state; /* initial state for a new connection */ 
    20392604    socklen_t addrlen; 
    20402605    struct sockaddr addr; 
     
    20692634                break; 
    20702635            } 
    2071             dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, 
    2072                                      DATA_BUFFER_SIZE, false); 
     2636            init_state = get_init_state(c); 
     2637 
     2638            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST, 
     2639                                     DATA_BUFFER_SIZE, c->protocol); 
     2640 
    20732641            break; 
    20742642 
     
    20772645                continue; 
    20782646            } 
    2079             if ((c->udp ? try_read_udp(c) : try_read_network(c)) != 0) { 
     2647            if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) { 
    20802648                continue; 
    20812649            } 
     
    20902658            break; 
    20912659 
     2660        case conn_bin_init: /* Reinitialize a binary connection */ 
     2661            c->rlbytes = MIN_BIN_PKT_LENGTH; 
     2662            c->write_and_go = conn_bin_init; 
     2663            c->cmd = -1; 
     2664            c->substate = bin_no_state; 
     2665            c->rbytes = c->wbytes = 0; 
     2666            c->wcurr = c->wbuf; 
     2667            c->rcurr = c->rbuf; 
     2668            c->ritem = (char*)c->bin_header; 
     2669            conn_set_state(c, conn_nread); 
     2670            conn_shrink(c); 
     2671            break; 
     2672 
    20922673        case conn_nread: 
    2093             /* we are reading rlbytes into ritem; */ 
    20942674            if (c->rlbytes == 0) { 
    20952675                complete_nread(c); 
     
    21402720            /* we are reading sbytes and throwing them away */ 
    21412721            if (c->sbytes == 0) { 
    2142                 conn_set_state(c, conn_read); 
     2722                conn_set_init_state(c); 
    21432723                break; 
    21442724            } 
     
    21882768             * list for TCP or a two-entry list for UDP). 
    21892769             */ 
    2190             if (c->iovused == 0 || (c->udp && c->iovused == 1)) { 
     2770            if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) { 
    21912771                if (add_iov(c, c->wcurr, c->wbytes) != 0 || 
    2192                     (c->udp && build_udp_headers(c) != 0)) { 
     2772                    (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 
    21932773                    if (settings.verbose > 0) 
    21942774                        fprintf(stderr, "Couldn't build response\n"); 
     
    22202800                        c->suffixleft--; 
    22212801                    } 
    2222                     conn_set_state(c, conn_read); 
     2802                    /* XXX:  I don't know why this wasn't the general case */ 
     2803                    if(c->protocol == binary_prot) { 
     2804                        conn_set_state(c, c->write_and_go); 
     2805                    } else { 
     2806                        conn_set_init_state(c); 
     2807                    } 
    22232808                } else if (c->state == conn_write) { 
    22242809                    if (c->write_and_free) { 
     
    22452830 
    22462831        case conn_closing: 
    2247             if (c->udp
     2832            if (IS_UDP(c->protocol)
    22482833                conn_cleanup(c); 
    22492834            else 
     
    22792864} 
    22802865 
    2281 static int new_socket(const bool is_udp) { 
     2866static int new_socket(const int prot) { 
    22822867    int sfd; 
    22832868    int flags; 
    22842869 
    2285     if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { 
     2870    if ((sfd = socket(AF_INET, IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { 
    22862871        perror("socket()"); 
    22872872        return -1; 
     
    23332918 
    23342919 
    2335 static int server_socket(const int port, const bool is_udp) { 
     2920static int server_socket(const int port, const int prot) { 
    23362921    int sfd; 
    23372922    struct linger ling = {0, 0}; 
     
    23392924    int flags =1; 
    23402925 
    2341     if ((sfd = new_socket(is_udp)) == -1) { 
     2926    if ((sfd = new_socket(prot)) == -1) { 
    23422927        return -1; 
    23432928    } 
    23442929 
    23452930    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); 
    2346     if (is_udp) { 
     2931    if (IS_UDP(prot)) { 
    23472932        maximize_sndbuf(sfd); 
    23482933    } else { 
     
    23662951        return -1; 
    23672952    } 
    2368     if (!is_udp && listen(sfd, 1024) == -1) { 
     2953    if (!IS_UDP(prot) && listen(sfd, 1024) == -1) { 
    23692954        perror("listen()"); 
    23702955        close(sfd); 
     
    24463031/* listening socket */ 
    24473032static int l_socket = 0; 
     3033/* Listening socket for the binary protocol */ 
     3034static int bl_socket = -1; 
    24483035 
    24493036/* udp socket */ 
     
    24543041    int i; 
    24553042    if (l_socket > -1) close(l_socket); 
     3043    if (bl_socket > -1) close(bl_socket); 
    24563044    if (u_socket > -1) close(u_socket); 
    24573045    for (i = 3; i <= 500; i++) close(i); /* so lame */ 
     
    25363124    printf("-p <num>      TCP port number to listen on (default: 11211)\n" 
    25373125           "-U <num>      UDP port number to listen on (default: 0, off)\n" 
     3126           "-B <num>      binary protocol TCP port number to listen on (default: 0, off)\n" 
    25383127           "-s <file>     unix socket path to listen on (disables network support)\n" 
    25393128           "-a <mask>     access mask for unix socket, in octal (default 0700)\n" 
     
    26863275 
    26873276    /* process arguments */ 
    2688     while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { 
     3277    while ((c = getopt(argc, argv, "a:bp:B:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { 
    26893278        switch (c) { 
    26903279        case 'a': 
     
    27013290        case 'p': 
    27023291            settings.port = atoi(optarg); 
     3292            break; 
     3293        case 'B': 
     3294            settings.binport = atoi(optarg); 
    27033295            break; 
    27043296        case 's': 
     
    28413433            exit(EXIT_FAILURE); 
    28423434        } 
     3435        /* Try the binary port. */ 
     3436        if(settings.binport > 0) { 
     3437            bl_socket = server_socket(settings.binport, 0); 
     3438            if (bl_socket == -1) { 
     3439                 fprintf(stderr, "failed to listen to binary protocol\n"); 
     3440                    exit(EXIT_FAILURE); 
     3441            } 
     3442        } 
    28433443    } 
    28443444 
     
    29333533    /* create the initial listening connection */ 
    29343534    if (!(listen_conn = conn_new(l_socket, conn_listening, 
    2935                                  EV_READ | EV_PERSIST, 1, false, main_base))) { 
     3535                                 EV_READ | EV_PERSIST, 1, ascii_prot, 
     3536                                 main_base))) { 
    29363537        fprintf(stderr, "failed to create listening connection"); 
    29373538        exit(EXIT_FAILURE); 
    29383539    } 
     3540 
     3541    /* Same for binary protocol */ 
     3542    if (bl_socket > 0) { 
     3543        if (!(bin_listen_conn = conn_new(bl_socket, conn_listening, 
     3544                                 EV_READ | EV_PERSIST, 1, binary_prot, 
     3545                                 main_base))) { 
     3546            fprintf(stderr, "failed to create listening connection"); 
     3547            exit(EXIT_FAILURE); 
     3548        } 
     3549    } 
     3550 
    29393551    /* start up worker threads if MT mode */ 
    29403552    thread_init(settings.num_threads, main_base); 
     
    29583570            /* this is guaranteed to hit all threads because we round-robin */ 
    29593571            dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, 
    2960                               UDP_READ_BUFFER_SIZE, 1); 
     3572                              UDP_READ_BUFFER_SIZE, ascii_udp_prot); 
    29613573        } 
    29623574    } 
  • branches/binary-1.3.0/memcached.h

    <
    r651 r658  
    3838#define IOV_LIST_HIGHWAT 600 
    3939#define MSG_LIST_HIGHWAT 100 
     40 
     41/* Binary protocol stuff */ 
     42#define MIN_BIN_PKT_LENGTH 12 
     43/* flags:32, expiration:32 */ 
     44#define BIN_SET_HDR_LEN 8 
     45/* Same as set, but with another 64 bits for the CAS identifier */ 
     46#define BIN_CAS_HDR_LEN (BIN_SET_HDR_LEN + 8) 
     47/* incr:64, initial:64, expiration:32 */ 
     48#define BIN_INCR_HDR_LEN 20 
     49/* timeout:32 */ 
     50#define BIN_DEL_HDR_LEN 4 
     51#define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t)) 
     52 
     53#define BIN_REQ_MAGIC 0x0f 
     54#define BIN_RES_MAGIC 0xf0 
     55 
     56#define CMD_GET 0 
     57#define CMD_SET 1 
     58#define CMD_ADD 2 
     59#define CMD_REPLACE 3 
     60#define CMD_DELETE 4 
     61#define CMD_INCR 5 
     62#define CMD_QUIT 6 
     63#define CMD_FLUSH 7 
     64#define CMD_GETQ 8 
     65#define CMD_NOOP 9 
     66#define CMD_VERSION 10 
     67 
     68#define CMD_GETS 50 
     69#define CMD_CAS 51 
     70 
     71#define ERR_UNKNOWN_CMD 0x81 
     72#define ERR_OUT_OF_MEMORY 0x82 
     73 
     74#define ERR_NOT_FOUND 0x1 
     75#define ERR_EXISTS 0x2 
     76#define ERR_TOO_LARGE 0x3 
     77#define ERR_INVALID_ARGUMENTS 0x4 
     78#define ERR_NOT_STORED 0x5 
    4079 
    4180/* Get a consistent bool type */ 
     
    84123    int port; 
    85124    int udpport; 
     125    int binport;           /* Port for binary protocol. */ 
    86126    struct in_addr interf; 
    87127    int verbose; 
     
    141181    conn_closing,    /** closing this connection */ 
    142182    conn_mwrite,     /** writing out many items sequentially */ 
     183    conn_bin_init,   /** Reinitializing a binary protocol connection */ 
    143184}; 
     185 
     186enum bin_substates { 
     187    bin_no_state, 
     188    bin_reading_set_header, 
     189    bin_reading_cas_header, 
     190    bin_read_set_value, 
     191    bin_reading_get_key, 
     192    bin_reading_del_header, 
     193    bin_reading_incr_header, 
     194};