root/trunk/lib/Perlbal/Socket.pm @ 687

Revision 687, 11.1 kB (checked in by jacques, 2 years ago)

Bump copyright and fixed typo in Interactive

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1# Base class for all socket types
2#
3# Copyright 2004, Danga Interactive, Inc.
4# Copyright 2005-2007, Six Apart, Ltd.
5
6package Perlbal::Socket;
7use strict;
8use warnings;
9no  warnings qw(deprecated);
10
11use Perlbal::HTTPHeaders;
12
13use Sys::Syscall;
14use POSIX ();
15
16use Danga::Socket 1.44;
17use base 'Danga::Socket';
18
19use fields (
20            'headers_string',  # headers as they're being read
21
22            'req_headers',     # the final Perlbal::HTTPHeaders object inbound
23            'res_headers',     # response headers outbound (Perlbal::HTTPHeaders object)
24
25            'create_time',     # creation time
26            'alive_time',      # last time noted alive
27            'state',           # general purpose state; used by descendants.
28            'do_die',          # if on, die and do no further requests
29
30            'read_buf',        # arrayref of scalarref read from client
31            'read_ahead',      # bytes sitting in read_buf
32            'read_size',       # total bytes read from client, ever
33
34            'ditch_leading_rn', # if true, the next header parsing will ignore a leading \r\n
35            );
36
37use constant MAX_HTTP_HEADER_LENGTH => 102400;  # 100k, arbitrary
38
39use constant TRACK_OBJECTS => 0;            # see @created_objects below
40if (TRACK_OBJECTS) {
41    use Scalar::Util qw(weaken isweak);
42}
43
44# kick-off one cleanup
45_do_cleanup();
46
47our %state_changes = (); # { "objref" => [ state, state, state, ... ] }
48our $last_callbacks = 0; # time last ran callbacks
49our $callbacks = []; # [ [ time, subref ], [ time, subref ], ... ]
50
51# this one deserves its own section.  we keep track of every Perlbal::Socket object
52# created if the TRACK_OBJECTS constant is on.  we use weakened references, though,
53# so this list will hopefully contain mostly undefs.  users can ask for this list if
54# they want to work with it via the get_created_objects_ref function.
55our @created_objects; # ( $ref, $ref, $ref ... )
56our $last_co_cleanup = 0; # clean the list every few seconds
57
58sub get_statechange_ref {
59    return \%state_changes;
60}
61
62sub get_created_objects_ref {
63    return \@created_objects;
64}
65
66sub write_debuggy {
67    my $self = shift;
68
69    my $cref = $_[0];
70    my $content = ref $cref eq "SCALAR" ? $$cref : $cref;
71    my $clen = defined $content ? length($content) : "undef";
72    $content = substr($content, 0, 17) . "..." if defined $content && $clen > 30;
73    my ($pkg, $filename, $line) = caller;
74    print "write($self, <$clen>\"$content\") from ($pkg, $filename, $line)\n" if Perlbal::DEBUG >= 4;
75    $self->SUPER::write(@_);
76}
77
78if (Perlbal::DEBUG >= 4) {
79    *write = \&write_debuggy;
80}
81
82sub new {
83    my Perlbal::Socket $self = shift;
84    $self = fields::new( $self ) unless ref $self;
85
86    Perlbal::objctor($self);
87
88    $self->SUPER::new( @_ );
89    $self->{headers_string} = '';
90    $self->{state} = undef;
91    $self->{do_die} = 0;
92
93    $self->{read_buf} = [];        # arrayref of scalar refs of bufs read from client
94    $self->{read_ahead} = 0;       # bytes sitting in read_buf
95    $self->{read_size} = 0;        # total bytes read from client
96
97    my $now = time;
98    $self->{alive_time} = $self->{create_time} = $now;
99
100    # now put this item in the list of created objects
101    if (TRACK_OBJECTS) {
102        # clean the created objects list if necessary
103        if ($last_co_cleanup < $now - 5) {
104            # remove out undefs, because those are natural byproducts of weakening
105            # references
106            @created_objects = grep { $_ } @created_objects;
107
108            # however, the grep turned our weak references back into strong ones, so
109            # we have to reweaken them
110            weaken($_) foreach @created_objects;
111
112            # we've cleaned up at this point
113            $last_co_cleanup = $now;
114        }
115
116        # now add this one to our cleaned list and weaken it
117        push @created_objects, $self;
118        weaken($created_objects[-1]);
119    }
120
121    return $self;
122}
123
124# FIXME: this doesn't scale in theory, but it might use less CPU in
125# practice than using the Heap:: modules and manipulating the
126# expirations all the time, thus doing things properly
127# algorithmically.  and this is definitely less work, so it's worth
128# a try.
129sub _do_cleanup {
130    my $sf = Perlbal::Socket->get_sock_ref;
131
132    my $now = time;
133
134    my %max_age;  # classname -> max age (0 means forever)
135    my @to_close;
136    while (my $k = each %$sf) {
137        my Perlbal::Socket $v = $sf->{$k};
138        my $ref = ref $v;
139        unless (defined $max_age{$ref}) {
140            # eval because not all Danga::Socket connections in Perlbal
141            # must be Perlbal::Socket-derived
142            $max_age{$ref} = eval { $ref->max_idle_time } || 0;
143        }
144        next unless $max_age{$ref};
145        if ($v->{alive_time} < $now - $max_age{$ref}) {
146            push @to_close, $v;
147        }
148    }
149
150    foreach my $sock (@to_close) {
151        $sock->close("perlbal_timeout")
152    }
153
154    Danga::Socket->AddTimer(5, \&_do_cleanup);
155}
156
157# CLASS METHOD: given a delay (in seconds) and a subref, this will call
158# that subref in AT LEAST delay seconds. if the subref returns 0, the
159# callback is discarded, but if it returns a positive number, the callback
160# is pushed onto the callback stack to be called again in at least that
161# many seconds.
162sub register_callback {
163    # adds a new callback to our list
164    my ($delay, $subref) = @_;
165    push @$callbacks, [ time + $delay, $subref ];
166    return 1;
167}
168
169# CLASS METHOD: runs through the list of registered callbacks and executes
170# any that need to be executed
171# FIXME: this doesn't scale.  need a heap.
172sub run_callbacks {
173    my $now = time;
174    return if $last_callbacks == $now;
175    $last_callbacks = $now;
176
177    my @destlist = ();
178    foreach my $ref (@$callbacks) {
179        # if their time is <= now...
180        if ($ref->[0] <= $now) {
181            # find out if they want to run again...
182            my $rv = $ref->[1]->();
183
184            # and if they do, push onto list...
185            push @destlist, [ $rv + $now, $ref->[1] ]
186                if defined $rv && $rv > 0;
187        } else {
188            # not time for this one, just shove it
189            push @destlist, $ref;
190        }
191    }
192    $callbacks = \@destlist;
193}
194
195# CLASS METHOD:
196# default is for sockets to never time out.  classes
197# can override.
198sub max_idle_time { 0; }
199
200# Socket: specific to HTTP socket types (only here and not in
201# ClientHTTPBase because ClientManage wants it too)
202sub read_request_headers  { read_headers($_[0], 0); }
203sub read_response_headers { read_headers($_[0], 1); }
204sub read_headers {
205    my Perlbal::Socket $self = shift;
206    my $is_res = shift;
207    print "Perlbal::Socket::read_headers($self) is_res=$is_res\n" if Perlbal::DEBUG >= 2;
208
209    my $sock = $self->{sock};
210
211    my $to_read = MAX_HTTP_HEADER_LENGTH - length($self->{headers_string});
212
213    my $bref = $self->read($to_read);
214    unless (defined $bref) {
215        # client disconnected
216        print "  client disconnected\n" if Perlbal::DEBUG >= 3;
217        return $self->close('remote_closure');
218    }
219
220    $self->{headers_string} .= $$bref;
221    my $idx = index($self->{headers_string}, "\r\n\r\n");
222
223    # can't find the header delimiter?
224    if ($idx == -1) {
225
226        # usually we get the headers all in one packet (one event), so
227        # if we get in here, that means it's more than likely the
228        # extra \r\n and if we clean it now (throw it away), then we
229        # can avoid a regexp later on.
230        if ($self->{ditch_leading_rn} && $self->{headers_string} eq "\r\n") {
231            print "  throwing away leading \\r\\n\n" if Perlbal::DEBUG >= 3;
232            $self->{ditch_leading_rn} = 0;
233            $self->{headers_string}   = "";
234            return 0;
235        }
236
237        print "  can't find end of headers\n" if Perlbal::DEBUG >= 3;
238        $self->close('long_headers')
239            if length($self->{headers_string}) >= MAX_HTTP_HEADER_LENGTH;
240        return 0;
241    }
242
243    my $hstr = substr($self->{headers_string}, 0, $idx);
244    print "  pre-parsed headers: [$hstr]\n" if Perlbal::DEBUG >= 3;
245
246    my $extra = substr($self->{headers_string}, $idx+4);
247    if (my $len = length($extra)) {
248        print "  pushing back $len bytes after header\n" if Perlbal::DEBUG >= 3;
249        $self->push_back_read(\$extra);
250    }
251
252    # some browsers send an extra \r\n after their POST bodies that isn't
253    # in their content-length.  a base class can tell us when they're
254    # on their 2nd+ request after a POST and tell us to be ready for that
255    # condition, and we'll clean it up
256    $hstr =~ s/^\r\n// if $self->{ditch_leading_rn};
257
258    unless (($is_res ? $self->{res_headers} : $self->{req_headers}) =
259                Perlbal::HTTPHeaders->new(\$hstr, $is_res)) {
260        # bogus headers?  close connection.
261        print "  bogus headers\n" if Perlbal::DEBUG >= 3;
262        return $self->close("parse_header_failure");
263    }
264
265    print "  got valid headers\n" if Perlbal::DEBUG >= 3;
266
267    $Perlbal::reqs++ unless $is_res;
268    $self->{ditch_leading_rn} = 0;
269
270    return $is_res ? $self->{res_headers} : $self->{req_headers};
271}
272
273### METHOD: drain_read_buf_to( $destination )
274### Write read-buffered data (if any) from the receiving object to the
275### I<destination> object.
276sub drain_read_buf_to {
277    my ($self, $dest) = @_;
278    return unless $self->{read_ahead};
279
280    while (my $bref = shift @{$self->{read_buf}}) {
281        print "draining readbuf from $self to $dest: [$$bref]\n" if Perlbal::DEBUG >= 3;
282        $dest->write($bref);
283        $self->{read_ahead} -= length($$bref);
284    }
285}
286
287### METHOD: die_gracefully()
288### By default, if we're in persist_wait state, close.  Else, ignore.  Children
289### can override if they want to do some other processing.
290sub die_gracefully {
291    my Perlbal::Socket $self = $_[0];
292    if ($self->state eq 'persist_wait') {
293        $self->close('graceful_shutdown');
294    }
295    $self->{do_die} = 1;
296}
297
298### METHOD: close()
299### Set our state when we get closed.
300sub close {
301    my Perlbal::Socket $self = $_[0];
302    $self->state('closed');
303    return $self->SUPER::close($_[1]);
304}
305
306### METHOD: state()
307### If you pass a parameter, sets the state, else returns it.
308sub state {
309    my Perlbal::Socket $self = shift;
310    return $self->{state} unless @_;
311
312    push @{$state_changes{"$self"} ||= []}, $_[0] if Perlbal::TRACK_STATES;
313    return $self->{state} = $_[0];
314}
315
316sub as_string_html {
317    my Perlbal::Socket $self = shift;
318    return $self->SUPER::as_string;
319}
320
321sub DESTROY {
322    my Perlbal::Socket $self = shift;
323    delete $state_changes{"$self"} if Perlbal::TRACK_STATES;
324    Perlbal::objdtor($self);
325}
326
327# package function (not a method).  returns bytes sent, or -1 on error.
328our $sf_defined = Sys::Syscall::sendfile_defined;
329our $max_sf_readwrite = 128 * 1024;
330sub sendfile {
331    my ($sfd, $fd, $bytes) = @_;
332    return Sys::Syscall::sendfile($sfd, $fd, $bytes) if $sf_defined;
333
334    # no support for sendfile.  ghetto version:  read and write.
335    my $buf;
336    $bytes = $max_sf_readwrite if $bytes > $max_sf_readwrite;
337
338    my $rv = POSIX::read($fd, $buf, $bytes);
339    return -1 unless defined $rv;
340    return -1 unless $rv == $bytes;
341
342    my $wv = POSIX::write($sfd, $buf, $rv);
343    return -1 unless defined $wv;
344
345    if (my $over_read = $rv - $wv) {
346        POSIX::lseek($fd, -$over_read, &POSIX::SEEK_CUR);
347    }
348
349    return $wv;
350}
351
3521;
353
354
355# Local Variables:
356# mode: perl
357# c-basic-indent: 4
358# indent-tabs-mode: nil
359# End:
Note: See TracBrowser for help on using the browser.