Changeset 673

Show
Ignore:
Timestamp:
12/11/07 03:28:46 (1 year ago)
Author:
dormando
Message:

Binary server patches by Dustin Sallings and Chris Goffinet

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/binary/server/memcached.c

    r672 r673  
    2323#include <sys/resource.h> 
    2424#include <sys/uio.h> 
     25#include <ctype.h> 
    2526 
    2627/* some POSIX systems need the following definition 
     
    6970static int try_read_network(conn *c); 
    7071static int try_read_udp(conn *c); 
     72static void conn_set_state(conn *, int); 
    7173 
    7274/* stats */ 
     
    107109static int deltotal; 
    108110static conn *listen_conn; 
     111static conn *bin_listen_conn; 
    109112static struct event_base *main_base; 
    110113 
     
    294297 
    295298conn *conn_new(const int sfd, const int init_state, const int event_flags, 
    296                 const int read_buffer_size, const int prot, struct event_base *base) { 
     299                const int read_buffer_size, const int prot, 
     300                struct event_base *base) { 
    297301    conn *c = conn_from_freelist(); 
    298302 
     
    342346    } 
    343347 
     348    /* unix socket mode doesn't need this, so zeroed out.  but why 
     349     * is this done for every command?  presumably for UDP 
     350     * mode.  */ 
     351    if (!settings.socketpath) { 
     352        c->request_addr_size = sizeof(c->request_addr); 
     353    } else { 
     354        c->request_addr_size = 0; 
     355    } 
     356 
    344357    if (settings.verbose > 1) { 
    345         if (init_state == conn_listening) 
     358        if (init_state == conn_listening) { 
    346359            fprintf(stderr, "<%d server listening\n", sfd); 
    347         else if (IS_UDP(prot)) 
     360        } else if (IS_UDP(prot)) { 
    348361            fprintf(stderr, "<%d server listening (udp)\n", sfd); 
    349         else 
    350             fprintf(stderr, "<%d new client connection\n", sfd); 
     362        } else if (prot == binary_prot) { 
     363            fprintf(stderr, "<%d new binary client connection\n", sfd); 
     364        } else if (prot == ascii_prot) { 
     365            fprintf(stderr, "<%d new ascii client connection\n", sfd); 
     366        } else { 
     367            fprintf(stderr, "<%d new unknown (%d) client connection\n", 
     368                sfd, prot); 
     369            abort(); 
     370        } 
    351371    } 
    352372 
     
    355375    c->state = init_state; 
    356376    c->rlbytes = 0; 
     377    c->cmd = -1; 
    357378    c->rbytes = c->wbytes = 0; 
    358379    c->wcurr = c->wbuf; 
     
    367388    c->msgused = 0; 
    368389 
    369     c->write_and_go = conn_read
     390    c->write_and_go = init_state
    370391    c->write_and_free = 0; 
    371392    c->item = 0; 
     
    468489} 
    469490 
     491static int get_init_state(conn *c) { 
     492    int rv=0; 
     493    assert(c != NULL); 
     494 
     495    switch(c->protocol) { 
     496        case binary_prot: 
     497            rv=conn_bin_init; 
     498            break; 
     499        default: 
     500            rv=conn_read; 
     501    } 
     502    return rv; 
     503} 
     504 
     505/* Set the given connection to its initial state.  The initial state will vary 
     506 * base don protocol type. */ 
     507static void conn_set_init_state(conn *c) { 
     508    assert(c != NULL); 
     509 
     510    conn_set_state(c, get_init_state(c)); 
     511} 
    470512 
    471513/* 
     
    749791 
    750792    conn_set_state(c, conn_write); 
    751     c->write_and_go = conn_read
     793    c->write_and_go = get_init_state(c)
    752794    return; 
    753795} 
     
    757799 * has been stored in c->item_comm, and the item is ready in c->item. 
    758800 */ 
    759  
    760 static void complete_nread(conn *c) { 
     801static void complete_nread_ascii(conn *c) { 
    761802    assert(c != NULL); 
    762803 
     
    785826    item_remove(c->item);       /* release the c->item reference */ 
    786827    c->item = 0; 
     828} 
     829 
     830static void add_bin_header(conn *c, int err, int body_len) { 
     831    int i=0; 
     832    uint32_t res_header[BIN_PKT_HDR_WORDS]; 
     833 
     834    assert(c); 
     835    assert(body_len >= 0); 
     836 
     837    c->msgcurr = 0; 
     838    c->msgused = 0; 
     839    c->iovused = 0; 
     840    if (add_msghdr(c) != 0) { 
     841        /* XXX:  out_string is inappropriate here */ 
     842        out_string(c, "SERVER_ERROR out of memory"); 
     843        return; 
     844    } 
     845 
     846    res_header[0] = BIN_RES_MAGIC << 24; 
     847    res_header[0] |= ((0xff & c->cmd) << 16); 
     848    res_header[0] |= (err << 8); 
     849 
     850    res_header[1] = c->opaque; 
     851    res_header[2] = body_len; 
     852 
     853    if(settings.verbose > 1) { 
     854        fprintf(stderr, "Writing bin response:  %08x %08x %08x\n", 
     855            res_header[0], res_header[1], res_header[2]); 
     856    } 
     857 
     858    for(i=0; i<BIN_PKT_HDR_WORDS; i++) { 
     859        res_header[i] = htonl(res_header[i]); 
     860    } 
     861 
     862    assert(c->wsize >= MIN_BIN_PKT_LENGTH); 
     863    memcpy(c->wbuf, &res_header, MIN_BIN_PKT_LENGTH); 
     864    add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH); 
     865} 
     866 
     867static void write_bin_error(conn *c, int err, int swallow) { 
     868    char *errstr="Unknown error"; 
     869    switch(err) { 
     870        case ERR_UNKNOWN_CMD: 
     871            errstr="Unknown command"; 
     872            break; 
     873        case ERR_NOT_FOUND: 
     874            errstr="Not found"; 
     875            break; 
     876        case ERR_INVALID_ARGUMENTS: 
     877            errstr="Invalid arguments"; 
     878            break; 
     879        case ERR_EXISTS: 
     880            errstr="Data exists for key."; 
     881            break; 
     882        case ERR_TOO_LARGE: 
     883            errstr="Too large."; 
     884            break; 
     885        case ERR_NOT_STORED: 
     886            errstr="Not stored."; 
     887            break; 
     888        default: 
     889            errstr="UNHANDLED ERROR"; 
     890            fprintf(stderr, "UNHANDLED ERROR:  %d\n", err); 
     891    } 
     892    if(settings.verbose > 0) { 
     893        fprintf(stderr, "Writing an error:  %s\n", errstr); 
     894    } 
     895    add_bin_header(c, err, strlen(errstr)); 
     896    add_iov(c, errstr, strlen(errstr)); 
     897 
     898    conn_set_state(c, conn_mwrite); 
     899    if(swallow > 0) { 
     900        c->sbytes=swallow; 
     901        c->write_and_go = conn_swallow; 
     902    } else { 
     903        c->write_and_go = conn_bin_init; 
     904    } 
     905} 
     906 
     907/* Form and send a response to a command over the binary protocol */ 
     908static void write_bin_response(conn *c, void *d, int dlen) { 
     909    add_bin_header(c, 0, dlen); 
     910    if(dlen > 0) { 
     911        add_iov(c, d, dlen); 
     912    } 
     913    conn_set_state(c, conn_mwrite); 
     914    c->write_and_go = conn_bin_init; 
     915} 
     916 
     917/* Byte swap a 64-bit number */ 
     918static int64_t swap64(int64_t in) { 
     919#ifdef ENDIAN_LITTLE 
     920    /* Little endian, flip the bytes around until someone makes a faster/better 
     921    * way to do this. */ 
     922    int64_t rv=0; 
     923    int i=0; 
     924     for(i=0; i<8; i++) { 
     925        rv = (rv << 8) | (in & 0xff); 
     926        in >>= 8; 
     927     } 
     928    return rv; 
     929#else 
     930    /* big-endian machines don't need byte swapping */ 
     931    return in; 
     932#endif 
     933} 
     934 
     935static void complete_incr_bin(conn *c) { 
     936    item *it; 
     937    int64_t delta; 
     938    uint64_t initial; 
     939    int32_t exptime; 
     940    char *key; 
     941    size_t nkey; 
     942    int i,res; 
     943    char *responseBuf = c->wbuf + BIN_INCR_HDR_LEN; 
     944 
     945    assert(c != NULL); 
     946 
     947    key=c->rbuf + BIN_INCR_HDR_LEN; 
     948    nkey=c->keylen; 
     949    key[nkey]=0x00; 
     950 
     951    delta = swap64(*((int64_t*)(c->rbuf))); 
     952    initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8))); 
     953    exptime = ntohl(*((int*)(c->rbuf + 16))); 
     954 
     955    if(settings.verbose) { 
     956        fprintf(stderr, "incr "); 
     957        for(i=0; i<nkey; i++) { 
     958            fprintf(stderr, "%c", key[i]); 
     959        } 
     960        fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime); 
     961    } 
     962 
     963    /* XXX:  Not sure what to do with these yet 
     964    if (settings.managed) { 
     965        int bucket = c->bucket; 
     966        if (bucket == -1) { 
     967            out_string(c, "CLIENT_ERROR no BG data in managed mode"); 
     968            return; 
     969        } 
     970        c->bucket = -1; 
     971        if (buckets[bucket] != c->gen) { 
     972            out_string(c, "ERROR_NOT_OWNER"); 
     973            return; 
     974        } 
     975    } 
     976    */ 
     977 
     978    it = item_get(key, nkey); 
     979    if (it) { 
     980        /* Weird magic in add_delta forces me to pad here */ 
     981        memset(responseBuf, ' ', 32); 
     982        responseBuf[32]=0x00; 
     983        add_delta(it, true, delta, responseBuf); 
     984        res = strlen(responseBuf); 
     985         
     986        assert(res > 0); 
     987        write_bin_response(c, responseBuf, res); 
     988        item_remove(it);         /* release our reference */ 
     989    } else { 
     990        if(exptime >= 0) { 
     991            /* Save some room for the response */ 
     992            assert(c->wsize > BIN_INCR_HDR_LEN + 32); 
     993            snprintf(responseBuf, BIN_INCR_HDR_LEN + 32, "%llu", initial); 
     994             
     995            res = strlen(responseBuf); 
     996            it = item_alloc(key, nkey, 0, realtime(exptime), res + 2); 
     997             
     998            memcpy(ITEM_data(it), responseBuf, res); 
     999             
     1000            if(store_item(it, NREAD_SET)) { 
     1001                write_bin_response(c, responseBuf, res); 
     1002            } else { 
     1003                write_bin_error(c, ERR_NOT_STORED, 0); 
     1004            } 
     1005            item_remove(it);         /* release our reference */ 
     1006        } else { 
     1007            write_bin_error(c, ERR_NOT_FOUND, 0); 
     1008        } 
     1009    } 
     1010} 
     1011 
     1012static void complete_update_bin(conn *c) { 
     1013    assert(c != NULL); 
     1014 
     1015    item *it = c->item; 
     1016 
     1017    STATS_LOCK(); 
     1018    stats.set_cmds++; 
     1019    STATS_UNLOCK(); 
     1020 
     1021    /* We don't actually receive the trailing two characters in the bin 
     1022     * protocol, so we're going to just set them here */ 
     1023    *(ITEM_data(it) + it->nbytes - 2) = '\r'; 
     1024    *(ITEM_data(it) + it->nbytes - 1) = '\n'; 
     1025 
     1026    if (store_item(it, c->item_comm)) { 
     1027        /* Stored */ 
     1028        write_bin_response(c, NULL, 0); 
     1029    } else { 
     1030        /* not Stored */ 
     1031        int eno=-1; 
     1032        if(c->item_comm == NREAD_ADD) { 
     1033            eno=ERR_EXISTS; 
     1034        } else if(c->item_comm == NREAD_REPLACE) { 
     1035            eno=ERR_NOT_FOUND; 
     1036        } else { 
     1037            eno=ERR_NOT_STORED; 
     1038        } 
     1039        write_bin_error(c, eno, 0); 
     1040    } 
     1041 
     1042    item_remove(c->item);       /* release the c->item reference */ 
     1043    c->item = 0; 
     1044} 
     1045 
     1046static void process_bin_get(conn *c) { 
     1047    item *it; 
     1048 
     1049    it = item_get(c->rbuf, c->keylen); 
     1050    if (it) { 
     1051        int *flags; 
     1052 
     1053        assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4); 
     1054 
     1055        /* This is a bit of magic.  I'm using wbuf as the header, so I'll place 
     1056        this is int in far enough to cover the header */ 
     1057        flags=(int*)(c->wbuf + MIN_BIN_PKT_LENGTH); 
     1058        *flags=htonl(strtoul(ITEM_suffix(it), NULL, 10)); 
     1059 
     1060        /* the length has two unnecessary bytes, and then we write four more */ 
     1061        add_bin_header(c, 0, it->nbytes - 2 + 4); 
     1062        /* Flags and value */ 
     1063        add_iov(c, flags, 4); 
     1064        /* bytes minus the CRLF */ 
     1065        add_iov(c, ITEM_data(it), it->nbytes - 2); 
     1066        conn_set_state(c, conn_mwrite); 
     1067    } else { 
     1068        if(c->cmd == CMD_GETQ) { 
     1069            conn_set_state(c, conn_bin_init); 
     1070        } else { 
     1071            write_bin_error(c, ERR_NOT_FOUND, 0); 
     1072        } 
     1073    } 
     1074} 
     1075 
     1076static void bin_read_key(conn *c, int next_substate, int extra) { 
     1077    assert(c); 
     1078    c->substate = next_substate; 
     1079    c->rlbytes = c->keylen + extra; 
     1080    assert(c->rsize >= c->rlbytes); 
     1081    c->ritem = c->rbuf; 
     1082    conn_set_state(c, conn_nread); 
     1083} 
     1084 
     1085static void dispatch_bin_command(conn *c) { 
     1086    time_t exptime = 0; 
     1087    switch(c->cmd) { 
     1088        case CMD_VERSION: 
     1089            write_bin_response(c, VERSION, strlen(VERSION)); 
     1090            break; 
     1091        case CMD_FLUSH: 
     1092            set_current_time(); 
     1093 
     1094            settings.oldest_live = current_time - 1; 
     1095            item_flush_expired(); 
     1096            write_bin_response(c, NULL, 0); 
     1097            break; 
     1098        case CMD_NOOP: 
     1099            write_bin_response(c, NULL, 0); 
     1100            break; 
     1101        case CMD_SET: 
     1102            /* Fallthrough */ 
     1103        case CMD_ADD: 
     1104            /* Fallthrough */ 
     1105        case CMD_REPLACE: 
     1106            bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN); 
     1107            break; 
     1108        case CMD_GETQ: 
     1109        case CMD_GET: 
     1110            bin_read_key(c, bin_reading_get_key, 0); 
     1111            break; 
     1112        case CMD_DELETE: 
     1113            bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN); 
     1114            break; 
     1115        case CMD_INCR: 
     1116            bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN); 
     1117            break; 
     1118        default: 
     1119            write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]); 
     1120    } 
     1121} 
     1122 
     1123static void process_bin_update(conn *c) { 
     1124    char *key; 
     1125    int nkey; 
     1126    int vlen; 
     1127    int flags; 
     1128    int exptime; 
     1129    item *it; 
     1130    int comm; 
     1131 
     1132    assert(c != NULL); 
     1133 
     1134    key=c->rbuf + BIN_SET_HDR_LEN; 
     1135    nkey=c->keylen; 
     1136    key[nkey]=0x00; 
     1137 
     1138    flags = ntohl(*((int*)(c->rbuf))); 
     1139    exptime = ntohl(*((int*)(c->rbuf + 4))); 
     1140    vlen = c->bin_header[2] - (nkey + BIN_SET_HDR_LEN); 
     1141 
     1142    if (settings.detail_enabled) { 
     1143        stats_prefix_record_set(key); 
     1144    } 
     1145 
     1146    /* Not sure what to do with this. 
     1147    if (settings.managed) { 
     1148        int bucket = c->bucket; 
     1149        if (bucket == -1) { 
     1150            out_string(c, "CLIENT_ERROR no BG data in managed mode"); 
     1151            return; 
     1152        } 
     1153        c->bucket = -1; 
     1154        if (buckets[bucket] != c->gen) { 
     1155            out_string(c, "ERROR_NOT_OWNER"); 
     1156            return; 
     1157        } 
     1158    } 
     1159    */ 
     1160 
     1161    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2); 
     1162 
     1163    if (it == 0) { 
     1164        if (! item_size_ok(nkey, flags, vlen + 2)) { 
     1165            write_bin_error(c, ERR_TOO_LARGE, vlen); 
     1166        } else { 
     1167            write_bin_error(c, ERR_OUT_OF_MEMORY, vlen); 
     1168        } 
     1169        /* swallow the data line */ 
     1170        c->write_and_go = conn_swallow; 
     1171        return; 
     1172    } 
     1173 
     1174    switch(c->cmd) { 
     1175        case CMD_ADD: 
     1176            c->item_comm = NREAD_ADD; 
     1177            break; 
     1178        case CMD_SET: 
     1179            c->item_comm = NREAD_SET; 
     1180            break; 
     1181        case CMD_REPLACE: 
     1182            c->item_comm = NREAD_REPLACE; 
     1183            break; 
     1184        default: 
     1185            assert(0); 
     1186    } 
     1187 
     1188    c->item = it; 
     1189    c->ritem = ITEM_data(it); 
     1190    c->rlbytes = vlen; 
     1191    conn_set_state(c, conn_nread); 
     1192    c->substate = bin_read_set_value; 
     1193} 
     1194 
     1195static void process_bin_delete(conn *c) { 
     1196    char *key; 
     1197    size_t nkey; 
     1198    item *it; 
     1199    time_t exptime = 0; 
     1200 
     1201    assert(c != NULL); 
     1202 
     1203    /* XXX:  I don't know what to do with this yet 
     1204    if (settings.managed) { 
     1205        int bucket = c->bucket; 
     1206        if (bucket == -1) { 
     1207            out_string(c, "CLIENT_ERROR no BG data in managed mode"); 
     1208            return; 
     1209        } 
     1210        c->bucket = -1; 
     1211        if (buckets[bucket] != c->gen) { 
     1212            out_string(c, "ERROR_NOT_OWNER"); 
     1213            return; 
     1214        } 
     1215    } 
     1216    */ 
     1217 
     1218    exptime = ntohl(*((int*)(c->rbuf))); 
     1219    key = c->rbuf + 4; 
     1220    nkey = c->keylen; 
     1221    key[nkey]=0x00; 
     1222 
     1223    if(settings.verbose) { 
     1224        fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime); 
     1225    } 
     1226 
     1227    if (settings.detail_enabled) { 
     1228        stats_prefix_record_delete(key); 
     1229    } 
     1230 
     1231    it = item_get(key, nkey); 
     1232    if (it) { 
     1233        if (exptime == 0) { 
     1234            item_unlink(it); 
     1235            item_remove(it);      /* release our reference */ 
     1236            write_bin_response(c, NULL, 0); 
     1237        } else { 
     1238            /* XXX:  This is really lame, but defer_delete returns a string */ 
     1239            char *res=defer_delete(it, exptime); 
     1240            if(res[0] == 'D') { 
     1241                write_bin_response(c, NULL, 0); 
     1242            } else { 
     1243                write_bin_error(c, ERR_OUT_OF_MEMORY, 0); 
     1244            } 
     1245        } 
     1246    } else { 
     1247        write_bin_error(c, ERR_NOT_FOUND, 0); 
     1248    } 
     1249} 
     1250 
     1251static void complete_nread_binary(conn *c) { 
     1252    assert(c != NULL); 
     1253 
     1254    if(c->cmd < 0) { 
     1255        /* No command defined.  Figure out what they're trying to say. */ 
     1256        int i=0; 
     1257        /* I did a bit of hard-coding around the packet sizes */ 
     1258        assert(BIN_PKT_HDR_WORDS == 3); 
     1259        for(i=0; i<BIN_PKT_HDR_WORDS; i++) { 
     1260            c->bin_header[i] = ntohl(c->bin_header[i]); 
     1261        } 
     1262        if(settings.verbose) { 
     1263            fprintf(stderr, "Read binary protocol data:  %08x %08x %08x\n", 
     1264                c->bin_header[0], c->bin_header[1], c->bin_header[2]); 
     1265        } 
     1266        if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) { 
     1267            if(settings.verbose) { 
     1268                fprintf(stderr, "Invalid magic:  %x\n", c->bin_header[0] >> 24); 
     1269            } 
     1270            conn_set_state(c, conn_closing); 
     1271            return; 
     1272        } 
     1273     
     1274        c->msgcurr = 0; 
     1275        c->msgused = 0; 
     1276        c->iovused = 0; 
     1277        if (add_msghdr(c) != 0) { 
     1278            out_string(c, "SERVER_ERROR out of memory"); 
     1279            return; 
     1280        } 
     1281     
     1282        c->cmd = (c->bin_header[0] >> 16) & 0xff; 
     1283        c->keylen = (c->bin_header[0] >> 8) & 0xff; 
     1284        c->opaque = c->bin_header[1]; 
     1285        if(settings.verbose > 1) { 
     1286            fprintf(stderr, 
     1287                "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd, 
     1288                c->opaque, c->keylen, c->bin_header[2]); 
     1289        } 
     1290        dispatch_bin_command(c); 
     1291    } else { 
     1292        switch(c->substate) { 
     1293            case bin_reading_set_header: 
     1294                process_bin_update(c); 
     1295                break; 
     1296            case bin_read_set_value: 
     1297                complete_update_bin(c); 
     1298                break; 
     1299            case bin_reading_get_key: 
     1300                process_bin_get(c); 
     1301                break; 
     1302            case bin_reading_del_header: 
     1303                process_bin_delete(c); 
     1304                break; 
     1305            case bin_reading_incr_header: 
     1306                complete_incr_bin(c); 
     1307                break; 
     1308            default: 
     1309                fprintf(stderr, "Not handling substate %d\n", c->substate); 
     1310                assert(0); 
     1311        } 
     1312    } 
     1313} 
     1314 
     1315static void complete_nread(conn *c) { 
     1316    assert(c != NULL); 
     1317 
     1318    if(c->protocol == ascii_prot) { 
     1319        complete_nread_ascii(c); 
     1320    } else if(c->protocol == binary_prot) { 
     1321        complete_nread_binary(c); 
     1322    } else { 
     1323        assert(0); /* XXX:  Invalid case.  Should probably do more here. */ 
     1324    } 
    7871325} 
    7881326 
     
    9621500        c->wbytes = bytes; 
    9631501        conn_set_state(c, conn_write); 
    964         c->write_and_go = conn_read
     1502        c->write_and_go = get_init_state(c)
    9651503    } else { 
    9661504        out_string(c, "SERVER_ERROR out of memory"); 
     
    14832021        value += delta; 
    14842022    else { 
    1485         if (delta >= value) value = 0; 
    1486         else value -= delta; 
     2023        value -= delta; 
     2024    } 
     2025    if(value < 0) { 
     2026        value=0; 
    14872027    } 
    14882028    sprintf(buf, "%llu", value); 
     
    17182258                c->gen = gen; 
    17192259            } 
    1720             conn_set_state(c, conn_read); 
     2260            conn_set_init_state(c); 
    17212261            return; 
    17222262        } else { 
     
    19102450        } 
    19112451 
    1912         /* unix socket mode doesn't need this, so zeroed out.  but why 
    1913          * is this done for every command?  presumably for UDP 
    1914          * mode.  */ 
    1915         if (!settings.socketpath) { 
    1916             c->request_addr_size = sizeof(c->request_addr); 
    1917         } else { 
    1918             c->request_addr_size = 0; 
    1919         } 
    1920  
    19212452        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); 
    19222453        if (res > 0) { 
     
    19742505    } 
    19752506} 
    1976  
    19772507 
    19782508/* 
     
    20462576    bool stop = false; 
    20472577    int sfd, flags = 1; 
     2578    int init_state; /* initial state for a new connection */ 
    20482579    socklen_t addrlen; 
    20492580    struct sockaddr addr; 
     
    20782609                break; 
    20792610            } 
    2080             dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, 
    2081                                      DATA_BUFFER_SIZE, ascii_prot); 
     2611            init_state = get_init_state(c); 
     2612 
     2613            dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST, 
     2614                                     DATA_BUFFER_SIZE, c->protocol); 
     2615 
    20822616            break; 
    20832617 
     
    20992633            break; 
    21002634 
     2635        case conn_bin_init: /* Reinitialize a binary connection */ 
     2636            c->rlbytes = MIN_BIN_PKT_LENGTH; 
     2637            c->write_and_go = conn_bin_init; 
     2638            c->cmd = -1; 
     2639            c->substate = bin_no_state; 
     2640            c->rbytes = c->wbytes = 0; 
     2641            c->wcurr = c->wbuf; 
     2642            c->rcurr = c->rbuf; 
     2643            c->ritem = (char*)c->bin_header; 
     2644            conn_set_state(c, conn_nread); 
     2645            conn_shrink(c); 
     2646            break; 
     2647 
    21012648        case conn_nread: 
    2102             /* we are reading rlbytes into ritem; */ 
    21032649            if (c->rlbytes == 0) { 
    21042650                complete_nread(c); 
     
    21492695            /* we are reading sbytes and throwing them away */ 
    21502696            if (c->sbytes == 0) { 
    2151                 conn_set_state(c, conn_read); 
     2697                conn_set_init_state(c); 
    21522698                break; 
    21532699            } 
     
    22292775                        c->suffixleft--; 
    22302776                    } 
    2231                     conn_set_state(c, conn_read); 
     2777                    /* XXX:  I don't know why this wasn't the general case */ 
     2778                    if(c->protocol == binary_prot) { 
     2779                        conn_set_state(c, c->write_and_go); 
     2780                    } else { 
     2781                        conn_set_init_state(c); 
     2782                    } 
    22322783                } else if (c->state == conn_write) { 
    22332784                    if (c->write_and_free) { 
     
    24553006/* listening socket */ 
    24563007static int l_socket = 0; 
     3008/* Listening socket for the binary protocol */ 
     3009static int bl_socket = -1; 
    24573010 
    24583011/* udp socket */ 
     
    24633016    int i; 
    24643017    if (l_socket > -1) close(l_socket); 
     3018    if (bl_socket > -1) close(bl_socket); 
    24653019    if (u_socket > -1) close(u_socket); 
    24663020    for (i = 3; i <= 500; i++) close(i); /* so lame */ 
     
    25453099    printf("-p <num>      TCP port number to listen on (default: 11211)\n" 
    25463100           "-U <num>      UDP port number to listen on (default: 0, off)\n" 
     3101           "-B <num>      binary protocol TCP port number to listen on (default: 0, off)\n" 
    25473102           "-s <file>     unix socket path to listen on (disables network support)\n" 
    25483103           "-a <mask>     access mask for unix socket, in octal (default 0700)\n" 
     
    26953250 
    26963251    /* process arguments */ 
    2697     while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { 
     3252    while ((c = getopt(argc, argv, "a:bp:B:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { 
    26983253        switch (c) { 
    26993254        case 'a': 
     
    27103265        case 'p': 
    27113266            settings.port = atoi(optarg); 
     3267            break; 
     3268        case 'B': 
     3269            settings.binport = atoi(optarg); 
    27123270            break; 
    27133271        case 's': 
     
    28503408            exit(EXIT_FAILURE); 
    28513409        } 
     3410        /* Try the binary port. */ 
     3411        if(settings.binport > 0) { 
     3412            bl_socket = server_socket(settings.binport, 0); 
     3413            if (bl_socket == -1) { 
     3414                 fprintf(stderr, "failed to listen to binary protocol\n"); 
     3415                    exit(EXIT_FAILURE); 
     3416            } 
     3417        } 
    28523418    } 
    28533419 
     
    29423508    /* create the initial listening connection */ 
    29433509    if (!(listen_conn = conn_new(l_socket, conn_listening, 
    2944                                  EV_READ | EV_PERSIST, 1, false, main_base))) { 
     3510                                 EV_READ | EV_PERSIST, 1, ascii_prot, 
     3511                                 main_base))) { 
    29453512        fprintf(stderr, "failed to create listening connection"); 
    29463513        exit(EXIT_FAILURE); 
    29473514    } 
     3515 
     3516    /* Same for binary protocol */ 
     3517    if (bl_socket > 0) { 
     3518        if (!(bin_listen_conn = conn_new(bl_socket, conn_listening, 
     3519                                 EV_READ | EV_PERSIST, 1, binary_prot, 
     3520                                 main_base))) { 
     3521            fprintf(stderr, "failed to create listening connection"); 
     3522            exit(EXIT_FAILURE); 
     3523        } 
     3524    } 
     3525 
    29483526    /* start up worker threads if MT mode */ 
    29493527    thread_init(settings.num_threads, main_base); 
  • branches/binary/server/memcached.h

    r672 r673  
    3838#define IOV_LIST_HIGHWAT 600 
    3939#define MSG_LIST_HIGHWAT 100 
     40 
     41/* Binary protocol stuff */ 
     42#define MIN_BIN_PKT_LENGTH 12 
     43/* flags:32, expiration:32 */ 
     44#define BIN_SET_HDR_LEN 8 
     45/* incr:64, initial:64, expiration:32 */ 
     46#define BIN_INCR_HDR_LEN 20 
     47/* timeout:32 */ 
     48#define BIN_DEL_HDR_LEN 4 
     49#define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t)) 
     50 
     51#define BIN_REQ_MAGIC 0x0f 
     52#define BIN_RES_MAGIC 0xf0 
     53 
     54#define CMD_GET 0 
     55#define CMD_SET 1 
     56#define CMD_ADD 2 
     57#define CMD_REPLACE 3 
     58#define CMD_DELETE 4 
     59#define CMD_INCR 5 
     60#define CMD_QUIT 6 
     61#define CMD_FLUSH 7 
     62#define CMD_GETQ 8 
     63#define CMD_NOOP 9 
     64#define CMD_VERSION 10 
     65 
     66#define ERR_UNKNOWN_CMD 0x81 
     67#define ERR_OUT_OF_MEMORY 0x82 
     68 
     69#define ERR_NOT_FOUND 0x1 
     70#define ERR_EXISTS 0x2 
     71#define ERR_TOO_LARGE 0x3 
     72#define ERR_INVALID_ARGUMENTS 0x4 
     73#define ERR_NOT_STORED 0x5 
    4074 
    4175/* Get a consistent bool type */ 
     
    84118    int port; 
    85119    int udpport; 
     120    int binport;           /* Port for binary protocol. */ 
    86121    struct in_addr interf; 
    87122    int verbose; 
     
    141176    conn_closing,    /** closing this connection */ 
    142177    conn_mwrite,     /** writing out many items sequentially */ 
     178    conn_bin_init,   /** Reinitializing a binary protocol connection */ 
     179}; 
     180 
     181enum bin_substates { 
     182    bin_no_state, 
     183    bin_reading_set_header, 
     184    bin_read_set_value, 
     185    bin_reading_get_key, 
     186    bin_reading_del_header, 
     187    bin_reading_incr_header, 
    143188}; 
    144189 
    145190enum protocols { 
    146191    ascii_prot = 3, /* arbitrary value. */ 
    147     ascii_udp_prot 
     192    ascii_udp_prot, 
     193    binary_prot 
    148194}; 
    149195 
     
    160206    int    sfd; 
    161207    int    state; 
     208    int    substate; 
    162209    struct event event; 
    163210    short  ev_flags; 
     
    227274                         a managed instance. -1 (_not_ 0) means invalid. */ 
    228275    int    gen;       /* generation requested for the bucket */ 
     276 
     277    /* Binary protocol stuff */ 
     278    /* This is where the binary header goes */ 
     279    uint32_t bin_header[MIN_BIN_PKT_LENGTH/sizeof(uint32_t)]; 
     280    short cmd; 
     281    int opaque; 
     282    int keylen; 
     283 
    229284} conn; 
    230285