Changeset 709

Show
Ignore:
Timestamp:
02/22/08 04:57:47 (9 months ago)
Author:
dormando
Message:

Cache::Memcached: use 'noreply' option in void context. (Tomash Brechko <tomash.brechko@gmail.com>)

For add(), set(), replace(), delete(), incr(), decr(), flush_all()
methods set 'noreply' option if the method was called in a void
context.

There's no 'verbosity' Perl binding.

Note that run_command() generic interface is broken. You can't use
it with noreply (and actually with many blocking commands either).

This patch also adds <time> optional parameter handling to
flush_all(). The delay is evenly distributed among all servers.

Add benchmark script for noreply parameter.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/api/perl/lib/Cache/Memcached.pm

    r634 r709  
    336336sub _write_and_read { 
    337337    my Cache::Memcached $self = shift; 
    338     my ($sock, $line, $check_complete) = @_; 
     338    my ($sock, $line, $check_complete, $noreply) = @_; 
    339339    my $res; 
    340340    my ($ret, $offset) = (undef, 0); 
     
    376376            } 
    377377            if ($res == length($line)) { # all sent 
    378                 $state = 1; 
     378                $state = $noreply ? 2 : 1; 
    379379            } else { # we only succeeded in sending some of it 
    380380                substr($line, 0, $res, ''); # delete the part we sent 
     
    411411    return 0 unless $sock; 
    412412 
     413    my @params; 
     414    my $noreply = not defined wantarray; 
     415    push @params, "noreply" if $noreply; 
     416 
    413417    $self->{'stats'}->{"delete"}++; 
    414418    $key = ref $key ? $key->[1] : $key; 
    415419    $time = $time ? " $time" : ""; 
    416     my $cmd = "delete $self->{namespace}$key$time\r\n"; 
    417     my $res = _write_and_read($self, $sock, $cmd); 
     420    my $cmd = "delete $self->{namespace}$key$time @params\r\n"; 
     421    my $res = _write_and_read($self, $sock, $cmd, undef, $noreply); 
    418422 
    419423    if ($self->{'stat_callback'}) { 
     
    447451    return 0 unless $sock; 
    448452 
     453    my @params; 
     454    my $noreply = not defined wantarray; 
     455    push @params, "noreply" if $noreply; 
     456 
    449457    use bytes; # return bytes from length() 
    450458 
     
    478486 
    479487    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL; 
    480     my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n"; 
    481  
    482     my $res = _write_and_read($self, $sock, $line); 
     488    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len @params\r\n$val\r\n"; 
     489 
     490    my $res = _write_and_read($self, $sock, $line, undef, $noreply); 
    483491 
    484492    if ($self->{'debug'} && $line) { 
     
    515523    $value = 1 unless defined $value; 
    516524 
    517     my $line = "$cmdname $self->{namespace}$key $value\r\n"; 
    518     my $res = _write_and_read($self, $sock, $line); 
     525    my @params; 
     526    my $noreply = not defined wantarray; 
     527    push @params, "noreply" if $noreply; 
     528 
     529    my $line = "$cmdname $self->{namespace}$key $value @params\r\n"; 
     530    my $res = _write_and_read($self, $sock, $line, undef, $noreply); 
    519531 
    520532    if ($self->{'stat_callback'}) { 
     
    777789sub flush_all { 
    778790    my Cache::Memcached $self = shift; 
     791    my ($time) = @_; 
     792 
     793    $time = 0 unless defined $time; 
    779794 
    780795    my $success = 1; 
    781796 
     797    my @params; 
     798    my $noreply = not defined wantarray; 
     799    push @params, "noreply" if $noreply; 
     800 
    782801    my @hosts = @{$self->{'buckets'}}; 
     802 
     803    # Distribute the delay among the servers.  For instance, if $time 
     804    # is 30 seconds, and we have 3 servers, they will get 30, 15, 0. 
     805    my $delay_step = 0; 
     806    if (@hosts > 1) { 
     807        $delay_step = $time / (@hosts - 1); 
     808    } 
     809 
    783810    foreach my $host (@hosts) { 
     811        my $delay = int($time); 
     812        $time -= $delay_step; 
     813        my $line = "flush_all $delay @params\r\n"; 
    784814        my $sock = $self->sock_to_host($host); 
    785         my @res = $self->run_command($sock, "flush_all\r\n"); 
    786         $success = 0 unless (@res); 
     815        my $res = _write_and_read($self, $sock, $line, undef, $noreply); 
     816        $success = 0 unless ($noreply or $res eq "OK\r\n"); 
    787817    } 
    788818 
     
    791821 
    792822# returns array of lines, or () on failure. 
     823# FIXME: current implementation is broken. 
    793824sub run_command { 
    794825    my Cache::Memcached $self = shift; 
     
    798829    my $line = $cmd; 
    799830    while (my $res = _write_and_read($self, $sock, $line)) { 
     831        # FIXME: _write_and_read() won't handle undefined $line. 
    800832        undef $line; 
    801833        $ret .= $res; 
     834        # FIXME: end condition is not correct. 
    802835        last if $ret =~ /(?:OK|END|ERROR)\r\n$/; 
    803836    }