root/trunk/api/perl/lib/Cache/Memcached.pm @ 796

Revision 796, 34.2 kB (checked in by henrylyne, 12 months ago)

Remove change to default val to empty string.
Insert warning with memkey to help debug.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# $Id$
2#
3# Copyright (c) 2003, 2004  Brad Fitzpatrick <brad@danga.com>
4#
5# See COPYRIGHT section in pod text below for usage and distribution rights.
6#
7
8package Cache::Memcached;
9
10use strict;
11use warnings;
12
13no strict 'refs';
14use Storable ();
15use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
16use IO::Handle ();
17use Time::HiRes ();
18use String::CRC32;
19use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
20use Cache::Memcached::GetParser;
21use Carp qw(carp croak);
22use fields qw{
23    debug no_rehash stats compress_threshold compress_enable stat_callback
24    readonly select_timeout namespace namespace_len servers active buckets
25    pref_ip
26    bucketcount _single_sock _stime
27    connect_timeout cb_connect_fail
28    parser_class
29};
30
31# flag definitions
32use constant F_STORABLE => 1;
33use constant F_COMPRESS => 2;
34
35# size savings required before saving compressed value
36use constant COMPRESS_SAVINGS => 0.20; # percent
37
38use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
39$VERSION = "1.24";
40
41BEGIN {
42    $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
43}
44
45my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
46$HAVE_XS = 0 if $ENV{NO_XS};
47
48my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
49if ($ENV{XS_DEBUG}) {
50    print "using parser: $parser_class\n";
51}
52
53$FLAG_NOSIGNAL = 0;
54eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
55
56my %host_dead;   # host -> unixtime marked dead until
57my %cache_sock;  # host -> socket
58my @buck2sock;   # bucket number -> $sock
59
60my $PROTO_TCP;
61
62our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
63
64sub new {
65    my Cache::Memcached $self = shift;
66    $self = fields::new( $self ) unless ref $self;
67
68    my $args = (@_ == 1) ? shift : { @_ };  # hashref-ify args
69
70    $self->set_servers($args->{'servers'});
71    $self->{'debug'} = $args->{'debug'} || 0;
72    $self->{'no_rehash'} = $args->{'no_rehash'};
73    $self->{'stats'} = {};
74    $self->{'pref_ip'} = $args->{'pref_ip'} || {};
75    $self->{'compress_threshold'} = $args->{'compress_threshold'};
76    $self->{'compress_enable'}    = 1;
77    $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
78    $self->{'readonly'} = $args->{'readonly'};
79    $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
80
81    # TODO: undocumented
82    $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
83    $self->{'select_timeout'}  = $args->{'select_timeout'}  || 1.0;
84    $self->{namespace} = $args->{namespace} || '';
85    $self->{namespace_len} = length $self->{namespace};
86
87    return $self;
88}
89
90sub set_pref_ip {
91    my Cache::Memcached $self = shift;
92    $self->{'pref_ip'} = shift;
93}
94
95sub set_servers {
96    my Cache::Memcached $self = shift;
97    my ($list) = @_;
98    $self->{'servers'} = $list || [];
99    $self->{'active'} = scalar @{$self->{'servers'}};
100    $self->{'buckets'} = undef;
101    $self->{'bucketcount'} = 0;
102    $self->init_buckets;
103    @buck2sock = ();
104
105    $self->{'_single_sock'} = undef;
106    if (@{$self->{'servers'}} == 1) {
107        $self->{'_single_sock'} = $self->{'servers'}[0];
108    }
109
110    return $self;
111}
112
113sub set_cb_connect_fail {
114    my Cache::Memcached $self = shift;
115    $self->{'cb_connect_fail'} = shift;
116}
117
118sub set_connect_timeout {
119    my Cache::Memcached $self = shift;
120    $self->{'connect_timeout'} = shift;
121}
122
123sub set_debug {
124    my Cache::Memcached $self = shift;
125    my ($dbg) = @_;
126    $self->{'debug'} = $dbg || 0;
127}
128
129sub set_readonly {
130    my Cache::Memcached $self = shift;
131    my ($ro) = @_;
132    $self->{'readonly'} = $ro;
133}
134
135sub set_norehash {
136    my Cache::Memcached $self = shift;
137    my ($val) = @_;
138    $self->{'no_rehash'} = $val;
139}
140
141sub set_compress_threshold {
142    my Cache::Memcached $self = shift;
143    my ($thresh) = @_;
144    $self->{'compress_threshold'} = $thresh;
145}
146
147sub enable_compress {
148    my Cache::Memcached $self = shift;
149    my ($enable) = @_;
150    $self->{'compress_enable'} = $enable;
151}
152
153sub forget_dead_hosts {
154    %host_dead = ();
155    @buck2sock = ();
156}
157
158sub set_stat_callback {
159    my Cache::Memcached $self = shift;
160    my ($stat_callback) = @_;
161    $self->{'stat_callback'} = $stat_callback;
162}
163
164my %sock_map;  # stringified-$sock -> "$ip:$port"
165
166sub _dead_sock {
167    my ($sock, $ret, $dead_for) = @_;
168    if (my $ipport = $sock_map{$sock}) {
169        my $now = time();
170        $host_dead{$ipport} = $now + $dead_for
171            if $dead_for;
172        delete $cache_sock{$ipport};
173        delete $sock_map{$sock};
174    }
175    @buck2sock = ();
176    return $ret;  # 0 or undef, probably, depending on what caller wants
177}
178
179sub _close_sock {
180    my ($sock) = @_;
181    if (my $ipport = $sock_map{$sock}) {
182        close $sock;
183        delete $cache_sock{$ipport};
184        delete $sock_map{$sock};
185    }
186    @buck2sock = ();
187}
188
189sub _connect_sock { # sock, sin, timeout
190    my ($sock, $sin, $timeout) = @_;
191    $timeout = 0.25 if not defined $timeout;
192
193    # make the socket non-blocking from now on,
194    # except if someone wants 0 timeout, meaning
195    # a blocking connect, but even then turn it
196    # non-blocking at the end of this function
197
198    if ($timeout) {
199        IO::Handle::blocking($sock, 0);
200    } else {
201        IO::Handle::blocking($sock, 1);
202    }
203
204    my $ret = connect($sock, $sin);
205
206    if (!$ret && $timeout && $!==EINPROGRESS) {
207
208        my $win='';
209        vec($win, fileno($sock), 1) = 1;
210
211        if (select(undef, $win, undef, $timeout) > 0) {
212            $ret = connect($sock, $sin);
213            # EISCONN means connected & won't re-connect, so success
214            $ret = 1 if !$ret && $!==EISCONN;
215        }
216    }
217
218    unless ($timeout) { # socket was temporarily blocking, now revert
219        IO::Handle::blocking($sock, 0);
220    }
221
222    # from here on, we use non-blocking (async) IO for the duration
223    # of the socket's life
224
225    return $ret;
226}
227
228sub sock_to_host { # (host)
229    my Cache::Memcached $self = ref $_[0] ? shift : undef;
230    my $host = $_[0];
231    return $cache_sock{$host} if $cache_sock{$host};
232
233    my $now = time();
234    my ($ip, $port) = $host =~ /(.*):(\d+)/;
235    return undef if
236        $host_dead{$host} && $host_dead{$host} > $now;
237    my $sock;
238
239    my $connected = 0;
240    my $sin;
241    my $proto = $PROTO_TCP ||= getprotobyname('tcp');
242
243    if ( index($host, '/') != 0 )
244    {
245        # if a preferred IP is known, try that first.
246        if ($self && $self->{pref_ip}{$ip}) {
247            socket($sock, PF_INET, SOCK_STREAM, $proto);
248            my $prefip = $self->{pref_ip}{$ip};
249            $sin = Socket::sockaddr_in($port,Socket::inet_aton($prefip));
250            if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
251                $connected = 1;
252            } else {
253                if (my $cb = $self->{cb_connect_fail}) {
254                    $cb->($prefip);
255                }
256                close $sock;
257            }
258        }
259
260        # normal path, or fallback path if preferred IP failed
261        unless ($connected) {
262            socket($sock, PF_INET, SOCK_STREAM, $proto);
263            $sin = Socket::sockaddr_in($port,Socket::inet_aton($ip));
264            my $timeout = $self ? $self->{connect_timeout} : 0.25;
265            unless (_connect_sock($sock,$sin,$timeout)) {
266                my $cb = $self ? $self->{cb_connect_fail} : undef;
267                $cb->($ip) if $cb;
268                return _dead_sock($sock, undef, 20 + int(rand(10)));
269            }
270        }
271    } else { # it's a unix domain/local socket
272        socket($sock, PF_UNIX, SOCK_STREAM, 0);
273        $sin = Socket::sockaddr_un($host);
274        my $timeout = $self ? $self->{connect_timeout} : 0.25;
275        unless (_connect_sock($sock,$sin,$timeout)) {
276            my $cb = $self ? $self->{cb_connect_fail} : undef;
277            $cb->($host) if $cb;
278            return _dead_sock($sock, undef, 20 + int(rand(10)));
279        }
280    }
281
282    # make the new socket not buffer writes.
283    my $old = select($sock);
284    $| = 1;
285    select($old);
286
287    $sock_map{$sock} = $host;
288    $cache_sock{$host} = $sock;
289
290    return $sock;
291}
292
293sub get_sock { # (key)
294    my Cache::Memcached $self = $_[0];
295    my $key = $_[1];
296    return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
297    return undef unless $self->{'active'};
298    my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
299
300    my $real_key = ref $key ? $key->[1] : $key;
301    my $tries = 0;
302    while ($tries++ < 20) {
303        my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
304        my $sock = $self->sock_to_host($host);
305        return $sock if $sock;
306        return undef if $self->{'no_rehash'};
307        $hv += _hashfunc($tries . $real_key);  # stupid, but works
308    }
309    return undef;
310}
311
312sub init_buckets {
313    my Cache::Memcached $self = shift;
314    return if $self->{'buckets'};
315    my $bu = $self->{'buckets'} = [];
316    foreach my $v (@{$self->{'servers'}}) {
317        if (ref $v eq "ARRAY") {
318            for (1..$v->[1]) { push @$bu, $v->[0]; }
319        } else {
320            push @$bu, $v;
321        }
322    }
323    $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
324}
325
326sub disconnect_all {
327    my $sock;
328    foreach $sock (values %cache_sock) {
329        close $sock;
330    }
331    %cache_sock = ();
332}
333
334# writes a line, then reads result.  by default stops reading after a
335# single line, but caller can override the $check_complete subref,
336# which gets passed a scalarref of buffer read thus far.
337sub _write_and_read {
338    my Cache::Memcached $self = shift;
339    my ($sock, $line, $check_complete) = @_;
340    my $res;
341    my ($ret, $offset) = (undef, 0);
342
343    $check_complete ||= sub {
344        return (rindex($ret, "\r\n") + 2 == length($ret));
345    };
346
347    # state: 0 - writing, 1 - reading, 2 - done
348    my $state = 0;
349
350    # the bitsets for select
351    my ($rin, $rout, $win, $wout);
352    my $nfound;
353
354    my $copy_state = -1;
355    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
356
357    # the select loop
358    while(1) {
359        if ($copy_state!=$state) {
360            last if $state==2;
361            ($rin, $win) = ('', '');
362            vec($rin, fileno($sock), 1) = 1 if $state==1;
363            vec($win, fileno($sock), 1) = 1 if $state==0;
364            $copy_state = $state;
365        }
366        $nfound = select($rout=$rin, $wout=$win, undef,
367                         $self->{'select_timeout'});
368        last unless $nfound;
369
370        if (vec($wout, fileno($sock), 1)) {
371            $res = send($sock, $line, $FLAG_NOSIGNAL);
372            next
373                if not defined $res and $!==EWOULDBLOCK;
374            unless ($res > 0) {
375                _close_sock($sock);
376                return undef;
377            }
378            if ($res == length($line)) { # all sent
379                $state = 1;
380            } else { # we only succeeded in sending some of it
381                substr($line, 0, $res, ''); # delete the part we sent
382            }
383        }
384
385        if (vec($rout, fileno($sock), 1)) {
386            $res = sysread($sock, $ret, 255, $offset);
387            next
388                if !defined($res) and $!==EWOULDBLOCK;
389            if ($res == 0) { # catches 0=conn closed or undef=error
390                _close_sock($sock);
391                return undef;
392            }
393            $offset += $res;
394            $state = 2 if $check_complete->(\$ret);
395        }
396    }
397
398    unless ($state == 2) {
399        _dead_sock($sock); # improperly finished
400        return undef;
401    }
402
403    return $ret;
404}
405
406sub delete {
407    my Cache::Memcached $self = shift;
408    my ($key, $time) = @_;
409    return 0 if ! $self->{'active'} || $self->{'readonly'};
410    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
411    my $sock = $self->get_sock($key);
412    return 0 unless $sock;
413
414    $self->{'stats'}->{"delete"}++;
415    $key = ref $key ? $key->[1] : $key;
416    $time = $time ? " $time" : "";
417    my $cmd = "delete $self->{namespace}$key$time\r\n";
418    my $res = _write_and_read($self, $sock, $cmd);
419
420    if ($self->{'stat_callback'}) {
421        my $etime = Time::HiRes::time();
422        $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
423    }
424
425    return defined $res && $res eq "DELETED\r\n";
426}
427*remove = \&delete;
428
429sub add {
430    _set("add", @_);
431}
432
433sub replace {
434    _set("replace", @_);
435}
436
437sub set {
438    _set("set", @_);
439}
440
441sub _set {
442    my $cmdname = shift;
443    my Cache::Memcached $self = shift;
444    my ($key, $val, $exptime) = @_;
445    carp "value for memkey:$key is not defined" unless (defined $val);
446    return 0 if ! $self->{'active'} || $self->{'readonly'};
447    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
448    my $sock = $self->get_sock($key);
449    return 0 unless $sock;
450
451    use bytes; # return bytes from length()
452
453    $self->{'stats'}->{$cmdname}++;
454    my $flags = 0;
455    $key = ref $key ? $key->[1] : $key;
456
457    if (ref $val) {
458        local $Carp::CarpLevel = 2;
459        $val = Storable::nfreeze($val);
460        $flags |= F_STORABLE;
461    }
462
463    my $len = length($val);
464
465    if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
466        $len >= $self->{'compress_threshold'}) {
467
468        my $c_val = Compress::Zlib::memGzip($val);
469        my $c_len = length($c_val);
470
471        # do we want to keep it?
472        if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
473            $val = $c_val;
474            $len = $c_len;
475            $flags |= F_COMPRESS;
476        }
477    }
478
479    $exptime = int($exptime || 0);
480
481    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
482    my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
483
484    my $res = _write_and_read($self, $sock, $line);
485
486    if ($self->{'debug'} && $line) {
487        chop $line; chop $line;
488        print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
489    }
490
491    if ($self->{'stat_callback'}) {
492        my $etime = Time::HiRes::time();
493        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
494    }
495
496    return defined $res && $res eq "STORED\r\n";
497}
498
499sub incr {
500    _incrdecr("incr", @_);
501}
502
503sub decr {
504    _incrdecr("decr", @_);
505}
506
507sub _incrdecr {
508    my $cmdname = shift;
509    my Cache::Memcached $self = shift;
510    my ($key, $value) = @_;
511    return undef if ! $self->{'active'} || $self->{'readonly'};
512    my $stime = Time::HiRes::time() if $self->{'stat_callback'};
513    my $sock = $self->get_sock($key);
514    return undef unless $sock;
515    $key = $key->[1] if ref $key;
516    $self->{'stats'}->{$cmdname}++;
517    $value = 1 unless defined $value;
518
519    my $line = "$cmdname $self->{namespace}$key $value\r\n";
520    my $res = _write_and_read($self, $sock, $line);
521
522    if ($self->{'stat_callback'}) {
523        my $etime = Time::HiRes::time();
524        $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
525    }
526
527    return undef unless defined $res && $res =~ /^(\d+)/;
528    return $1;
529}
530
531sub get {
532    my Cache::Memcached $self = $_[0];
533    my $key = $_[1];
534
535    # TODO: make a fast path for this?  or just keep using get_multi?
536    my $r = $self->get_multi($key);
537    my $kval = ref $key ? $key->[1] : $key;
538    return $r->{$kval};
539}
540
541sub get_multi {
542    my Cache::Memcached $self = shift;
543    return {} unless $self->{'active'};
544    $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
545    $self->{'stats'}->{"get_multi"}++;
546
547    my %val;        # what we'll be returning a reference to (realkey -> value)
548    my %sock_keys;  # sockref_as_scalar -> [ realkeys ]
549    my $sock;
550
551    if ($self->{'_single_sock'}) {
552        $sock = $self->sock_to_host($self->{'_single_sock'});
553        unless ($sock) {
554            return {};
555        }
556        foreach my $key (@_) {
557            my $kval = ref $key ? $key->[1] : $key;
558            push @{$sock_keys{$sock}}, $kval;
559        }
560    } else {
561        my $bcount = $self->{'bucketcount'};
562        my $sock;
563      KEY:
564        foreach my $key (@_) {
565            my ($hv, $real_key) = ref $key ?
566                (int($key->[0]),               $key->[1]) :
567                ((crc32($key) >> 16) & 0x7fff, $key);
568
569            my $tries;
570            while (1) {
571                my $bucket = $hv % $bcount;
572
573                # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
574                #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
575                #    and last;
576
577                # but this variant doesn't crash:
578                $sock = $buck2sock[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
579                if ($sock) {
580                    $buck2sock[$bucket] = $sock;
581                    last;
582                }
583
584                next KEY if $tries++ >= 20;
585                $hv += _hashfunc($tries . $real_key);
586            }
587
588            push @{$sock_keys{$sock}}, $real_key;
589        }
590    }
591
592    $self->{'stats'}->{"get_keys"} += @_;
593    $self->{'stats'}->{"get_socks"} += keys %sock_keys;
594
595    local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
596
597    _load_multi($self, \%sock_keys, \%val);
598
599    if ($self->{'debug'}) {
600        while (my ($k, $v) = each %val) {
601            print STDERR "MemCache: got $k = $v\n";
602        }
603    }
604    return \%val;
605}
606
607sub _load_multi {
608    use bytes; # return bytes from length()
609    my Cache::Memcached $self;
610    my ($sock_keys, $ret);
611
612    ($self, $sock_keys, $ret) = @_;
613
614    # all keyed by $sockstr:
615    my %reading; # $sockstr -> $sock.  bool, whether we're reading from this socket
616    my %writing; # $sockstr -> $sock.  bool, whether we're writing to this socket
617    my %buf;     # buffers, for writing
618
619    my %parser;  # $sockstr -> Cache::Memcached::GetParser
620
621    my $active_changed = 1; # force rebuilding of select sets
622
623    my $dead = sub {
624        my $sock = shift;
625        print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
626        delete $reading{$sock};
627        delete $writing{$sock};
628
629        if (my $p = $parser{$sock}) {
630            my $key = $p->current_key;
631            delete $ret->{$key} if $key;
632        }
633
634        if ($self->{'stat_callback'}) {
635            my $etime = Time::HiRes::time();
636            $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
637        }
638
639        close $sock;
640        _dead_sock($sock);
641    };
642
643    # $finalize->($key, $flags)
644    # $finalize->({ $key => $flags, $key => $flags });
645    my $finalize = sub {
646        my $map = $_[0];
647        $map = {@_} unless ref $map;
648
649        while (my ($k, $flags) = each %$map) {
650
651            # remove trailing \r\n
652            chop $ret->{$k}; chop $ret->{$k};
653
654            $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
655                if $HAVE_ZLIB && $flags & F_COMPRESS;
656            if ($flags & F_STORABLE) {
657                # wrapped in eval in case a perl 5.6 Storable tries to
658                # unthaw data from a perl 5.8 Storable.  (5.6 is stupid
659                # and dies if the version number changes at all.  in 5.8
660                # they made it only die if it unencounters a new feature)
661                eval {
662                    $ret->{$k} = Storable::thaw($ret->{$k});
663                };
664                # so if there was a problem, just treat it as a cache miss.
665                if ($@) {
666                    delete $ret->{$k};
667                }
668            }
669        }
670    };
671
672    foreach (keys %$sock_keys) {
673        my $ipport = $sock_map{$_}        or die "No map found matching for $_";
674        my $sock   = $cache_sock{$ipport} or die "No sock found for $ipport";
675        print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
676        $writing{$_} = $sock;
677        if ($self->{namespace}) {
678            $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
679        } else {
680            $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
681        }
682
683        $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
684    }
685
686    my $read = sub {
687        my $sockstr = "$_[0]";  # $sock is $_[0];
688        my $p = $parser{$sockstr} or die;
689        my $rv = $p->parse_from_sock($_[0]);
690        if ($rv > 0) {
691            # okay, finished with this socket
692            delete $reading{$sockstr};
693        } elsif ($rv < 0) {
694            $dead->($_[0]);
695        }
696        return $rv;
697    };
698
699    # returns 1 when it's done, for success or error.  0 if still working.
700    my $write = sub {
701        my ($sock, $sockstr) = ($_[0], "$_[0]");
702        my $res;
703
704        $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
705
706        return 0
707            if not defined $res and $!==EWOULDBLOCK;
708        unless ($res > 0) {
709            $dead->($sock);
710            return 1;
711        }
712        if ($res == length($buf{$sockstr})) { # all sent
713            $buf{$sockstr} = "";
714
715            # switch the socket from writing to reading
716            delete $writing{$sockstr};
717            $reading{$sockstr} = $sock;
718            return 1;
719        } else { # we only succeeded in sending some of it
720            substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
721        }
722        return 0;
723    };
724
725    # the bitsets for select
726    my ($rin, $rout, $win, $wout);
727    my $nfound;
728
729    # the big select loop
730    while(1) {
731        if ($active_changed) {
732            last unless %reading or %writing; # no sockets left?
733            ($rin, $win) = ('', '');
734            foreach (values %reading) {
735                vec($rin, fileno($_), 1) = 1;
736            }
737            foreach (values %writing) {
738                vec($win, fileno($_), 1) = 1;
739            }
740            $active_changed = 0;
741        }
742        # TODO: more intelligent cumulative timeout?
743        # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
744        $nfound = select($rout=$rin, $wout=$win, undef,
745                         $self->{'select_timeout'});
746        last unless $nfound;
747
748        # TODO: possible robustness improvement: we could select
749        # writing sockets for reading also, and raise hell if they're
750        # ready (input unread from last time, etc.)
751        # maybe do that on the first loop only?
752        foreach (values %writing) {
753            if (vec($wout, fileno($_), 1)) {
754                $active_changed = 1 if $write->($_);
755            }
756        }
757        foreach (values %reading) {
758            if (vec($rout, fileno($_), 1)) {
759                $active_changed = 1 if $read->($_);
760            }
761        }
762    }
763
764    # if there're active sockets left, they need to die
765    foreach (values %writing) {
766        $dead->($_);
767    }
768    foreach (values %reading) {
769        $dead->($_);
770    }
771
772    return;
773}
774
775sub _hashfunc {
776    return (crc32($_[0]) >> 16) & 0x7fff;
777}
778
779sub flush_all {
780    my Cache::Memcached $self = shift;
781
782    my $success = 1;
783
784    my @hosts = @{$self->{'buckets'}};
785    foreach my $host (@hosts) {
786        my $sock = $self->sock_to_host($host);
787        my @res = $self->run_command($sock, "flush_all\r\n");
788        $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
789    }
790
791    return $success;
792}
793
794# returns array of lines, or () on failure.
795sub run_command {
796    my Cache::Memcached $self = shift;
797    my ($sock, $cmd) = @_;
798    return () unless $sock;
799    my $ret;
800    my $line = $cmd;
801    while (my $res = _write_and_read($self, $sock, $line)) {
802        undef $line;
803        $ret .= $res;
804        last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
805    }
806    chop $ret; chop $ret;
807    return map { "$_\r\n" } split(/\r\n/, $ret);
808}
809
810sub stats {
811    my Cache::Memcached $self = shift;
812    my ($types) = @_;
813    return 0 unless $self->{'active'};
814    return 0 unless !ref($types) || ref($types) eq 'ARRAY';
815    if (!ref($types)) {
816        if (!$types) {
817            # I don't much care what the default is, it should just
818            # be something reasonable.  Obviously "reset" should not
819            # be on the list :) but other types that might go in here
820            # include maps, cachedump, slabs, or items.
821            $types = [ qw( misc malloc sizes self ) ];
822        } else {
823            $types = [ $types ];
824        }
825    }
826
827    my $stats_hr = { };
828
829    # The "self" stat type is special, it only applies to this very
830    # object.
831    if (grep /^self$/, @$types) {
832        $stats_hr->{'self'} = \%{ $self->{'stats'} };
833    }
834
835    my %misc_keys = map { $_ => 1 }
836      qw/ bytes bytes_read bytes_written
837          cmd_get cmd_set connection_structures curr_items
838          get_hits get_misses
839          total_connections total_items
840        /;
841
842    # Now handle the other types, passing each type to each host server.
843    my @hosts = @{$self->{'buckets'}};
844  HOST: foreach my $host (@hosts) {
845        my $sock = $self->sock_to_host($host);
846      TYPE: foreach my $typename (grep !/^self$/, @$types) {
847            my $type = $typename eq 'misc' ? "" : " $typename";
848            my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
849                my $bref = shift;
850                return $$bref =~ /^(?:END|ERROR)\r?\n/m;
851            });
852            unless ($lines) {
853                _dead_sock($sock);
854                next HOST;
855            }
856
857            $lines =~ s/\0//g;  # 'stats sizes' starts with NULL?
858
859            # And, most lines end in \r\n but 'stats maps' (as of
860            # July 2003 at least) ends in \n. ??
861            my @lines = split(/\r?\n/, $lines);
862
863            # Some stats are key-value, some are not.  malloc,
864            # sizes, and the empty string are key-value.
865            # ("self" was handled separately above.)
866            if ($typename =~ /^(malloc|sizes|misc)$/) {
867                # This stat is key-value.
868                foreach my $line (@lines) {
869                    my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
870                    if ($key) {
871                        $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
872                    }
873                    $stats_hr->{'total'}{$key} += $value
874                        if $typename eq 'misc' && $key && $misc_keys{$key};
875                    $stats_hr->{'total'}{"malloc_$key"} += $value
876                        if $typename eq 'malloc' && $key;
877                }
878            } else {
879                # This stat is not key-value so just pull it
880                # all out in one blob.
881                $lines =~ s/^END\r?\n//m;
882                $stats_hr->{'hosts'}{$host}{$typename} ||= "";
883                $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
884            }
885        }
886    }
887
888    return $stats_hr;
889}
890
891sub stats_reset {
892    my Cache::Memcached $self = shift;
893    my ($types) = @_;
894    return 0 unless $self->{'active'};
895
896  HOST: foreach my $host (@{$self->{'buckets'}}) {
897        my $sock = $self->sock_to_host($host);
898        my $ok = _write_and_read($self, $sock, "stats reset");
899        unless (defined $ok && $ok eq "RESET\r\n") {
900            _dead_sock($sock);
901        }
902    }
903    return 1;
904}
905
9061;
907__END__
908
909=head1 NAME
910
911Cache::Memcached - client library for memcached (memory cache daemon)
912
913=head1 SYNOPSIS
914
915  use Cache::Memcached;
916
917  $memd = new Cache::Memcached {
918    'servers' => [ "10.0.0.15:11211", "10.0.0.15:11212", "/var/sock/memcached",
919                   "10.0.0.17:11211", [ "10.0.0.17:11211", 3 ] ],
920    'debug' => 0,
921    'compress_threshold' => 10_000,
922  };
923  $memd->set_servers($array_ref);
924  $memd->set_compress_threshold(10_000);
925  $memd->enable_compress(0);
926
927  $memd->set("my_key", "Some value");
928  $memd->set("object_key", { 'complex' => [ "object", 2, 4 ]});
929
930  $val = $memd->get("my_key");
931  $val = $memd->get("object_key");
932  if ($val) { print $val->{'complex'}->[2]; }
933
934  $memd->incr("key");
935  $memd->decr("key");
936  $memd->incr("key", 2);
937
938=head1 DESCRIPTION
939
940This is the Perl API for memcached, a distributed memory cache daemon.
941More information is available at:
942
943  http://www.danga.com/memcached/
944
945=head1 CONSTRUCTOR
946
947=over 4
948
949=item C<new>
950
951Takes one parameter, a hashref of options.  The most important key is
952C<servers>, but that can also be set later with the C<set_servers>
953method.  The servers must be an arrayref of hosts, each of which is
954either a scalar of the form C<10.0.0.10:11211> or an arrayref of the
955former and an integer weight value.  (The default weight if
956unspecified is 1.)  It's recommended that weight values be kept as low
957as possible, as this module currently allocates memory for bucket
958distribution proportional to the total host weights.
959
960Use C<compress_threshold> to set a compression threshold, in bytes.
961Values larger than this threshold will be compressed by C<set> and
962decompressed by C<get>.
963
964Use C<no_rehash> to disable finding a new memcached server when one
965goes down.  Your application may or may not need this, depending on
966your expirations and key usage.
967
968Use C<readonly> to disable writes to backend memcached servers.  Only
969get and get_multi will work.  This is useful in bizarre debug and
970profiling cases only.
971
972Use C<namespace> to prefix all keys with the provided namespace value.
973That is, if you set namespace to "app1:" and later do a set of "foo"
974to "bar", memcached is actually seeing you set "app1:foo" to "bar".
975
976The other useful key is C<debug>, which when set to true will produce
977diagnostics on STDERR.
978
979=back
980
981=head1 METHODS
982
983=over 4
984
985=item C<set_servers>
986
987Sets the server list this module distributes key gets and sets between.
988The format is an arrayref of identical form as described in the C<new>
989constructor.
990
991=item C<set_debug>
992
993Sets the C<debug> flag.  See C<new> constructor for more information.
994
995=item C<set_readonly>
996
997Sets the C<readonly> flag.  See C<new> constructor for more information.
998
999=item C<set_norehash>
1000
1001Sets the C<no_rehash> flag.  See C<new> constructor for more information.
1002
1003=item C<set_compress_threshold>
1004
1005Sets the compression threshold. See C<new> constructor for more information.
1006
1007=item C<enable_compress>
1008
1009Temporarily enable or disable compression.  Has no effect if C<compress_threshold>
1010isn't set, but has an overriding effect if it is.
1011
1012=item C<get>
1013
1014my $val = $memd->get($key);
1015
1016Retrieves a key from the memcache.  Returns the value (automatically
1017thawed with Storable, if necessary) or undef.
1018
1019The $key can optionally be an arrayref, with the first element being the
1020hash value, if you want to avoid making this module calculate a hash
1021value.  You may prefer, for example, to keep all of a given user's
1022objects on the same memcache server, so you could use the user's
1023unique id as the hash value.
1024
1025=item C<get_multi>
1026
1027my $hashref = $memd->get_multi(@keys);
1028
1029Retrieves multiple keys from the memcache doing just one query.
1030Returns a hashref of key/value pairs that were available.
1031
1032This method is recommended over regular 'get' as it lowers the number
1033of total packets flying around your network, reducing total latency,
1034since your app doesn't have to wait for each round-trip of 'get'
1035before sending the next one.
1036
1037=item C<set>
1038
1039$memd->set($key, $value[, $exptime]);
1040
1041Unconditionally sets a key to a given value in the memcache.  Returns true
1042if it was stored successfully.
1043
1044The $key can optionally be an arrayref, with the first element being the
1045hash value, as described above.
1046
1047The $exptime (expiration time) defaults to "never" if unspecified.  If
1048you want the key to expire in memcached, pass an integer $exptime.  If
1049value is less than 60*60*24*30 (30 days), time is assumed to be relative
1050from the present.  If larger, it's considered an absolute Unix time.
1051
1052=item C<add>
1053
1054$memd->add($key, $value[, $exptime]);
1055
1056Like C<set>, but only stores in memcache if the key doesn't already exist.
1057
1058=item C<replace>
1059
1060$memd->replace($key, $value[, $exptime]);
1061
1062Like C<set>, but only stores in memcache if the key already exists.  The
1063opposite of C<add>.
1064
1065=item C<delete>
1066
1067$memd->delete($key[, $time]);
1068
1069Deletes a key.  You may optionally provide an integer time value (in seconds) to
1070tell the memcached server to block new writes to this key for that many seconds.
1071(Sometimes useful as a hacky means to prevent races.)  Returns true if key
1072was found and deleted, and false otherwise.
1073
1074You may also use the alternate method name B<remove>, so
1075Cache::Memcached looks like the L<Cache::Cache> API.
1076
1077=item C<incr>
1078
1079$memd->incr($key[, $value]);
1080
1081Sends a command to the server to atomically increment the value for
1082$key by $value, or by 1 if $value is undefined.  Returns undef if $key
1083doesn't exist on server, otherwise it returns the new value after
1084incrementing.  Value should be zero or greater.  Overflow on server
1085is not checked.  Be aware of values approaching 2**32.  See decr.
1086
1087=item C<decr>
1088
1089$memd->decr($key[, $value]);
1090
1091Like incr, but decrements.  Unlike incr, underflow is checked and new
1092values are capped at 0.  If server value is 1, a decrement of 2
1093returns 0, not -1.
1094
1095=item C<stats>
1096
1097$memd->stats([$keys]);
1098
1099Returns a hashref of statistical data regarding the memcache server(s),
1100the $memd object, or both.  $keys can be an arrayref of keys wanted, a
1101single key wanted, or absent (in which case the default value is malloc,
1102sizes, self, and the empty string).  These keys are the values passed
1103to the 'stats' command issued to the memcached server(s), except for
1104'self' which is internal to the $memd object.  Allowed values are:
1105
1106=over 4
1107
1108=item C<misc>
1109
1110The stats returned by a 'stats' command:  pid, uptime, version,
1111bytes, get_hits, etc.
1112
1113=item C<malloc>
1114
1115The stats returned by a 'stats malloc':  total_alloc, arena_size, etc.
1116
1117=item C<sizes>
1118
1119The stats returned by a 'stats sizes'.
1120
1121=item C<self>
1122
1123The stats for the $memd object itself (a copy of $memd->{'stats'}).
1124
1125=item C<maps>
1126
1127The stats returned by a 'stats maps'.
1128
1129=item C<cachedump>
1130
1131The stats returned by a 'stats cachedump'.
1132
1133=item C<slabs>
1134
1135The stats returned by a 'stats slabs'.
1136
1137=item C<items>
1138
1139The stats returned by a 'stats items'.
1140
1141=back
1142
1143=item C<disconnect_all>
1144
1145$memd->disconnect_all;
1146
1147Closes all cached sockets to all memcached servers.  You must do this
1148if your program forks and the parent has used this module at all.
1149Otherwise the children will try to use cached sockets and they'll fight
1150(as children do) and garble the client/server protocol.
1151
1152=item C<flush_all>
1153
1154$memd->flush_all;
1155
1156Runs the memcached "flush_all" command on all configured hosts,
1157emptying all their caches.  (or rather, invalidating all items
1158in the caches in an O(1) operation...)  Running stats will still
1159show the item existing, they're just be non-existent and lazily
1160destroyed next time you try to detch any of them.
1161
1162=back
1163
1164=head1 BUGS
1165
1166When a server goes down, this module does detect it, and re-hashes the
1167request to the remaining servers, but the way it does it isn't very
1168clean.  The result may be that it gives up during its rehashing and
1169refuses to get/set something it could've, had it been done right.
1170
1171=head1 COPYRIGHT
1172
1173This module is Copyright (c) 2003 Brad Fitzpatrick.
1174All rights reserved.
1175
1176You may distribute under the terms of either the GNU General Public
1177License or the Artistic License, as specified in the Perl README file.
1178
1179=head1 WARRANTY
1180
1181This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1182
1183=head1 FAQ
1184
1185See the memcached website:
1186   http://www.danga.com/memcached/
1187
1188=head1 AUTHORS
1189
1190Brad Fitzpatrick <brad@danga.com>
1191
1192Anatoly Vorobey <mellon@pobox.com>
1193
1194Brad Whitaker <whitaker@danga.com>
1195
1196Jamie McCarthy <jamie@mccarthy.vg>
Note: See TracBrowser for help on using the browser.