root/trunk/api/perl/lib/Cache/Memcached.pm @ 795

Revision 795, 34.1 kB (checked in by henrylyne, 12 months ago)

If a memcache value is undefined, default it to an empty string. This fixes errors reported for using length on undef, and trying to concatenate an undef value.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# $Id$
2#
3# Copyright (c) 2003, 2004  Brad Fitzpatrick <brad@danga.com>
4#
5# See COPYRIGHT section in pod text below for usage and distribution rights.
6#
7
8package Cache::Memcached;
9
10use strict;
11use warnings;
12
13no strict 'refs';
14use Storable ();
15use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16use IO::Handle ();
17use Time::HiRes ();
18use String::CRC32;
19use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20use Cache::Memcached::GetParser;
21use fields qw{
22    debug no_rehash stats compress_threshold compress_enable stat_callback
23    readonly select_timeout namespace namespace_len servers active buckets
24    pref_ip
25    bucketcount _single_sock _stime
26    connect_timeout cb_connect_fail
27    parser_class
28};
29
30# flag definitions
31use constant F_STORABLE => 1;
32use constant F_COMPRESS => 2;
33
34# size savings required before saving compressed value
35use constant COMPRESS_SAVINGS => 0.20; # percent
36
37use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
38$VERSION = "1.24";
39
40BEGIN {
41    $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
42}
43
44my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
45$HAVE_XS = 0 if $ENV{NO_XS};
46
47my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
48if ($ENV{XS_DEBUG}) {
49    print "using parser: $parser_class\n";
50}
51
52$FLAG_NOSIGNAL = 0;
53eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
54
55my %host_dead;   # host -> unixtime marked dead until
56my %cache_sock;  # host -> socket
57my @buck2sock;   # bucket number -> $sock
58
59my $PROTO_TCP;
60
61our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
62
63sub new {
64    my Cache::Memcached $self = shift;
65    $self = fields::new( $self ) unless ref $self;
66
67    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args
68
69    $self->set_servers($args->{'servers'});
70    $self->{'debug'} = $args->{'debug'} || 0;
71    $self->{'no_rehash'} = $args->{'no_rehash'};
72    $self->{'stats'} = {};
73    $self->{'pref_ip'} = $args->{'pref_ip'} || {};
74    $self->{'compress_threshold'} = $args->{'compress_threshold'};
75    $self->{'compress_enable'}    = 1;
76    $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
77    $self->{'readonly'} = $args->{'readonly'};
78    $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
79
80    # TODO: undocumented
81    $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
82    $self->{'select_timeout'}  = $args->{'select_timeout'}  || 1.0;
83    $self->{namespace} = $args->{namespace} || '';
84    $self->{namespace_len} = length $self->{namespace};
85
86    return $self;
87}
88
89sub set_pref_ip {
90    my Cache::Memcached $self = shift;
91    $self->{'pref_ip'} = shift;
92}
93
94sub set_servers {
95    my Cache::Memcached $self = shift;
96    my ($list) = @_;
97    $self->{'servers'} = $list || [];
98    $self->{'active'} = scalar @{$self->{'servers'}};
99    $self->{'buckets'} = undef;
100    $self->{'bucketcount'} = 0;
101    $self->init_buckets;
102    @buck2sock = ();
103
104    $self->{'_single_sock'} = undef;
105    if (@{$self->{'servers'}} == 1) {
106        $self->{'_single_sock'} = $self->{'servers'}[0];
107    }
108
109    return $self;
110}
111
112sub set_cb_connect_fail {
113    my Cache::Memcached $self = shift;
114    $self->{'cb_connect_fail'} = shift;
115}
116
117sub set_connect_timeout {
118    my Cache::Memcached $self = shift;
119    $self->{'connect_timeout'} = shift;
120}
121
122sub set_debug {
123    my Cache::Memcached $self = shift;
124    my ($dbg) = @_;
125    $self->{'debug'} = $dbg || 0;
126}
127
128sub set_readonly {
129    my Cache::Memcached $self = shift;
130    my ($ro) = @_;
131    $self->{'readonly'} = $ro;
132}
133
134sub set_norehash {
135    my Cache::Memcached $self = shift;
136    my ($val) = @_;
137    $self->{'no_rehash'} = $val;
138}
139
140sub set_compress_threshold {
141    my Cache::Memcached $self = shift;
142    my ($thresh) = @_;
143    $self->{'compress_threshold'} = $thresh;
144}
145
146sub enable_compress {
147    my Cache::Memcached $self = shift;
148    my ($enable) = @_;
149    $self->{'compress_enable'} = $enable;
150}
151
152sub forget_dead_hosts {
153    %host_dead = ();
154    @buck2sock = ();
155}
156
157sub set_stat_callback {
158    my Cache::Memcached $self = shift;
159    my ($stat_callback) = @_;
160    $self->{'stat_callback'} = $stat_callback;
161}
162
163my %sock_map;  # stringified-$sock -> "$ip:$port"
164
165sub _dead_sock {
166    my ($sock, $ret, $dead_for) = @_;
167    if (my $ipport = $sock_map{$sock}) {
168        my $now = time();
169        $host_dead{$ipport} = $now + $dead_for
170            if $dead_for;
171        delete $cache_sock{$ipport};
172        delete $sock_map{$sock};
173    }
174    @buck2sock = ();
175    return $ret;  # 0 or undef, probably, depending on what caller wants
176}
177
178sub _close_sock {
179    my ($sock) = @_;
180    if (my $ipport = $sock_map{$sock}) {
181        close $sock;
182        delete $cache_sock{$ipport};
183        delete $sock_map{$sock};
184    }
185    @buck2sock = ();
186}
187
188sub _connect_sock { # sock, sin, timeout
189    my ($sock, $sin, $timeout) = @_;
190    $timeout = 0.25 if not defined $timeout;
191
192    # make the socket non-blocking from now on,
193    # except if someone wants 0 timeout, meaning
194    # a blocking connect, but even then turn it
195    # non-blocking at the end of this function
196
197    if ($timeout) {
198        IO::Handle::blocking($sock, 0);
199    } else {
200        IO::Handle::blocking($sock, 1);
201    }
202
203    my $ret = connect($sock, $sin);
204
205    if (!$ret && $timeout && $!==EINPROGRESS) {
206
207        my $win='';
208        vec($win, fileno($sock), 1) = 1;
209
210        if (select(undef, $win, undef, $timeout) > 0) {
211            $ret = connect($sock, $sin);
212            # EISCONN means connected & won't re-connect, so success
213            $ret = 1 if !$ret && $!==EISCONN;
214        }
215    }
216
217    unless ($timeout) { # socket was temporarily blocking, now revert
218        IO::Handle::blocking($sock, 0);
219    }
220
221    # from here on, we use non-blocking (async) IO for the duration
222    # of the socket's life
223
224    return $ret;
225}
226
227sub sock_to_host { # (host)
228    my Cache::Memcached $self = ref $_[0] ? shift : undef;
229    my $host = $_[0];
230    return $cache_sock{$host} if $cache_sock{$host};
231
232    my $now = time();
233    my ($ip, $port) = $host =~ /(.*):(\d+)/;
234    return undef if
235        $host_dead{$host} && $host_dead{$host} > $now;
236    my $sock;
237
238    my $connected = 0;
239    my $sin;
240    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
241
242    if ( index($host, '/') != 0 )
243    {
244        # if a preferred IP is known, try that first.
245        if ($self && $self->{pref_ip}{$ip}) {
246            socket($sock, PF_INET, SOCK_STREAM, $proto);
247            my $prefip = $self->{pref_ip}{$ip};
248            $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
249            if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
250                $connected = 1;
251            } else {
252                if (my $cb = $self->{cb_connect_fail}) {
253                    $cb->($prefip);
254                }
255                close $sock;
256            }
257        }
258
259        # normal path, or fallback path if preferred IP failed
260        unless ($connected) {
261            socket($sock, PF_INET, SOCK_STREAM, $proto);
262            $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
263            my $timeout = $self ? $self->{connect_timeout} : 0.25;
264            unless (_connect_sock($sock,$sin,$timeout)) {
265                my $cb = $self ? $self->{cb_connect_fail} : undef;
266                $cb->($ip) if $cb;
267                return _dead_sock($sock, undef, 20 + int(rand(10)));
268            }
269        }
270    } else { # it's a unix domain/local socket
271        socket($sock, PF_UNIX, SOCK_STREAM, 0);
272        $sin = Socket::sockaddr_un($host);
273        my $timeout = $self ? $self->{connect_timeout} : 0.25;
274        unless (_connect_sock($sock,$sin,$timeout)) {
275            my $cb = $self ? $self->{cb_connect_fail} : undef;
276            $cb->($host) if $cb;
277            return _dead_sock($sock, undef, 20 + int(rand(10)));
278        }
279    }
280
281    # make the new socket not buffer writes.
282    my $old = select($sock);
283    $| = 1;
284    select($old);
285
286    $sock_map{$sock} = $host;
287    $cache_sock{$host} = $sock;
288
289    return $sock;
290}
291
292sub get_sock { # (key)
293    my Cache::Memcached $self = $_[0];
294    my $key = $_[1];
295    return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
296    return undef unless $self->{'active'};
297    my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
298
299    my $real_key = ref $key ? $key->[1] : $key;
300    my $tries = 0;
301    while ($tries++ < 20) {
302        my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
303        my $sock = $self->sock_to_host($host);
304        return $sock if $sock;
305        return undef if $self->{'no_rehash'};
306        $hv += _hashfunc($tries . $real_key);  # stupid, but works
307    }
308    return undef;
309}
310
311sub init_buckets {
312    my Cache::Memcached $self = shift;
313    return if $self->{'buckets'};
314    my $bu = $self->{'buckets'} = [];
315    foreach my $v (@{$self->{'servers'}}) {
316        if (ref $v eq "ARRAY") {
317            for (1..$v->[1]) { push @$bu, $v->[0]; }
318        } else {
319            push @$bu, $v;
320        }
321    }
322    $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
323}
324
325sub disconnect_all {
326    my $sock;
327    foreach $sock (values %cache_sock) {
328        close $sock;
329    }
330    %cache_sock = ();
331}
332
333# writes a line, then reads result.  by default stops reading after a
334# single line, but caller can override the $check_complete subref,
335# which gets passed a scalarref of buffer read thus far.
336sub _write_and_read {
337    my Cache::Memcached $self = shift;
338    my ($sock, $line, $check_complete) = @_;
339    my $res;
340    my ($ret, $offset) = (undef, 0);
341
342    $check_complete ||= sub {
343        return (rindex($ret, "\r\n") + 2 == length($ret));
344    };
345
346    # state: 0 - writing, 1 - reading, 2 - done
347    my $state = 0;
348
349    # the bitsets for select
350    my ($rin, $rout, $win, $wout);
351    my $nfound;
352
353    my $copy_state = -1;
354    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
355
356    # the select loop
357    while(1) {
358        if ($copy_state!=$state) {
359            last if $state==2;
360            ($rin, $win) = ('', '');
361            vec($rin, fileno($sock), 1) = 1 if $state==1;
362            vec($win, fileno($sock), 1) = 1 if $state==0;
363            $copy_state = $state;
364        }
365        $nfound = select($rout=$rin, $wout=$win, undef,
366                         $self->{'select_timeout'});
367        last unless $nfound;
368
369        if (vec($wout, fileno($sock), 1)) {
370            $res = send($sock, $line, $FLAG_NOSIGNAL);
371            next
372                if not defined $res and $!==EWOULDBLOCK;
373            unless ($res > 0) {
374                _close_sock($sock);
375                return undef;
376            }
377            if ($res == length($line)) { # all sent
378                $state = 1;
379            } else { # we only succeeded in sending some of it
380                substr($line, 0, $res, ''); # delete the part we sent
381            }
382        }
383
384        if (vec($rout, fileno($sock), 1)) {
385            $res = sysread($sock, $ret, 255, $offset);
386            next
387                if !defined($res) and $!==EWOULDBLOCK;
388            if ($res == 0) { # catches 0=conn closed or undef=error
389                _close_sock($sock);
390                return undef;
391            }
392            $offset += $res;
393            $state = 2 if $check_complete->(\$ret);
394        }
395    }
396
397    unless ($state == 2) {
398        _dead_sock($sock); # improperly finished
399        return undef;
400    }
401
402    return $ret;
403}
404
405sub delete {
406    my Cache::Memcached $self = shift;
407    my ($key, $time) = @_;
408    return 0 if ! $self->{'active'} || $self->{'readonly'};
409    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
410    my $sock = $self->get_sock($key);
411    return 0 unless $sock;
412
413    $self->{'stats'}->{"delete"}++;
414    $key = ref $key ? $key->[1] : $key;
415    $time = $time ? " $time" : "";
416    my $cmd = "delete $self->{namespace}$key$time\r\n";
417    my $res = _write_and_read($self, $sock, $cmd);
418
419    if ($self->{'stat_callback'}) {
420        my $etime = Time::HiRes::time();
421        $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
422    }
423
424    return defined $res && $res eq "DELETED\r\n";
425}
426*remove = \&delete;
427
428sub add {
429    _set("add", @_);
430}
431
432sub replace {
433    _set("replace", @_);
434}
435
436sub set {
437    _set("set", @_);
438}
439
440sub _set {
441    my $cmdname = shift;
442    my Cache::Memcached $self = shift;
443    my ($key, $val, $exptime) = @_;
444    $val = '' unless (defined $val);
445    return 0 if ! $self->{'active'} || $self->{'readonly'};
446    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
447    my $sock = $self->get_sock($key);
448    return 0 unless $sock;
449
450    use bytes; # return bytes from length()
451
452    $self->{'stats'}->{$cmdname}++;
453    my $flags = 0;
454    $key = ref $key ? $key->[1] : $key;
455
456    if (ref $val) {
457        local $Carp::CarpLevel = 2;
458        $val = Storable::nfreeze($val);
459        $flags |= F_STORABLE;
460    }
461
462    my $len = length($val);
463
464    if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
465        $len >= $self->{'compress_threshold'}) {
466
467        my $c_val = Compress::Zlib::memGzip($val);
468        my $c_len = length($c_val);
469
470        # do we want to keep it?
471        if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
472            $val = $c_val;
473            $len = $c_len;
474            $flags |= F_COMPRESS;
475        }
476    }
477
478    $exptime = int($exptime || 0);
479
480    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
481    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
482
483    my $res = _write_and_read($self, $sock, $line);
484
485    if ($self->{'debug'} && $line) {
486        chop $line; chop $line;
487        print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
488    }
489
490    if ($self->{'stat_callback'}) {
491        my $etime = Time::HiRes::time();
492        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
493    }
494
495    return defined $res && $res eq "STORED\r\n";
496}
497
498sub incr {
499    _incrdecr("incr", @_);
500}
501
502sub decr {
503    _incrdecr("decr", @_);
504}
505
506sub _incrdecr {
507    my $cmdname = shift;
508    my Cache::Memcached $self = shift;
509    my ($key, $value) = @_;
510    return undef if ! $self->{'active'} || $self->{'readonly'};
511    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
512    my $sock = $self->get_sock($key);
513    return undef unless $sock;
514    $key = $key->[1] if ref $key;
515    $self->{'stats'}->{$cmdname}++;
516    $value = 1 unless defined $value;
517
518    my $line = "$cmdname $self->{namespace}$key $value\r\n";
519    my $res = _write_and_read($self, $sock, $line);
520
521    if ($self->{'stat_callback'}) {
522        my $etime = Time::HiRes::time();
523        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
524    }
525
526    return undef unless defined $res && $res =~ /^(\d+)/;
527    return $1;
528}
529
530sub get {
531    my Cache::Memcached $self = $_[0];
532    my $key = $_[1];
533
534    # TODO: make a fast path for this?  or just keep using get_multi?
535    my $r = $self->get_multi($key);
536    my $kval = ref $key ? $key->[1] : $key;
537    return $r->{$kval};
538}
539
540sub get_multi {
541    my Cache::Memcached $self = shift;
542    return {} unless $self->{'active'};
543    $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
544    $self->{'stats'}->{"get_multi"}++;
545
546    my %val;        # what we'll be returning a reference to (realkey -> value)
547    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
548    my $sock;
549
550    if ($self->{'_single_sock'}) {
551        $sock = $self->sock_to_host($self->{'_single_sock'});
552        unless ($sock) {
553            return {};
554        }
555        foreach my $key (@_) {
556            my $kval = ref $key ? $key->[1] : $key;
557            push @{$sock_keys{$sock}}, $kval;
558        }
559    } else {
560        my $bcount = $self->{'bucketcount'};
561        my $sock;
562      KEY:
563        foreach my $key (@_) {
564            my ($hv, $real_key) = ref $key ?
565                (int($key->[0]),               $key->[1]) :
566                ((crc32($key) >> 16) & 0x7fff, $key);
567
568            my $tries;
569            while (1) {
570                my $bucket = $hv % $bcount;
571
572                # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
573                #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
574                #    and last;
575
576                # but this variant doesn't crash:
577                $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
578                if ($sock) {
579                    $buck2sock[$bucket] = $sock;
580                    last;
581                }
582
583                next KEY if $tries++ >= 20;
584                $hv += _hashfunc($tries . $real_key);
585            }
586
587            push @{$sock_keys{$sock}}, $real_key;
588        }
589    }
590
591    $self->{'stats'}->{"get_keys"} += @_;
592    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
593
594    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
595
596    _load_multi($self, \%sock_keys, \%val);
597
598    if ($self->{'debug'}) {
599        while (my ($k, $v) = each %val) {
600            print STDERR "MemCache: got $k = $v\n";
601        }
602    }
603    return \%val;
604}
605
606sub _load_multi {
607    use bytes; # return bytes from length()
608    my Cache::Memcached $self;
609    my ($sock_keys, $ret);
610
611    ($self, $sock_keys, $ret) = @_;
612
613    # all keyed by $sockstr:
614    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
615    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
616    my %buf;     # buffers, for writing
617
618    my %parser;  # $sockstr -> Cache::Memcached::GetParser
619
620    my $active_changed = 1; # force rebuilding of select sets
621
622    my $dead = sub {
623        my $sock = shift;
624        print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
625        delete $reading{$sock};
626        delete $writing{$sock};
627
628        if (my $p = $parser{$sock}) {
629            my $key = $p->current_key;
630            delete $ret->{$key} if $key;
631        }
632
633        if ($self->{'stat_callback'}) {
634            my $etime = Time::HiRes::time();
635            $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
636        }
637
638        close $sock;
639        _dead_sock($sock);
640    };
641
642    # $finalize->($key, $flags)
643    # $finalize->({ $key => $flags, $key => $flags });
644    my $finalize = sub {
645        my $map = $_[0];
646        $map = {@_} unless ref $map;
647
648        while (my ($k, $flags) = each %$map) {
649
650            # remove trailing \r\n
651            chop $ret->{$k}; chop $ret->{$k};
652
653            $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
654                if $HAVE_ZLIB && $flags & F_COMPRESS;
655            if ($flags & F_STORABLE) {
656                # wrapped in eval in case a perl 5.6 Storable tries to
657                # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
658                # and dies if the version number changes at all.  in 5.8
659                # they made it only die if it unencounters a new feature)
660                eval {
661                    $ret->{$k} = Storable::thaw($ret->{$k});
662                };
663                # so if there was a problem, just treat it as a cache miss.
664                if ($@) {
665                    delete $ret->{$k};
666                }
667            }
668        }
669    };
670
671    foreach (keys %$sock_keys) {
672        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
673        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
674        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
675        $writing{$_} = $sock;
676        if ($self->{namespace}) {
677            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
678        } else {
679            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
680        }
681
682        $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
683    }
684
685    my $read = sub {
686        my $sockstr = "$_[0]";  # $sock is $_[0];
687        my $p = $parser{$sockstr} or die;
688        my $rv = $p->parse_from_sock($_[0]);
689        if ($rv > 0) {
690            # okay, finished with this socket
691            delete $reading{$sockstr};
692        } elsif ($rv < 0) {
693            $dead->($_[0]);
694        }
695        return $rv;
696    };
697
698    # returns 1 when it's done, for success or error.  0 if still working.
699    my $write = sub {
700        my ($sock, $sockstr) = ($_[0], "$_[0]");
701        my $res;
702
703        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
704
705        return 0
706            if not defined $res and $!==EWOULDBLOCK;
707        unless ($res > 0) {
708            $dead->($sock);
709            return 1;
710        }
711        if ($res == length($buf{$sockstr})) { # all sent
712            $buf{$sockstr} = "";
713
714            # switch the socket from writing to reading
715            delete $writing{$sockstr};
716            $reading{$sockstr} = $sock;
717            return 1;
718        } else { # we only succeeded in sending some of it
719            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
720        }
721        return 0;
722    };
723
724    # the bitsets for select
725    my ($rin, $rout, $win, $wout);
726    my $nfound;
727
728    # the big select loop
729    while(1) {
730        if ($active_changed) {
731            last unless %reading or %writing; # no sockets left?
732            ($rin, $win) = ('', '');
733            foreach (values %reading) {
734                vec($rin, fileno($_), 1) = 1;
735            }
736            foreach (values %writing) {
737                vec($win, fileno($_), 1) = 1;
738            }
739            $active_changed = 0;
740        }
741        # TODO: more intelligent cumulative timeout?
742        # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
743        $nfound = select($rout=$rin, $wout=$win, undef,
744                         $self->{'select_timeout'});
745        last unless $nfound;
746
747        # TODO: possible robustness improvement: we could select
748        # writing sockets for reading also, and raise hell if they're
749        # ready (input unread from last time, etc.)
750        # maybe do that on the first loop only?
751        foreach (values %writing) {
752            if (vec($wout, fileno($_), 1)) {
753                $active_changed = 1 if $write->($_);
754            }
755        }
756        foreach (values %reading) {
757            if (vec($rout, fileno($_), 1)) {
758                $active_changed = 1 if $read->($_);
759            }
760        }
761    }
762
763    # if there're active sockets left, they need to die
764    foreach (values %writing) {
765        $dead->($_);
766    }
767    foreach (values %reading) {
768        $dead->($_);
769    }
770
771    return;
772}
773
774sub _hashfunc {
775    return (crc32($_[0]) >> 16) & 0x7fff;
776}
777
778sub flush_all {
779    my Cache::Memcached $self = shift;
780
781    my $success = 1;
782
783    my @hosts = @{$self->{'buckets'}};
784    foreach my $host (@hosts) {
785        my $sock = $self->sock_to_host($host);
786        my @res = $self->run_command($sock, "flush_all\r\n");
787        $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
788    }
789
790    return $success;
791}
792
793# returns array of lines, or () on failure.
794sub run_command {
795    my Cache::Memcached $self = shift;
796    my ($sock, $cmd) = @_;
797    return () unless $sock;
798    my $ret;
799    my $line = $cmd;
800    while (my $res = _write_and_read($self, $sock, $line)) {
801        undef $line;
802        $ret .= $res;
803        last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
804    }
805    chop $ret; chop $ret;
806    return map { "$_\r\n" } split(/\r\n/, $ret);
807}
808
809sub stats {
810    my Cache::Memcached $self = shift;
811    my ($types) = @_;
812    return 0 unless $self->{'active'};
813    return 0 unless !ref($types) || ref($types) eq 'ARRAY';
814    if (!ref($types)) {
815        if (!$types) {
816            # I don't much care what the default is, it should just
817            # be something reasonable.  Obviously "reset" should not
818            # be on the list :) but other types that might go in here
819            # include maps, cachedump, slabs, or items.
820            $types = [ qw( misc malloc sizes self ) ];
821        } else {
822            $types = [ $types ];
823        }
824    }
825
826    my $stats_hr = { };
827
828    # The "self" stat type is special, it only applies to this very
829    # object.
830    if (grep /^self$/, @$types) {
831        $stats_hr->{'self'} = \%{ $self->{'stats'} };
832    }
833
834    my %misc_keys = map { $_ => 1 }
835      qw/ bytes bytes_read bytes_written
836          cmd_get cmd_set connection_structures curr_items
837          get_hits get_misses
838          total_connections total_items
839        /;
840
841    # Now handle the other types, passing each type to each host server.
842    my @hosts = @{$self->{'buckets'}};
843  HOST: foreach my $host (@hosts) {
844        my $sock = $self->sock_to_host($host);
845      TYPE: foreach my $typename (grep !/^self$/, @$types) {
846            my $type = $typename eq 'misc' ? "" : " $typename";
847            my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
848                my $bref = shift;
849                return $$bref =~ /^(?:END|ERROR)\r?\n/m;
850            });
851            unless ($lines) {
852                _dead_sock($sock);
853                next HOST;
854            }
855
856            $lines =~ s/\0//g;  # 'stats sizes' starts with NULL?
857
858            # And, most lines end in \r\n but 'stats maps' (as of
859            # July 2003 at least) ends in \n. ??
860            my @lines = split(/\r?\n/, $lines);
861
862            # Some stats are key-value, some are not.  malloc,
863            # sizes, and the empty string are key-value.
864            # ("self" was handled separately above.)
865            if ($typename =~ /^(malloc|sizes|misc)$/) {
866                # This stat is key-value.
867                foreach my $line (@lines) {
868                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
869                    if ($key) {
870                        $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
871                    }
872                    $stats_hr->{'total'}{$key} += $value
873                        if $typename eq 'misc' && $key && $misc_keys{$key};
874                    $stats_hr->{'total'}{"malloc_$key"} += $value
875                        if $typename eq 'malloc' && $key;
876                }
877            } else {
878                # This stat is not key-value so just pull it
879                # all out in one blob.
880                $lines =~ s/^END\r?\n//m;
881                $stats_hr->{'hosts'}{$host}{$typename} ||= "";
882                $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
883            }
884        }
885    }
886
887    return $stats_hr;
888}
889
890sub stats_reset {
891    my Cache::Memcached $self = shift;
892    my ($types) = @_;
893    return 0 unless $self->{'active'};
894
895  HOST: foreach my $host (@{$self->{'buckets'}}) {
896        my $sock = $self->sock_to_host($host);
897        my $ok = _write_and_read($self, $sock, "stats reset");
898        unless (defined $ok && $ok eq "RESET\r\n") {
899            _dead_sock($sock);
900        }
901    }
902    return 1;
903}
904
9051;
906__END__
907
908=head1 NAME
909
910Cache::Memcached - client library for memcached (memory cache daemon)
911
912=head1 SYNOPSIS
913
914  use Cache::Memcached;
915
916  $memd = new Cache::Memcached {
917    'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
918                   "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
919    'debug' => 0,
920    'compress_threshold' => 10_000,
921  };
922  $memd->set_servers($array_ref);
923  $memd->set_compress_threshold(10_000);
924  $memd->enable_compress(0);
925
926  $memd->set("my_key", "Some value");
927  $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
928
929  $val = $memd->get("my_key");
930  $val = $memd->get("object_key");
931  if ($val) { print $val->{'complex'}->[2]; }
932
933  $memd->incr("key");
934  $memd->decr("key");
935  $memd->incr("key", 2);
936
937=head1 DESCRIPTION
938
939This is the Perl API for memcached, a distributed memory cache daemon.
940More information is available at:
941
942  http://www.danga.com/memcached/
943
944=head1 CONSTRUCTOR
945
946=over 4
947
948=item C<new>
949
950Takes one parameter, a hashref of options.  The most important key is
951C<servers>, but that can also be set later with the C<set_servers>
952method.  The servers must be an arrayref of hosts, each of which is
953either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
954former and an integer weight value.  (The default weight if
955unspecified is 1.)  It's recommended that weight values be kept as low
956as possible, as this module currently allocates memory for bucket
957distribution proportional to the total host weights.
958
959Use C<compress_threshold> to set a compression threshold, in bytes.
960Values larger than this threshold will be compressed by C<set> and
961decompressed by C<get>.
962
963Use C<no_rehash> to disable finding a new memcached server when one
964goes down.  Your application may or may not need this, depending on
965your expirations and key usage.
966
967Use C<readonly> to disable writes to backend memcached servers.  Only
968get and get_multi will work.  This is useful in bizarre debug and
969profiling cases only.
970
971Use C<namespace> to prefix all keys with the provided namespace value.
972That is, if you set namespace to "app1:" and later do a set of "foo"
973to "bar", memcached is actually seeing you set "app1:foo" to "bar".
974
975The other useful key is C<debug>, which when set to true will produce
976diagnostics on STDERR.
977
978=back
979
980=head1 METHODS
981
982=over 4
983
984=item C<set_servers>
985
986Sets the server list this module distributes key gets and sets between.
987The format is an arrayref of identical form as described in the C<new>
988constructor.
989
990=item C<set_debug>
991
992Sets the C<debug> flag.  See C<new> constructor for more information.
993
994=item C<set_readonly>
995
996Sets the C<readonly> flag.  See C<new> constructor for more information.
997
998=item C<set_norehash>
999
1000Sets the C<no_rehash> flag.  See C<new> constructor for more information.
1001
1002=item C<set_compress_threshold>
1003
1004Sets the compression threshold. See C<new> constructor for more information.
1005
1006=item C<enable_compress>
1007
1008Temporarily enable or disable compression.  Has no effect if C<compress_threshold>
1009isn't set, but has an overriding effect if it is.
1010
1011=item C<get>
1012
1013my $val = $memd->get($key);
1014
1015Retrieves a key from the memcache.  Returns the value (automatically
1016thawed with Storable, if necessary) or undef.
1017
1018The $key can optionally be an arrayref, with the first element being the
1019hash value, if you want to avoid making this module calculate a hash
1020value.  You may prefer, for example, to keep all of a given user's
1021objects on the same memcache server, so you could use the user's
1022unique id as the hash value.
1023
1024=item C<get_multi>
1025
1026my $hashref = $memd->get_multi(@keys);
1027
1028Retrieves multiple keys from the memcache doing just one query.
1029Returns a hashref of key/value pairs that were available.
1030
1031This method is recommended over regular 'get' as it lowers the number
1032of total packets flying around your network, reducing total latency,
1033since your app doesn't have to wait for each round-trip of 'get'
1034before sending the next one.
1035
1036=item C<set>
1037
1038$memd->set($key, $value[, $exptime]);
1039
1040Unconditionally sets a key to a given value in the memcache.  Returns true
1041if it was stored successfully.
1042
1043The $key can optionally be an arrayref, with the first element being the
1044hash value, as described above.
1045
1046The $exptime (expiration time) defaults to "never" if unspecified.  If
1047you want the key to expire in memcached, pass an integer $exptime.  If
1048value is less than 60*60*24*30 (30 days), time is assumed to be relative
1049from the present.  If larger, it's considered an absolute Unix time.
1050
1051=item C<add>
1052
1053$memd->add($key, $value[, $exptime]);
1054
1055Like C<set>, but only stores in memcache if the key doesn't already exist.
1056
1057=item C<replace>
1058
1059$memd->replace($key, $value[, $exptime]);
1060
1061Like C<set>, but only stores in memcache if the key already exists.  The
1062opposite of C<add>.
1063
1064=item C<delete>
1065
1066$memd->delete($key[, $time]);
1067
1068Deletes a key.  You may optionally provide an integer time value (in seconds) to
1069tell the memcached server to block new writes to this key for that many seconds.
1070(Sometimes useful as a hacky means to prevent races.)  Returns true if key
1071was found and deleted, and false otherwise.
1072
1073You may also use the alternate method name B<remove>, so
1074Cache::Memcached looks like the L<Cache::Cache> API.
1075
1076=item C<incr>
1077
1078$memd->incr($key[, $value]);
1079
1080Sends a command to the server to atomically increment the value for
1081$key by $value, or by 1 if $value is undefined.  Returns undef if $key
1082doesn't exist on server, otherwise it returns the new value after
1083incrementing.  Value should be zero or greater.  Overflow on server
1084is not checked.  Be aware of values approaching 2**32.  See decr.
1085
1086=item C<decr>
1087
1088$memd->decr($key[, $value]);
1089
1090Like incr, but decrements.  Unlike incr, underflow is checked and new
1091values are capped at 0.  If server value is 1, a decrement of 2
1092returns 0, not -1.
1093
1094=item C<stats>
1095
1096$memd->stats([$keys]);
1097
1098Returns a hashref of statistical data regarding the memcache server(s),
1099the $memd object, or both.  $keys can be an arrayref of keys wanted, a
1100single key wanted, or absent (in which case the default value is malloc,
1101sizes, self, and the empty string).  These keys are the values passed
1102to the 'stats' command issued to the memcached server(s), except for
1103'self' which is internal to the $memd object.  Allowed values are:
1104
1105=over 4
1106
1107=item C<misc>
1108
1109The stats returned by a 'stats' command:  pid, uptime, version,
1110bytes, get_hits, etc.
1111
1112=item C<malloc>
1113
1114The stats returned by a 'stats malloc':  total_alloc, arena_size, etc.
1115
1116=item C<sizes>
1117
1118The stats returned by a 'stats sizes'.
1119
1120=item C<self>
1121
1122The stats for the $memd object itself (a copy of $memd->{'stats'}).
1123
1124=item C<maps>
1125
1126The stats returned by a 'stats maps'.
1127
1128=item C<cachedump>
1129
1130The stats returned by a 'stats cachedump'.
1131
1132=item C<slabs>
1133
1134The stats returned by a 'stats slabs'.
1135
1136=item C<items>
1137
1138The stats returned by a 'stats items'.
1139
1140=back
1141
1142=item C<disconnect_all>
1143
1144$memd->disconnect_all;
1145
1146Closes all cached sockets to all memcached servers.  You must do this
1147if your program forks and the parent has used this module at all.
1148Otherwise the children will try to use cached sockets and they'll fight
1149(as children do) and garble the client/server protocol.
1150
1151=item C<flush_all>
1152
1153$memd->flush_all;
1154
1155Runs the memcached "flush_all" command on all configured hosts,
1156emptying all their caches.  (or rather, invalidating all items
1157in the caches in an O(1) operation...)  Running stats will still
1158show the item existing, they're just be non-existent and lazily
1159destroyed next time you try to detch any of them.
1160
1161=back
1162
1163=head1 BUGS
1164
1165When a server goes down, this module does detect it, and re-hashes the
1166request to the remaining servers, but the way it does it isn't very
1167clean.  The result may be that it gives up during its rehashing and
1168refuses to get/set something it could've, had it been done right.
1169
1170=head1 COPYRIGHT
1171
1172This module is Copyright (c) 2003 Brad Fitzpatrick.
1173All rights reserved.
1174
1175You may distribute under the terms of either the GNU General Public
1176License or the Artistic License, as specified in the Perl README file.
1177
1178=head1 WARRANTY
1179
1180This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1181
1182=head1 FAQ
1183
1184See the memcached website:
1185   http://www.danga.com/memcached/
1186
1187=head1 AUTHORS
1188
1189Brad Fitzpatrick <brad@danga.com>
1190
1191Anatoly Vorobey <mellon@pobox.com>
1192
1193Brad Whitaker <whitaker@danga.com>
1194
1195Jamie McCarthy <jamie@mccarthy.vg>
Note: See TracBrowser for help on using the browser.