root/trunk/api/perl/lib/Cache/Memcached.pm @ 809

Revision 809, 34.3 kB (checked in by bradfitz, 7 months ago)

Checking in changes prior to tagging of version 1.25. Changelog diff is:

Index: ChangeLog
===================================================================
--- ChangeLog (revision 808)
+++ ChangeLog (working copy)
@@ -1,3 +1,8 @@
+2009-05-02: version 1.25
+
+ * Clear @buck2sock when calling disconnect_all. (Dennis Stosberg,
+ [rt.cpan.org #45560]
+

  • Reconnects to a dead connection shouldn't happen every time when the connection has never succeded. Apply the dead timeout to sockets that never even came up. Add a test.
  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# $Id$
2#
3# Copyright (c) 2003, 2004  Brad Fitzpatrick <brad@danga.com>
4#
5# See COPYRIGHT section in pod text below for usage and distribution rights.
6#
7
8package Cache::Memcached;
9
10use strict;
11use warnings;
12
13no strict 'refs';
14use Storable ();
15use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16use IO::Handle ();
17use Time::HiRes ();
18use String::CRC32;
19use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20use Cache::Memcached::GetParser;
21use fields qw{
22    debug no_rehash stats compress_threshold compress_enable stat_callback
23    readonly select_timeout namespace namespace_len servers active buckets
24    pref_ip
25    bucketcount _single_sock _stime
26    connect_timeout cb_connect_fail
27    parser_class
28};
29
30# flag definitions
31use constant F_STORABLE => 1;
32use constant F_COMPRESS => 2;
33
34# size savings required before saving compressed value
35use constant COMPRESS_SAVINGS => 0.20; # percent
36
37use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
38$VERSION = "1.25";
39
40BEGIN {
41    $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
42}
43
44my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
45$HAVE_XS = 0 if $ENV{NO_XS};
46
47my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
48if ($ENV{XS_DEBUG}) {
49    print "using parser: $parser_class\n";
50}
51
52$FLAG_NOSIGNAL = 0;
53eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
54
55my %host_dead;   # host -> unixtime marked dead until
56my %cache_sock;  # host -> socket
57my @buck2sock;   # bucket number -> $sock
58
59my $PROTO_TCP;
60
61our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
62
63sub new {
64    my Cache::Memcached $self = shift;
65    $self = fields::new( $self ) unless ref $self;
66
67    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args
68
69    $self->set_servers($args->{'servers'});
70    $self->{'debug'} = $args->{'debug'} || 0;
71    $self->{'no_rehash'} = $args->{'no_rehash'};
72    $self->{'stats'} = {};
73    $self->{'pref_ip'} = $args->{'pref_ip'} || {};
74    $self->{'compress_threshold'} = $args->{'compress_threshold'};
75    $self->{'compress_enable'}    = 1;
76    $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
77    $self->{'readonly'} = $args->{'readonly'};
78    $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
79
80    # TODO: undocumented
81    $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
82    $self->{'select_timeout'}  = $args->{'select_timeout'}  || 1.0;
83    $self->{namespace} = $args->{namespace} || '';
84    $self->{namespace_len} = length $self->{namespace};
85
86    return $self;
87}
88
89sub set_pref_ip {
90    my Cache::Memcached $self = shift;
91    $self->{'pref_ip'} = shift;
92}
93
94sub set_servers {
95    my Cache::Memcached $self = shift;
96    my ($list) = @_;
97    $self->{'servers'} = $list || [];
98    $self->{'active'} = scalar @{$self->{'servers'}};
99    $self->{'buckets'} = undef;
100    $self->{'bucketcount'} = 0;
101    $self->init_buckets;
102    @buck2sock = ();
103
104    $self->{'_single_sock'} = undef;
105    if (@{$self->{'servers'}} == 1) {
106        $self->{'_single_sock'} = $self->{'servers'}[0];
107    }
108
109    return $self;
110}
111
112sub set_cb_connect_fail {
113    my Cache::Memcached $self = shift;
114    $self->{'cb_connect_fail'} = shift;
115}
116
117sub set_connect_timeout {
118    my Cache::Memcached $self = shift;
119    $self->{'connect_timeout'} = shift;
120}
121
122sub set_debug {
123    my Cache::Memcached $self = shift;
124    my ($dbg) = @_;
125    $self->{'debug'} = $dbg || 0;
126}
127
128sub set_readonly {
129    my Cache::Memcached $self = shift;
130    my ($ro) = @_;
131    $self->{'readonly'} = $ro;
132}
133
134sub set_norehash {
135    my Cache::Memcached $self = shift;
136    my ($val) = @_;
137    $self->{'no_rehash'} = $val;
138}
139
140sub set_compress_threshold {
141    my Cache::Memcached $self = shift;
142    my ($thresh) = @_;
143    $self->{'compress_threshold'} = $thresh;
144}
145
146sub enable_compress {
147    my Cache::Memcached $self = shift;
148    my ($enable) = @_;
149    $self->{'compress_enable'} = $enable;
150}
151
152sub forget_dead_hosts {
153    %host_dead = ();
154    @buck2sock = ();
155}
156
157sub set_stat_callback {
158    my Cache::Memcached $self = shift;
159    my ($stat_callback) = @_;
160    $self->{'stat_callback'} = $stat_callback;
161}
162
163my %sock_map;  # stringified-$sock -> "$ip:$port"
164
165sub _dead_sock {
166    my ($sock, $ret, $dead_for) = @_;
167    if (my $ipport = $sock_map{$sock}) {
168        my $now = time();
169        $host_dead{$ipport} = $now + $dead_for
170            if $dead_for;
171        delete $cache_sock{$ipport};
172        delete $sock_map{$sock};
173    }
174    @buck2sock = ();
175    return $ret;  # 0 or undef, probably, depending on what caller wants
176}
177
178sub _close_sock {
179    my ($sock) = @_;
180    if (my $ipport = $sock_map{$sock}) {
181        close $sock;
182        delete $cache_sock{$ipport};
183        delete $sock_map{$sock};
184    }
185    @buck2sock = ();
186}
187
188sub _connect_sock { # sock, sin, timeout
189    my ($sock, $sin, $timeout) = @_;
190    $timeout = 0.25 if not defined $timeout;
191
192    # make the socket non-blocking from now on,
193    # except if someone wants 0 timeout, meaning
194    # a blocking connect, but even then turn it
195    # non-blocking at the end of this function
196
197    if ($timeout) {
198        IO::Handle::blocking($sock, 0);
199    } else {
200        IO::Handle::blocking($sock, 1);
201    }
202
203    my $ret = connect($sock, $sin);
204
205    if (!$ret && $timeout && $!==EINPROGRESS) {
206
207        my $win='';
208        vec($win, fileno($sock), 1) = 1;
209
210        if (select(undef, $win, undef, $timeout) > 0) {
211            $ret = connect($sock, $sin);
212            # EISCONN means connected & won't re-connect, so success
213            $ret = 1 if !$ret && $!==EISCONN;
214        }
215    }
216
217    unless ($timeout) { # socket was temporarily blocking, now revert
218        IO::Handle::blocking($sock, 0);
219    }
220
221    # from here on, we use non-blocking (async) IO for the duration
222    # of the socket's life
223
224    return $ret;
225}
226
227sub sock_to_host { # (host)
228    my Cache::Memcached $self = ref $_[0] ? shift : undef;
229    my $host = $_[0];
230    return $cache_sock{$host} if $cache_sock{$host};
231
232    my $now = time();
233    my ($ip, $port) = $host =~ /(.*):(\d+)/;
234    return undef if
235        $host_dead{$host} && $host_dead{$host} > $now;
236    my $sock;
237
238    my $connected = 0;
239    my $sin;
240    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
241
242    if ( index($host, '/') != 0 )
243    {
244        # if a preferred IP is known, try that first.
245        if ($self && $self->{pref_ip}{$ip}) {
246            socket($sock, PF_INET, SOCK_STREAM, $proto);
247            $sock_map{$sock} = $host;
248            my $prefip = $self->{pref_ip}{$ip};
249            $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
250            if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
251                $connected = 1;
252            } else {
253                if (my $cb = $self->{cb_connect_fail}) {
254                    $cb->($prefip);
255                }
256                close $sock;
257            }
258        }
259
260        # normal path, or fallback path if preferred IP failed
261        unless ($connected) {
262            socket($sock, PF_INET, SOCK_STREAM, $proto);
263            $sock_map{$sock} = $host;
264            $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
265            my $timeout = $self ? $self->{connect_timeout} : 0.25;
266            unless (_connect_sock($sock,$sin,$timeout)) {
267                my $cb = $self ? $self->{cb_connect_fail} : undef;
268                $cb->($ip) if $cb;
269                return _dead_sock($sock, undef, 20 + int(rand(10)));
270            }
271        }
272    } else { # it's a unix domain/local socket
273        socket($sock, PF_UNIX, SOCK_STREAM, 0);
274        $sock_map{$sock} = $host;
275        $sin = Socket::sockaddr_un($host);
276        my $timeout = $self ? $self->{connect_timeout} : 0.25;
277        unless (_connect_sock($sock,$sin,$timeout)) {
278            my $cb = $self ? $self->{cb_connect_fail} : undef;
279            $cb->($host) if $cb;
280            return _dead_sock($sock, undef, 20 + int(rand(10)));
281        }
282    }
283
284    # make the new socket not buffer writes.
285    my $old = select($sock);
286    $| = 1;
287    select($old);
288
289    $cache_sock{$host} = $sock;
290
291    return $sock;
292}
293
294sub get_sock { # (key)
295    my Cache::Memcached $self = $_[0];
296    my $key = $_[1];
297    return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
298    return undef unless $self->{'active'};
299    my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
300
301    my $real_key = ref $key ? $key->[1] : $key;
302    my $tries = 0;
303    while ($tries++ < 20) {
304        my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
305        my $sock = $self->sock_to_host($host);
306        return $sock if $sock;
307        return undef if $self->{'no_rehash'};
308        $hv += _hashfunc($tries . $real_key);  # stupid, but works
309    }
310    return undef;
311}
312
313sub init_buckets {
314    my Cache::Memcached $self = shift;
315    return if $self->{'buckets'};
316    my $bu = $self->{'buckets'} = [];
317    foreach my $v (@{$self->{'servers'}}) {
318        if (ref $v eq "ARRAY") {
319            for (1..$v->[1]) { push @$bu, $v->[0]; }
320        } else {
321            push @$bu, $v;
322        }
323    }
324    $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
325}
326
327sub disconnect_all {
328    my $sock;
329    foreach $sock (values %cache_sock) {
330        close $sock;
331    }
332    %cache_sock = ();
333    @buck2sock = ();
334}
335
336# writes a line, then reads result.  by default stops reading after a
337# single line, but caller can override the $check_complete subref,
338# which gets passed a scalarref of buffer read thus far.
339sub _write_and_read {
340    my Cache::Memcached $self = shift;
341    my ($sock, $line, $check_complete) = @_;
342    my $res;
343    my ($ret, $offset) = (undef, 0);
344
345    $check_complete ||= sub {
346        return (rindex($ret, "\r\n") + 2 == length($ret));
347    };
348
349    # state: 0 - writing, 1 - reading, 2 - done
350    my $state = 0;
351
352    # the bitsets for select
353    my ($rin, $rout, $win, $wout);
354    my $nfound;
355
356    my $copy_state = -1;
357    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
358
359    # the select loop
360    while(1) {
361        if ($copy_state!=$state) {
362            last if $state==2;
363            ($rin, $win) = ('', '');
364            vec($rin, fileno($sock), 1) = 1 if $state==1;
365            vec($win, fileno($sock), 1) = 1 if $state==0;
366            $copy_state = $state;
367        }
368        $nfound = select($rout=$rin, $wout=$win, undef,
369                         $self->{'select_timeout'});
370        last unless $nfound;
371
372        if (vec($wout, fileno($sock), 1)) {
373            $res = send($sock, $line, $FLAG_NOSIGNAL);
374            next
375                if not defined $res and $!==EWOULDBLOCK;
376            unless ($res > 0) {
377                _close_sock($sock);
378                return undef;
379            }
380            if ($res == length($line)) { # all sent
381                $state = 1;
382            } else { # we only succeeded in sending some of it
383                substr($line, 0, $res, ''); # delete the part we sent
384            }
385        }
386
387        if (vec($rout, fileno($sock), 1)) {
388            $res = sysread($sock, $ret, 255, $offset);
389            next
390                if !defined($res) and $!==EWOULDBLOCK;
391            if ($res == 0) { # catches 0=conn closed or undef=error
392                _close_sock($sock);
393                return undef;
394            }
395            $offset += $res;
396            $state = 2 if $check_complete->(\$ret);
397        }
398    }
399
400    unless ($state == 2) {
401        _dead_sock($sock); # improperly finished
402        return undef;
403    }
404
405    return $ret;
406}
407
408sub delete {
409    my Cache::Memcached $self = shift;
410    my ($key, $time) = @_;
411    return 0 if ! $self->{'active'} || $self->{'readonly'};
412    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
413    my $sock = $self->get_sock($key);
414    return 0 unless $sock;
415
416    $self->{'stats'}->{"delete"}++;
417    $key = ref $key ? $key->[1] : $key;
418    $time = $time ? " $time" : "";
419    my $cmd = "delete $self->{namespace}$key$time\r\n";
420    my $res = _write_and_read($self, $sock, $cmd);
421
422    if ($self->{'stat_callback'}) {
423        my $etime = Time::HiRes::time();
424        $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
425    }
426
427    return defined $res && $res eq "DELETED\r\n";
428}
429*remove = \&delete;
430
431sub add {
432    _set("add", @_);
433}
434
435sub replace {
436    _set("replace", @_);
437}
438
439sub set {
440    _set("set", @_);
441}
442
443sub _set {
444    my $cmdname = shift;
445    my Cache::Memcached $self = shift;
446    my ($key, $val, $exptime) = @_;
447    return 0 if ! $self->{'active'} || $self->{'readonly'};
448    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
449    my $sock = $self->get_sock($key);
450    return 0 unless $sock;
451
452    use bytes; # return bytes from length()
453
454    $self->{'stats'}->{$cmdname}++;
455    my $flags = 0;
456    $key = ref $key ? $key->[1] : $key;
457
458    if (ref $val) {
459        local $Carp::CarpLevel = 2;
460        $val = Storable::nfreeze($val);
461        $flags |= F_STORABLE;
462    }
463    warn "value for memkey:$key is not defined" unless defined $val;
464
465    my $len = length($val);
466
467    if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
468        $len >= $self->{'compress_threshold'}) {
469
470        my $c_val = Compress::Zlib::memGzip($val);
471        my $c_len = length($c_val);
472
473        # do we want to keep it?
474        if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
475            $val = $c_val;
476            $len = $c_len;
477            $flags |= F_COMPRESS;
478        }
479    }
480
481    $exptime = int($exptime || 0);
482
483    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
484    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
485
486    my $res = _write_and_read($self, $sock, $line);
487
488    if ($self->{'debug'} && $line) {
489        chop $line; chop $line;
490        print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
491    }
492
493    if ($self->{'stat_callback'}) {
494        my $etime = Time::HiRes::time();
495        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
496    }
497
498    return defined $res && $res eq "STORED\r\n";
499}
500
501sub incr {
502    _incrdecr("incr", @_);
503}
504
505sub decr {
506    _incrdecr("decr", @_);
507}
508
509sub _incrdecr {
510    my $cmdname = shift;
511    my Cache::Memcached $self = shift;
512    my ($key, $value) = @_;
513    return undef if ! $self->{'active'} || $self->{'readonly'};
514    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
515    my $sock = $self->get_sock($key);
516    return undef unless $sock;
517    $key = $key->[1] if ref $key;
518    $self->{'stats'}->{$cmdname}++;
519    $value = 1 unless defined $value;
520
521    my $line = "$cmdname $self->{namespace}$key $value\r\n";
522    my $res = _write_and_read($self, $sock, $line);
523
524    if ($self->{'stat_callback'}) {
525        my $etime = Time::HiRes::time();
526        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
527    }
528
529    return undef unless defined $res && $res =~ /^(\d+)/;
530    return $1;
531}
532
533sub get {
534    my Cache::Memcached $self = $_[0];
535    my $key = $_[1];
536
537    # TODO: make a fast path for this?  or just keep using get_multi?
538    my $r = $self->get_multi($key);
539    my $kval = ref $key ? $key->[1] : $key;
540    return $r->{$kval};
541}
542
543sub get_multi {
544    my Cache::Memcached $self = shift;
545    return {} unless $self->{'active'};
546    $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
547    $self->{'stats'}->{"get_multi"}++;
548
549    my %val;        # what we'll be returning a reference to (realkey -> value)
550    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
551    my $sock;
552
553    if ($self->{'_single_sock'}) {
554        $sock = $self->sock_to_host($self->{'_single_sock'});
555        unless ($sock) {
556            return {};
557        }
558        foreach my $key (@_) {
559            my $kval = ref $key ? $key->[1] : $key;
560            push @{$sock_keys{$sock}}, $kval;
561        }
562    } else {
563        my $bcount = $self->{'bucketcount'};
564        my $sock;
565      KEY:
566        foreach my $key (@_) {
567            my ($hv, $real_key) = ref $key ?
568                (int($key->[0]),               $key->[1]) :
569                ((crc32($key) >> 16) & 0x7fff, $key);
570
571            my $tries;
572            while (1) {
573                my $bucket = $hv % $bcount;
574
575                # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
576                #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
577                #    and last;
578
579                # but this variant doesn't crash:
580                $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
581                if ($sock) {
582                    $buck2sock[$bucket] = $sock;
583                    last;
584                }
585
586                next KEY if $tries++ >= 20;
587                $hv += _hashfunc($tries . $real_key);
588            }
589
590            push @{$sock_keys{$sock}}, $real_key;
591        }
592    }
593
594    $self->{'stats'}->{"get_keys"} += @_;
595    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
596
597    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
598
599    _load_multi($self, \%sock_keys, \%val);
600
601    if ($self->{'debug'}) {
602        while (my ($k, $v) = each %val) {
603            print STDERR "MemCache: got $k = $v\n";
604        }
605    }
606    return \%val;
607}
608
609sub _load_multi {
610    use bytes; # return bytes from length()
611    my Cache::Memcached $self;
612    my ($sock_keys, $ret);
613
614    ($self, $sock_keys, $ret) = @_;
615
616    # all keyed by $sockstr:
617    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
618    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
619    my %buf;     # buffers, for writing
620
621    my %parser;  # $sockstr -> Cache::Memcached::GetParser
622
623    my $active_changed = 1; # force rebuilding of select sets
624
625    my $dead = sub {
626        my $sock = shift;
627        print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
628        delete $reading{$sock};
629        delete $writing{$sock};
630
631        if (my $p = $parser{$sock}) {
632            my $key = $p->current_key;
633            delete $ret->{$key} if $key;
634        }
635
636        if ($self->{'stat_callback'}) {
637            my $etime = Time::HiRes::time();
638            $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
639        }
640
641        close $sock;
642        _dead_sock($sock);
643    };
644
645    # $finalize->($key, $flags)
646    # $finalize->({ $key => $flags, $key => $flags });
647    my $finalize = sub {
648        my $map = $_[0];
649        $map = {@_} unless ref $map;
650
651        while (my ($k, $flags) = each %$map) {
652
653            # remove trailing \r\n
654            chop $ret->{$k}; chop $ret->{$k};
655
656            $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
657                if $HAVE_ZLIB && $flags & F_COMPRESS;
658            if ($flags & F_STORABLE) {
659                # wrapped in eval in case a perl 5.6 Storable tries to
660                # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
661                # and dies if the version number changes at all.  in 5.8
662                # they made it only die if it unencounters a new feature)
663                eval {
664                    $ret->{$k} = Storable::thaw($ret->{$k});
665                };
666                # so if there was a problem, just treat it as a cache miss.
667                if ($@) {
668                    delete $ret->{$k};
669                }
670            }
671        }
672    };
673
674    foreach (keys %$sock_keys) {
675        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
676        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
677        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
678        $writing{$_} = $sock;
679        if ($self->{namespace}) {
680            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
681        } else {
682            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
683        }
684
685        $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
686    }
687
688    my $read = sub {
689        my $sockstr = "$_[0]";  # $sock is $_[0];
690        my $p = $parser{$sockstr} or die;
691        my $rv = $p->parse_from_sock($_[0]);
692        if ($rv > 0) {
693            # okay, finished with this socket
694            delete $reading{$sockstr};
695        } elsif ($rv < 0) {
696            $dead->($_[0]);
697        }
698        return $rv;
699    };
700
701    # returns 1 when it's done, for success or error.  0 if still working.
702    my $write = sub {
703        my ($sock, $sockstr) = ($_[0], "$_[0]");
704        my $res;
705
706        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
707
708        return 0
709            if not defined $res and $!==EWOULDBLOCK;
710        unless ($res > 0) {
711            $dead->($sock);
712            return 1;
713        }
714        if ($res == length($buf{$sockstr})) { # all sent
715            $buf{$sockstr} = "";
716
717            # switch the socket from writing to reading
718            delete $writing{$sockstr};
719            $reading{$sockstr} = $sock;
720            return 1;
721        } else { # we only succeeded in sending some of it
722            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
723        }
724        return 0;
725    };
726
727    # the bitsets for select
728    my ($rin, $rout, $win, $wout);
729    my $nfound;
730
731    # the big select loop
732    while(1) {
733        if ($active_changed) {
734            last unless %reading or %writing; # no sockets left?
735            ($rin, $win) = ('', '');
736            foreach (values %reading) {
737                vec($rin, fileno($_), 1) = 1;
738            }
739            foreach (values %writing) {
740                vec($win, fileno($_), 1) = 1;
741            }
742            $active_changed = 0;
743        }
744        # TODO: more intelligent cumulative timeout?
745        # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
746        $nfound = select($rout=$rin, $wout=$win, undef,
747                         $self->{'select_timeout'});
748        last unless $nfound;
749
750        # TODO: possible robustness improvement: we could select
751        # writing sockets for reading also, and raise hell if they're
752        # ready (input unread from last time, etc.)
753        # maybe do that on the first loop only?
754        foreach (values %writing) {
755            if (vec($wout, fileno($_), 1)) {
756                $active_changed = 1 if $write->($_);
757            }
758        }
759        foreach (values %reading) {
760            if (vec($rout, fileno($_), 1)) {
761                $active_changed = 1 if $read->($_);
762            }
763        }
764    }
765
766    # if there're active sockets left, they need to die
767    foreach (values %writing) {
768        $dead->($_);
769    }
770    foreach (values %reading) {
771        $dead->($_);
772    }
773
774    return;
775}
776
777sub _hashfunc {
778    return (crc32($_[0]) >> 16) & 0x7fff;
779}
780
781sub flush_all {
782    my Cache::Memcached $self = shift;
783
784    my $success = 1;
785
786    my @hosts = @{$self->{'buckets'}};
787    foreach my $host (@hosts) {
788        my $sock = $self->sock_to_host($host);
789        my @res = $self->run_command($sock, "flush_all\r\n");
790        $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
791    }
792
793    return $success;
794}
795
796# returns array of lines, or () on failure.
797sub run_command {
798    my Cache::Memcached $self = shift;
799    my ($sock, $cmd) = @_;
800    return () unless $sock;
801    my $ret;
802    my $line = $cmd;
803    while (my $res = _write_and_read($self, $sock, $line)) {
804        undef $line;
805        $ret .= $res;
806        last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
807    }
808    chop $ret; chop $ret;
809    return map { "$_\r\n" } split(/\r\n/, $ret);
810}
811
812sub stats {
813    my Cache::Memcached $self = shift;
814    my ($types) = @_;
815    return 0 unless $self->{'active'};
816    return 0 unless !ref($types) || ref($types) eq 'ARRAY';
817    if (!ref($types)) {
818        if (!$types) {
819            # I don't much care what the default is, it should just
820            # be something reasonable.  Obviously "reset" should not
821            # be on the list :) but other types that might go in here
822            # include maps, cachedump, slabs, or items.
823            $types = [ qw( misc malloc sizes self ) ];
824        } else {
825            $types = [ $types ];
826        }
827    }
828
829    my $stats_hr = { };
830
831    # The "self" stat type is special, it only applies to this very
832    # object.
833    if (grep /^self$/, @$types) {
834        $stats_hr->{'self'} = \%{ $self->{'stats'} };
835    }
836
837    my %misc_keys = map { $_ => 1 }
838      qw/ bytes bytes_read bytes_written
839          cmd_get cmd_set connection_structures curr_items
840          get_hits get_misses
841          total_connections total_items
842        /;
843
844    # Now handle the other types, passing each type to each host server.
845    my @hosts = @{$self->{'buckets'}};
846  HOST: foreach my $host (@hosts) {
847        my $sock = $self->sock_to_host($host);
848      TYPE: foreach my $typename (grep !/^self$/, @$types) {
849            my $type = $typename eq 'misc' ? "" : " $typename";
850            my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
851                my $bref = shift;
852                return $$bref =~ /^(?:END|ERROR)\r?\n/m;
853            });
854            unless ($lines) {
855                _dead_sock($sock);
856                next HOST;
857            }
858
859            $lines =~ s/\0//g;  # 'stats sizes' starts with NULL?
860
861            # And, most lines end in \r\n but 'stats maps' (as of
862            # July 2003 at least) ends in \n. ??
863            my @lines = split(/\r?\n/, $lines);
864
865            # Some stats are key-value, some are not.  malloc,
866            # sizes, and the empty string are key-value.
867            # ("self" was handled separately above.)
868            if ($typename =~ /^(malloc|sizes|misc)$/) {
869                # This stat is key-value.
870                foreach my $line (@lines) {
871                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
872                    if ($key) {
873                        $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
874                    }
875                    $stats_hr->{'total'}{$key} += $value
876                        if $typename eq 'misc' && $key && $misc_keys{$key};
877                    $stats_hr->{'total'}{"malloc_$key"} += $value
878                        if $typename eq 'malloc' && $key;
879                }
880            } else {
881                # This stat is not key-value so just pull it
882                # all out in one blob.
883                $lines =~ s/^END\r?\n//m;
884                $stats_hr->{'hosts'}{$host}{$typename} ||= "";
885                $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
886            }
887        }
888    }
889
890    return $stats_hr;
891}
892
893sub stats_reset {
894    my Cache::Memcached $self = shift;
895    my ($types) = @_;
896    return 0 unless $self->{'active'};
897
898  HOST: foreach my $host (@{$self->{'buckets'}}) {
899        my $sock = $self->sock_to_host($host);
900        my $ok = _write_and_read($self, $sock, "stats reset");
901        unless (defined $ok && $ok eq "RESET\r\n") {
902            _dead_sock($sock);
903        }
904    }
905    return 1;
906}
907
9081;
909__END__
910
911=head1 NAME
912
913Cache::Memcached - client library for memcached (memory cache daemon)
914
915=head1 SYNOPSIS
916
917  use Cache::Memcached;
918
919  $memd = new Cache::Memcached {
920    'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
921                   "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
922    'debug' => 0,
923    'compress_threshold' => 10_000,
924  };
925  $memd->set_servers($array_ref);
926  $memd->set_compress_threshold(10_000);
927  $memd->enable_compress(0);
928
929  $memd->set("my_key", "Some value");
930  $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
931
932  $val = $memd->get("my_key");
933  $val = $memd->get("object_key");
934  if ($val) { print $val->{'complex'}->[2]; }
935
936  $memd->incr("key");
937  $memd->decr("key");
938  $memd->incr("key", 2);
939
940=head1 DESCRIPTION
941
942This is the Perl API for memcached, a distributed memory cache daemon.
943More information is available at:
944
945  http://www.danga.com/memcached/
946
947=head1 CONSTRUCTOR
948
949=over 4
950
951=item C<new>
952
953Takes one parameter, a hashref of options.  The most important key is
954C<servers>, but that can also be set later with the C<set_servers>
955method.  The servers must be an arrayref of hosts, each of which is
956either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
957former and an integer weight value.  (The default weight if
958unspecified is 1.)  It's recommended that weight values be kept as low
959as possible, as this module currently allocates memory for bucket
960distribution proportional to the total host weights.
961
962Use C<compress_threshold> to set a compression threshold, in bytes.
963Values larger than this threshold will be compressed by C<set> and
964decompressed by C<get>.
965
966Use C<no_rehash> to disable finding a new memcached server when one
967goes down.  Your application may or may not need this, depending on
968your expirations and key usage.
969
970Use C<readonly> to disable writes to backend memcached servers.  Only
971get and get_multi will work.  This is useful in bizarre debug and
972profiling cases only.
973
974Use C<namespace> to prefix all keys with the provided namespace value.
975That is, if you set namespace to "app1:" and later do a set of "foo"
976to "bar", memcached is actually seeing you set "app1:foo" to "bar".
977
978The other useful key is C<debug>, which when set to true will produce
979diagnostics on STDERR.
980
981=back
982
983=head1 METHODS
984
985=over 4
986
987=item C<set_servers>
988
989Sets the server list this module distributes key gets and sets between.
990The format is an arrayref of identical form as described in the C<new>
991constructor.
992
993=item C<set_debug>
994
995Sets the C<debug> flag.  See C<new> constructor for more information.
996
997=item C<set_readonly>
998
999Sets the C<readonly> flag.  See C<new> constructor for more information.
1000
1001=item C<set_norehash>
1002
1003Sets the C<no_rehash> flag.  See C<new> constructor for more information.
1004
1005=item C<set_compress_threshold>
1006
1007Sets the compression threshold. See C<new> constructor for more information.
1008
1009=item C<enable_compress>
1010
1011Temporarily enable or disable compression.  Has no effect if C<compress_threshold>
1012isn't set, but has an overriding effect if it is.
1013
1014=item C<get>
1015
1016my $val = $memd->get($key);
1017
1018Retrieves a key from the memcache.  Returns the value (automatically
1019thawed with Storable, if necessary) or undef.
1020
1021The $key can optionally be an arrayref, with the first element being the
1022hash value, if you want to avoid making this module calculate a hash
1023value.  You may prefer, for example, to keep all of a given user's
1024objects on the same memcache server, so you could use the user's
1025unique id as the hash value.
1026
1027=item C<get_multi>
1028
1029my $hashref = $memd->get_multi(@keys);
1030
1031Retrieves multiple keys from the memcache doing just one query.
1032Returns a hashref of key/value pairs that were available.
1033
1034This method is recommended over regular 'get' as it lowers the number
1035of total packets flying around your network, reducing total latency,
1036since your app doesn't have to wait for each round-trip of 'get'
1037before sending the next one.
1038
1039=item C<set>
1040
1041$memd->set($key, $value[, $exptime]);
1042
1043Unconditionally sets a key to a given value in the memcache.  Returns true
1044if it was stored successfully.
1045
1046The $key can optionally be an arrayref, with the first element being the
1047hash value, as described above.
1048
1049The $exptime (expiration time) defaults to "never" if unspecified.  If
1050you want the key to expire in memcached, pass an integer $exptime.  If
1051value is less than 60*60*24*30 (30 days), time is assumed to be relative
1052from the present.  If larger, it's considered an absolute Unix time.
1053
1054=item C<add>
1055
1056$memd->add($key, $value[, $exptime]);
1057
1058Like C<set>, but only stores in memcache if the key doesn't already exist.
1059
1060=item C<replace>
1061
1062$memd->replace($key, $value[, $exptime]);
1063
1064Like C<set>, but only stores in memcache if the key already exists.  The
1065opposite of C<add>.
1066
1067=item C<delete>
1068
1069$memd->delete($key[, $time]);
1070
1071Deletes a key.  You may optionally provide an integer time value (in seconds) to
1072tell the memcached server to block new writes to this key for that many seconds.
1073(Sometimes useful as a hacky means to prevent races.)  Returns true if key
1074was found and deleted, and false otherwise.
1075
1076You may also use the alternate method name B<remove>, so
1077Cache::Memcached looks like the L<Cache::Cache> API.
1078
1079=item C<incr>
1080
1081$memd->incr($key[, $value]);
1082
1083Sends a command to the server to atomically increment the value for
1084$key by $value, or by 1 if $value is undefined.  Returns undef if $key
1085doesn't exist on server, otherwise it returns the new value after
1086incrementing.  Value should be zero or greater.  Overflow on server
1087is not checked.  Be aware of values approaching 2**32.  See decr.
1088
1089=item C<decr>
1090
1091$memd->decr($key[, $value]);
1092
1093Like incr, but decrements.  Unlike incr, underflow is checked and new
1094values are capped at 0.  If server value is 1, a decrement of 2
1095returns 0, not -1.
1096
1097=item C<stats>
1098
1099$memd->stats([$keys]);
1100
1101Returns a hashref of statistical data regarding the memcache server(s),
1102the $memd object, or both.  $keys can be an arrayref of keys wanted, a
1103single key wanted, or absent (in which case the default value is malloc,
1104sizes, self, and the empty string).  These keys are the values passed
1105to the 'stats' command issued to the memcached server(s), except for
1106'self' which is internal to the $memd object.  Allowed values are:
1107
1108=over 4
1109
1110=item C<misc>
1111
1112The stats returned by a 'stats' command:  pid, uptime, version,
1113bytes, get_hits, etc.
1114
1115=item C<malloc>
1116
1117The stats returned by a 'stats malloc':  total_alloc, arena_size, etc.
1118
1119=item C<sizes>
1120
1121The stats returned by a 'stats sizes'.
1122
1123=item C<self>
1124
1125The stats for the $memd object itself (a copy of $memd->{'stats'}).
1126
1127=item C<maps>
1128
1129The stats returned by a 'stats maps'.
1130
1131=item C<cachedump>
1132
1133The stats returned by a 'stats cachedump'.
1134
1135=item C<slabs>
1136
1137The stats returned by a 'stats slabs'.
1138
1139=item C<items>
1140
1141The stats returned by a 'stats items'.
1142
1143=back
1144
1145=item C<disconnect_all>
1146
1147$memd->disconnect_all;
1148
1149Closes all cached sockets to all memcached servers.  You must do this
1150if your program forks and the parent has used this module at all.
1151Otherwise the children will try to use cached sockets and they'll fight
1152(as children do) and garble the client/server protocol.
1153
1154=item C<flush_all>
1155
1156$memd->flush_all;
1157
1158Runs the memcached "flush_all" command on all configured hosts,
1159emptying all their caches.  (or rather, invalidating all items
1160in the caches in an O(1) operation...)  Running stats will still
1161show the item existing, they're just be non-existent and lazily
1162destroyed next time you try to detch any of them.
1163
1164=back
1165
1166=head1 BUGS
1167
1168When a server goes down, this module does detect it, and re-hashes the
1169request to the remaining servers, but the way it does it isn't very
1170clean.  The result may be that it gives up during its rehashing and
1171refuses to get/set something it could've, had it been done right.
1172
1173=head1 COPYRIGHT
1174
1175This module is Copyright (c) 2003 Brad Fitzpatrick.
1176All rights reserved.
1177
1178You may distribute under the terms of either the GNU General Public
1179License or the Artistic License, as specified in the Perl README file.
1180
1181=head1 WARRANTY
1182
1183This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1184
1185=head1 FAQ
1186
1187See the memcached website:
1188   http://www.danga.com/memcached/
1189
1190=head1 AUTHORS
1191
1192Brad Fitzpatrick <brad@danga.com>
1193
1194Anatoly Vorobey <mellon@pobox.com>
1195
1196Brad Whitaker <whitaker@danga.com>
1197
1198Jamie McCarthy <jamie@mccarthy.vg>
Note: See TracBrowser for help on using the browser.