root/trunk/api/perl/lib/Cache/Memcached.pm @ 805

Revision 805, 34.2 kB (checked in by hachi, 11 months ago)

Fix reconnects on dead sockets to not be as slow.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# $Id$
2#
3# Copyright (c) 2003, 2004  Brad Fitzpatrick <brad@danga.com>
4#
5# See COPYRIGHT section in pod text below for usage and distribution rights.
6#
7
8package Cache::Memcached;
9
10use strict;
11use warnings;
12
13no strict 'refs';
14use Storable ();
15use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16use IO::Handle ();
17use Time::HiRes ();
18use String::CRC32;
19use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20use Cache::Memcached::GetParser;
21use fields qw{
22    debug no_rehash stats compress_threshold compress_enable stat_callback
23    readonly select_timeout namespace namespace_len servers active buckets
24    pref_ip
25    bucketcount _single_sock _stime
26    connect_timeout cb_connect_fail
27    parser_class
28};
29
30# flag definitions
31use constant F_STORABLE => 1;
32use constant F_COMPRESS => 2;
33
34# size savings required before saving compressed value
35use constant COMPRESS_SAVINGS => 0.20; # percent
36
37use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
38$VERSION = "1.24";
39
40BEGIN {
41    $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
42}
43
44my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
45$HAVE_XS = 0 if $ENV{NO_XS};
46
47my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
48if ($ENV{XS_DEBUG}) {
49    print "using parser: $parser_class\n";
50}
51
52$FLAG_NOSIGNAL = 0;
53eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
54
55my %host_dead;   # host -> unixtime marked dead until
56my %cache_sock;  # host -> socket
57my @buck2sock;   # bucket number -> $sock
58
59my $PROTO_TCP;
60
61our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
62
63sub new {
64    my Cache::Memcached $self = shift;
65    $self = fields::new( $self ) unless ref $self;
66
67    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args
68
69    $self->set_servers($args->{'servers'});
70    $self->{'debug'} = $args->{'debug'} || 0;
71    $self->{'no_rehash'} = $args->{'no_rehash'};
72    $self->{'stats'} = {};
73    $self->{'pref_ip'} = $args->{'pref_ip'} || {};
74    $self->{'compress_threshold'} = $args->{'compress_threshold'};
75    $self->{'compress_enable'}    = 1;
76    $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
77    $self->{'readonly'} = $args->{'readonly'};
78    $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
79
80    # TODO: undocumented
81    $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
82    $self->{'select_timeout'}  = $args->{'select_timeout'}  || 1.0;
83    $self->{namespace} = $args->{namespace} || '';
84    $self->{namespace_len} = length $self->{namespace};
85
86    return $self;
87}
88
89sub set_pref_ip {
90    my Cache::Memcached $self = shift;
91    $self->{'pref_ip'} = shift;
92}
93
94sub set_servers {
95    my Cache::Memcached $self = shift;
96    my ($list) = @_;
97    $self->{'servers'} = $list || [];
98    $self->{'active'} = scalar @{$self->{'servers'}};
99    $self->{'buckets'} = undef;
100    $self->{'bucketcount'} = 0;
101    $self->init_buckets;
102    @buck2sock = ();
103
104    $self->{'_single_sock'} = undef;
105    if (@{$self->{'servers'}} == 1) {
106        $self->{'_single_sock'} = $self->{'servers'}[0];
107    }
108
109    return $self;
110}
111
112sub set_cb_connect_fail {
113    my Cache::Memcached $self = shift;
114    $self->{'cb_connect_fail'} = shift;
115}
116
117sub set_connect_timeout {
118    my Cache::Memcached $self = shift;
119    $self->{'connect_timeout'} = shift;
120}
121
122sub set_debug {
123    my Cache::Memcached $self = shift;
124    my ($dbg) = @_;
125    $self->{'debug'} = $dbg || 0;
126}
127
128sub set_readonly {
129    my Cache::Memcached $self = shift;
130    my ($ro) = @_;
131    $self->{'readonly'} = $ro;
132}
133
134sub set_norehash {
135    my Cache::Memcached $self = shift;
136    my ($val) = @_;
137    $self->{'no_rehash'} = $val;
138}
139
140sub set_compress_threshold {
141    my Cache::Memcached $self = shift;
142    my ($thresh) = @_;
143    $self->{'compress_threshold'} = $thresh;
144}
145
146sub enable_compress {
147    my Cache::Memcached $self = shift;
148    my ($enable) = @_;
149    $self->{'compress_enable'} = $enable;
150}
151
152sub forget_dead_hosts {
153    %host_dead = ();
154    @buck2sock = ();
155}
156
157sub set_stat_callback {
158    my Cache::Memcached $self = shift;
159    my ($stat_callback) = @_;
160    $self->{'stat_callback'} = $stat_callback;
161}
162
163my %sock_map;  # stringified-$sock -> "$ip:$port"
164
165sub _dead_sock {
166    my ($sock, $ret, $dead_for) = @_;
167    if (my $ipport = $sock_map{$sock}) {
168        my $now = time();
169        $host_dead{$ipport} = $now + $dead_for
170            if $dead_for;
171        delete $cache_sock{$ipport};
172        delete $sock_map{$sock};
173    }
174    @buck2sock = ();
175    return $ret;  # 0 or undef, probably, depending on what caller wants
176}
177
178sub _close_sock {
179    my ($sock) = @_;
180    if (my $ipport = $sock_map{$sock}) {
181        close $sock;
182        delete $cache_sock{$ipport};
183        delete $sock_map{$sock};
184    }
185    @buck2sock = ();
186}
187
188sub _connect_sock { # sock, sin, timeout
189    my ($sock, $sin, $timeout) = @_;
190    $timeout = 0.25 if not defined $timeout;
191
192    # make the socket non-blocking from now on,
193    # except if someone wants 0 timeout, meaning
194    # a blocking connect, but even then turn it
195    # non-blocking at the end of this function
196
197    if ($timeout) {
198        IO::Handle::blocking($sock, 0);
199    } else {
200        IO::Handle::blocking($sock, 1);
201    }
202
203    my $ret = connect($sock, $sin);
204
205    if (!$ret && $timeout && $!==EINPROGRESS) {
206
207        my $win='';
208        vec($win, fileno($sock), 1) = 1;
209
210        if (select(undef, $win, undef, $timeout) > 0) {
211            $ret = connect($sock, $sin);
212            # EISCONN means connected & won't re-connect, so success
213            $ret = 1 if !$ret && $!==EISCONN;
214        }
215    }
216
217    unless ($timeout) { # socket was temporarily blocking, now revert
218        IO::Handle::blocking($sock, 0);
219    }
220
221    # from here on, we use non-blocking (async) IO for the duration
222    # of the socket's life
223
224    return $ret;
225}
226
227sub sock_to_host { # (host)
228    my Cache::Memcached $self = ref $_[0] ? shift : undef;
229    my $host = $_[0];
230    return $cache_sock{$host} if $cache_sock{$host};
231
232    my $now = time();
233    my ($ip, $port) = $host =~ /(.*):(\d+)/;
234    return undef if
235        $host_dead{$host} && $host_dead{$host} > $now;
236    my $sock;
237
238    my $connected = 0;
239    my $sin;
240    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
241
242    if ( index($host, '/') != 0 )
243    {
244        # if a preferred IP is known, try that first.
245        if ($self && $self->{pref_ip}{$ip}) {
246            socket($sock, PF_INET, SOCK_STREAM, $proto);
247            $sock_map{$sock} = $host;
248            my $prefip = $self->{pref_ip}{$ip};
249            $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
250            if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
251                $connected = 1;
252            } else {
253                if (my $cb = $self->{cb_connect_fail}) {
254                    $cb->($prefip);
255                }
256                close $sock;
257            }
258        }
259
260        # normal path, or fallback path if preferred IP failed
261        unless ($connected) {
262            socket($sock, PF_INET, SOCK_STREAM, $proto);
263            $sock_map{$sock} = $host;
264            $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
265            my $timeout = $self ? $self->{connect_timeout} : 0.25;
266            unless (_connect_sock($sock,$sin,$timeout)) {
267                my $cb = $self ? $self->{cb_connect_fail} : undef;
268                $cb->($ip) if $cb;
269                return _dead_sock($sock, undef, 20 + int(rand(10)));
270            }
271        }
272    } else { # it's a unix domain/local socket
273        socket($sock, PF_UNIX, SOCK_STREAM, 0);
274        $sock_map{$sock} = $host;
275        $sin = Socket::sockaddr_un($host);
276        my $timeout = $self ? $self->{connect_timeout} : 0.25;
277        unless (_connect_sock($sock,$sin,$timeout)) {
278            my $cb = $self ? $self->{cb_connect_fail} : undef;
279            $cb->($host) if $cb;
280            return _dead_sock($sock, undef, 20 + int(rand(10)));
281        }
282    }
283
284    # make the new socket not buffer writes.
285    my $old = select($sock);
286    $| = 1;
287    select($old);
288
289    $cache_sock{$host} = $sock;
290
291    return $sock;
292}
293
294sub get_sock { # (key)
295    my Cache::Memcached $self = $_[0];
296    my $key = $_[1];
297    return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
298    return undef unless $self->{'active'};
299    my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
300
301    my $real_key = ref $key ? $key->[1] : $key;
302    my $tries = 0;
303    while ($tries++ < 20) {
304        my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
305        my $sock = $self->sock_to_host($host);
306        return $sock if $sock;
307        return undef if $self->{'no_rehash'};
308        $hv += _hashfunc($tries . $real_key);  # stupid, but works
309    }
310    return undef;
311}
312
313sub init_buckets {
314    my Cache::Memcached $self = shift;
315    return if $self->{'buckets'};
316    my $bu = $self->{'buckets'} = [];
317    foreach my $v (@{$self->{'servers'}}) {
318        if (ref $v eq "ARRAY") {
319            for (1..$v->[1]) { push @$bu, $v->[0]; }
320        } else {
321            push @$bu, $v;
322        }
323    }
324    $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
325}
326
327sub disconnect_all {
328    my $sock;
329    foreach $sock (values %cache_sock) {
330        close $sock;
331    }
332    %cache_sock = ();
333}
334
335# writes a line, then reads result.  by default stops reading after a
336# single line, but caller can override the $check_complete subref,
337# which gets passed a scalarref of buffer read thus far.
338sub _write_and_read {
339    my Cache::Memcached $self = shift;
340    my ($sock, $line, $check_complete) = @_;
341    my $res;
342    my ($ret, $offset) = (undef, 0);
343
344    $check_complete ||= sub {
345        return (rindex($ret, "\r\n") + 2 == length($ret));
346    };
347
348    # state: 0 - writing, 1 - reading, 2 - done
349    my $state = 0;
350
351    # the bitsets for select
352    my ($rin, $rout, $win, $wout);
353    my $nfound;
354
355    my $copy_state = -1;
356    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
357
358    # the select loop
359    while(1) {
360        if ($copy_state!=$state) {
361            last if $state==2;
362            ($rin, $win) = ('', '');
363            vec($rin, fileno($sock), 1) = 1 if $state==1;
364            vec($win, fileno($sock), 1) = 1 if $state==0;
365            $copy_state = $state;
366        }
367        $nfound = select($rout=$rin, $wout=$win, undef,
368                         $self->{'select_timeout'});
369        last unless $nfound;
370
371        if (vec($wout, fileno($sock), 1)) {
372            $res = send($sock, $line, $FLAG_NOSIGNAL);
373            next
374                if not defined $res and $!==EWOULDBLOCK;
375            unless ($res > 0) {
376                _close_sock($sock);
377                return undef;
378            }
379            if ($res == length($line)) { # all sent
380                $state = 1;
381            } else { # we only succeeded in sending some of it
382                substr($line, 0, $res, ''); # delete the part we sent
383            }
384        }
385
386        if (vec($rout, fileno($sock), 1)) {
387            $res = sysread($sock, $ret, 255, $offset);
388            next
389                if !defined($res) and $!==EWOULDBLOCK;
390            if ($res == 0) { # catches 0=conn closed or undef=error
391                _close_sock($sock);
392                return undef;
393            }
394            $offset += $res;
395            $state = 2 if $check_complete->(\$ret);
396        }
397    }
398
399    unless ($state == 2) {
400        _dead_sock($sock); # improperly finished
401        return undef;
402    }
403
404    return $ret;
405}
406
407sub delete {
408    my Cache::Memcached $self = shift;
409    my ($key, $time) = @_;
410    return 0 if ! $self->{'active'} || $self->{'readonly'};
411    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
412    my $sock = $self->get_sock($key);
413    return 0 unless $sock;
414
415    $self->{'stats'}->{"delete"}++;
416    $key = ref $key ? $key->[1] : $key;
417    $time = $time ? " $time" : "";
418    my $cmd = "delete $self->{namespace}$key$time\r\n";
419    my $res = _write_and_read($self, $sock, $cmd);
420
421    if ($self->{'stat_callback'}) {
422        my $etime = Time::HiRes::time();
423        $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
424    }
425
426    return defined $res && $res eq "DELETED\r\n";
427}
428*remove = \&delete;
429
430sub add {
431    _set("add", @_);
432}
433
434sub replace {
435    _set("replace", @_);
436}
437
438sub set {
439    _set("set", @_);
440}
441
442sub _set {
443    my $cmdname = shift;
444    my Cache::Memcached $self = shift;
445    my ($key, $val, $exptime) = @_;
446    return 0 if ! $self->{'active'} || $self->{'readonly'};
447    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
448    my $sock = $self->get_sock($key);
449    return 0 unless $sock;
450
451    use bytes; # return bytes from length()
452
453    $self->{'stats'}->{$cmdname}++;
454    my $flags = 0;
455    $key = ref $key ? $key->[1] : $key;
456
457    if (ref $val) {
458        local $Carp::CarpLevel = 2;
459        $val = Storable::nfreeze($val);
460        $flags |= F_STORABLE;
461    }
462    warn "value for memkey:$key is not defined" unless defined $val;
463
464    my $len = length($val);
465
466    if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
467        $len >= $self->{'compress_threshold'}) {
468
469        my $c_val = Compress::Zlib::memGzip($val);
470        my $c_len = length($c_val);
471
472        # do we want to keep it?
473        if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
474            $val = $c_val;
475            $len = $c_len;
476            $flags |= F_COMPRESS;
477        }
478    }
479
480    $exptime = int($exptime || 0);
481
482    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
483    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
484
485    my $res = _write_and_read($self, $sock, $line);
486
487    if ($self->{'debug'} && $line) {
488        chop $line; chop $line;
489        print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
490    }
491
492    if ($self->{'stat_callback'}) {
493        my $etime = Time::HiRes::time();
494        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
495    }
496
497    return defined $res && $res eq "STORED\r\n";
498}
499
500sub incr {
501    _incrdecr("incr", @_);
502}
503
504sub decr {
505    _incrdecr("decr", @_);
506}
507
508sub _incrdecr {
509    my $cmdname = shift;
510    my Cache::Memcached $self = shift;
511    my ($key, $value) = @_;
512    return undef if ! $self->{'active'} || $self->{'readonly'};
513    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
514    my $sock = $self->get_sock($key);
515    return undef unless $sock;
516    $key = $key->[1] if ref $key;
517    $self->{'stats'}->{$cmdname}++;
518    $value = 1 unless defined $value;
519
520    my $line = "$cmdname $self->{namespace}$key $value\r\n";
521    my $res = _write_and_read($self, $sock, $line);
522
523    if ($self->{'stat_callback'}) {
524        my $etime = Time::HiRes::time();
525        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
526    }
527
528    return undef unless defined $res && $res =~ /^(\d+)/;
529    return $1;
530}
531
532sub get {
533    my Cache::Memcached $self = $_[0];
534    my $key = $_[1];
535
536    # TODO: make a fast path for this?  or just keep using get_multi?
537    my $r = $self->get_multi($key);
538    my $kval = ref $key ? $key->[1] : $key;
539    return $r->{$kval};
540}
541
542sub get_multi {
543    my Cache::Memcached $self = shift;
544    return {} unless $self->{'active'};
545    $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
546    $self->{'stats'}->{"get_multi"}++;
547
548    my %val;        # what we'll be returning a reference to (realkey -> value)
549    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
550    my $sock;
551
552    if ($self->{'_single_sock'}) {
553        $sock = $self->sock_to_host($self->{'_single_sock'});
554        unless ($sock) {
555            return {};
556        }
557        foreach my $key (@_) {
558            my $kval = ref $key ? $key->[1] : $key;
559            push @{$sock_keys{$sock}}, $kval;
560        }
561    } else {
562        my $bcount = $self->{'bucketcount'};
563        my $sock;
564      KEY:
565        foreach my $key (@_) {
566            my ($hv, $real_key) = ref $key ?
567                (int($key->[0]),               $key->[1]) :
568                ((crc32($key) >> 16) & 0x7fff, $key);
569
570            my $tries;
571            while (1) {
572                my $bucket = $hv % $bcount;
573
574                # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
575                #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
576                #    and last;
577
578                # but this variant doesn't crash:
579                $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
580                if ($sock) {
581                    $buck2sock[$bucket] = $sock;
582                    last;
583                }
584
585                next KEY if $tries++ >= 20;
586                $hv += _hashfunc($tries . $real_key);
587            }
588
589            push @{$sock_keys{$sock}}, $real_key;
590        }
591    }
592
593    $self->{'stats'}->{"get_keys"} += @_;
594    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
595
596    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
597
598    _load_multi($self, \%sock_keys, \%val);
599
600    if ($self->{'debug'}) {
601        while (my ($k, $v) = each %val) {
602            print STDERR "MemCache: got $k = $v\n";
603        }
604    }
605    return \%val;
606}
607
608sub _load_multi {
609    use bytes; # return bytes from length()
610    my Cache::Memcached $self;
611    my ($sock_keys, $ret);
612
613    ($self, $sock_keys, $ret) = @_;
614
615    # all keyed by $sockstr:
616    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
617    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
618    my %buf;     # buffers, for writing
619
620    my %parser;  # $sockstr -> Cache::Memcached::GetParser
621
622    my $active_changed = 1; # force rebuilding of select sets
623
624    my $dead = sub {
625        my $sock = shift;
626        print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
627        delete $reading{$sock};
628        delete $writing{$sock};
629
630        if (my $p = $parser{$sock}) {
631            my $key = $p->current_key;
632            delete $ret->{$key} if $key;
633        }
634
635        if ($self->{'stat_callback'}) {
636            my $etime = Time::HiRes::time();
637            $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
638        }
639
640        close $sock;
641        _dead_sock($sock);
642    };
643
644    # $finalize->($key, $flags)
645    # $finalize->({ $key => $flags, $key => $flags });
646    my $finalize = sub {
647        my $map = $_[0];
648        $map = {@_} unless ref $map;
649
650        while (my ($k, $flags) = each %$map) {
651
652            # remove trailing \r\n
653            chop $ret->{$k}; chop $ret->{$k};
654
655            $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
656                if $HAVE_ZLIB && $flags & F_COMPRESS;
657            if ($flags & F_STORABLE) {
658                # wrapped in eval in case a perl 5.6 Storable tries to
659                # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
660                # and dies if the version number changes at all.  in 5.8
661                # they made it only die if it unencounters a new feature)
662                eval {
663                    $ret->{$k} = Storable::thaw($ret->{$k});
664                };
665                # so if there was a problem, just treat it as a cache miss.
666                if ($@) {
667                    delete $ret->{$k};
668                }
669            }
670        }
671    };
672
673    foreach (keys %$sock_keys) {
674        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
675        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
676        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
677        $writing{$_} = $sock;
678        if ($self->{namespace}) {
679            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
680        } else {
681            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
682        }
683
684        $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
685    }
686
687    my $read = sub {
688        my $sockstr = "$_[0]";  # $sock is $_[0];
689        my $p = $parser{$sockstr} or die;
690        my $rv = $p->parse_from_sock($_[0]);
691        if ($rv > 0) {
692            # okay, finished with this socket
693            delete $reading{$sockstr};
694        } elsif ($rv < 0) {
695            $dead->($_[0]);
696        }
697        return $rv;
698    };
699
700    # returns 1 when it's done, for success or error.  0 if still working.
701    my $write = sub {
702        my ($sock, $sockstr) = ($_[0], "$_[0]");
703        my $res;
704
705        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
706
707        return 0
708            if not defined $res and $!==EWOULDBLOCK;
709        unless ($res > 0) {
710            $dead->($sock);
711            return 1;
712        }
713        if ($res == length($buf{$sockstr})) { # all sent
714            $buf{$sockstr} = "";
715
716            # switch the socket from writing to reading
717            delete $writing{$sockstr};
718            $reading{$sockstr} = $sock;
719            return 1;
720        } else { # we only succeeded in sending some of it
721            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
722        }
723        return 0;
724    };
725
726    # the bitsets for select
727    my ($rin, $rout, $win, $wout);
728    my $nfound;
729
730    # the big select loop
731    while(1) {
732        if ($active_changed) {
733            last unless %reading or %writing; # no sockets left?
734            ($rin, $win) = ('', '');
735            foreach (values %reading) {
736                vec($rin, fileno($_), 1) = 1;
737            }
738            foreach (values %writing) {
739                vec($win, fileno($_), 1) = 1;
740            }
741            $active_changed = 0;
742        }
743        # TODO: more intelligent cumulative timeout?
744        # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
745        $nfound = select($rout=$rin, $wout=$win, undef,
746                         $self->{'select_timeout'});
747        last unless $nfound;
748
749        # TODO: possible robustness improvement: we could select
750        # writing sockets for reading also, and raise hell if they're
751        # ready (input unread from last time, etc.)
752        # maybe do that on the first loop only?
753        foreach (values %writing) {
754            if (vec($wout, fileno($_), 1)) {
755                $active_changed = 1 if $write->($_);
756            }
757        }
758        foreach (values %reading) {
759            if (vec($rout, fileno($_), 1)) {
760                $active_changed = 1 if $read->($_);
761            }
762        }
763    }
764
765    # if there're active sockets left, they need to die
766    foreach (values %writing) {
767        $dead->($_);
768    }
769    foreach (values %reading) {
770        $dead->($_);
771    }
772
773    return;
774}
775
776sub _hashfunc {
777    return (crc32($_[0]) >> 16) & 0x7fff;
778}
779
780sub flush_all {
781    my Cache::Memcached $self = shift;
782
783    my $success = 1;
784
785    my @hosts = @{$self->{'buckets'}};
786    foreach my $host (@hosts) {
787        my $sock = $self->sock_to_host($host);
788        my @res = $self->run_command($sock, "flush_all\r\n");
789        $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
790    }
791
792    return $success;
793}
794
795# returns array of lines, or () on failure.
796sub run_command {
797    my Cache::Memcached $self = shift;
798    my ($sock, $cmd) = @_;
799    return () unless $sock;
800    my $ret;
801    my $line = $cmd;
802    while (my $res = _write_and_read($self, $sock, $line)) {
803        undef $line;
804        $ret .= $res;
805        last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
806    }
807    chop $ret; chop $ret;
808    return map { "$_\r\n" } split(/\r\n/, $ret);
809}
810
811sub stats {
812    my Cache::Memcached $self = shift;
813    my ($types) = @_;
814    return 0 unless $self->{'active'};
815    return 0 unless !ref($types) || ref($types) eq 'ARRAY';
816    if (!ref($types)) {
817        if (!$types) {
818            # I don't much care what the default is, it should just
819            # be something reasonable.  Obviously "reset" should not
820            # be on the list :) but other types that might go in here
821            # include maps, cachedump, slabs, or items.
822            $types = [ qw( misc malloc sizes self ) ];
823        } else {
824            $types = [ $types ];
825        }
826    }
827
828    my $stats_hr = { };
829
830    # The "self" stat type is special, it only applies to this very
831    # object.
832    if (grep /^self$/, @$types) {
833        $stats_hr->{'self'} = \%{ $self->{'stats'} };
834    }
835
836    my %misc_keys = map { $_ => 1 }
837      qw/ bytes bytes_read bytes_written
838          cmd_get cmd_set connection_structures curr_items
839          get_hits get_misses
840          total_connections total_items
841        /;
842
843    # Now handle the other types, passing each type to each host server.
844    my @hosts = @{$self->{'buckets'}};
845  HOST: foreach my $host (@hosts) {
846        my $sock = $self->sock_to_host($host);
847      TYPE: foreach my $typename (grep !/^self$/, @$types) {
848            my $type = $typename eq 'misc' ? "" : " $typename";
849            my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
850                my $bref = shift;
851                return $$bref =~ /^(?:END|ERROR)\r?\n/m;
852            });
853            unless ($lines) {
854                _dead_sock($sock);
855                next HOST;
856            }
857
858            $lines =~ s/\0//g;  # 'stats sizes' starts with NULL?
859
860            # And, most lines end in \r\n but 'stats maps' (as of
861            # July 2003 at least) ends in \n. ??
862            my @lines = split(/\r?\n/, $lines);
863
864            # Some stats are key-value, some are not.  malloc,
865            # sizes, and the empty string are key-value.
866            # ("self" was handled separately above.)
867            if ($typename =~ /^(malloc|sizes|misc)$/) {
868                # This stat is key-value.
869                foreach my $line (@lines) {
870                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
871                    if ($key) {
872                        $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
873                    }
874                    $stats_hr->{'total'}{$key} += $value
875                        if $typename eq 'misc' && $key && $misc_keys{$key};
876                    $stats_hr->{'total'}{"malloc_$key"} += $value
877                        if $typename eq 'malloc' && $key;
878                }
879            } else {
880                # This stat is not key-value so just pull it
881                # all out in one blob.
882                $lines =~ s/^END\r?\n//m;
883                $stats_hr->{'hosts'}{$host}{$typename} ||= "";
884                $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
885            }
886        }
887    }
888
889    return $stats_hr;
890}
891
892sub stats_reset {
893    my Cache::Memcached $self = shift;
894    my ($types) = @_;
895    return 0 unless $self->{'active'};
896
897  HOST: foreach my $host (@{$self->{'buckets'}}) {
898        my $sock = $self->sock_to_host($host);
899        my $ok = _write_and_read($self, $sock, "stats reset");
900        unless (defined $ok && $ok eq "RESET\r\n") {
901            _dead_sock($sock);
902        }
903    }
904    return 1;
905}
906
9071;
908__END__
909
910=head1 NAME
911
912Cache::Memcached - client library for memcached (memory cache daemon)
913
914=head1 SYNOPSIS
915
916  use Cache::Memcached;
917
918  $memd = new Cache::Memcached {
919    'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
920                   "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
921    'debug' => 0,
922    'compress_threshold' => 10_000,
923  };
924  $memd->set_servers($array_ref);
925  $memd->set_compress_threshold(10_000);
926  $memd->enable_compress(0);
927
928  $memd->set("my_key", "Some value");
929  $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
930
931  $val = $memd->get("my_key");
932  $val = $memd->get("object_key");
933  if ($val) { print $val->{'complex'}->[2]; }
934
935  $memd->incr("key");
936  $memd->decr("key");
937  $memd->incr("key", 2);
938
939=head1 DESCRIPTION
940
941This is the Perl API for memcached, a distributed memory cache daemon.
942More information is available at:
943
944  http://www.danga.com/memcached/
945
946=head1 CONSTRUCTOR
947
948=over 4
949
950=item C<new>
951
952Takes one parameter, a hashref of options.  The most important key is
953C<servers>, but that can also be set later with the C<set_servers>
954method.  The servers must be an arrayref of hosts, each of which is
955either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
956former and an integer weight value.  (The default weight if
957unspecified is 1.)  It's recommended that weight values be kept as low
958as possible, as this module currently allocates memory for bucket
959distribution proportional to the total host weights.
960
961Use C<compress_threshold> to set a compression threshold, in bytes.
962Values larger than this threshold will be compressed by C<set> and
963decompressed by C<get>.
964
965Use C<no_rehash> to disable finding a new memcached server when one
966goes down.  Your application may or may not need this, depending on
967your expirations and key usage.
968
969Use C<readonly> to disable writes to backend memcached servers.  Only
970get and get_multi will work.  This is useful in bizarre debug and
971profiling cases only.
972
973Use C<namespace> to prefix all keys with the provided namespace value.
974That is, if you set namespace to "app1:" and later do a set of "foo"
975to "bar", memcached is actually seeing you set "app1:foo" to "bar".
976
977The other useful key is C<debug>, which when set to true will produce
978diagnostics on STDERR.
979
980=back
981
982=head1 METHODS
983
984=over 4
985
986=item C<set_servers>
987
988Sets the server list this module distributes key gets and sets between.
989The format is an arrayref of identical form as described in the C<new>
990constructor.
991
992=item C<set_debug>
993
994Sets the C<debug> flag.  See C<new> constructor for more information.
995
996=item C<set_readonly>
997
998Sets the C<readonly> flag.  See C<new> constructor for more information.
999
1000=item C<set_norehash>
1001
1002Sets the C<no_rehash> flag.  See C<new> constructor for more information.
1003
1004=item C<set_compress_threshold>
1005
1006Sets the compression threshold. See C<new> constructor for more information.
1007
1008=item C<enable_compress>
1009
1010Temporarily enable or disable compression.  Has no effect if C<compress_threshold>
1011isn't set, but has an overriding effect if it is.
1012
1013=item C<get>
1014
1015my $val = $memd->get($key);
1016
1017Retrieves a key from the memcache.  Returns the value (automatically
1018thawed with Storable, if necessary) or undef.
1019
1020The $key can optionally be an arrayref, with the first element being the
1021hash value, if you want to avoid making this module calculate a hash
1022value.  You may prefer, for example, to keep all of a given user's
1023objects on the same memcache server, so you could use the user's
1024unique id as the hash value.
1025
1026=item C<get_multi>
1027
1028my $hashref = $memd->get_multi(@keys);
1029
1030Retrieves multiple keys from the memcache doing just one query.
1031Returns a hashref of key/value pairs that were available.
1032
1033This method is recommended over regular 'get' as it lowers the number
1034of total packets flying around your network, reducing total latency,
1035since your app doesn't have to wait for each round-trip of 'get'
1036before sending the next one.
1037
1038=item C<set>
1039
1040$memd->set($key, $value[, $exptime]);
1041
1042Unconditionally sets a key to a given value in the memcache.  Returns true
1043if it was stored successfully.
1044
1045The $key can optionally be an arrayref, with the first element being the
1046hash value, as described above.
1047
1048The $exptime (expiration time) defaults to "never" if unspecified.  If
1049you want the key to expire in memcached, pass an integer $exptime.  If
1050value is less than 60*60*24*30 (30 days), time is assumed to be relative
1051from the present.  If larger, it's considered an absolute Unix time.
1052
1053=item C<add>
1054
1055$memd->add($key, $value[, $exptime]);
1056
1057Like C<set>, but only stores in memcache if the key doesn't already exist.
1058
1059=item C<replace>
1060
1061$memd->replace($key, $value[, $exptime]);
1062
1063Like C<set>, but only stores in memcache if the key already exists.  The
1064opposite of C<add>.
1065
1066=item C<delete>
1067
1068$memd->delete($key[, $time]);
1069
1070Deletes a key.  You may optionally provide an integer time value (in seconds) to
1071tell the memcached server to block new writes to this key for that many seconds.
1072(Sometimes useful as a hacky means to prevent races.)  Returns true if key
1073was found and deleted, and false otherwise.
1074
1075You may also use the alternate method name B<remove>, so
1076Cache::Memcached looks like the L<Cache::Cache> API.
1077
1078=item C<incr>
1079
1080$memd->incr($key[, $value]);
1081
1082Sends a command to the server to atomically increment the value for
1083$key by $value, or by 1 if $value is undefined.  Returns undef if $key
1084doesn't exist on server, otherwise it returns the new value after
1085incrementing.  Value should be zero or greater.  Overflow on server
1086is not checked.  Be aware of values approaching 2**32.  See decr.
1087
1088=item C<decr>
1089
1090$memd->decr($key[, $value]);
1091
1092Like incr, but decrements.  Unlike incr, underflow is checked and new
1093values are capped at 0.  If server value is 1, a decrement of 2
1094returns 0, not -1.
1095
1096=item C<stats>
1097
1098$memd->stats([$keys]);
1099
1100Returns a hashref of statistical data regarding the memcache server(s),
1101the $memd object, or both.  $keys can be an arrayref of keys wanted, a
1102single key wanted, or absent (in which case the default value is malloc,
1103sizes, self, and the empty string).  These keys are the values passed
1104to the 'stats' command issued to the memcached server(s), except for
1105'self' which is internal to the $memd object.  Allowed values are:
1106
1107=over 4
1108
1109=item C<misc>
1110
1111The stats returned by a 'stats' command:  pid, uptime, version,
1112bytes, get_hits, etc.
1113
1114=item C<malloc>
1115
1116The stats returned by a 'stats malloc':  total_alloc, arena_size, etc.
1117
1118=item C<sizes>
1119
1120The stats returned by a 'stats sizes'.
1121
1122=item C<self>
1123
1124The stats for the $memd object itself (a copy of $memd->{'stats'}).
1125
1126=item C<maps>
1127
1128The stats returned by a 'stats maps'.
1129
1130=item C<cachedump>
1131
1132The stats returned by a 'stats cachedump'.
1133
1134=item C<slabs>
1135
1136The stats returned by a 'stats slabs'.
1137
1138=item C<items>
1139
1140The stats returned by a 'stats items'.
1141
1142=back
1143
1144=item C<disconnect_all>
1145
1146$memd->disconnect_all;
1147
1148Closes all cached sockets to all memcached servers.  You must do this
1149if your program forks and the parent has used this module at all.
1150Otherwise the children will try to use cached sockets and they'll fight
1151(as children do) and garble the client/server protocol.
1152
1153=item C<flush_all>
1154
1155$memd->flush_all;
1156
1157Runs the memcached "flush_all" command on all configured hosts,
1158emptying all their caches.  (or rather, invalidating all items
1159in the caches in an O(1) operation...)  Running stats will still
1160show the item existing, they're just be non-existent and lazily
1161destroyed next time you try to detch any of them.
1162
1163=back
1164
1165=head1 BUGS
1166
1167When a server goes down, this module does detect it, and re-hashes the
1168request to the remaining servers, but the way it does it isn't very
1169clean.  The result may be that it gives up during its rehashing and
1170refuses to get/set something it could've, had it been done right.
1171
1172=head1 COPYRIGHT
1173
1174This module is Copyright (c) 2003 Brad Fitzpatrick.
1175All rights reserved.
1176
1177You may distribute under the terms of either the GNU General Public
1178License or the Artistic License, as specified in the Perl README file.
1179
1180=head1 WARRANTY
1181
1182This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1183
1184=head1 FAQ
1185
1186See the memcached website:
1187   http://www.danga.com/memcached/
1188
1189=head1 AUTHORS
1190
1191Brad Fitzpatrick <brad@danga.com>
1192
1193Anatoly Vorobey <mellon@pobox.com>
1194
1195Brad Whitaker <whitaker@danga.com>
1196
1197Jamie McCarthy <jamie@mccarthy.vg>
Note: See TracBrowser for help on using the browser.