root/trunk/api/perl/lib/Cache/Memcached.pm @ 790

Revision 790, 34.1 kB (checked in by ykerherve, 16 months ago)

Fixed flush_all to only return success when there is a proper OK
returned by all the servers.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# $Id$
2#
3# Copyright (c) 2003, 2004  Brad Fitzpatrick <brad@danga.com>
4#
5# See COPYRIGHT section in pod text below for usage and distribution rights.
6#
7
8package Cache::Memcached;
9
10use strict;
11use warnings;
12
13no strict 'refs';
14use Storable ();
15use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16use IO::Handle ();
17use Time::HiRes ();
18use String::CRC32;
19use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20use Cache::Memcached::GetParser;
21use fields qw{
22    debug no_rehash stats compress_threshold compress_enable stat_callback
23    readonly select_timeout namespace namespace_len servers active buckets
24    pref_ip
25    bucketcount _single_sock _stime
26    connect_timeout cb_connect_fail
27    parser_class
28};
29
30# flag definitions
31use constant F_STORABLE => 1;
32use constant F_COMPRESS => 2;
33
34# size savings required before saving compressed value
35use constant COMPRESS_SAVINGS => 0.20; # percent
36
37use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
38$VERSION = "1.24";
39
40BEGIN {
41    $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
42}
43
44my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
45$HAVE_XS = 0 if $ENV{NO_XS};
46
47my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
48if ($ENV{XS_DEBUG}) {
49    print "using parser: $parser_class\n";
50}
51
52$FLAG_NOSIGNAL = 0;
53eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
54
55my %host_dead;   # host -> unixtime marked dead until
56my %cache_sock;  # host -> socket
57my @buck2sock;   # bucket number -> $sock
58
59my $PROTO_TCP;
60
61our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
62
63sub new {
64    my Cache::Memcached $self = shift;
65    $self = fields::new( $self ) unless ref $self;
66
67    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args
68
69    $self->set_servers($args->{'servers'});
70    $self->{'debug'} = $args->{'debug'} || 0;
71    $self->{'no_rehash'} = $args->{'no_rehash'};
72    $self->{'stats'} = {};
73    $self->{'pref_ip'} = $args->{'pref_ip'} || {};
74    $self->{'compress_threshold'} = $args->{'compress_threshold'};
75    $self->{'compress_enable'}    = 1;
76    $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
77    $self->{'readonly'} = $args->{'readonly'};
78    $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
79
80    # TODO: undocumented
81    $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
82    $self->{'select_timeout'}  = $args->{'select_timeout'}  || 1.0;
83    $self->{namespace} = $args->{namespace} || '';
84    $self->{namespace_len} = length $self->{namespace};
85
86    return $self;
87}
88
89sub set_pref_ip {
90    my Cache::Memcached $self = shift;
91    $self->{'pref_ip'} = shift;
92}
93
94sub set_servers {
95    my Cache::Memcached $self = shift;
96    my ($list) = @_;
97    $self->{'servers'} = $list || [];
98    $self->{'active'} = scalar @{$self->{'servers'}};
99    $self->{'buckets'} = undef;
100    $self->{'bucketcount'} = 0;
101    $self->init_buckets;
102    @buck2sock = ();
103
104    $self->{'_single_sock'} = undef;
105    if (@{$self->{'servers'}} == 1) {
106        $self->{'_single_sock'} = $self->{'servers'}[0];
107    }
108
109    return $self;
110}
111
112sub set_cb_connect_fail {
113    my Cache::Memcached $self = shift;
114    $self->{'cb_connect_fail'} = shift;
115}
116
117sub set_connect_timeout {
118    my Cache::Memcached $self = shift;
119    $self->{'connect_timeout'} = shift;
120}
121
122sub set_debug {
123    my Cache::Memcached $self = shift;
124    my ($dbg) = @_;
125    $self->{'debug'} = $dbg || 0;
126}
127
128sub set_readonly {
129    my Cache::Memcached $self = shift;
130    my ($ro) = @_;
131    $self->{'readonly'} = $ro;
132}
133
134sub set_norehash {
135    my Cache::Memcached $self = shift;
136    my ($val) = @_;
137    $self->{'no_rehash'} = $val;
138}
139
140sub set_compress_threshold {
141    my Cache::Memcached $self = shift;
142    my ($thresh) = @_;
143    $self->{'compress_threshold'} = $thresh;
144}
145
146sub enable_compress {
147    my Cache::Memcached $self = shift;
148    my ($enable) = @_;
149    $self->{'compress_enable'} = $enable;
150}
151
152sub forget_dead_hosts {
153    %host_dead = ();
154    @buck2sock = ();
155}
156
157sub set_stat_callback {
158    my Cache::Memcached $self = shift;
159    my ($stat_callback) = @_;
160    $self->{'stat_callback'} = $stat_callback;
161}
162
163my %sock_map;  # stringified-$sock -> "$ip:$port"
164
165sub _dead_sock {
166    my ($sock, $ret, $dead_for) = @_;
167    if (my $ipport = $sock_map{$sock}) {
168        my $now = time();
169        $host_dead{$ipport} = $now + $dead_for
170            if $dead_for;
171        delete $cache_sock{$ipport};
172        delete $sock_map{$sock};
173    }
174    @buck2sock = ();
175    return $ret;  # 0 or undef, probably, depending on what caller wants
176}
177
178sub _close_sock {
179    my ($sock) = @_;
180    if (my $ipport = $sock_map{$sock}) {
181        close $sock;
182        delete $cache_sock{$ipport};
183        delete $sock_map{$sock};
184    }
185    @buck2sock = ();
186}
187
188sub _connect_sock { # sock, sin, timeout
189    my ($sock, $sin, $timeout) = @_;
190    $timeout = 0.25 if not defined $timeout;
191
192    # make the socket non-blocking from now on,
193    # except if someone wants 0 timeout, meaning
194    # a blocking connect, but even then turn it
195    # non-blocking at the end of this function
196
197    if ($timeout) {
198        IO::Handle::blocking($sock, 0);
199    } else {
200        IO::Handle::blocking($sock, 1);
201    }
202
203    my $ret = connect($sock, $sin);
204
205    if (!$ret && $timeout && $!==EINPROGRESS) {
206
207        my $win='';
208        vec($win, fileno($sock), 1) = 1;
209
210        if (select(undef, $win, undef, $timeout) > 0) {
211            $ret = connect($sock, $sin);
212            # EISCONN means connected & won't re-connect, so success
213            $ret = 1 if !$ret && $!==EISCONN;
214        }
215    }
216
217    unless ($timeout) { # socket was temporarily blocking, now revert
218        IO::Handle::blocking($sock, 0);
219    }
220
221    # from here on, we use non-blocking (async) IO for the duration
222    # of the socket's life
223
224    return $ret;
225}
226
227sub sock_to_host { # (host)
228    my Cache::Memcached $self = ref $_[0] ? shift : undef;
229    my $host = $_[0];
230    return $cache_sock{$host} if $cache_sock{$host};
231
232    my $now = time();
233    my ($ip, $port) = $host =~ /(.*):(\d+)/;
234    return undef if
235        $host_dead{$host} && $host_dead{$host} > $now;
236    my $sock;
237
238    my $connected = 0;
239    my $sin;
240    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
241
242    if ( index($host, '/') != 0 )
243    {
244        # if a preferred IP is known, try that first.
245        if ($self && $self->{pref_ip}{$ip}) {
246            socket($sock, PF_INET, SOCK_STREAM, $proto);
247            my $prefip = $self->{pref_ip}{$ip};
248            $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
249            if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
250                $connected = 1;
251            } else {
252                if (my $cb = $self->{cb_connect_fail}) {
253                    $cb->($prefip);
254                }
255                close $sock;
256            }
257        }
258
259        # normal path, or fallback path if preferred IP failed
260        unless ($connected) {
261            socket($sock, PF_INET, SOCK_STREAM, $proto);
262            $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
263            my $timeout = $self ? $self->{connect_timeout} : 0.25;
264            unless (_connect_sock($sock,$sin,$timeout)) {
265                my $cb = $self ? $self->{cb_connect_fail} : undef;
266                $cb->($ip) if $cb;
267                return _dead_sock($sock, undef, 20 + int(rand(10)));
268            }
269        }
270    } else { # it's a unix domain/local socket
271        socket($sock, PF_UNIX, SOCK_STREAM, 0);
272        $sin = Socket::sockaddr_un($host);
273        my $timeout = $self ? $self->{connect_timeout} : 0.25;
274        unless (_connect_sock($sock,$sin,$timeout)) {
275            my $cb = $self ? $self->{cb_connect_fail} : undef;
276            $cb->($host) if $cb;
277            return _dead_sock($sock, undef, 20 + int(rand(10)));
278        }
279    }
280
281    # make the new socket not buffer writes.
282    my $old = select($sock);
283    $| = 1;
284    select($old);
285
286    $sock_map{$sock} = $host;
287    $cache_sock{$host} = $sock;
288
289    return $sock;
290}
291
292sub get_sock { # (key)
293    my Cache::Memcached $self = $_[0];
294    my $key = $_[1];
295    return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
296    return undef unless $self->{'active'};
297    my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
298
299    my $real_key = ref $key ? $key->[1] : $key;
300    my $tries = 0;
301    while ($tries++ < 20) {
302        my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
303        my $sock = $self->sock_to_host($host);
304        return $sock if $sock;
305        return undef if $self->{'no_rehash'};
306        $hv += _hashfunc($tries . $real_key);  # stupid, but works
307    }
308    return undef;
309}
310
311sub init_buckets {
312    my Cache::Memcached $self = shift;
313    return if $self->{'buckets'};
314    my $bu = $self->{'buckets'} = [];
315    foreach my $v (@{$self->{'servers'}}) {
316        if (ref $v eq "ARRAY") {
317            for (1..$v->[1]) { push @$bu, $v->[0]; }
318        } else {
319            push @$bu, $v;
320        }
321    }
322    $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
323}
324
325sub disconnect_all {
326    my $sock;
327    foreach $sock (values %cache_sock) {
328        close $sock;
329    }
330    %cache_sock = ();
331}
332
333# writes a line, then reads result.  by default stops reading after a
334# single line, but caller can override the $check_complete subref,
335# which gets passed a scalarref of buffer read thus far.
336sub _write_and_read {
337    my Cache::Memcached $self = shift;
338    my ($sock, $line, $check_complete) = @_;
339    my $res;
340    my ($ret, $offset) = (undef, 0);
341
342    $check_complete ||= sub {
343        return (rindex($ret, "\r\n") + 2 == length($ret));
344    };
345
346    # state: 0 - writing, 1 - reading, 2 - done
347    my $state = 0;
348
349    # the bitsets for select
350    my ($rin, $rout, $win, $wout);
351    my $nfound;
352
353    my $copy_state = -1;
354    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
355
356    # the select loop
357    while(1) {
358        if ($copy_state!=$state) {
359            last if $state==2;
360            ($rin, $win) = ('', '');
361            vec($rin, fileno($sock), 1) = 1 if $state==1;
362            vec($win, fileno($sock), 1) = 1 if $state==0;
363            $copy_state = $state;
364        }
365        $nfound = select($rout=$rin, $wout=$win, undef,
366                         $self->{'select_timeout'});
367        last unless $nfound;
368
369        if (vec($wout, fileno($sock), 1)) {
370            $res = send($sock, $line, $FLAG_NOSIGNAL);
371            next
372                if not defined $res and $!==EWOULDBLOCK;
373            unless ($res > 0) {
374                _close_sock($sock);
375                return undef;
376            }
377            if ($res == length($line)) { # all sent
378                $state = 1;
379            } else { # we only succeeded in sending some of it
380                substr($line, 0, $res, ''); # delete the part we sent
381            }
382        }
383
384        if (vec($rout, fileno($sock), 1)) {
385            $res = sysread($sock, $ret, 255, $offset);
386            next
387                if !defined($res) and $!==EWOULDBLOCK;
388            if ($res == 0) { # catches 0=conn closed or undef=error
389                _close_sock($sock);
390                return undef;
391            }
392            $offset += $res;
393            $state = 2 if $check_complete->(\$ret);
394        }
395    }
396
397    unless ($state == 2) {
398        _dead_sock($sock); # improperly finished
399        return undef;
400    }
401
402    return $ret;
403}
404
405sub delete {
406    my Cache::Memcached $self = shift;
407    my ($key, $time) = @_;
408    return 0 if ! $self->{'active'} || $self->{'readonly'};
409    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
410    my $sock = $self->get_sock($key);
411    return 0 unless $sock;
412
413    $self->{'stats'}->{"delete"}++;
414    $key = ref $key ? $key->[1] : $key;
415    $time = $time ? " $time" : "";
416    my $cmd = "delete $self->{namespace}$key$time\r\n";
417    my $res = _write_and_read($self, $sock, $cmd);
418
419    if ($self->{'stat_callback'}) {
420        my $etime = Time::HiRes::time();
421        $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
422    }
423
424    return defined $res && $res eq "DELETED\r\n";
425}
426*remove = \&delete;
427
428sub add {
429    _set("add", @_);
430}
431
432sub replace {
433    _set("replace", @_);
434}
435
436sub set {
437    _set("set", @_);
438}
439
440sub _set {
441    my $cmdname = shift;
442    my Cache::Memcached $self = shift;
443    my ($key, $val, $exptime) = @_;
444    return 0 if ! $self->{'active'} || $self->{'readonly'};
445    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
446    my $sock = $self->get_sock($key);
447    return 0 unless $sock;
448
449    use bytes; # return bytes from length()
450
451    $self->{'stats'}->{$cmdname}++;
452    my $flags = 0;
453    $key = ref $key ? $key->[1] : $key;
454
455    if (ref $val) {
456        local $Carp::CarpLevel = 2;
457        $val = Storable::nfreeze($val);
458        $flags |= F_STORABLE;
459    }
460
461    my $len = length($val);
462
463    if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
464        $len >= $self->{'compress_threshold'}) {
465
466        my $c_val = Compress::Zlib::memGzip($val);
467        my $c_len = length($c_val);
468
469        # do we want to keep it?
470        if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
471            $val = $c_val;
472            $len = $c_len;
473            $flags |= F_COMPRESS;
474        }
475    }
476
477    $exptime = int($exptime || 0);
478
479    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
480    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
481
482    my $res = _write_and_read($self, $sock, $line);
483
484    if ($self->{'debug'} && $line) {
485        chop $line; chop $line;
486        print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
487    }
488
489    if ($self->{'stat_callback'}) {
490        my $etime = Time::HiRes::time();
491        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
492    }
493
494    return defined $res && $res eq "STORED\r\n";
495}
496
497sub incr {
498    _incrdecr("incr", @_);
499}
500
501sub decr {
502    _incrdecr("decr", @_);
503}
504
505sub _incrdecr {
506    my $cmdname = shift;
507    my Cache::Memcached $self = shift;
508    my ($key, $value) = @_;
509    return undef if ! $self->{'active'} || $self->{'readonly'};
510    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
511    my $sock = $self->get_sock($key);
512    return undef unless $sock;
513    $key = $key->[1] if ref $key;
514    $self->{'stats'}->{$cmdname}++;
515    $value = 1 unless defined $value;
516
517    my $line = "$cmdname $self->{namespace}$key $value\r\n";
518    my $res = _write_and_read($self, $sock, $line);
519
520    if ($self->{'stat_callback'}) {
521        my $etime = Time::HiRes::time();
522        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
523    }
524
525    return undef unless defined $res && $res =~ /^(\d+)/;
526    return $1;
527}
528
529sub get {
530    my Cache::Memcached $self = $_[0];
531    my $key = $_[1];
532
533    # TODO: make a fast path for this?  or just keep using get_multi?
534    my $r = $self->get_multi($key);
535    my $kval = ref $key ? $key->[1] : $key;
536    return $r->{$kval};
537}
538
539sub get_multi {
540    my Cache::Memcached $self = shift;
541    return {} unless $self->{'active'};
542    $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
543    $self->{'stats'}->{"get_multi"}++;
544
545    my %val;        # what we'll be returning a reference to (realkey -> value)
546    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
547    my $sock;
548
549    if ($self->{'_single_sock'}) {
550        $sock = $self->sock_to_host($self->{'_single_sock'});
551        unless ($sock) {
552            return {};
553        }
554        foreach my $key (@_) {
555            my $kval = ref $key ? $key->[1] : $key;
556            push @{$sock_keys{$sock}}, $kval;
557        }
558    } else {
559        my $bcount = $self->{'bucketcount'};
560        my $sock;
561      KEY:
562        foreach my $key (@_) {
563            my ($hv, $real_key) = ref $key ?
564                (int($key->[0]),               $key->[1]) :
565                ((crc32($key) >> 16) & 0x7fff, $key);
566
567            my $tries;
568            while (1) {
569                my $bucket = $hv % $bcount;
570
571                # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
572                #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
573                #    and last;
574
575                # but this variant doesn't crash:
576                $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
577                if ($sock) {
578                    $buck2sock[$bucket] = $sock;
579                    last;
580                }
581
582                next KEY if $tries++ >= 20;
583                $hv += _hashfunc($tries . $real_key);
584            }
585
586            push @{$sock_keys{$sock}}, $real_key;
587        }
588    }
589
590    $self->{'stats'}->{"get_keys"} += @_;
591    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
592
593    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
594
595    _load_multi($self, \%sock_keys, \%val);
596
597    if ($self->{'debug'}) {
598        while (my ($k, $v) = each %val) {
599            print STDERR "MemCache: got $k = $v\n";
600        }
601    }
602    return \%val;
603}
604
605sub _load_multi {
606    use bytes; # return bytes from length()
607    my Cache::Memcached $self;
608    my ($sock_keys, $ret);
609
610    ($self, $sock_keys, $ret) = @_;
611
612    # all keyed by $sockstr:
613    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
614    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
615    my %buf;     # buffers, for writing
616
617    my %parser;  # $sockstr -> Cache::Memcached::GetParser
618
619    my $active_changed = 1; # force rebuilding of select sets
620
621    my $dead = sub {
622        my $sock = shift;
623        print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
624        delete $reading{$sock};
625        delete $writing{$sock};
626
627        if (my $p = $parser{$sock}) {
628            my $key = $p->current_key;
629            delete $ret->{$key} if $key;
630        }
631
632        if ($self->{'stat_callback'}) {
633            my $etime = Time::HiRes::time();
634            $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
635        }
636
637        close $sock;
638        _dead_sock($sock);
639    };
640
641    # $finalize->($key, $flags)
642    # $finalize->({ $key => $flags, $key => $flags });
643    my $finalize = sub {
644        my $map = $_[0];
645        $map = {@_} unless ref $map;
646
647        while (my ($k, $flags) = each %$map) {
648
649            # remove trailing \r\n
650            chop $ret->{$k}; chop $ret->{$k};
651
652            $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
653                if $HAVE_ZLIB && $flags & F_COMPRESS;
654            if ($flags & F_STORABLE) {
655                # wrapped in eval in case a perl 5.6 Storable tries to
656                # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
657                # and dies if the version number changes at all.  in 5.8
658                # they made it only die if it unencounters a new feature)
659                eval {
660                    $ret->{$k} = Storable::thaw($ret->{$k});
661                };
662                # so if there was a problem, just treat it as a cache miss.
663                if ($@) {
664                    delete $ret->{$k};
665                }
666            }
667        }
668    };
669
670    foreach (keys %$sock_keys) {
671        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
672        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
673        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
674        $writing{$_} = $sock;
675        if ($self->{namespace}) {
676            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
677        } else {
678            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
679        }
680
681        $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
682    }
683
684    my $read = sub {
685        my $sockstr = "$_[0]";  # $sock is $_[0];
686        my $p = $parser{$sockstr} or die;
687        my $rv = $p->parse_from_sock($_[0]);
688        if ($rv > 0) {
689            # okay, finished with this socket
690            delete $reading{$sockstr};
691        } elsif ($rv < 0) {
692            $dead->($_[0]);
693        }
694        return $rv;
695    };
696
697    # returns 1 when it's done, for success or error.  0 if still working.
698    my $write = sub {
699        my ($sock, $sockstr) = ($_[0], "$_[0]");
700        my $res;
701
702        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
703
704        return 0
705            if not defined $res and $!==EWOULDBLOCK;
706        unless ($res > 0) {
707            $dead->($sock);
708            return 1;
709        }
710        if ($res == length($buf{$sockstr})) { # all sent
711            $buf{$sockstr} = "";
712
713            # switch the socket from writing to reading
714            delete $writing{$sockstr};
715            $reading{$sockstr} = $sock;
716            return 1;
717        } else { # we only succeeded in sending some of it
718            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
719        }
720        return 0;
721    };
722
723    # the bitsets for select
724    my ($rin, $rout, $win, $wout);
725    my $nfound;
726
727    # the big select loop
728    while(1) {
729        if ($active_changed) {
730            last unless %reading or %writing; # no sockets left?
731            ($rin, $win) = ('', '');
732            foreach (values %reading) {
733                vec($rin, fileno($_), 1) = 1;
734            }
735            foreach (values %writing) {
736                vec($win, fileno($_), 1) = 1;
737            }
738            $active_changed = 0;
739        }
740        # TODO: more intelligent cumulative timeout?
741        # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
742        $nfound = select($rout=$rin, $wout=$win, undef,
743                         $self->{'select_timeout'});
744        last unless $nfound;
745
746        # TODO: possible robustness improvement: we could select
747        # writing sockets for reading also, and raise hell if they're
748        # ready (input unread from last time, etc.)
749        # maybe do that on the first loop only?
750        foreach (values %writing) {
751            if (vec($wout, fileno($_), 1)) {
752                $active_changed = 1 if $write->($_);
753            }
754        }
755        foreach (values %reading) {
756            if (vec($rout, fileno($_), 1)) {
757                $active_changed = 1 if $read->($_);
758            }
759        }
760    }
761
762    # if there're active sockets left, they need to die
763    foreach (values %writing) {
764        $dead->($_);
765    }
766    foreach (values %reading) {
767        $dead->($_);
768    }
769
770    return;
771}
772
773sub _hashfunc {
774    return (crc32($_[0]) >> 16) & 0x7fff;
775}
776
777sub flush_all {
778    my Cache::Memcached $self = shift;
779
780    my $success = 1;
781
782    my @hosts = @{$self->{'buckets'}};
783    foreach my $host (@hosts) {
784        my $sock = $self->sock_to_host($host);
785        my @res = $self->run_command($sock, "flush_all\r\n");
786        $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
787    }
788
789    return $success;
790}
791
792# returns array of lines, or () on failure.
793sub run_command {
794    my Cache::Memcached $self = shift;
795    my ($sock, $cmd) = @_;
796    return () unless $sock;
797    my $ret;
798    my $line = $cmd;
799    while (my $res = _write_and_read($self, $sock, $line)) {
800        undef $line;
801        $ret .= $res;
802        last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
803    }
804    chop $ret; chop $ret;
805    return map { "$_\r\n" } split(/\r\n/, $ret);
806}
807
808sub stats {
809    my Cache::Memcached $self = shift;
810    my ($types) = @_;
811    return 0 unless $self->{'active'};
812    return 0 unless !ref($types) || ref($types) eq 'ARRAY';
813    if (!ref($types)) {
814        if (!$types) {
815            # I don't much care what the default is, it should just
816            # be something reasonable.  Obviously "reset" should not
817            # be on the list :) but other types that might go in here
818            # include maps, cachedump, slabs, or items.
819            $types = [ qw( misc malloc sizes self ) ];
820        } else {
821            $types = [ $types ];
822        }
823    }
824
825    my $stats_hr = { };
826
827    # The "self" stat type is special, it only applies to this very
828    # object.
829    if (grep /^self$/, @$types) {
830        $stats_hr->{'self'} = \%{ $self->{'stats'} };
831    }
832
833    my %misc_keys = map { $_ => 1 }
834      qw/ bytes bytes_read bytes_written
835          cmd_get cmd_set connection_structures curr_items
836          get_hits get_misses
837          total_connections total_items
838        /;
839
840    # Now handle the other types, passing each type to each host server.
841    my @hosts = @{$self->{'buckets'}};
842  HOST: foreach my $host (@hosts) {
843        my $sock = $self->sock_to_host($host);
844      TYPE: foreach my $typename (grep !/^self$/, @$types) {
845            my $type = $typename eq 'misc' ? "" : " $typename";
846            my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
847                my $bref = shift;
848                return $$bref =~ /^(?:END|ERROR)\r?\n/m;
849            });
850            unless ($lines) {
851                _dead_sock($sock);
852                next HOST;
853            }
854
855            $lines =~ s/\0//g;  # 'stats sizes' starts with NULL?
856
857            # And, most lines end in \r\n but 'stats maps' (as of
858            # July 2003 at least) ends in \n. ??
859            my @lines = split(/\r?\n/, $lines);
860
861            # Some stats are key-value, some are not.  malloc,
862            # sizes, and the empty string are key-value.
863            # ("self" was handled separately above.)
864            if ($typename =~ /^(malloc|sizes|misc)$/) {
865                # This stat is key-value.
866                foreach my $line (@lines) {
867                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
868                    if ($key) {
869                        $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
870                    }
871                    $stats_hr->{'total'}{$key} += $value
872                        if $typename eq 'misc' && $key && $misc_keys{$key};
873                    $stats_hr->{'total'}{"malloc_$key"} += $value
874                        if $typename eq 'malloc' && $key;
875                }
876            } else {
877                # This stat is not key-value so just pull it
878                # all out in one blob.
879                $lines =~ s/^END\r?\n//m;
880                $stats_hr->{'hosts'}{$host}{$typename} ||= "";
881                $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
882            }
883        }
884    }
885
886    return $stats_hr;
887}
888
889sub stats_reset {
890    my Cache::Memcached $self = shift;
891    my ($types) = @_;
892    return 0 unless $self->{'active'};
893
894  HOST: foreach my $host (@{$self->{'buckets'}}) {
895        my $sock = $self->sock_to_host($host);
896        my $ok = _write_and_read($self, $sock, "stats reset");
897        unless (defined $ok && $ok eq "RESET\r\n") {
898            _dead_sock($sock);
899        }
900    }
901    return 1;
902}
903
9041;
905__END__
906
907=head1 NAME
908
909Cache::Memcached - client library for memcached (memory cache daemon)
910
911=head1 SYNOPSIS
912
913  use Cache::Memcached;
914
915  $memd = new Cache::Memcached {
916    'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
917                   "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
918    'debug' => 0,
919    'compress_threshold' => 10_000,
920  };
921  $memd->set_servers($array_ref);
922  $memd->set_compress_threshold(10_000);
923  $memd->enable_compress(0);
924
925  $memd->set("my_key", "Some value");
926  $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
927
928  $val = $memd->get("my_key");
929  $val = $memd->get("object_key");
930  if ($val) { print $val->{'complex'}->[2]; }
931
932  $memd->incr("key");
933  $memd->decr("key");
934  $memd->incr("key", 2);
935
936=head1 DESCRIPTION
937
938This is the Perl API for memcached, a distributed memory cache daemon.
939More information is available at:
940
941  http://www.danga.com/memcached/
942
943=head1 CONSTRUCTOR
944
945=over 4
946
947=item C<new>
948
949Takes one parameter, a hashref of options.  The most important key is
950C<servers>, but that can also be set later with the C<set_servers>
951method.  The servers must be an arrayref of hosts, each of which is
952either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
953former and an integer weight value.  (The default weight if
954unspecified is 1.)  It's recommended that weight values be kept as low
955as possible, as this module currently allocates memory for bucket
956distribution proportional to the total host weights.
957
958Use C<compress_threshold> to set a compression threshold, in bytes.
959Values larger than this threshold will be compressed by C<set> and
960decompressed by C<get>.
961
962Use C<no_rehash> to disable finding a new memcached server when one
963goes down.  Your application may or may not need this, depending on
964your expirations and key usage.
965
966Use C<readonly> to disable writes to backend memcached servers.  Only
967get and get_multi will work.  This is useful in bizarre debug and
968profiling cases only.
969
970Use C<namespace> to prefix all keys with the provided namespace value.
971That is, if you set namespace to "app1:" and later do a set of "foo"
972to "bar", memcached is actually seeing you set "app1:foo" to "bar".
973
974The other useful key is C<debug>, which when set to true will produce
975diagnostics on STDERR.
976
977=back
978
979=head1 METHODS
980
981=over 4
982
983=item C<set_servers>
984
985Sets the server list this module distributes key gets and sets between.
986The format is an arrayref of identical form as described in the C<new>
987constructor.
988
989=item C<set_debug>
990
991Sets the C<debug> flag.  See C<new> constructor for more information.
992
993=item C<set_readonly>
994
995Sets the C<readonly> flag.  See C<new> constructor for more information.
996
997=item C<set_norehash>
998
999Sets the C<no_rehash> flag.  See C<new> constructor for more information.
1000
1001=item C<set_compress_threshold>
1002
1003Sets the compression threshold. See C<new> constructor for more information.
1004
1005=item C<enable_compress>
1006
1007Temporarily enable or disable compression.  Has no effect if C<compress_threshold>
1008isn't set, but has an overriding effect if it is.
1009
1010=item C<get>
1011
1012my $val = $memd->get($key);
1013
1014Retrieves a key from the memcache.  Returns the value (automatically
1015thawed with Storable, if necessary) or undef.
1016
1017The $key can optionally be an arrayref, with the first element being the
1018hash value, if you want to avoid making this module calculate a hash
1019value.  You may prefer, for example, to keep all of a given user's
1020objects on the same memcache server, so you could use the user's
1021unique id as the hash value.
1022
1023=item C<get_multi>
1024
1025my $hashref = $memd->get_multi(@keys);
1026
1027Retrieves multiple keys from the memcache doing just one query.
1028Returns a hashref of key/value pairs that were available.
1029
1030This method is recommended over regular 'get' as it lowers the number
1031of total packets flying around your network, reducing total latency,
1032since your app doesn't have to wait for each round-trip of 'get'
1033before sending the next one.
1034
1035=item C<set>
1036
1037$memd->set($key, $value[, $exptime]);
1038
1039Unconditionally sets a key to a given value in the memcache.  Returns true
1040if it was stored successfully.
1041
1042The $key can optionally be an arrayref, with the first element being the
1043hash value, as described above.
1044
1045The $exptime (expiration time) defaults to "never" if unspecified.  If
1046you want the key to expire in memcached, pass an integer $exptime.  If
1047value is less than 60*60*24*30 (30 days), time is assumed to be relative
1048from the present.  If larger, it's considered an absolute Unix time.
1049
1050=item C<add>
1051
1052$memd->add($key, $value[, $exptime]);
1053
1054Like C<set>, but only stores in memcache if the key doesn't already exist.
1055
1056=item C<replace>
1057
1058$memd->replace($key, $value[, $exptime]);
1059
1060Like C<set>, but only stores in memcache if the key already exists.  The
1061opposite of C<add>.
1062
1063=item C<delete>
1064
1065$memd->delete($key[, $time]);
1066
1067Deletes a key.  You may optionally provide an integer time value (in seconds) to
1068tell the memcached server to block new writes to this key for that many seconds.
1069(Sometimes useful as a hacky means to prevent races.)  Returns true if key
1070was found and deleted, and false otherwise.
1071
1072You may also use the alternate method name B<remove>, so
1073Cache::Memcached looks like the L<Cache::Cache> API.
1074
1075=item C<incr>
1076
1077$memd->incr($key[, $value]);
1078
1079Sends a command to the server to atomically increment the value for
1080$key by $value, or by 1 if $value is undefined.  Returns undef if $key
1081doesn't exist on server, otherwise it returns the new value after
1082incrementing.  Value should be zero or greater.  Overflow on server
1083is not checked.  Be aware of values approaching 2**32.  See decr.
1084
1085=item C<decr>
1086
1087$memd->decr($key[, $value]);
1088
1089Like incr, but decrements.  Unlike incr, underflow is checked and new
1090values are capped at 0.  If server value is 1, a decrement of 2
1091returns 0, not -1.
1092
1093=item C<stats>
1094
1095$memd->stats([$keys]);
1096
1097Returns a hashref of statistical data regarding the memcache server(s),
1098the $memd object, or both.  $keys can be an arrayref of keys wanted, a
1099single key wanted, or absent (in which case the default value is malloc,
1100sizes, self, and the empty string).  These keys are the values passed
1101to the 'stats' command issued to the memcached server(s), except for
1102'self' which is internal to the $memd object.  Allowed values are:
1103
1104=over 4
1105
1106=item C<misc>
1107
1108The stats returned by a 'stats' command:  pid, uptime, version,
1109bytes, get_hits, etc.
1110
1111=item C<malloc>
1112
1113The stats returned by a 'stats malloc':  total_alloc, arena_size, etc.
1114
1115=item C<sizes>
1116
1117The stats returned by a 'stats sizes'.
1118
1119=item C<self>
1120
1121The stats for the $memd object itself (a copy of $memd->{'stats'}).
1122
1123=item C<maps>
1124
1125The stats returned by a 'stats maps'.
1126
1127=item C<cachedump>
1128
1129The stats returned by a 'stats cachedump'.
1130
1131=item C<slabs>
1132
1133The stats returned by a 'stats slabs'.
1134
1135=item C<items>
1136
1137The stats returned by a 'stats items'.
1138
1139=back
1140
1141=item C<disconnect_all>
1142
1143$memd->disconnect_all;
1144
1145Closes all cached sockets to all memcached servers.  You must do this
1146if your program forks and the parent has used this module at all.
1147Otherwise the children will try to use cached sockets and they'll fight
1148(as children do) and garble the client/server protocol.
1149
1150=item C<flush_all>
1151
1152$memd->flush_all;
1153
1154Runs the memcached "flush_all" command on all configured hosts,
1155emptying all their caches.  (or rather, invalidating all items
1156in the caches in an O(1) operation...)  Running stats will still
1157show the item existing, they're just be non-existent and lazily
1158destroyed next time you try to detch any of them.
1159
1160=back
1161
1162=head1 BUGS
1163
1164When a server goes down, this module does detect it, and re-hashes the
1165request to the remaining servers, but the way it does it isn't very
1166clean.  The result may be that it gives up during its rehashing and
1167refuses to get/set something it could've, had it been done right.
1168
1169=head1 COPYRIGHT
1170
1171This module is Copyright (c) 2003 Brad Fitzpatrick.
1172All rights reserved.
1173
1174You may distribute under the terms of either the GNU General Public
1175License or the Artistic License, as specified in the Perl README file.
1176
1177=head1 WARRANTY
1178
1179This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1180
1181=head1 FAQ
1182
1183See the memcached website:
1184   http://www.danga.com/memcached/
1185
1186=head1 AUTHORS
1187
1188Brad Fitzpatrick <brad@danga.com>
1189
1190Anatoly Vorobey <mellon@pobox.com>
1191
1192Brad Whitaker <whitaker@danga.com>
1193
1194Jamie McCarthy <jamie@mccarthy.vg>
Note: See TracBrowser for help on using the browser.