root/trunk/lib/Perlbal/Socket.pm

Revision 774, 11.9 kB (checked in by ask, 17 months ago)

Lots of typo corrections in documentation and comments from Nick Andrew

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# Base class for all socket types
2#
3# Copyright 2004, Danga Interactive, Inc.
4# Copyright 2005-2007, Six Apart, Ltd.
5
6package Perlbal::Socket;
7use strict;
8use warnings;
9no  warnings qw(deprecated);
10
11use Perlbal::HTTPHeaders;
12
13use Sys::Syscall;
14use POSIX ();
15
16use Danga::Socket 1.44;
17use base 'Danga::Socket';
18
19use fields (
20            'headers_string',  # headers as they're being read
21
22            'req_headers',     # the final Perlbal::HTTPHeaders object inbound
23            'res_headers',     # response headers outbound (Perlbal::HTTPHeaders object)
24
25            'create_time',     # creation time
26            'alive_time',      # last time noted alive
27            'state',           # general purpose state; used by descendants.
28            'do_die',          # if on, die and do no further requests
29
30            'read_buf',        # arrayref of scalarref read from client
31            'read_ahead',      # bytes sitting in read_buf
32            'read_size',       # total bytes read from client, ever
33
34            'ditch_leading_rn', # if true, the next header parsing will ignore a leading \r\n
35
36            'observed_ip_string', # if defined, contains the observed IP string of the peer
37                                  # we're serving. this is intended for hoding the value of
38                                  # the X-Forwarded-For and using it to govern ACLs.
39            );
40
41use constant MAX_HTTP_HEADER_LENGTH => 102400;  # 100k, arbitrary
42
43use constant TRACK_OBJECTS => 0;            # see @created_objects below
44if (TRACK_OBJECTS) {
45    use Scalar::Util qw(weaken isweak);
46}
47
48# kick-off one cleanup
49_do_cleanup();
50
51our %state_changes = (); # { "objref" => [ state, state, state, ... ] }
52our $last_callbacks = 0; # time last ran callbacks
53our $callbacks = []; # [ [ time, subref ], [ time, subref ], ... ]
54
55# this one deserves its own section.  we keep track of every Perlbal::Socket object
56# created if the TRACK_OBJECTS constant is on.  we use weakened references, though,
57# so this list will hopefully contain mostly undefs.  users can ask for this list if
58# they want to work with it via the get_created_objects_ref function.
59our @created_objects; # ( $ref, $ref, $ref ... )
60our $last_co_cleanup = 0; # clean the list every few seconds
61
62sub get_statechange_ref {
63    return \%state_changes;
64}
65
66sub get_created_objects_ref {
67    return \@created_objects;
68}
69
70sub write_debuggy {
71    my $self = shift;
72
73    my $cref = $_[0];
74    my $content = ref $cref eq "SCALAR" ? $$cref : $cref;
75    my $clen = defined $content ? length($content) : "undef";
76    $content = substr($content, 0, 17) . "..." if defined $content && $clen > 30;
77    my ($pkg, $filename, $line) = caller;
78    print "write($self, <$clen>\"$content\") from ($pkg, $filename, $line)\n" if Perlbal::DEBUG >= 4;
79    $self->SUPER::write(@_);
80}
81
82if (Perlbal::DEBUG >= 4) {
83    *write = \&write_debuggy;
84}
85
86sub new {
87    my Perlbal::Socket $self = shift;
88    $self = fields::new( $self ) unless ref $self;
89
90    Perlbal::objctor($self);
91
92    $self->SUPER::new( @_ );
93    $self->{headers_string} = '';
94    $self->{state} = undef;
95    $self->{do_die} = 0;
96
97    $self->{read_buf} = [];        # arrayref of scalar refs of bufs read from client
98    $self->{read_ahead} = 0;       # bytes sitting in read_buf
99    $self->{read_size} = 0;        # total bytes read from client
100
101    my $now = time;
102    $self->{alive_time} = $self->{create_time} = $now;
103
104    # now put this item in the list of created objects
105    if (TRACK_OBJECTS) {
106        # clean the created objects list if necessary
107        if ($last_co_cleanup < $now - 5) {
108            # remove out undefs, because those are natural byproducts of weakening
109            # references
110            @created_objects = grep { $_ } @created_objects;
111
112            # however, the grep turned our weak references back into strong ones, so
113            # we have to re-weaken them
114            weaken($_) foreach @created_objects;
115
116            # we've cleaned up at this point
117            $last_co_cleanup = $now;
118        }
119
120        # now add this one to our cleaned list and weaken it
121        push @created_objects, $self;
122        weaken($created_objects[-1]);
123    }
124
125    return $self;
126}
127
128# FIXME: this doesn't scale in theory, but it might use less CPU in
129# practice than using the Heap:: modules and manipulating the
130# expirations all the time, thus doing things properly
131# algorithmically.  and this is definitely less work, so it's worth
132# a try.
133sub _do_cleanup {
134    my $sf = Perlbal::Socket->get_sock_ref;
135
136    my $now = time;
137
138    my @to_close;
139    while (my $k = each %$sf) {
140        my Perlbal::Socket $v = $sf->{$k};
141
142        my $max_age = eval { $v->max_idle_time } || 0;
143        next unless $max_age;
144
145        if ($v->{alive_time} < $now - $max_age) {
146            push @to_close, $v;
147        }
148    }
149
150    foreach my $sock (@to_close) {
151        $sock->close("perlbal_timeout")
152    }
153
154    Danga::Socket->AddTimer(5, \&_do_cleanup);
155}
156
157# CLASS METHOD: given a delay (in seconds) and a subref, this will call
158# that subref in AT LEAST delay seconds. if the subref returns 0, the
159# callback is discarded, but if it returns a positive number, the callback
160# is pushed onto the callback stack to be called again in at least that
161# many seconds.
162sub register_callback {
163    # adds a new callback to our list
164    my ($delay, $subref) = @_;
165    push @$callbacks, [ time + $delay, $subref ];
166    return 1;
167}
168
169# CLASS METHOD: runs through the list of registered callbacks and executes
170# any that need to be executed
171# FIXME: this doesn't scale.  need a heap.
172sub run_callbacks {
173    my $now = time;
174    return if $last_callbacks == $now;
175    $last_callbacks = $now;
176
177    my @destlist = ();
178    foreach my $ref (@$callbacks) {
179        # if their time is <= now...
180        if ($ref->[0] <= $now) {
181            # find out if they want to run again...
182            my $rv = $ref->[1]->();
183
184            # and if they do, push onto list...
185            push @destlist, [ $rv + $now, $ref->[1] ]
186                if defined $rv && $rv > 0;
187        } else {
188            # not time for this one, just shove it
189            push @destlist, $ref;
190        }
191    }
192    $callbacks = \@destlist;
193}
194
195# CLASS METHOD:
196# default is for sockets to never time out.  classes
197# can override.
198sub max_idle_time { 0; }
199
200# Socket: specific to HTTP socket types (only here and not in
201# ClientHTTPBase because ClientManage wants it too)
202sub read_request_headers  { read_headers($_[0], 0); }
203sub read_response_headers { read_headers($_[0], 1); }
204sub read_headers {
205    my Perlbal::Socket $self = shift;
206    my $is_res = shift;
207    print "Perlbal::Socket::read_headers($self) is_res=$is_res\n" if Perlbal::DEBUG >= 2;
208
209    my $sock = $self->{sock};
210
211    my $to_read = MAX_HTTP_HEADER_LENGTH - length($self->{headers_string});
212
213    my $bref = $self->read($to_read);
214    unless (defined $bref) {
215        # client disconnected
216        print "  client disconnected\n" if Perlbal::DEBUG >= 3;
217        return $self->close('remote_closure');
218    }
219
220    $self->{headers_string} .= $$bref;
221    my $idx = index($self->{headers_string}, "\r\n\r\n");
222    my $delim_len = 4;
223
224    # can't find the header delimiter? check for LFLF header delimiter.
225    if ($idx == -1) {
226        $idx = index($self->{headers_string}, "\n\n");
227        $delim_len = 2;
228    }
229    # still can't find the header delimiter?
230    if ($idx == -1) {
231
232        # usually we get the headers all in one packet (one event), so
233        # if we get in here, that means it's more than likely the
234        # extra \r\n and if we clean it now (throw it away), then we
235        # can avoid a regexp later on.
236        if ($self->{ditch_leading_rn} && $self->{headers_string} eq "\r\n") {
237            print "  throwing away leading \\r\\n\n" if Perlbal::DEBUG >= 3;
238            $self->{ditch_leading_rn} = 0;
239            $self->{headers_string}   = "";
240            return 0;
241        }
242
243        print "  can't find end of headers\n" if Perlbal::DEBUG >= 3;
244        $self->close('long_headers')
245            if length($self->{headers_string}) >= MAX_HTTP_HEADER_LENGTH;
246        return 0;
247    }
248
249    my $hstr = substr($self->{headers_string}, 0, $idx);
250    print "  pre-parsed headers: [$hstr]\n" if Perlbal::DEBUG >= 3;
251
252    my $extra = substr($self->{headers_string}, $idx+$delim_len);
253    if (my $len = length($extra)) {
254        print "  pushing back $len bytes after header\n" if Perlbal::DEBUG >= 3;
255        $self->push_back_read(\$extra);
256    }
257
258    # some browsers send an extra \r\n after their POST bodies that isn't
259    # in their content-length.  a base class can tell us when they're
260    # on their 2nd+ request after a POST and tell us to be ready for that
261    # condition, and we'll clean it up
262    $hstr =~ s/^\r\n// if $self->{ditch_leading_rn};
263
264    unless (($is_res ? $self->{res_headers} : $self->{req_headers}) =
265                Perlbal::HTTPHeaders->new(\$hstr, $is_res)) {
266        # bogus headers?  close connection.
267        print "  bogus headers\n" if Perlbal::DEBUG >= 3;
268        return $self->close("parse_header_failure");
269    }
270
271    print "  got valid headers\n" if Perlbal::DEBUG >= 3;
272
273    $Perlbal::reqs++ unless $is_res;
274    $self->{ditch_leading_rn} = 0;
275
276    return $is_res ? $self->{res_headers} : $self->{req_headers};
277}
278
279### METHOD: drain_read_buf_to( $destination )
280### Write read-buffered data (if any) from the receiving object to the
281### I<destination> object.
282sub drain_read_buf_to {
283    my ($self, $dest) = @_;
284    return unless $self->{read_ahead};
285
286    while (my $bref = shift @{$self->{read_buf}}) {
287        print "draining readbuf from $self to $dest: [$$bref]\n" if Perlbal::DEBUG >= 3;
288        $dest->write($bref);
289        $self->{read_ahead} -= length($$bref);
290    }
291}
292
293### METHOD: die_gracefully()
294### By default, if we're in persist_wait state, close.  Else, ignore.  Children
295### can override if they want to do some other processing.
296sub die_gracefully {
297    my Perlbal::Socket $self = $_[0];
298    if ($self->state eq 'persist_wait') {
299        $self->close('graceful_shutdown');
300    }
301    $self->{do_die} = 1;
302}
303
304### METHOD: write()
305### Overridden from Danga::Socket to update our alive time on successful writes
306### Stops sockets from being closed on long-running write operations
307sub write {
308    my $self = shift;
309
310    my $ret;
311    if ($ret = $self->SUPER::write(@_)) {
312        # Mark this socket alive so we don't time out
313        $self->{alive_time} = $Perlbal::tick_time;
314    }
315   
316    return $ret;
317}
318
319### METHOD: close()
320### Set our state when we get closed.
321sub close {
322    my Perlbal::Socket $self = $_[0];
323    $self->state('closed');
324    return $self->SUPER::close($_[1]);
325}
326
327### METHOD: state()
328### If you pass a parameter, sets the state, else returns it.
329sub state {
330    my Perlbal::Socket $self = shift;
331    return $self->{state} unless @_;
332
333    push @{$state_changes{"$self"} ||= []}, $_[0] if Perlbal::TRACK_STATES;
334    return $self->{state} = $_[0];
335}
336
337sub observed_ip_string {
338    my Perlbal::Socket $self = shift;
339
340    if (@_) {
341        return $self->{observed_ip_string} = $_[0];
342    } else {
343        return $self->{observed_ip_string};
344    }
345}
346
347sub as_string_html {
348    my Perlbal::Socket $self = shift;
349    return $self->SUPER::as_string;
350}
351
352sub DESTROY {
353    my Perlbal::Socket $self = shift;
354    delete $state_changes{"$self"} if Perlbal::TRACK_STATES;
355    Perlbal::objdtor($self);
356}
357
358# package function (not a method).  returns bytes sent, or -1 on error.
359our $sf_defined = Sys::Syscall::sendfile_defined;
360our $max_sf_readwrite = 128 * 1024;
361sub sendfile {
362    my ($sfd, $fd, $bytes) = @_;
363    return Sys::Syscall::sendfile($sfd, $fd, $bytes) if $sf_defined;
364
365    # no support for sendfile.  ghetto version:  read and write.
366    my $buf;
367    $bytes = $max_sf_readwrite if $bytes > $max_sf_readwrite;
368
369    my $rv = POSIX::read($fd, $buf, $bytes);
370    return -1 unless defined $rv;
371    return -1 unless $rv == $bytes;
372
373    my $wv = POSIX::write($sfd, $buf, $rv);
374    return -1 unless defined $wv;
375
376    if (my $over_read = $rv - $wv) {
377        POSIX::lseek($fd, -$over_read, &POSIX::SEEK_CUR);
378    }
379
380    return $wv;
381}
382
3831;
384
385
386# Local Variables:
387# mode: perl
388# c-basic-indent: 4
389# indent-tabs-mode: nil
390# End:
Note: See TracBrowser for help on using the browser.