Changeset 658
- Timestamp:
- 12/03/07 08:17:24 (1 year ago)
- Files:
-
- branches/binary-1.3.0 (copied) (copied from trunk/server)
- branches/binary-1.3.0/memcached.c (modified) (41 diffs)
- branches/binary-1.3.0/memcached.h (modified) (8 diffs)
- branches/binary-1.3.0/thread.c (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/binary-1.3.0/memcached.c
r654 r658 23 23 #include <sys/resource.h> 24 24 #include <sys/uio.h> 25 #include <ctype.h> 25 26 26 27 /* some POSIX systems need the following definition … … 64 65 */ 65 66 static void drive_machine(conn *c); 66 static int new_socket(const bool is_udp);67 static int server_socket(const int port, const bool is_udp);67 static int new_socket(const int prot); 68 static int server_socket(const int port, const int prot); 68 69 static int try_read_command(conn *c); 69 70 static int try_read_network(conn *c); 70 71 static int try_read_udp(conn *c); 72 static void conn_set_state(conn *, int); 71 73 72 74 /* stats */ … … 107 109 static int deltotal; 108 110 static conn *listen_conn; 111 static conn *bin_listen_conn; 109 112 static struct event_base *main_base; 110 113 … … 229 232 c->msgused++; 230 233 231 if ( c->udp) {234 if (IS_UDP(c->protocol)) { 232 235 /* Leave room for the UDP header, which we'll fill in later. */ 233 236 return add_iov(c, NULL, UDP_HEADER_SIZE); … … 294 297 295 298 conn *conn_new(const int sfd, const int init_state, const int event_flags, 296 const int read_buffer_size, const bool is_udp, struct event_base *base) { 299 const int read_buffer_size, const int prot, 300 struct event_base *base) { 297 301 conn *c = conn_from_freelist(); 298 302 … … 342 346 } 343 347 348 /* unix socket mode doesn't need this, so zeroed out. but why 349 * is this done for every command? presumably for UDP 350 * mode. */ 351 if (!settings.socketpath) { 352 c->request_addr_size = sizeof(c->request_addr); 353 } else { 354 c->request_addr_size = 0; 355 } 356 344 357 if (settings.verbose > 1) { 345 if (init_state == conn_listening) 358 if (init_state == conn_listening) { 346 359 fprintf(stderr, "<%d server listening\n", sfd); 347 else if (is_udp)360 } else if (IS_UDP(prot)) { 348 361 fprintf(stderr, "<%d server listening (udp)\n", sfd); 349 else 350 fprintf(stderr, "<%d new client connection\n", sfd); 362 } else if (prot == binary_prot) { 363 fprintf(stderr, "<%d new binary client connection\n", sfd); 364 } else if (prot == ascii_prot) { 365 fprintf(stderr, "<%d new ascii client connection\n", sfd); 366 } else { 367 fprintf(stderr, "<%d new unknown (%d) client connection\n", 368 sfd, prot); 369 abort(); 370 } 351 371 } 352 372 353 373 c->sfd = sfd; 354 c-> udp = is_udp;374 c->protocol = prot; 355 375 c->state = init_state; 356 376 c->rlbytes = 0; 377 c->cmd = -1; 357 378 c->rbytes = c->wbytes = 0; 358 379 c->wcurr = c->wbuf; … … 367 388 c->msgused = 0; 368 389 369 c->write_and_go = conn_read;390 c->write_and_go = init_state; 370 391 c->write_and_free = 0; 371 392 c->item = 0; … … 468 489 } 469 490 491 static int get_init_state(conn *c) { 492 int rv=0; 493 assert(c != NULL); 494 495 switch(c->protocol) { 496 case binary_prot: 497 rv=conn_bin_init; 498 break; 499 default: 500 rv=conn_read; 501 } 502 return rv; 503 } 504 505 /* Set the given connection to its initial state. The initial state will vary 506 * base don protocol type. */ 507 static void conn_set_init_state(conn *c) { 508 assert(c != NULL); 509 510 conn_set_state(c, get_init_state(c)); 511 } 470 512 471 513 /* … … 480 522 assert(c != NULL); 481 523 482 if ( c->udp)524 if (IS_UDP(c->protocol)) 483 525 return; 484 526 … … 652 694 * UDP_MAX_PAYLOAD_SIZE bytes. 653 695 */ 654 limit_to_mtu = c->udp|| (1 == c->msgused);696 limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused); 655 697 656 698 /* We may need to start a new msghdr if this one is full. */ … … 749 791 750 792 conn_set_state(c, conn_write); 751 c->write_and_go = conn_read;793 c->write_and_go = get_init_state(c); 752 794 return; 753 795 } … … 757 799 * has been stored in c->item_comm, and the item is ready in c->item. 758 800 */ 759 760 static void complete_nread(conn *c) { 801 static void complete_nread_ascii(conn *c) { 761 802 assert(c != NULL); 762 803 … … 785 826 item_remove(c->item); /* release the c->item reference */ 786 827 c->item = 0; 828 } 829 830 static void add_bin_header(conn *c, int err, int body_len) { 831 int i=0; 832 uint32_t res_header[BIN_PKT_HDR_WORDS]; 833 834 assert(c); 835 assert(body_len >= 0); 836 837 c->msgcurr = 0; 838 c->msgused = 0; 839 c->iovused = 0; 840 if (add_msghdr(c) != 0) { 841 /* XXX: out_string is inappropriate here */ 842 out_string(c, "SERVER_ERROR out of memory"); 843 return; 844 } 845 846 res_header[0] = BIN_RES_MAGIC << 24; 847 res_header[0] |= ((0xff & c->cmd) << 16); 848 res_header[0] |= (err << 8); 849 850 res_header[1] = c->opaque; 851 res_header[2] = body_len; 852 853 if(settings.verbose > 1) { 854 fprintf(stderr, "Writing bin response: %08x %08x %08x\n", 855 res_header[0], res_header[1], res_header[2]); 856 } 857 858 for(i=0; i<BIN_PKT_HDR_WORDS; i++) { 859 res_header[i] = htonl(res_header[i]); 860 } 861 862 assert(c->wsize >= MIN_BIN_PKT_LENGTH); 863 memcpy(c->wbuf, &res_header, MIN_BIN_PKT_LENGTH); 864 add_iov(c, c->wbuf, MIN_BIN_PKT_LENGTH); 865 } 866 867 static void write_bin_error(conn *c, int err, int swallow) { 868 char *errstr="Unknown error"; 869 switch(err) { 870 case ERR_UNKNOWN_CMD: 871 errstr="Unknown command"; 872 break; 873 case ERR_NOT_FOUND: 874 errstr="Not found"; 875 break; 876 case ERR_INVALID_ARGUMENTS: 877 errstr="Invalid arguments"; 878 break; 879 case ERR_EXISTS: 880 errstr="Data exists for key."; 881 break; 882 case ERR_TOO_LARGE: 883 errstr="Too large."; 884 break; 885 case ERR_NOT_STORED: 886 errstr="Not stored."; 887 break; 888 default: 889 errstr="UNHANDLED ERROR"; 890 fprintf(stderr, "UNHANDLED ERROR: %d\n", err); 891 } 892 if(settings.verbose > 0) { 893 fprintf(stderr, "Writing an error: %s\n", errstr); 894 } 895 add_bin_header(c, err, strlen(errstr)); 896 add_iov(c, errstr, strlen(errstr)); 897 898 conn_set_state(c, conn_mwrite); 899 if(swallow > 0) { 900 c->sbytes=swallow; 901 c->write_and_go = conn_swallow; 902 } else { 903 c->write_and_go = conn_bin_init; 904 } 905 } 906 907 /* Form and send a response to a command over the binary protocol */ 908 static void write_bin_response(conn *c, void *d, int dlen) { 909 add_bin_header(c, 0, dlen); 910 if(dlen > 0) { 911 add_iov(c, d, dlen); 912 } 913 conn_set_state(c, conn_mwrite); 914 c->write_and_go = conn_bin_init; 915 } 916 917 /* Byte swap a 64-bit number */ 918 static int64_t swap64(int64_t in) { 919 #ifdef ENDIAN_LITTLE 920 /* Little endian, flip the bytes around until someone makes a faster/better 921 * way to do this. */ 922 int64_t rv=0; 923 int i=0; 924 for(i=0; i<8; i++) { 925 rv = (rv << 8) | (in & 0xff); 926 in >>= 8; 927 } 928 return rv; 929 #else 930 /* big-endian machines don't need byte swapping */ 931 return in; 932 #endif 933 } 934 935 static void complete_incr_bin(conn *c) { 936 item *it; 937 int64_t delta; 938 uint64_t initial; 939 int32_t exptime; 940 char *key; 941 size_t nkey; 942 int i,res; 943 char *responseBuf = c->wbuf + BIN_INCR_HDR_LEN; 944 945 assert(c != NULL); 946 947 key=c->rbuf + BIN_INCR_HDR_LEN; 948 nkey=c->keylen; 949 key[nkey]=0x00; 950 951 delta = swap64(*((int64_t*)(c->rbuf))); 952 initial = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8))); 953 exptime = ntohl(*((int*)(c->rbuf + 16))); 954 955 if(settings.verbose) { 956 fprintf(stderr, "incr "); 957 for(i=0; i<nkey; i++) { 958 fprintf(stderr, "%c", key[i]); 959 } 960 fprintf(stderr, " %lld, %llu, %d\n", delta, initial, exptime); 961 } 962 963 /* XXX: Not sure what to do with these yet 964 if (settings.managed) { 965 int bucket = c->bucket; 966 if (bucket == -1) { 967 out_string(c, "CLIENT_ERROR no BG data in managed mode"); 968 return; 969 } 970 c->bucket = -1; 971 if (buckets[bucket] != c->gen) { 972 out_string(c, "ERROR_NOT_OWNER"); 973 return; 974 } 975 } 976 */ 977 978 it = item_get(key, nkey); 979 if (it) { 980 /* Weird magic in add_delta forces me to pad here */ 981 memset(responseBuf, ' ', 32); 982 responseBuf[32]=0x00; 983 add_delta(it, true, delta, responseBuf); 984 res = strlen(responseBuf); 985 986 assert(res > 0); 987 write_bin_response(c, responseBuf, res); 988 item_remove(it); /* release our reference */ 989 } else { 990 if(exptime >= 0) { 991 /* Save some room for the response */ 992 assert(c->wsize > BIN_INCR_HDR_LEN + 32); 993 snprintf(responseBuf, BIN_INCR_HDR_LEN + 32, "%llu", initial); 994 995 res = strlen(responseBuf); 996 it = item_alloc(key, nkey, 0, realtime(exptime), res + 2); 997 998 memcpy(ITEM_data(it), responseBuf, res); 999 1000 if(store_item(it, NREAD_SET)) { 1001 write_bin_response(c, responseBuf, res); 1002 } else { 1003 write_bin_error(c, ERR_NOT_STORED, 0); 1004 } 1005 item_remove(it); /* release our reference */ 1006 } else { 1007 write_bin_error(c, ERR_NOT_FOUND, 0); 1008 } 1009 } 1010 } 1011 1012 static void complete_update_bin(conn *c) { 1013 int eno=-1, ret=0; 1014 assert(c != NULL); 1015 1016 item *it = c->item; 1017 1018 STATS_LOCK(); 1019 stats.set_cmds++; 1020 STATS_UNLOCK(); 1021 1022 /* We don't actually receive the trailing two characters in the bin 1023 * protocol, so we're going to just set them here */ 1024 *(ITEM_data(it) + it->nbytes - 2) = '\r'; 1025 *(ITEM_data(it) + it->nbytes - 1) = '\n'; 1026 1027 switch (store_item(it, c->item_comm)) { 1028 case 1: 1029 /* Stored */ 1030 write_bin_response(c, NULL, 0); 1031 break; 1032 case 2: 1033 write_bin_error(c, ERR_EXISTS, 0); 1034 break; 1035 case 3: 1036 write_bin_error(c, ERR_NOT_FOUND, 0); 1037 break; 1038 default: 1039 if(c->item_comm == NREAD_ADD) { 1040 eno=ERR_EXISTS; 1041 } else if(c->item_comm == NREAD_REPLACE) { 1042 eno=ERR_NOT_FOUND; 1043 } else { 1044 eno=ERR_NOT_STORED; 1045 } 1046 write_bin_error(c, eno, 0); 1047 } 1048 1049 item_remove(c->item); /* release the c->item reference */ 1050 c->item = 0; 1051 } 1052 1053 static void process_bin_get(conn *c) { 1054 item *it; 1055 1056 it = item_get(c->rbuf, c->keylen); 1057 if (it) { 1058 int *flags; 1059 1060 assert(c->rsize >= MIN_BIN_PKT_LENGTH + 4); 1061 1062 /* This is a bit of magic. I'm using wbuf as the header, so I'll place 1063 this is int in far enough to cover the header */ 1064 flags=(int*)(c->wbuf + MIN_BIN_PKT_LENGTH); 1065 *flags=htonl(strtoul(ITEM_suffix(it), NULL, 10)); 1066 1067 /* the length has two unnecessary bytes, and then we write four more */ 1068 add_bin_header(c, 0, it->nbytes - 2 + 4 + (c->cmd == CMD_GETS?8:0)); 1069 /* Flags */ 1070 add_iov(c, flags, 4); 1071 /* if it's a gets, add the identifier */ 1072 if(c->cmd == CMD_GETS) { 1073 uint64_t* identifier; 1074 identifier=(uint64_t*)(c->wbuf + MIN_BIN_PKT_LENGTH + 4); 1075 *identifier=swap64((uint32_t)it->cas_id); 1076 add_iov(c, identifier, 8); 1077 } 1078 /* bytes minus the CRLF */ 1079 add_iov(c, ITEM_data(it), it->nbytes - 2); 1080 conn_set_state(c, conn_mwrite); 1081 } else { 1082 if(c->cmd == CMD_GETQ) { 1083 conn_set_state(c, conn_bin_init); 1084 } else { 1085 write_bin_error(c, ERR_NOT_FOUND, 0); 1086 } 1087 } 1088 } 1089 1090 static void bin_read_key(conn *c, int next_substate, int extra) { 1091 assert(c); 1092 c->substate = next_substate; 1093 c->rlbytes = c->keylen + extra; 1094 assert(c->rsize >= c->rlbytes); 1095 c->ritem = c->rbuf; 1096 conn_set_state(c, conn_nread); 1097 } 1098 1099 static void dispatch_bin_command(conn *c) { 1100 time_t exptime = 0; 1101 switch(c->cmd) { 1102 case CMD_VERSION: 1103 write_bin_response(c, VERSION, strlen(VERSION)); 1104 break; 1105 case CMD_FLUSH: 1106 set_current_time(); 1107 1108 settings.oldest_live = current_time - 1; 1109 item_flush_expired(); 1110 write_bin_response(c, NULL, 0); 1111 break; 1112 case CMD_NOOP: 1113 write_bin_response(c, NULL, 0); 1114 break; 1115 case CMD_SET: 1116 /* Fallthrough */ 1117 case CMD_ADD: 1118 /* Fallthrough */ 1119 case CMD_REPLACE: 1120 bin_read_key(c, bin_reading_set_header, BIN_SET_HDR_LEN); 1121 break; 1122 case CMD_CAS: 1123 bin_read_key(c, bin_reading_cas_header, BIN_CAS_HDR_LEN); 1124 break; 1125 case CMD_GETS: 1126 case CMD_GETQ: 1127 case CMD_GET: 1128 bin_read_key(c, bin_reading_get_key, 0); 1129 break; 1130 case CMD_DELETE: 1131 bin_read_key(c, bin_reading_del_header, BIN_DEL_HDR_LEN); 1132 break; 1133 case CMD_INCR: 1134 bin_read_key(c, bin_reading_incr_header, BIN_INCR_HDR_LEN); 1135 break; 1136 default: 1137 write_bin_error(c, ERR_UNKNOWN_CMD, c->bin_header[2]); 1138 } 1139 } 1140 1141 static void process_bin_update(conn *c, bool cas) { 1142 char *key; 1143 int nkey; 1144 int vlen; 1145 int flags; 1146 int exptime; 1147 item *it; 1148 int comm; 1149 int hdrlen=cas ? BIN_CAS_HDR_LEN : BIN_SET_HDR_LEN; 1150 1151 assert(c != NULL); 1152 1153 key=c->rbuf + hdrlen; 1154 nkey=c->keylen; 1155 key[nkey]=0x00; 1156 1157 flags = ntohl(*((int*)(c->rbuf))); 1158 exptime = ntohl(*((int*)(c->rbuf + 4))); 1159 vlen = c->bin_header[2] - (nkey + hdrlen); 1160 1161 if(settings.verbose) { 1162 fprintf(stderr, "Value len is %d\n", vlen); 1163 } 1164 1165 if (settings.detail_enabled) { 1166 stats_prefix_record_set(key); 1167 } 1168 1169 /* Not sure what to do with this. 1170 if (settings.managed) { 1171 int bucket = c->bucket; 1172 if (bucket == -1) { 1173 out_string(c, "CLIENT_ERROR no BG data in managed mode"); 1174 return; 1175 } 1176 c->bucket = -1; 1177 if (buckets[bucket] != c->gen) { 1178 out_string(c, "ERROR_NOT_OWNER"); 1179 return; 1180 } 1181 } 1182 */ 1183 1184 it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2); 1185 if(cas) { 1186 it->cas_id = (uint64_t)swap64(*((int64_t*)(c->rbuf + 8))); 1187 } 1188 1189 if (it == 0) { 1190 if (! item_size_ok(nkey, flags, vlen + 2)) { 1191 write_bin_error(c, ERR_TOO_LARGE, vlen); 1192 } else { 1193 write_bin_error(c, ERR_OUT_OF_MEMORY, vlen); 1194 } 1195 /* swallow the data line */ 1196 c->write_and_go = conn_swallow; 1197 return; 1198 } 1199 1200 switch(c->cmd) { 1201 case CMD_ADD: 1202 c->item_comm = NREAD_ADD; 1203 break; 1204 case CMD_CAS: 1205 c->item_comm = NREAD_CAS; 1206 break; 1207 case CMD_SET: 1208 c->item_comm = NREAD_SET; 1209 break; 1210 case CMD_REPLACE: 1211 c->item_comm = NREAD_REPLACE; 1212 break; 1213 default: 1214 assert(0); 1215 } 1216 1217 c->item = it; 1218 c->ritem = ITEM_data(it); 1219 c->rlbytes = vlen; 1220 conn_set_state(c, conn_nread); 1221 c->substate = bin_read_set_value; 1222 } 1223 1224 static void process_bin_delete(conn *c) { 1225 char *key; 1226 size_t nkey; 1227 item *it; 1228 time_t exptime = 0; 1229 1230 assert(c != NULL); 1231 1232 /* XXX: I don't know what to do with this yet 1233 if (settings.managed) { 1234 int bucket = c->bucket; 1235 if (bucket == -1) { 1236 out_string(c, "CLIENT_ERROR no BG data in managed mode"); 1237 return; 1238 } 1239 c->bucket = -1; 1240 if (buckets[bucket] != c->gen) { 1241 out_string(c, "ERROR_NOT_OWNER"); 1242 return; 1243 } 1244 } 1245 */ 1246 1247 exptime = ntohl(*((int*)(c->rbuf))); 1248 key = c->rbuf + 4; 1249 nkey = c->keylen; 1250 key[nkey]=0x00; 1251 1252 if(settings.verbose) { 1253 fprintf(stderr, "Deleting %s with a timeout of %d\n", key, exptime); 1254 } 1255 1256 if (settings.detail_enabled) { 1257 stats_prefix_record_delete(key); 1258 } 1259 1260 it = item_get(key, nkey); 1261 if (it) { 1262 if (exptime == 0) { 1263 item_unlink(it); 1264 item_remove(it); /* release our reference */ 1265 write_bin_response(c, NULL, 0); 1266 } else { 1267 /* XXX: This is really lame, but defer_delete returns a string */ 1268 char *res=defer_delete(it, exptime); 1269 if(res[0] == 'D') { 1270 write_bin_response(c, NULL, 0); 1271 } else { 1272 write_bin_error(c, ERR_OUT_OF_MEMORY, 0); 1273 } 1274 } 1275 } else { 1276 write_bin_error(c, ERR_NOT_FOUND, 0); 1277 } 1278 } 1279 1280 static void complete_nread_binary(conn *c) { 1281 assert(c != NULL); 1282 1283 if(c->cmd < 0) { 1284 /* No command defined. Figure out what they're trying to say. */ 1285 int i=0; 1286 /* I did a bit of hard-coding around the packet sizes */ 1287 assert(BIN_PKT_HDR_WORDS == 3); 1288 for(i=0; i<BIN_PKT_HDR_WORDS; i++) { 1289 c->bin_header[i] = ntohl(c->bin_header[i]); 1290 } 1291 if(settings.verbose) { 1292 fprintf(stderr, "Read binary protocol data: %08x %08x %08x\n", 1293 c->bin_header[0], c->bin_header[1], c->bin_header[2]); 1294 } 1295 if((c->bin_header[0] >> 24) != BIN_REQ_MAGIC) { 1296 if(settings.verbose) { 1297 fprintf(stderr, "Invalid magic: %x\n", c->bin_header[0] >> 24); 1298 } 1299 conn_set_state(c, conn_closing); 1300 return; 1301 } 1302 1303 c->msgcurr = 0; 1304 c->msgused = 0; 1305 c->iovused = 0; 1306 if (add_msghdr(c) != 0) { 1307 out_string(c, "SERVER_ERROR out of memory"); 1308 return; 1309 } 1310 1311 c->cmd = (c->bin_header[0] >> 16) & 0xff; 1312 c->keylen = (c->bin_header[0] >> 8) & 0xff; 1313 c->opaque = c->bin_header[1]; 1314 if(settings.verbose > 1) { 1315 fprintf(stderr, 1316 "Command: %d, opaque=%08x, keylen=%d, total_len=%d\n", c->cmd, 1317 c->opaque, c->keylen, c->bin_header[2]); 1318 } 1319 dispatch_bin_command(c); 1320 } else { 1321 switch(c->substate) { 1322 case bin_reading_set_header: 1323 process_bin_update(c, false); 1324 break; 1325 case bin_reading_cas_header: 1326 process_bin_update(c, true); 1327 break; 1328 case bin_read_set_value: 1329 complete_update_bin(c); 1330 break; 1331 case bin_reading_get_key: 1332 process_bin_get(c); 1333 break; 1334 case bin_reading_del_header: 1335 process_bin_delete(c); 1336 break; 1337 case bin_reading_incr_header: 1338 complete_incr_bin(c); 1339 break; 1340 default: 1341 fprintf(stderr, "Not handling substate %d\n", c->substate); 1342 assert(0); 1343 } 1344 } 1345 } 1346 1347 static void complete_nread(conn *c) { 1348 assert(c != NULL); 1349 1350 if(c->protocol == ascii_prot) { 1351 complete_nread_ascii(c); 1352 } else if(c->protocol == binary_prot) { 1353 complete_nread_binary(c); 1354 } else { 1355 assert(0); /* XXX: Invalid case. Should probably do more here. */ 1356 } 787 1357 } 788 1358 … … 826 1396 do_item_replace(old_it, it); 827 1397 stored = 1; 828 } 829 else 830 { 1398 } else { 1399 if(settings.verbose > 1) { 1400 fprintf(stderr, "CAS: failure: expected %llu, got %llu\n", 1401 old_it->cas_id, it->cas_id); 1402 } 831 1403 stored = 2; 832 1404 } … … 962 1534 c->wbytes = bytes; 963 1535 conn_set_state(c, conn_write); 964 c->write_and_go = conn_read;1536 c->write_and_go = get_init_state(c); 965 1537 } else { 966 1538 out_string(c, "SERVER_ERROR out of memory"); … … 1319 1891 */ 1320 1892 if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0 1321 || ( c->udp&& build_udp_headers(c) != 0)) {1893 || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 1322 1894 out_string(c, "SERVER_ERROR out of memory"); 1323 1895 } … … 1483 2055 value += delta; 1484 2056 else { 1485 if (delta >= value) value = 0; 1486 else value -= delta; 2057 value -= delta; 2058 } 2059 if(value < 0) { 2060 value=0; 1487 2061 } 1488 2062 sprintf(buf, "%llu", value); … … 1718 2292 c->gen = gen; 1719 2293 } 1720 conn_set_ state(c, conn_read);2294 conn_set_init_state(c); 1721 2295 return; 1722 2296 } else { … … 1901 2475 } 1902 2476 1903 /* unix socket mode doesn't need this, so zeroed out. but why1904 * is this done for every command? presumably for UDP1905 * mode. */1906 if (!settings.socketpath) {1907 c->request_addr_size = sizeof(c->request_addr);1908 } else {1909 c->request_addr_size = 0;1910 }1911 1912 2477 res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); 1913 2478 if (res > 0) { … … 1965 2530 } 1966 2531 } 1967 1968 2532 1969 2533 /* … … 2024 2588 perror("Failed to write, and not due to blocking"); 2025 2589 2026 if ( c->udp)2590 if (IS_UDP(c->protocol)) 2027 2591 conn_set_state(c, conn_read); 2028 2592 else … … 2037 2601 bool stop = false; 2038 2602 int sfd, flags = 1; 2603 int init_state; /* initial state for a new connection */ 2039 2604 socklen_t addrlen; 2040 2605 struct sockaddr addr; … … 2069 2634 break; 2070 2635 } 2071 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, 2072 DATA_BUFFER_SIZE, false); 2636 init_state = get_init_state(c); 2637 2638 dispatch_conn_new(sfd, init_state, EV_READ | EV_PERSIST, 2639 DATA_BUFFER_SIZE, c->protocol); 2640 2073 2641 break; 2074 2642 … … 2077 2645 continue; 2078 2646 } 2079 if (( c->udp? try_read_udp(c) : try_read_network(c)) != 0) {2647 if ((IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c)) != 0) { 2080 2648 continue; 2081 2649 } … … 2090 2658 break; 2091 2659 2660 case conn_bin_init: /* Reinitialize a binary connection */ 2661 c->rlbytes = MIN_BIN_PKT_LENGTH; 2662 c->write_and_go = conn_bin_init; 2663 c->cmd = -1; 2664 c->substate = bin_no_state; 2665 c->rbytes = c->wbytes = 0; 2666 c->wcurr = c->wbuf; 2667 c->rcurr = c->rbuf; 2668 c->ritem = (char*)c->bin_header; 2669 conn_set_state(c, conn_nread); 2670 conn_shrink(c); 2671 break; 2672 2092 2673 case conn_nread: 2093 /* we are reading rlbytes into ritem; */2094 2674 if (c->rlbytes == 0) { 2095 2675 complete_nread(c); … … 2140 2720 /* we are reading sbytes and throwing them away */ 2141 2721 if (c->sbytes == 0) { 2142 conn_set_ state(c, conn_read);2722 conn_set_init_state(c); 2143 2723 break; 2144 2724 } … … 2188 2768 * list for TCP or a two-entry list for UDP). 2189 2769 */ 2190 if (c->iovused == 0 || ( c->udp&& c->iovused == 1)) {2770 if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) { 2191 2771 if (add_iov(c, c->wcurr, c->wbytes) != 0 || 2192 ( c->udp&& build_udp_headers(c) != 0)) {2772 (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { 2193 2773 if (settings.verbose > 0) 2194 2774 fprintf(stderr, "Couldn't build response\n"); … … 2220 2800 c->suffixleft--; 2221 2801 } 2222 conn_set_state(c, conn_read); 2802 /* XXX: I don't know why this wasn't the general case */ 2803 if(c->protocol == binary_prot) { 2804 conn_set_state(c, c->write_and_go); 2805 } else { 2806 conn_set_init_state(c); 2807 } 2223 2808 } else if (c->state == conn_write) { 2224 2809 if (c->write_and_free) { … … 2245 2830 2246 2831 case conn_closing: 2247 if ( c->udp)2832 if (IS_UDP(c->protocol)) 2248 2833 conn_cleanup(c); 2249 2834 else … … 2279 2864 } 2280 2865 2281 static int new_socket(const bool is_udp) {2866 static int new_socket(const int prot) { 2282 2867 int sfd; 2283 2868 int flags; 2284 2869 2285 if ((sfd = socket(AF_INET, is_udp? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {2870 if ((sfd = socket(AF_INET, IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { 2286 2871 perror("socket()"); 2287 2872 return -1; … … 2333 2918 2334 2919 2335 static int server_socket(const int port, const bool is_udp) {2920 static int server_socket(const int port, const int prot) { 2336 2921 int sfd; 2337 2922 struct linger ling = {0, 0}; … … 2339 2924 int flags =1; 2340 2925 2341 if ((sfd = new_socket( is_udp)) == -1) {2926 if ((sfd = new_socket(prot)) == -1) { 2342 2927 return -1; 2343 2928 } 2344 2929 2345 2930 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); 2346 if ( is_udp) {2931 if (IS_UDP(prot)) { 2347 2932 maximize_sndbuf(sfd); 2348 2933 } else { … … 2366 2951 return -1; 2367 2952 } 2368 if (! is_udp&& listen(sfd, 1024) == -1) {2953 if (!IS_UDP(prot) && listen(sfd, 1024) == -1) { 2369 2954 perror("listen()"); 2370 2955 close(sfd); … … 2446 3031 /* listening socket */ 2447 3032 static int l_socket = 0; 3033 /* Listening socket for the binary protocol */ 3034 static int bl_socket = -1; 2448 3035 2449 3036 /* udp socket */ … … 2454 3041 int i; 2455 3042 if (l_socket > -1) close(l_socket); 3043 if (bl_socket > -1) close(bl_socket); 2456 3044 if (u_socket > -1) close(u_socket); 2457 3045 for (i = 3; i <= 500; i++) close(i); /* so lame */ … … 2536 3124 printf("-p <num> TCP port number to listen on (default: 11211)\n" 2537 3125 "-U <num> UDP port number to listen on (default: 0, off)\n" 3126 "-B <num> binary protocol TCP port number to listen on (default: 0, off)\n" 2538 3127 "-s <file> unix socket path to listen on (disables network support)\n" 2539 3128 "-a <mask> access mask for unix socket, in octal (default 0700)\n" … … 2686 3275 2687 3276 /* process arguments */ 2688 while ((c = getopt(argc, argv, "a:bp: s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {3277 while ((c = getopt(argc, argv, "a:bp:B:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { 2689 3278 switch (c) { 2690 3279 case 'a': … … 2701 3290 case 'p': 2702 3291 settings.port = atoi(optarg); 3292 break; 3293 case 'B': 3294 settings.binport = atoi(optarg); 2703 3295 break; 2704 3296 case 's': … … 2841 3433 exit(EXIT_FAILURE); 2842 3434 } 3435 /* Try the binary port. */ 3436 if(settings.binport > 0) { 3437 bl_socket = server_socket(settings.binport, 0); 3438 if (bl_socket == -1) { 3439 fprintf(stderr, "failed to listen to binary protocol\n"); 3440 exit(EXIT_FAILURE); 3441 } 3442 } 2843 3443 } 2844 3444 … … 2933 3533 /* create the initial listening connection */ 2934 3534 if (!(listen_conn = conn_new(l_socket, conn_listening, 2935 EV_READ | EV_PERSIST, 1, false, main_base))) { 3535 EV_READ | EV_PERSIST, 1, ascii_prot, 3536 main_base))) { 2936 3537 fprintf(stderr, "failed to create listening connection"); 2937 3538 exit(EXIT_FAILURE); 2938 3539 } 3540 3541 /* Same for binary protocol */ 3542 if (bl_socket > 0) { 3543 if (!(bin_listen_conn = conn_new(bl_socket, conn_listening, 3544 EV_READ | EV_PERSIST, 1, binary_prot, 3545 main_base))) { 3546 fprintf(stderr, "failed to create listening connection"); 3547 exit(EXIT_FAILURE); 3548 } 3549 } 3550 2939 3551 /* start up worker threads if MT mode */ 2940 3552 thread_init(settings.num_threads, main_base); … … 2958 3570 /* this is guaranteed to hit all threads because we round-robin */ 2959 3571 dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, 2960 UDP_READ_BUFFER_SIZE, 1);3572 UDP_READ_BUFFER_SIZE, ascii_udp_prot); 2961 3573 } 2962 3574 } branches/binary-1.3.0/memcached.h
r651 r658 38 38 #define IOV_LIST_HIGHWAT 600 39 39 #define MSG_LIST_HIGHWAT 100 40 41 /* Binary protocol stuff */ 42 #define MIN_BIN_PKT_LENGTH 12 43 /* flags:32, expiration:32 */ 44 #define BIN_SET_HDR_LEN 8 45 /* Same as set, but with another 64 bits for the CAS identifier */ 46 #define BIN_CAS_HDR_LEN (BIN_SET_HDR_LEN + 8) 47 /* incr:64, initial:64, expiration:32 */ 48 #define BIN_INCR_HDR_LEN 20 49 /* timeout:32 */ 50 #define BIN_DEL_HDR_LEN 4 51 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t)) 52 53 #define BIN_REQ_MAGIC 0x0f 54 #define BIN_RES_MAGIC 0xf0 55 56 #define CMD_GET 0 57 #define CMD_SET 1 58 #define CMD_ADD 2 59 #define CMD_REPLACE 3 60 #define CMD_DELETE 4 61 #define CMD_INCR 5 62 #define CMD_QUIT 6 63 #define CMD_FLUSH 7 64 #define CMD_GETQ 8 65 #define CMD_NOOP 9 66 #define CMD_VERSION 10 67 68 #define CMD_GETS 50 69 #define CMD_CAS 51 70 71 #define ERR_UNKNOWN_CMD 0x81 72 #define ERR_OUT_OF_MEMORY 0x82 73 74 #define ERR_NOT_FOUND 0x1 75 #define ERR_EXISTS 0x2 76 #define ERR_TOO_LARGE 0x3 77 #define ERR_INVALID_ARGUMENTS 0x4 78 #define ERR_NOT_STORED 0x5 40 79 41 80 /* Get a consistent bool type */ … … 84 123 int port; 85 124 int udpport; 125 int binport; /* Port for binary protocol. */ 86 126 struct in_addr interf; 87 127 int verbose; … … 141 181 conn_closing, /** closing this connection */ 142 182 conn_mwrite, /** writing out many items sequentially */ 183 conn_bin_init, /** Reinitializing a binary protocol connection */ 143 184 }; 185 186 enum bin_substates { 187 bin_no_state, 188 bin_reading_set_header, 189 bin_reading_cas_header, 190 bin_read_set_value, 191 bin_reading_get_key, 192 bin_reading_del_header, 193 bin_reading_incr_header, 194 }; <
