root/trunk/api/perl/lib/Cache/Memcached.pm @ 811

Revision 811, 34.4 kB (checked in by bradfitz, 7 months ago)

Checking in changes prior to tagging of version 1.26. Changelog diff is:

Index: ChangeLog
===================================================================
--- ChangeLog (revision 809)
+++ ChangeLog (working copy)
@@ -1,3 +1,8 @@
+2009-05-04: version 1.26
+
+ * don't include "stats sizes" by default in the stats method,
+ as that can hang big servers for a few seconds (Brad Fitzpatrick)
+

2009-05-02: version 1.25


  • Clear @buck2sock when calling disconnect_all. (Dennis Stosberg,
  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# $Id$
2#
3# Copyright (c) 2003, 2004  Brad Fitzpatrick <brad@danga.com>
4#
5# See COPYRIGHT section in pod text below for usage and distribution rights.
6#
7
8package Cache::Memcached;
9
10use strict;
11use warnings;
12
13no strict 'refs';
14use Storable ();
15use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16use IO::Handle ();
17use Time::HiRes ();
18use String::CRC32;
19use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20use Cache::Memcached::GetParser;
21use fields qw{
22    debug no_rehash stats compress_threshold compress_enable stat_callback
23    readonly select_timeout namespace namespace_len servers active buckets
24    pref_ip
25    bucketcount _single_sock _stime
26    connect_timeout cb_connect_fail
27    parser_class
28};
29
30# flag definitions
31use constant F_STORABLE => 1;
32use constant F_COMPRESS => 2;
33
34# size savings required before saving compressed value
35use constant COMPRESS_SAVINGS => 0.20; # percent
36
37use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
38$VERSION = "1.26";
39
40BEGIN {
41    $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
42}
43
44my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
45$HAVE_XS = 0 if $ENV{NO_XS};
46
47my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
48if ($ENV{XS_DEBUG}) {
49    print "using parser: $parser_class\n";
50}
51
52$FLAG_NOSIGNAL = 0;
53eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
54
55my %host_dead;   # host -> unixtime marked dead until
56my %cache_sock;  # host -> socket
57my @buck2sock;   # bucket number -> $sock
58
59my $PROTO_TCP;
60
61our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
62
63sub new {
64    my Cache::Memcached $self = shift;
65    $self = fields::new( $self ) unless ref $self;
66
67    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args
68
69    $self->set_servers($args->{'servers'});
70    $self->{'debug'} = $args->{'debug'} || 0;
71    $self->{'no_rehash'} = $args->{'no_rehash'};
72    $self->{'stats'} = {};
73    $self->{'pref_ip'} = $args->{'pref_ip'} || {};
74    $self->{'compress_threshold'} = $args->{'compress_threshold'};
75    $self->{'compress_enable'}    = 1;
76    $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
77    $self->{'readonly'} = $args->{'readonly'};
78    $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
79
80    # TODO: undocumented
81    $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
82    $self->{'select_timeout'}  = $args->{'select_timeout'}  || 1.0;
83    $self->{namespace} = $args->{namespace} || '';
84    $self->{namespace_len} = length $self->{namespace};
85
86    return $self;
87}
88
89sub set_pref_ip {
90    my Cache::Memcached $self = shift;
91    $self->{'pref_ip'} = shift;
92}
93
94sub set_servers {
95    my Cache::Memcached $self = shift;
96    my ($list) = @_;
97    $self->{'servers'} = $list || [];
98    $self->{'active'} = scalar @{$self->{'servers'}};
99    $self->{'buckets'} = undef;
100    $self->{'bucketcount'} = 0;
101    $self->init_buckets;
102    @buck2sock = ();
103
104    $self->{'_single_sock'} = undef;
105    if (@{$self->{'servers'}} == 1) {
106        $self->{'_single_sock'} = $self->{'servers'}[0];
107    }
108
109    return $self;
110}
111
112sub set_cb_connect_fail {
113    my Cache::Memcached $self = shift;
114    $self->{'cb_connect_fail'} = shift;
115}
116
117sub set_connect_timeout {
118    my Cache::Memcached $self = shift;
119    $self->{'connect_timeout'} = shift;
120}
121
122sub set_debug {
123    my Cache::Memcached $self = shift;
124    my ($dbg) = @_;
125    $self->{'debug'} = $dbg || 0;
126}
127
128sub set_readonly {
129    my Cache::Memcached $self = shift;
130    my ($ro) = @_;
131    $self->{'readonly'} = $ro;
132}
133
134sub set_norehash {
135    my Cache::Memcached $self = shift;
136    my ($val) = @_;
137    $self->{'no_rehash'} = $val;
138}
139
140sub set_compress_threshold {
141    my Cache::Memcached $self = shift;
142    my ($thresh) = @_;
143    $self->{'compress_threshold'} = $thresh;
144}
145
146sub enable_compress {
147    my Cache::Memcached $self = shift;
148    my ($enable) = @_;
149    $self->{'compress_enable'} = $enable;
150}
151
152sub forget_dead_hosts {
153    %host_dead = ();
154    @buck2sock = ();
155}
156
157sub set_stat_callback {
158    my Cache::Memcached $self = shift;
159    my ($stat_callback) = @_;
160    $self->{'stat_callback'} = $stat_callback;
161}
162
163my %sock_map;  # stringified-$sock -> "$ip:$port"
164
165sub _dead_sock {
166    my ($sock, $ret, $dead_for) = @_;
167    if (my $ipport = $sock_map{$sock}) {
168        my $now = time();
169        $host_dead{$ipport} = $now + $dead_for
170            if $dead_for;
171        delete $cache_sock{$ipport};
172        delete $sock_map{$sock};
173    }
174    @buck2sock = ();
175    return $ret;  # 0 or undef, probably, depending on what caller wants
176}
177
178sub _close_sock {
179    my ($sock) = @_;
180    if (my $ipport = $sock_map{$sock}) {
181        close $sock;
182        delete $cache_sock{$ipport};
183        delete $sock_map{$sock};
184    }
185    @buck2sock = ();
186}
187
188sub _connect_sock { # sock, sin, timeout
189    my ($sock, $sin, $timeout) = @_;
190    $timeout = 0.25 if not defined $timeout;
191
192    # make the socket non-blocking from now on,
193    # except if someone wants 0 timeout, meaning
194    # a blocking connect, but even then turn it
195    # non-blocking at the end of this function
196
197    if ($timeout) {
198        IO::Handle::blocking($sock, 0);
199    } else {
200        IO::Handle::blocking($sock, 1);
201    }
202
203    my $ret = connect($sock, $sin);
204
205    if (!$ret && $timeout && $!==EINPROGRESS) {
206
207        my $win='';
208        vec($win, fileno($sock), 1) = 1;
209
210        if (select(undef, $win, undef, $timeout) > 0) {
211            $ret = connect($sock, $sin);
212            # EISCONN means connected & won't re-connect, so success
213            $ret = 1 if !$ret && $!==EISCONN;
214        }
215    }
216
217    unless ($timeout) { # socket was temporarily blocking, now revert
218        IO::Handle::blocking($sock, 0);
219    }
220
221    # from here on, we use non-blocking (async) IO for the duration
222    # of the socket's life
223
224    return $ret;
225}
226
227sub sock_to_host { # (host)
228    my Cache::Memcached $self = ref $_[0] ? shift : undef;
229    my $host = $_[0];
230    return $cache_sock{$host} if $cache_sock{$host};
231
232    my $now = time();
233    my ($ip, $port) = $host =~ /(.*):(\d+)/;
234    return undef if
235        $host_dead{$host} && $host_dead{$host} > $now;
236    my $sock;
237
238    my $connected = 0;
239    my $sin;
240    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
241
242    if ( index($host, '/') != 0 )
243    {
244        # if a preferred IP is known, try that first.
245        if ($self && $self->{pref_ip}{$ip}) {
246            socket($sock, PF_INET, SOCK_STREAM, $proto);
247            $sock_map{$sock} = $host;
248            my $prefip = $self->{pref_ip}{$ip};
249            $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
250            if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
251                $connected = 1;
252            } else {
253                if (my $cb = $self->{cb_connect_fail}) {
254                    $cb->($prefip);
255                }
256                close $sock;
257            }
258        }
259
260        # normal path, or fallback path if preferred IP failed
261        unless ($connected) {
262            socket($sock, PF_INET, SOCK_STREAM, $proto);
263            $sock_map{$sock} = $host;
264            $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
265            my $timeout = $self ? $self->{connect_timeout} : 0.25;
266            unless (_connect_sock($sock,$sin,$timeout)) {
267                my $cb = $self ? $self->{cb_connect_fail} : undef;
268                $cb->($ip) if $cb;
269                return _dead_sock($sock, undef, 20 + int(rand(10)));
270            }
271        }
272    } else { # it's a unix domain/local socket
273        socket($sock, PF_UNIX, SOCK_STREAM, 0);
274        $sock_map{$sock} = $host;
275        $sin = Socket::sockaddr_un($host);
276        my $timeout = $self ? $self->{connect_timeout} : 0.25;
277        unless (_connect_sock($sock,$sin,$timeout)) {
278            my $cb = $self ? $self->{cb_connect_fail} : undef;
279            $cb->($host) if $cb;
280            return _dead_sock($sock, undef, 20 + int(rand(10)));
281        }
282    }
283
284    # make the new socket not buffer writes.
285    my $old = select($sock);
286    $| = 1;
287    select($old);
288
289    $cache_sock{$host} = $sock;
290
291    return $sock;
292}
293
294sub get_sock { # (key)
295    my Cache::Memcached $self = $_[0];
296    my $key = $_[1];
297    return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
298    return undef unless $self->{'active'};
299    my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
300
301    my $real_key = ref $key ? $key->[1] : $key;
302    my $tries = 0;
303    while ($tries++ < 20) {
304        my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
305        my $sock = $self->sock_to_host($host);
306        return $sock if $sock;
307        return undef if $self->{'no_rehash'};
308        $hv += _hashfunc($tries . $real_key);  # stupid, but works
309    }
310    return undef;
311}
312
313sub init_buckets {
314    my Cache::Memcached $self = shift;
315    return if $self->{'buckets'};
316    my $bu = $self->{'buckets'} = [];
317    foreach my $v (@{$self->{'servers'}}) {
318        if (ref $v eq "ARRAY") {
319            for (1..$v->[1]) { push @$bu, $v->[0]; }
320        } else {
321            push @$bu, $v;
322        }
323    }
324    $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
325}
326
327sub disconnect_all {
328    my $sock;
329    foreach $sock (values %cache_sock) {
330        close $sock;
331    }
332    %cache_sock = ();
333    @buck2sock = ();
334}
335
336# writes a line, then reads result.  by default stops reading after a
337# single line, but caller can override the $check_complete subref,
338# which gets passed a scalarref of buffer read thus far.
339sub _write_and_read {
340    my Cache::Memcached $self = shift;
341    my ($sock, $line, $check_complete) = @_;
342    my $res;
343    my ($ret, $offset) = (undef, 0);
344
345    $check_complete ||= sub {
346        return (rindex($ret, "\r\n") + 2 == length($ret));
347    };
348
349    # state: 0 - writing, 1 - reading, 2 - done
350    my $state = 0;
351
352    # the bitsets for select
353    my ($rin, $rout, $win, $wout);
354    my $nfound;
355
356    my $copy_state = -1;
357    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
358
359    # the select loop
360    while(1) {
361        if ($copy_state!=$state) {
362            last if $state==2;
363            ($rin, $win) = ('', '');
364            vec($rin, fileno($sock), 1) = 1 if $state==1;
365            vec($win, fileno($sock), 1) = 1 if $state==0;
366            $copy_state = $state;
367        }
368        $nfound = select($rout=$rin, $wout=$win, undef,
369                         $self->{'select_timeout'});
370        last unless $nfound;
371
372        if (vec($wout, fileno($sock), 1)) {
373            $res = send($sock, $line, $FLAG_NOSIGNAL);
374            next
375                if not defined $res and $!==EWOULDBLOCK;
376            unless ($res > 0) {
377                _close_sock($sock);
378                return undef;
379            }
380            if ($res == length($line)) { # all sent
381                $state = 1;
382            } else { # we only succeeded in sending some of it
383                substr($line, 0, $res, ''); # delete the part we sent
384            }
385        }
386
387        if (vec($rout, fileno($sock), 1)) {
388            $res = sysread($sock, $ret, 255, $offset);
389            next
390                if !defined($res) and $!==EWOULDBLOCK;
391            if ($res == 0) { # catches 0=conn closed or undef=error
392                _close_sock($sock);
393                return undef;
394            }
395            $offset += $res;
396            $state = 2 if $check_complete->(\$ret);
397        }
398    }
399
400    unless ($state == 2) {
401        _dead_sock($sock); # improperly finished
402        return undef;
403    }
404
405    return $ret;
406}
407
408sub delete {
409    my Cache::Memcached $self = shift;
410    my ($key, $time) = @_;
411    return 0 if ! $self->{'active'} || $self->{'readonly'};
412    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
413    my $sock = $self->get_sock($key);
414    return 0 unless $sock;
415
416    $self->{'stats'}->{"delete"}++;
417    $key = ref $key ? $key->[1] : $key;
418    $time = $time ? " $time" : "";
419    my $cmd = "delete $self->{namespace}$key$time\r\n";
420    my $res = _write_and_read($self, $sock, $cmd);
421
422    if ($self->{'stat_callback'}) {
423        my $etime = Time::HiRes::time();
424        $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
425    }
426
427    return defined $res && $res eq "DELETED\r\n";
428}
429*remove = \&delete;
430
431sub add {
432    _set("add", @_);
433}
434
435sub replace {
436    _set("replace", @_);
437}
438
439sub set {
440    _set("set", @_);
441}
442
443sub _set {
444    my $cmdname = shift;
445    my Cache::Memcached $self = shift;
446    my ($key, $val, $exptime) = @_;
447    return 0 if ! $self->{'active'} || $self->{'readonly'};
448    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
449    my $sock = $self->get_sock($key);
450    return 0 unless $sock;
451
452    use bytes; # return bytes from length()
453
454    $self->{'stats'}->{$cmdname}++;
455    my $flags = 0;
456    $key = ref $key ? $key->[1] : $key;
457
458    if (ref $val) {
459        local $Carp::CarpLevel = 2;
460        $val = Storable::nfreeze($val);
461        $flags |= F_STORABLE;
462    }
463    warn "value for memkey:$key is not defined" unless defined $val;
464
465    my $len = length($val);
466
467    if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
468        $len >= $self->{'compress_threshold'}) {
469
470        my $c_val = Compress::Zlib::memGzip($val);
471        my $c_len = length($c_val);
472
473        # do we want to keep it?
474        if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
475            $val = $c_val;
476            $len = $c_len;
477            $flags |= F_COMPRESS;
478        }
479    }
480
481    $exptime = int($exptime || 0);
482
483    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
484    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
485
486    my $res = _write_and_read($self, $sock, $line);
487
488    if ($self->{'debug'} && $line) {
489        chop $line; chop $line;
490        print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
491    }
492
493    if ($self->{'stat_callback'}) {
494        my $etime = Time::HiRes::time();
495        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
496    }
497
498    return defined $res && $res eq "STORED\r\n";
499}
500
501sub incr {
502    _incrdecr("incr", @_);
503}
504
505sub decr {
506    _incrdecr("decr", @_);
507}
508
509sub _incrdecr {
510    my $cmdname = shift;
511    my Cache::Memcached $self = shift;
512    my ($key, $value) = @_;
513    return undef if ! $self->{'active'} || $self->{'readonly'};
514    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
515    my $sock = $self->get_sock($key);
516    return undef unless $sock;
517    $key = $key->[1] if ref $key;
518    $self->{'stats'}->{$cmdname}++;
519    $value = 1 unless defined $value;
520
521    my $line = "$cmdname $self->{namespace}$key $value\r\n";
522    my $res = _write_and_read($self, $sock, $line);
523
524    if ($self->{'stat_callback'}) {
525        my $etime = Time::HiRes::time();
526        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
527    }
528
529    return undef unless defined $res && $res =~ /^(\d+)/;
530    return $1;
531}
532
533sub get {
534    my Cache::Memcached $self = $_[0];
535    my $key = $_[1];
536
537    # TODO: make a fast path for this?  or just keep using get_multi?
538    my $r = $self->get_multi($key);
539    my $kval = ref $key ? $key->[1] : $key;
540    return $r->{$kval};
541}
542
543sub get_multi {
544    my Cache::Memcached $self = shift;
545    return {} unless $self->{'active'};
546    $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
547    $self->{'stats'}->{"get_multi"}++;
548
549    my %val;        # what we'll be returning a reference to (realkey -> value)
550    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
551    my $sock;
552
553    if ($self->{'_single_sock'}) {
554        $sock = $self->sock_to_host($self->{'_single_sock'});
555        unless ($sock) {
556            return {};
557        }
558        foreach my $key (@_) {
559            my $kval = ref $key ? $key->[1] : $key;
560            push @{$sock_keys{$sock}}, $kval;
561        }
562    } else {
563        my $bcount = $self->{'bucketcount'};
564        my $sock;
565      KEY:
566        foreach my $key (@_) {
567            my ($hv, $real_key) = ref $key ?
568                (int($key->[0]),               $key->[1]) :
569                ((crc32($key) >> 16) & 0x7fff, $key);
570
571            my $tries;
572            while (1) {
573                my $bucket = $hv % $bcount;
574
575                # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
576                #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
577                #    and last;
578
579                # but this variant doesn't crash:
580                $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
581                if ($sock) {
582                    $buck2sock[$bucket] = $sock;
583                    last;
584                }
585
586                next KEY if $tries++ >= 20;
587                $hv += _hashfunc($tries . $real_key);
588            }
589
590            push @{$sock_keys{$sock}}, $real_key;
591        }
592    }
593
594    $self->{'stats'}->{"get_keys"} += @_;
595    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
596
597    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
598
599    _load_multi($self, \%sock_keys, \%val);
600
601    if ($self->{'debug'}) {
602        while (my ($k, $v) = each %val) {
603            print STDERR "MemCache: got $k = $v\n";
604        }
605    }
606    return \%val;
607}
608
609sub _load_multi {
610    use bytes; # return bytes from length()
611    my Cache::Memcached $self;
612    my ($sock_keys, $ret);
613
614    ($self, $sock_keys, $ret) = @_;
615
616    # all keyed by $sockstr:
617    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
618    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
619    my %buf;     # buffers, for writing
620
621    my %parser;  # $sockstr -> Cache::Memcached::GetParser
622
623    my $active_changed = 1; # force rebuilding of select sets
624
625    my $dead = sub {
626        my $sock = shift;
627        print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
628        delete $reading{$sock};
629        delete $writing{$sock};
630
631        if (my $p = $parser{$sock}) {
632            my $key = $p->current_key;
633            delete $ret->{$key} if $key;
634        }
635
636        if ($self->{'stat_callback'}) {
637            my $etime = Time::HiRes::time();
638            $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
639        }
640
641        close $sock;
642        _dead_sock($sock);
643    };
644
645    # $finalize->($key, $flags)
646    # $finalize->({ $key => $flags, $key => $flags });
647    my $finalize = sub {
648        my $map = $_[0];
649        $map = {@_} unless ref $map;
650
651        while (my ($k, $flags) = each %$map) {
652
653            # remove trailing \r\n
654            chop $ret->{$k}; chop $ret->{$k};
655
656            $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
657                if $HAVE_ZLIB && $flags & F_COMPRESS;
658            if ($flags & F_STORABLE) {
659                # wrapped in eval in case a perl 5.6 Storable tries to
660                # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
661                # and dies if the version number changes at all.  in 5.8
662                # they made it only die if it unencounters a new feature)
663                eval {
664                    $ret->{$k} = Storable::thaw($ret->{$k});
665                };
666                # so if there was a problem, just treat it as a cache miss.
667                if ($@) {
668                    delete $ret->{$k};
669                }
670            }
671        }
672    };
673
674    foreach (keys %$sock_keys) {
675        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
676        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
677        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
678        $writing{$_} = $sock;
679        if ($self->{namespace}) {
680            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
681        } else {
682            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
683        }
684
685        $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
686    }
687
688    my $read = sub {
689        my $sockstr = "$_[0]";  # $sock is $_[0];
690        my $p = $parser{$sockstr} or die;
691        my $rv = $p->parse_from_sock($_[0]);
692        if ($rv > 0) {
693            # okay, finished with this socket
694            delete $reading{$sockstr};
695        } elsif ($rv < 0) {
696            $dead->($_[0]);
697        }
698        return $rv;
699    };
700
701    # returns 1 when it's done, for success or error.  0 if still working.
702    my $write = sub {
703        my ($sock, $sockstr) = ($_[0], "$_[0]");
704        my $res;
705
706        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
707
708        return 0
709            if not defined $res and $!==EWOULDBLOCK;
710        unless ($res > 0) {
711            $dead->($sock);
712            return 1;
713        }
714        if ($res == length($buf{$sockstr})) { # all sent
715            $buf{$sockstr} = "";
716
717            # switch the socket from writing to reading
718            delete $writing{$sockstr};
719            $reading{$sockstr} = $sock;
720            return 1;
721        } else { # we only succeeded in sending some of it
722            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
723        }
724        return 0;
725    };
726
727    # the bitsets for select
728    my ($rin, $rout, $win, $wout);
729    my $nfound;
730
731    # the big select loop
732    while(1) {
733        if ($active_changed) {
734            last unless %reading or %writing; # no sockets left?
735            ($rin, $win) = ('', '');
736            foreach (values %reading) {
737                vec($rin, fileno($_), 1) = 1;
738            }
739            foreach (values %writing) {
740                vec($win, fileno($_), 1) = 1;
741            }
742            $active_changed = 0;
743        }
744        # TODO: more intelligent cumulative timeout?
745        # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
746        $nfound = select($rout=$rin, $wout=$win, undef,
747                         $self->{'select_timeout'});
748        last unless $nfound;
749
750        # TODO: possible robustness improvement: we could select
751        # writing sockets for reading also, and raise hell if they're
752        # ready (input unread from last time, etc.)
753        # maybe do that on the first loop only?
754        foreach (values %writing) {
755            if (vec($wout, fileno($_), 1)) {
756                $active_changed = 1 if $write->($_);
757            }
758        }
759        foreach (values %reading) {
760            if (vec($rout, fileno($_), 1)) {
761                $active_changed = 1 if $read->($_);
762            }
763        }
764    }
765
766    # if there're active sockets left, they need to die
767    foreach (values %writing) {
768        $dead->($_);
769    }
770    foreach (values %reading) {
771        $dead->($_);
772    }
773
774    return;
775}
776
777sub _hashfunc {
778    return (crc32($_[0]) >> 16) & 0x7fff;
779}
780
781sub flush_all {
782    my Cache::Memcached $self = shift;
783
784    my $success = 1;
785
786    my @hosts = @{$self->{'buckets'}};
787    foreach my $host (@hosts) {
788        my $sock = $self->sock_to_host($host);
789        my @res = $self->run_command($sock, "flush_all\r\n");
790        $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
791    }
792
793    return $success;
794}
795
796# returns array of lines, or () on failure.
797sub run_command {
798    my Cache::Memcached $self = shift;
799    my ($sock, $cmd) = @_;
800    return () unless $sock;
801    my $ret;
802    my $line = $cmd;
803    while (my $res = _write_and_read($self, $sock, $line)) {
804        undef $line;
805        $ret .= $res;
806        last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
807    }
808    chop $ret; chop $ret;
809    return map { "$_\r\n" } split(/\r\n/, $ret);
810}
811
812sub stats {
813    my Cache::Memcached $self = shift;
814    my ($types) = @_;
815    return 0 unless $self->{'active'};
816    return 0 unless !ref($types) || ref($types) eq 'ARRAY';
817    if (!ref($types)) {
818        if (!$types) {
819            # I don't much care what the default is, it should just
820            # be something reasonable.  Obviously "reset" should not
821            # be on the list :) but other types that might go in here
822            # include maps, cachedump, slabs, or items.  Note that
823            # this does NOT include 'sizes' anymore, as that can freeze
824            # bug servers for a couple seconds.
825            $types = [ qw( misc malloc self ) ];
826        } else {
827            $types = [ $types ];
828        }
829    }
830
831    my $stats_hr = { };
832
833    # The "self" stat type is special, it only applies to this very
834    # object.
835    if (grep /^self$/, @$types) {
836        $stats_hr->{'self'} = \%{ $self->{'stats'} };
837    }
838
839    my %misc_keys = map { $_ => 1 }
840      qw/ bytes bytes_read bytes_written
841          cmd_get cmd_set connection_structures curr_items
842          get_hits get_misses
843          total_connections total_items
844        /;
845
846    # Now handle the other types, passing each type to each host server.
847    my @hosts = @{$self->{'buckets'}};
848  HOST: foreach my $host (@hosts) {
849        my $sock = $self->sock_to_host($host);
850      TYPE: foreach my $typename (grep !/^self$/, @$types) {
851            my $type = $typename eq 'misc' ? "" : " $typename";
852            my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
853                my $bref = shift;
854                return $$bref =~ /^(?:END|ERROR)\r?\n/m;
855            });
856            unless ($lines) {
857                _dead_sock($sock);
858                next HOST;
859            }
860
861            $lines =~ s/\0//g;  # 'stats sizes' starts with NULL?
862
863            # And, most lines end in \r\n but 'stats maps' (as of
864            # July 2003 at least) ends in \n. ??
865            my @lines = split(/\r?\n/, $lines);
866
867            # Some stats are key-value, some are not.  malloc,
868            # sizes, and the empty string are key-value.
869            # ("self" was handled separately above.)
870            if ($typename =~ /^(malloc|sizes|misc)$/) {
871                # This stat is key-value.
872                foreach my $line (@lines) {
873                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
874                    if ($key) {
875                        $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
876                    }
877                    $stats_hr->{'total'}{$key} += $value
878                        if $typename eq 'misc' && $key && $misc_keys{$key};
879                    $stats_hr->{'total'}{"malloc_$key"} += $value
880                        if $typename eq 'malloc' && $key;
881                }
882            } else {
883                # This stat is not key-value so just pull it
884                # all out in one blob.
885                $lines =~ s/^END\r?\n//m;
886                $stats_hr->{'hosts'}{$host}{$typename} ||= "";
887                $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
888            }
889        }
890    }
891
892    return $stats_hr;
893}
894
895sub stats_reset {
896    my Cache::Memcached $self = shift;
897    my ($types) = @_;
898    return 0 unless $self->{'active'};
899
900  HOST: foreach my $host (@{$self->{'buckets'}}) {
901        my $sock = $self->sock_to_host($host);
902        my $ok = _write_and_read($self, $sock, "stats reset");
903        unless (defined $ok && $ok eq "RESET\r\n") {
904            _dead_sock($sock);
905        }
906    }
907    return 1;
908}
909
9101;
911__END__
912
913=head1 NAME
914
915Cache::Memcached - client library for memcached (memory cache daemon)
916
917=head1 SYNOPSIS
918
919  use Cache::Memcached;
920
921  $memd = new Cache::Memcached {
922    'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
923                   "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
924    'debug' => 0,
925    'compress_threshold' => 10_000,
926  };
927  $memd->set_servers($array_ref);
928  $memd->set_compress_threshold(10_000);
929  $memd->enable_compress(0);
930
931  $memd->set("my_key", "Some value");
932  $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
933
934  $val = $memd->get("my_key");
935  $val = $memd->get("object_key");
936  if ($val) { print $val->{'complex'}->[2]; }
937
938  $memd->incr("key");
939  $memd->decr("key");
940  $memd->incr("key", 2);
941
942=head1 DESCRIPTION
943
944This is the Perl API for memcached, a distributed memory cache daemon.
945More information is available at:
946
947  http://www.danga.com/memcached/
948
949=head1 CONSTRUCTOR
950
951=over 4
952
953=item C<new>
954
955Takes one parameter, a hashref of options.  The most important key is
956C<servers>, but that can also be set later with the C<set_servers>
957method.  The servers must be an arrayref of hosts, each of which is
958either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
959former and an integer weight value.  (The default weight if
960unspecified is 1.)  It's recommended that weight values be kept as low
961as possible, as this module currently allocates memory for bucket
962distribution proportional to the total host weights.
963
964Use C<compress_threshold> to set a compression threshold, in bytes.
965Values larger than this threshold will be compressed by C<set> and
966decompressed by C<get>.
967
968Use C<no_rehash> to disable finding a new memcached server when one
969goes down.  Your application may or may not need this, depending on
970your expirations and key usage.
971
972Use C<readonly> to disable writes to backend memcached servers.  Only
973get and get_multi will work.  This is useful in bizarre debug and
974profiling cases only.
975
976Use C<namespace> to prefix all keys with the provided namespace value.
977That is, if you set namespace to "app1:" and later do a set of "foo"
978to "bar", memcached is actually seeing you set "app1:foo" to "bar".
979
980The other useful key is C<debug>, which when set to true will produce
981diagnostics on STDERR.
982
983=back
984
985=head1 METHODS
986
987=over 4
988
989=item C<set_servers>
990
991Sets the server list this module distributes key gets and sets between.
992The format is an arrayref of identical form as described in the C<new>
993constructor.
994
995=item C<set_debug>
996
997Sets the C<debug> flag.  See C<new> constructor for more information.
998
999=item C<set_readonly>
1000
1001Sets the C<readonly> flag.  See C<new> constructor for more information.
1002
1003=item C<set_norehash>
1004
1005Sets the C<no_rehash> flag.  See C<new> constructor for more information.
1006
1007=item C<set_compress_threshold>
1008
1009Sets the compression threshold. See C<new> constructor for more information.
1010
1011=item C<enable_compress>
1012
1013Temporarily enable or disable compression.  Has no effect if C<compress_threshold>
1014isn't set, but has an overriding effect if it is.
1015
1016=item C<get>
1017
1018my $val = $memd->get($key);
1019
1020Retrieves a key from the memcache.  Returns the value (automatically
1021thawed with Storable, if necessary) or undef.
1022
1023The $key can optionally be an arrayref, with the first element being the
1024hash value, if you want to avoid making this module calculate a hash
1025value.  You may prefer, for example, to keep all of a given user's
1026objects on the same memcache server, so you could use the user's
1027unique id as the hash value.
1028
1029=item C<get_multi>
1030
1031my $hashref = $memd->get_multi(@keys);
1032
1033Retrieves multiple keys from the memcache doing just one query.
1034Returns a hashref of key/value pairs that were available.
1035
1036This method is recommended over regular 'get' as it lowers the number
1037of total packets flying around your network, reducing total latency,
1038since your app doesn't have to wait for each round-trip of 'get'
1039before sending the next one.
1040
1041=item C<set>
1042
1043$memd->set($key, $value[, $exptime]);
1044
1045Unconditionally sets a key to a given value in the memcache.  Returns true
1046if it was stored successfully.
1047
1048The $key can optionally be an arrayref, with the first element being the
1049hash value, as described above.
1050
1051The $exptime (expiration time) defaults to "never" if unspecified.  If
1052you want the key to expire in memcached, pass an integer $exptime.  If
1053value is less than 60*60*24*30 (30 days), time is assumed to be relative
1054from the present.  If larger, it's considered an absolute Unix time.
1055
1056=item C<add>
1057
1058$memd->add($key, $value[, $exptime]);
1059
1060Like C<set>, but only stores in memcache if the key doesn't already exist.
1061
1062=item C<replace>
1063
1064$memd->replace($key, $value[, $exptime]);
1065
1066Like C<set>, but only stores in memcache if the key already exists.  The
1067opposite of C<add>.
1068
1069=item C<delete>
1070
1071$memd->delete($key[, $time]);
1072
1073Deletes a key.  You may optionally provide an integer time value (in seconds) to
1074tell the memcached server to block new writes to this key for that many seconds.
1075(Sometimes useful as a hacky means to prevent races.)  Returns true if key
1076was found and deleted, and false otherwise.
1077
1078You may also use the alternate method name B<remove>, so
1079Cache::Memcached looks like the L<Cache::Cache> API.
1080
1081=item C<incr>
1082
1083$memd->incr($key[, $value]);
1084
1085Sends a command to the server to atomically increment the value for
1086$key by $value, or by 1 if $value is undefined.  Returns undef if $key
1087doesn't exist on server, otherwise it returns the new value after
1088incrementing.  Value should be zero or greater.  Overflow on server
1089is not checked.  Be aware of values approaching 2**32.  See decr.
1090
1091=item C<decr>
1092
1093$memd->decr($key[, $value]);
1094
1095Like incr, but decrements.  Unlike incr, underflow is checked and new
1096values are capped at 0.  If server value is 1, a decrement of 2
1097returns 0, not -1.
1098
1099=item C<stats>
1100
1101$memd->stats([$keys]);
1102
1103Returns a hashref of statistical data regarding the memcache server(s),
1104the $memd object, or both.  $keys can be an arrayref of keys wanted, a
1105single key wanted, or absent (in which case the default value is malloc,
1106sizes, self, and the empty string).  These keys are the values passed
1107to the 'stats' command issued to the memcached server(s), except for
1108'self' which is internal to the $memd object.  Allowed values are:
1109
1110=over 4
1111
1112=item C<misc>
1113
1114The stats returned by a 'stats' command:  pid, uptime, version,
1115bytes, get_hits, etc.
1116
1117=item C<malloc>
1118
1119The stats returned by a 'stats malloc':  total_alloc, arena_size, etc.
1120
1121=item C<sizes>
1122
1123The stats returned by a 'stats sizes'.
1124
1125=item C<self>
1126
1127The stats for the $memd object itself (a copy of $memd->{'stats'}).
1128
1129=item C<maps>
1130
1131The stats returned by a 'stats maps'.
1132
1133=item C<cachedump>
1134
1135The stats returned by a 'stats cachedump'.
1136
1137=item C<slabs>
1138
1139The stats returned by a 'stats slabs'.
1140
1141=item C<items>
1142
1143The stats returned by a 'stats items'.
1144
1145=back
1146
1147=item C<disconnect_all>
1148
1149$memd->disconnect_all;
1150
1151Closes all cached sockets to all memcached servers.  You must do this
1152if your program forks and the parent has used this module at all.
1153Otherwise the children will try to use cached sockets and they'll fight
1154(as children do) and garble the client/server protocol.
1155
1156=item C<flush_all>
1157
1158$memd->flush_all;
1159
1160Runs the memcached "flush_all" command on all configured hosts,
1161emptying all their caches.  (or rather, invalidating all items
1162in the caches in an O(1) operation...)  Running stats will still
1163show the item existing, they're just be non-existent and lazily
1164destroyed next time you try to detch any of them.
1165
1166=back
1167
1168=head1 BUGS
1169
1170When a server goes down, this module does detect it, and re-hashes the
1171request to the remaining servers, but the way it does it isn't very
1172clean.  The result may be that it gives up during its rehashing and
1173refuses to get/set something it could've, had it been done right.
1174
1175=head1 COPYRIGHT
1176
1177This module is Copyright (c) 2003 Brad Fitzpatrick.
1178All rights reserved.
1179
1180You may distribute under the terms of either the GNU General Public
1181License or the Artistic License, as specified in the Perl README file.
1182
1183=head1 WARRANTY
1184
1185This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1186
1187=head1 FAQ
1188
1189See the memcached website:
1190   http://www.danga.com/memcached/
1191
1192=head1 AUTHORS
1193
1194Brad Fitzpatrick <brad@danga.com>
1195
1196Anatoly Vorobey <mellon@pobox.com>
1197
1198Brad Whitaker <whitaker@danga.com>
1199
1200Jamie McCarthy <jamie@mccarthy.vg>
Note: See TracBrowser for help on using the browser.