Changeset 722
- Timestamp:
- 02/25/08 07:55:50 (6 months ago)
- Files:
-
- branches/binary/api/perl/ChangeLog (modified) (1 diff)
- branches/binary/api/perl/lib/Cache/Memcached.pm (modified) (9 diffs)
- branches/binary/server/ChangeLog (modified) (2 diffs)
- branches/binary/server/devtools/bench_noreply.pl (added)
- branches/binary/server/doc/protocol.txt (modified) (8 diffs)
- branches/binary/server/memcached.c (modified) (15 diffs)
- branches/binary/server/memcached.h (modified) (1 diff)
- branches/binary/server/t/noreply.t (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/binary/api/perl/ChangeLog
r605 r722 1 * 'noreply' support from Tomash Brechko <tomash.brechko@gmail.com> 2 1 3 * various test updates from Ronald J Kimball <rkimball@pangeamedia.com> 2 4 branches/binary/api/perl/lib/Cache/Memcached.pm
r634 r722 336 336 sub _write_and_read { 337 337 my Cache::Memcached $self = shift; 338 my ($sock, $line, $check_complete ) = @_;338 my ($sock, $line, $check_complete, $noreply) = @_; 339 339 my $res; 340 340 my ($ret, $offset) = (undef, 0); … … 376 376 } 377 377 if ($res == length($line)) { # all sent 378 $state = 1;378 $state = $noreply ? 2 : 1; 379 379 } else { # we only succeeded in sending some of it 380 380 substr($line, 0, $res, ''); # delete the part we sent … … 411 411 return 0 unless $sock; 412 412 413 my @params; 414 my $noreply = not defined wantarray; 415 push @params, "noreply" if $noreply; 416 413 417 $self->{'stats'}->{"delete"}++; 414 418 $key = ref $key ? $key->[1] : $key; 415 419 $time = $time ? " $time" : ""; 416 my $cmd = "delete $self->{namespace}$key$time \r\n";417 my $res = _write_and_read($self, $sock, $cmd );420 my $cmd = "delete $self->{namespace}$key$time @params\r\n"; 421 my $res = _write_and_read($self, $sock, $cmd, undef, $noreply); 418 422 419 423 if ($self->{'stat_callback'}) { … … 447 451 return 0 unless $sock; 448 452 453 my @params; 454 my $noreply = not defined wantarray; 455 push @params, "noreply" if $noreply; 456 449 457 use bytes; # return bytes from length() 450 458 … … 478 486 479 487 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; 480 my $line = "$cmdname $self->{namespace}$key $flags $exptime $len \r\n$val\r\n";481 482 my $res = _write_and_read($self, $sock, $line );488 my $line = "$cmdname $self->{namespace}$key $flags $exptime $len @params\r\n$val\r\n"; 489 490 my $res = _write_and_read($self, $sock, $line, undef, $noreply); 483 491 484 492 if ($self->{'debug'} && $line) { … … 515 523 $value = 1 unless defined $value; 516 524 517 my $line = "$cmdname $self->{namespace}$key $value\r\n"; 518 my $res = _write_and_read($self, $sock, $line); 525 my @params; 526 my $noreply = not defined wantarray; 527 push @params, "noreply" if $noreply; 528 529 my $line = "$cmdname $self->{namespace}$key $value @params\r\n"; 530 my $res = _write_and_read($self, $sock, $line, undef, $noreply); 519 531 520 532 if ($self->{'stat_callback'}) { … … 777 789 sub flush_all { 778 790 my Cache::Memcached $self = shift; 791 my ($time) = @_; 792 793 $time = 0 unless defined $time; 779 794 780 795 my $success = 1; 781 796 797 my @params; 798 my $noreply = not defined wantarray; 799 push @params, "noreply" if $noreply; 800 782 801 my @hosts = @{$self->{'buckets'}}; 802 803 # Distribute the delay among the servers. For instance, if $time 804 # is 30 seconds, and we have 3 servers, they will get 30, 15, 0. 805 my $delay_step = 0; 806 if (@hosts > 1) { 807 $delay_step = $time / (@hosts - 1); 808 } 809 783 810 foreach my $host (@hosts) { 811 my $delay = int($time); 812 $time -= $delay_step; 813 my $line = "flush_all $delay @params\r\n"; 784 814 my $sock = $self->sock_to_host($host); 785 my @res = $self->run_command($sock, "flush_all\r\n");786 $success = 0 unless ( @res);815 my $res = _write_and_read($self, $sock, $line, undef, $noreply); 816 $success = 0 unless ($noreply or $res eq "OK\r\n"); 787 817 } 788 818 … … 791 821 792 822 # returns array of lines, or () on failure. 823 # FIXME: current implementation is broken. 793 824 sub run_command { 794 825 my Cache::Memcached $self = shift; … … 798 829 my $line = $cmd; 799 830 while (my $res = _write_and_read($self, $sock, $line)) { 831 # FIXME: _write_and_read() won't handle undefined $line. 800 832 undef $line; 801 833 $ret .= $res; 834 # FIXME: end condition is not correct. 802 835 last if $ret =~ /(?:OK|END|ERROR)\r\n$/; 803 836 } branches/binary/server/ChangeLog
r717 r722 1 2008-12-17 1 * 'noreply' support (Tomash Brechko) 2 2 3 3 * IPv6 support, and IPv6 multi-interface support (brian@tangent.org) … … 9 9 10 10 * Use gettimeofday(2) instead of time(2). 11 12 2008-02-1013 11 14 12 * Make -k option work (Tomash Brechko) branches/binary/server/doc/protocol.txt
r717 r722 127 127 First, the client sends a command line which looks like this: 128 128 129 <command name> <key> <flags> <exptime> <bytes> [<cas unqiue>]\r\n 130 131 - <command name> is "set", "add", "replace", "append", "prepend", or "cas" 129 <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n 130 cas <key> <flags> <exptime> <bytes> <cas unqiue> [noreply]\r\n 131 132 - <command name> is "set", "add", "replace", "append" or "prepend" 132 133 133 134 "set" means "store this data". … … 175 176 when issuing "cas" updates. 176 177 178 - "noreply" optional parameter instructs the server to not send the 179 reply. NOTE: if the request line is malformed, the server can't 180 parse "noreply" option reliably. In this case it may send the error 181 to the client, and not reading it on the client side will break 182 things. Client should construct only valid requests. 183 177 184 After this line, the client sends the data block: 178 185 … … 246 253 The command "delete" allows for explicit deletion of items: 247 254 248 delete <key> <time>\r\n255 delete <key> [<time>] [noreply]\r\n 249 256 250 257 - <key> is the key of the item the client wishes the server to delete … … 262 269 storage commands with this key will succeed). 263 270 271 - "noreply" optional parameter instructs the server to not send the 272 reply. See the note in Storage commands regarding malformed 273 requests. 274 264 275 The response line to this command can be one of: 265 276 … … 286 297 The client sends the command line: 287 298 288 incr <key> <value> \r\n299 incr <key> <value> [noreply]\r\n 289 300 290 301 or 291 302 292 decr <key> <value> \r\n303 decr <key> <value> [noreply]\r\n 293 304 294 305 - <key> is the key of the item the client wishes to change … … 296 307 - <value> is the amount by which the client wants to increase/decrease 297 308 the item. It is a decimal representation of a 64-bit unsigned integer. 309 310 - "noreply" optional parameter instructs the server to not send the 311 reply. See the note in Storage commands regarding malformed 312 requests. 298 313 299 314 The response will be one of: … … 400 415 401 416 "flush_all" is a command with an optional numeric argument. It always 402 succeeds, and the server sends "OK\r\n" in response. Its effect is to 403 invalidate all existing items immediately (by default) or after the 404 expiration specified. After invalidation none of the items will be returned 405 in response to a retrieval command (unless it's stored again under the 406 same key *after* flush_all has invalidated the items). flush_all doesn't 407 actually free all the memory taken up by existing items; that will 408 happen gradually as new items are stored. The most precise definition 409 of what flush_all does is the following: it causes all items whose 410 update time is earlier than the time at which flush_all was set to be 411 executed to be ignored for retrieval purposes. 417 succeeds, and the server sends "OK\r\n" in response (unless "noreply" 418 is given as the last parameter). Its effect is to invalidate all 419 existing items immediately (by default) or after the expiration 420 specified. After invalidation none of the items will be returned in 421 response to a retrieval command (unless it's stored again under the 422 same key *after* flush_all has invalidated the items). flush_all 423 doesn't actually free all the memory taken up by existing items; that 424 will happen gradually as new items are stored. The most precise 425 definition of what flush_all does is the following: it causes all 426 items whose update time is earlier than the time at which flush_all 427 was set to be executed to be ignored for retrieval purposes. 412 428 413 429 The intent of flush_all with a delay, was that in a setting where you … … 432 448 server. 433 449 434 "verbosity" is a command with a numeric argument. It always 435 succeeds, and the server sends "OK\r\n" in response. Its effect is to 436 set the verbosity level of the logging output. 450 "verbosity" is a command with a numeric argument. It always succeeds, 451 and the server sends "OK\r\n" in response (unless "noreply" is given 452 as the last parameter). Its effect is to set the verbosity level of 453 the logging output. 437 454 438 455 "quit" is a command with no arguments: branches/binary/server/memcached.c
r721 r722 410 410 c->gen = 0; 411 411 412 c->noreply = false; 413 412 414 event_set(&c->event, sfd, event_flags, event_handler, (void *)c); 413 415 event_base_set(base, &c->event); … … 791 793 792 794 assert(c != NULL); 795 796 if (c->noreply) { 797 if (settings.verbose > 1) 798 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); 799 c->noreply = false; 800 conn_set_state(c, conn_read); 801 return; 802 } 793 803 794 804 if (settings.verbose > 1) … … 1481 1491 #define KEY_MAX_LENGTH 250 1482 1492 1483 #define MAX_TOKENS 71493 #define MAX_TOKENS 8 1484 1494 1485 1495 /* … … 1548 1558 } else { 1549 1559 out_string(c, "SERVER_ERROR out of memory"); 1560 } 1561 } 1562 1563 static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens) 1564 { 1565 int noreply_index = ntokens - 2; 1566 1567 /* 1568 NOTE: this function is not the first place where we are going to 1569 send the reply. We could send it instead from process_command() 1570 if the request line has wrong number of tokens. However parsing 1571 malformed line for "noreply" option is not reliable anyway, so 1572 it can't be helped. 1573 */ 1574 if (tokens[noreply_index].value 1575 && strcmp(tokens[noreply_index].value, "noreply") == 0) { 1576 c->noreply = true; 1550 1577 } 1551 1578 } … … 1930 1957 assert(c != NULL); 1931 1958 1959 set_noreply_maybe(c, tokens, ntokens); 1960 1932 1961 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { 1933 1962 out_string(c, "CLIENT_ERROR bad command line format"); … … 2000 2029 2001 2030 assert(c != NULL); 2031 2032 set_noreply_maybe(c, tokens, ntokens); 2002 2033 2003 2034 if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { … … 2099 2130 assert(c != NULL); 2100 2131 2132 set_noreply_maybe(c, tokens, ntokens); 2133 2101 2134 if (settings.managed) { 2102 2135 int bucket = c->bucket; … … 2120 2153 } 2121 2154 2122 if(ntokens == 4) {2155 if(ntokens == (c->noreply ? 5 : 4)) { 2123 2156 exptime = strtol(tokens[2].value, NULL, 10); 2124 2157 … … 2183 2216 assert(c != NULL); 2184 2217 2218 set_noreply_maybe(c, tokens, ntokens); 2219 2185 2220 level = strtoul(tokens[1].value, NULL, 10); 2186 2221 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level; … … 2220 2255 process_get_command(c, tokens, ntokens, false); 2221 2256 2222 } else if ( ntokens == 6&&2257 } else if ((ntokens == 6 || ntokens == 7) && 2223 2258 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || 2224 2259 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || … … 2229 2264 process_update_command(c, tokens, ntokens, comm, false); 2230 2265 2231 } else if ( ntokens == 7&& (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {2266 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) { 2232 2267 2233 2268 process_update_command(c, tokens, ntokens, comm, true); 2234 2269 2235 } else if ( ntokens == 4&& (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {2270 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { 2236 2271 2237 2272 process_arithmetic_command(c, tokens, ntokens, 1); … … 2241 2276 process_get_command(c, tokens, ntokens, true); 2242 2277 2243 } else if ( ntokens == 4&& (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {2278 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) { 2244 2279 2245 2280 process_arithmetic_command(c, tokens, ntokens, 0); 2246 2281 2247 } else if (ntokens >= 3 && ntokens <= 4&& (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {2282 } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) { 2248 2283 2249 2284 process_delete_command(c, tokens, ntokens); … … 2314 2349 process_stat(c, tokens, ntokens); 2315 2350 2316 } else if (ntokens >= 2 && ntokens <= 3&& (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {2351 } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) { 2317 2352 time_t exptime = 0; 2318 2353 set_current_time(); 2319 2354 2320 if(ntokens == 2) { 2355 set_noreply_maybe(c, tokens, ntokens); 2356 2357 if(ntokens == (c->noreply ? 3 : 2)) { 2321 2358 settings.oldest_live = current_time - 1; 2322 2359 item_flush_expired(); … … 2383 2420 out_string(c, "CLIENT_ERROR Slab reassignment not supported"); 2384 2421 #endif 2385 } else if ( ntokens == 3&& (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {2422 } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) { 2386 2423 process_verbosity_command(c, tokens, ntokens); 2387 2424 } else { … … 2516 2553 if (res == -1) { 2517 2554 if (errno == EAGAIN || errno == EWOULDBLOCK) break; 2518 else return 0; 2555 /* Should close on unhandled errors. */ 2556 conn_set_state(c, conn_closing); 2557 return 1; 2519 2558 } 2520 2559 } branches/binary/server/memcached.h
r717 r722 285 285 a managed instance. -1 (_not_ 0) means invalid. */ 286 286 int gen; /* generation requested for the bucket */ 287 287 bool noreply; /* True if the reply should not be sent. */ 288 288 /* Binary protocol stuff */ 289 289 /* This is where the binary header goes */
