root/trunk/lib/Perlbal/ClientProxy.pm @ 612

Revision 612, 39.7 kB (checked in by bradfitz, 3 years ago)

-- fix a crash as reported on mailing lists, with backends not

releasing references to clientproxies when coming through
service selectors, and backends then timing out, crashing
the server.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1######################################################################
2# HTTP Connection from a reverse proxy client
3#
4# Copyright 2004, Danga Interactice, Inc.
5# Copyright 2005-2006, Six Apart, Ltd.
6#
7package Perlbal::ClientProxy;
8use strict;
9use warnings;
10use base "Perlbal::ClientHTTPBase";
11no  warnings qw(deprecated);
12
13use fields (
14            'backend',             # Perlbal::BackendHTTP object (or undef if disconnected)
15            'backend_requested',   # true if we've requested a backend for this request
16            'reconnect_count',     # number of times we've tried to reconnect to backend
17            'high_priority',       # boolean; 1 if we are or were in the high priority queue
18            'low_priority',        # boolean; 1 if we are or were in the low priority queue
19            'reproxy_uris',        # arrayref; URIs to reproxy to, in order
20            'reproxy_expected_size', # int: size of response we expect to get back for reproxy
21            'currently_reproxying',  # arrayref; the host info and URI we're reproxying right now
22            'content_length_remain', # int: amount of data we're still waiting for
23            'responded',           # bool: whether we've already sent a response to the user or not
24            'last_request_time',   # int: time that we last received a request
25            'primary_res_hdrs',  # if defined, we are doing a transparent reproxy-URI
26                                 # and the headers we get back aren't necessarily
27                                 # the ones we want.  instead, get most headers
28                                 # from the provided res headers object here.
29            'is_buffering',        # bool; if we're buffering some/all of a request to memory/disk
30            'is_writing',          # bool; if on, we currently have an aio_write out
31            'start_time',          # hi-res time when we started getting data to upload
32            'bufh',                # buffered upload filehandle object
33            'bufilename',          # string; buffered upload filename
34            'bureason',            # string; if defined, the reason we're buffering to disk
35            'buoutpos',            # int; buffered output position
36            'backend_stalled',   # boolean:  if backend has shut off its reads because we're too slow.
37            'unread_data_waiting',  # boolean:  if we shut off reads while we know data is yet to be read from client
38
39            # for perlbal sending out UDP packets related to upload status (for xmlhttprequest upload bar)
40            'last_upload_packet',  # unixtime we last sent a UDP upload packet
41            'upload_session',      # client's self-generated upload session
42
43            # error-retrying stuff
44            'retry_count',         # number of times we've retried this request so far after getting 500 errors
45            );
46
47use constant READ_SIZE         => 131072;    # 128k, ~common TCP window size?
48use constant READ_AHEAD_SIZE   =>  32768;    # kinda arbitrary.  sum of these two is max stored per connection while waiting for backend.
49use Errno qw( EPIPE ENOENT ECONNRESET EAGAIN );
50use POSIX qw( O_CREAT O_TRUNC O_RDWR O_RDONLY );
51use Time::HiRes qw( gettimeofday tv_interval );
52
53my $udp_sock;
54
55# ClientProxy
56sub new {
57    my ($class, $service, $sock) = @_;
58
59    my $self = $class;
60    $self = fields::new($class) unless ref $self;
61    $self->SUPER::new($service, $sock);       # init base fields
62
63    Perlbal::objctor($self);
64    bless $self, ref $class || $class;
65
66    $self->init;
67    $self->watch_read(1);
68    return $self;
69}
70
71sub new_from_base {
72    my $class = shift;
73    my Perlbal::ClientHTTPBase $cb = shift;
74    bless $cb, $class;
75    $cb->init;
76    $cb->watch_read(1);
77    $cb->handle_request;
78    return $cb;
79}
80
81sub init {
82    my Perlbal::ClientProxy $self = $_[0];
83
84    $self->{last_request_time} = 0;
85
86    $self->{backend} = undef;
87    $self->{high_priority} = 0;
88
89    $self->{responded} = 0;
90    $self->{unread_data_waiting} = 0;
91    $self->{content_length_remain} = undef;
92    $self->{backend_requested} = 0;
93
94    $self->{is_buffering} = 0;
95    $self->{is_writing} = 0;
96    $self->{start_time} = undef;
97    $self->{bufh} = undef;
98    $self->{bufilename} = undef;
99    $self->{buoutpos} = 0;
100    $self->{bureason} = undef;
101
102    $self->{reproxy_uris} = undef;
103    $self->{reproxy_expected_size} = undef;
104    $self->{currently_reproxying} = undef;
105
106    $self->{retry_count} = 0;
107}
108
109# given a service name, re-request (GET/HEAD only) to that service, even though
110# you've already done a request to your original service
111sub start_reproxy_service {
112    my Perlbal::ClientProxy $self = $_[0];
113    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
114    my $svc_name = $_[2];
115
116    my $svc = $svc_name ? Perlbal->service($svc_name) : undef;
117    unless ($svc) {
118        $self->_simple_response(404, "Vhost twiddling not configured for requested pair.");
119        return 1;
120    }
121
122    $self->{backend_requested} = 0;
123    $self->{backend} = undef;
124    $self->{res_headers} = $primary_res_hdrs;
125
126    $svc->adopt_base_client($self);
127}
128
129# call this with a string of space separated URIs to start a process
130# that will fetch the item at the first and return it to the user,
131# on failure it will try the second, then third, etc
132sub start_reproxy_uri {
133    my Perlbal::ClientProxy $self = $_[0];
134    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
135    my $urls = $_[2];
136
137    # at this point we need to disconnect from our backend
138    $self->{backend} = undef;
139
140    # failure if we have no primary response headers
141    return unless $self->{primary_res_hdrs} ||= $primary_res_hdrs;
142
143    # construct reproxy_uri list
144    if (defined $urls) {
145        my @uris = split /\s+/, $urls;
146        $self->{currently_reproxying} = undef;
147        $self->{reproxy_uris} = [];
148        foreach my $uri (@uris) {
149            next unless $uri =~ m!^http://(.+?)(?::(\d+))?(/.*)?$!;
150            push @{$self->{reproxy_uris}}, [ $1, $2 || 80, $3 || '/' ];
151        }
152    }
153
154    # if we get in here and we have currently_reproxying defined, then something
155    # happened and we want to retry that one
156    if ($self->{currently_reproxying}) {
157        unshift @{$self->{reproxy_uris}}, $self->{currently_reproxying};
158        $self->{currently_reproxying} = undef;
159    }
160
161    # if we have no uris in our list now, tell the user 404
162    return $self->_simple_response(503)
163        unless @{$self->{reproxy_uris} || []};
164
165    # set the expected size if we got a content length in our headers
166    if ($primary_res_hdrs && (my $expected_size = $primary_res_hdrs->header('X-REPROXY-EXPECTED-SIZE'))) {
167        $self->{reproxy_expected_size} = $expected_size;
168    }
169
170    # pass ourselves off to the reproxy manager
171    $self->state('wait_backend');
172    Perlbal::ReproxyManager::do_reproxy($self);
173}
174
175# called by the reproxy manager when we can't get to our requested backend
176sub try_next_uri {
177    my Perlbal::ClientProxy $self = $_[0];
178
179    shift @{$self->{reproxy_uris}};
180    $self->{currently_reproxying} = undef;
181    $self->start_reproxy_uri();
182}
183
184# returns true if this ClientProxy is too many bytes behind the backend
185sub too_far_behind_backend {
186    my Perlbal::ClientProxy $self    = $_[0];
187    my Perlbal::BackendHTTP $backend = $self->{backend}   or return 0;
188
189    # if a backend doesn't have a service, it's a
190    # ReproxyManager-created backend, and thus it should use the
191    # 'buffer_size_reproxy_url' parameter for acceptable buffer
192    # widths, and not the regular 'buffer_size'.  this lets people
193    # tune buffers depending on the types of webservers.  (assumption
194    # being that reproxied-to webservers are event-based and it's okay
195    # to tie the up longer in favor of using less buffer memory in
196    # perlbal)
197    my $max_buffer = defined $backend->{service} ?
198        $self->{service}->{buffer_size} :
199        $self->{service}->{buffer_size_reproxy_url};
200
201    return $self->{write_buf_size} > $max_buffer;
202}
203
204# this is a callback for when a backend has been created and is
205# ready for us to do something with it
206sub use_reproxy_backend {
207    my Perlbal::ClientProxy $self = $_[0];
208    my Perlbal::BackendHTTP $be = $_[1];
209
210    # get a URI
211    my $datref = $self->{currently_reproxying} = shift @{$self->{reproxy_uris}};
212    unless (defined $datref) {
213        # return error and close the backend
214        $be->close('invalid_uris');
215        return $self->_simple_response(503);
216    }
217
218    # now send request
219    $self->{backend} = $be;
220    $be->{client} = $self;
221
222    my $extra_hdr = "";
223    if (my $range = $self->{req_headers}->header("Range")) {
224        $extra_hdr .= "Range: $range\r\n";
225    }
226    if (my $host = $self->{req_headers}->header("Host")) {
227        $extra_hdr .= "Host: $host\r\n";
228    }
229
230    my $headers = "GET $datref->[2] HTTP/1.0\r\nConnection: keep-alive\r\n${extra_hdr}\r\n";
231
232    $be->{req_headers} = Perlbal::HTTPHeaders->new(\$headers);
233    $be->state('sending_req');
234    $self->state('backend_req_sent');
235    $be->write($be->{req_headers}->to_string_ref);
236    $be->watch_read(1);
237    $be->watch_write(1);
238}
239
240# this is called when a transient backend getting a reproxied URI has received
241# a response from the server and is ready for us to deal with it
242sub backend_response_received {
243    my Perlbal::ClientProxy $self = $_[0];
244    my Perlbal::BackendHTTP $be = $_[1];
245
246    # a response means that we are no longer currently waiting on a reproxy, and
247    # don't want to retry this URI
248    $self->{currently_reproxying} = undef;
249
250    # we fail if we got something that's NOT a 2xx code, OR, if we expected
251    # a certain size and got back something different
252    my $code = $be->{res_headers}->response_code + 0;
253
254    my $bad_code = sub {
255        return 0 if $code >= 200 && $code <= 299;
256        return 0 if $code == 416;
257        return 1;
258    };
259
260    my $bad_size = sub {
261        return 0 unless defined $self->{reproxy_expected_size};
262        return $self->{reproxy_expected_size} != $be->{res_headers}->header('Content-length');
263    };
264
265    if ($bad_code->() || $bad_size->()) {
266        # fall back to an alternate URL
267        $be->{client} = undef;
268        $be->close('non_200_reproxy');
269        $self->try_next_uri;
270        return 1;
271    }
272    return 0;
273}
274
275sub start_reproxy_file {
276    my Perlbal::ClientProxy $self = shift;
277    my $file = shift;                      # filename to reproxy
278    my Perlbal::HTTPHeaders $hd = shift;   # headers from backend, in need of cleanup
279
280    # at this point we need to disconnect from our backend
281    $self->{backend} = undef;
282
283    # call hook for pre-reproxy
284    return if $self->{service}->run_hook("start_file_reproxy", $self, \$file);
285
286    # set our expected size
287    if (my $expected_size = $hd->header('X-REPROXY-EXPECTED-SIZE')) {
288        $self->{reproxy_expected_size} = $expected_size;
289    }
290
291    # start an async stat on the file
292    $self->state('wait_stat');
293    Perlbal::AIO::aio_stat($file, sub {
294
295        # if the client's since disconnected by the time we get the stat,
296        # just bail.
297        return if $self->{closed};
298
299        my $size = -s _;
300
301        unless ($size) {
302            # FIXME: POLICY: 404 or retry request to backend w/o reproxy-file capability?
303            return $self->_simple_response(404);
304        }
305        if (defined $self->{reproxy_expected_size} && $self->{reproxy_expected_size} != $size) {
306            # 404; the file size doesn't match what we expected
307            return $self->_simple_response(404);
308        }
309
310        # if the thing we're reproxying is indeed a file, advertise that
311        # we support byteranges on it
312        if (-f _) {
313            $hd->header("Accept-Ranges", "bytes");
314        }
315
316        my ($status, $range_start, $range_end) = $self->{req_headers}->range($size);
317        my $not_satisfiable = 0;
318
319        if ($status == 416) {
320            $hd = Perlbal::HTTPHeaders->new_response(416);
321            $hd->header("Content-Range", $size ? "bytes */$size" : "*");
322            $not_satisfiable = 1;
323        }
324
325        # change the status code to 200 if the backend gave us 204 No Content
326        $hd->code(200) if $hd->response_code == 204;
327
328        # fixup the Content-Length header with the correct size (application
329        # doesn't need to provide a correct value if it doesn't want to stat())
330        if ($status == 200) {
331            $hd->header("Content-Length", $size);
332        } elsif ($status == 206) {
333            $hd->header("Content-Range", "bytes $range_start-$range_end/$size");
334            $hd->header("Content-Length", $range_end - $range_start + 1);
335            $hd->code(206);
336        }
337
338        # don't send this internal header to the client:
339        $hd->header('X-REPROXY-FILE', undef);
340
341        # rewrite some other parts of the header
342        $self->setup_keepalive($hd);
343
344        # just send the header, now that we cleaned it.
345        $self->{res_headers} = $hd;
346        $self->write($hd->to_string_ref);
347
348        if ($self->{req_headers}->request_method eq 'HEAD' || $not_satisfiable) {
349            $self->write(sub { $self->http_response_sent; });
350            return;
351        }
352
353        $self->state('wait_open');
354        Perlbal::AIO::aio_open($file, 0, 0 , sub {
355            my $fh = shift;
356
357            # if client's gone, just close filehandle and abort
358            if ($self->{closed}) {
359                CORE::close($fh) if $fh;
360                return;
361            }
362
363            # handle errors
364            if (! $fh) {
365                # FIXME: do 500 vs. 404 vs whatever based on $! ?
366                return $self->_simple_response(500);
367            }
368
369            # seek if partial content
370            if ($status == 206) {
371                sysseek($fh, $range_start, &POSIX::SEEK_SET);
372                $size = $range_end - $range_start + 1;
373            }
374
375            $self->reproxy_fh($fh, $size);
376            $self->watch_write(1);
377        });
378    });
379}
380
381# Client
382# get/set backend proxy connection
383sub backend {
384    my Perlbal::ClientProxy $self = shift;
385    return $self->{backend} unless @_;
386
387    my $backend = shift;
388    $self->state('draining_res') unless $backend;
389    return $self->{backend} = $backend;
390}
391
392# invoked by backend when it wants us to start watching for reads again
393# and feeding it data (if we have any)
394sub backend_ready {
395    my Perlbal::ClientProxy $self = $_[0];
396    my Perlbal::BackendHTTP $be = $_[1];
397
398    # if we'd turned ourselves off while we waited for a backend, turn
399    # ourselves back on, because the backend is ready for data now.
400    if ($self->{unread_data_waiting}) {
401        $self->watch_read(1);
402    }
403
404    # normal, not-buffered-to-disk case:
405    return $self->drain_read_buf_to($be) unless $self->{bureason};
406
407    # buffered-to-disk case.
408
409    # tell the backend it has to go into buffered_upload_mode,
410    # which makes it inform us of its writable availability
411    $be->invoke_buffered_upload_mode;
412}
413
414# our backend enqueues a call to this method in our write buffer, so this is called
415# right after we've finished sending all of the results to the user.  at this point,
416# if we were doing keep-alive, we don't close and setup for the next request.
417sub backend_finished {
418    my Perlbal::ClientProxy $self = shift;
419    print "ClientProxy::backend_finished\n" if Perlbal::DEBUG >= 3;
420
421    # mark ourselves as having responded (presumeably if we're here,
422    # the backend has responded already)
423    $self->{responded} = 1;
424
425    # our backend is done with us, so we disconnect ourselves from it
426    $self->{backend} = undef;
427
428    # backend is done sending data to us, so we can recycle this clientproxy
429    # if we don't have any data yet to read
430    return $self->http_response_sent unless $self->{unread_data_waiting};
431
432    # if we get here (and we do, rarely, in practice) then that means
433    # the backend read was empty/disconected (or otherwise messed up),
434    # and the only thing we can really do is close the client down.
435    $self->close("backend_finished_while_unread_data");
436}
437
438# called when we've sent a response to a user fully and we need to reset state
439sub http_response_sent {
440    my Perlbal::ClientProxy $self = $_[0];
441
442    # persistence logic is in ClientHTTPBase
443    return 0 unless $self->SUPER::http_response_sent;
444
445    print "ClientProxy::http_response_sent -- resetting state\n" if Perlbal::DEBUG >= 3;
446
447    if (my $be = $self->{backend}) {
448        $self->{backend} = undef;
449        $be->forget_client;
450    }
451
452    # if we get here we're being persistent, reset our state
453    $self->{backend_requested} = 0;
454    $self->{high_priority} = 0;
455    $self->{reproxy_uris} = undef;
456    $self->{reproxy_expected_size} = undef;
457    $self->{currently_reproxying} = undef;
458    $self->{content_length_remain} = undef;
459    $self->{primary_res_hdrs} = undef;
460    $self->{responded} = 0;
461    $self->{is_buffering} = 0;
462    $self->{is_writing} = 0;
463    $self->{start_time} = undef;
464    $self->{bufh} = undef;
465    $self->{bufilename} = undef;
466    $self->{buoutpos} = 0;
467    $self->{bureason} = undef;
468    $self->{upload_session} = undef;
469    return 1;
470}
471
472# to request a backend connection AFTER you've already done so, if you
473# didn't like the results from the first one.  (like after a 500 error)
474sub rerequest_backend {
475    my Perlbal::ClientProxy $self = shift;
476
477    $self->{backend_requested} = 0;
478    $self->{backend} = undef;
479    $self->request_backend;
480}
481
482sub request_backend {
483    my Perlbal::ClientProxy $self = shift;
484    return if $self->{backend_requested};
485    $self->{backend_requested} = 1;
486
487    $self->state('wait_backend');
488    $self->{service}->request_backend_connection($self);
489    $self->tcp_cork(1);  # cork writes to self
490}
491
492# Client (overrides and calls super)
493sub close {
494    my Perlbal::ClientProxy $self = shift;
495    my $reason = shift;
496
497    # don't close twice
498    return if $self->{closed};
499
500    # signal that we're done
501    $self->{service}->run_hooks('end_proxy_request', $self);
502
503    # kill our backend if we still have one
504    if (my $backend = $self->{backend}) {
505        print "Client ($self) closing backend ($backend)\n" if Perlbal::DEBUG >= 1;
506        $self->backend(undef);
507        $backend->close($reason ? "proxied_from_client_close:$reason" : "proxied_from_client_close");
508    } else {
509        # if no backend, tell our service that we don't care for one anymore
510        $self->{service}->note_client_close($self);
511    }
512
513    # call ClientHTTPBase's close
514    $self->SUPER::close($reason);
515}
516
517sub client_disconnected { # : void
518    my Perlbal::ClientProxy $self = shift;
519    print "ClientProxy::client_disconnected\n" if Perlbal::DEBUG >= 2;
520
521    # if client disconnected, then we need to turn off watching for
522    # further reads and purge the existing upload if any. also, we
523    # should just return and do nothing else.
524
525    $self->watch_read(0);
526    $self->purge_buffered_upload if $self->{bureason};
527    return $self->close('user_disconnected');
528}
529
530# Client
531sub event_write {
532    my Perlbal::ClientProxy $self = shift;
533    print "ClientProxy::event_write\n" if Perlbal::DEBUG >= 3;
534
535    $self->SUPER::event_write;
536
537    # obviously if we're writing the backend has processed our request
538    # and we are responding/have responded to the user, so mark it so
539    $self->{responded} = 1;
540
541    # trigger our backend to keep reading, if it's still connected
542    if ($self->{backend_stalled} && (my $backend = $self->{backend})) {
543        print "  unstalling backend\n" if Perlbal::DEBUG >= 3;
544
545        $self->{backend_stalled} = 0;
546        $backend->watch_read(1);
547    }
548}
549
550# ClientProxy
551sub event_read {
552    my Perlbal::ClientProxy $self = shift;
553    print "ClientProxy::event_read\n" if Perlbal::DEBUG >= 3;
554
555    # mark alive so we don't get killed for being idle
556    $self->{alive_time} = time;
557
558    # if we have no headers, the only thing we can do is try to get some
559    if (! $self->{req_headers}) {
560        print "  no headers.  reading.\n" if Perlbal::DEBUG >= 3;
561        $self->handle_request if $self->read_request_headers;
562        return;
563    }
564
565    # if we're buffering to disk or haven't read too much from this client, keep reading,
566    # otherwise shut off read notifications
567    unless ($self->{is_buffering} || $self->{read_ahead} < READ_AHEAD_SIZE) {
568        # our buffer is full, so turn off reads for now
569        print "  disabling reads.\n" if Perlbal::DEBUG >= 3;
570        $self->watch_read(0);
571        return;
572    }
573
574    # read more data if we're still buffering or if our current read buffer
575    # is not full to the max READ_AHEAD_SIZE which is how much data we will
576    # buffer in from the user before passing on to the backend
577
578    # read the MIN(READ_SIZE, content_length_remain)
579    my $read_size = READ_SIZE;
580    my $remain = $self->{content_length_remain};
581
582    $read_size = $remain if $remain && $remain < $read_size;
583    print "  reading $read_size bytes (", (defined $remain ? $remain : "(undef)"), " bytes remain)\n" if Perlbal::DEBUG >= 3;
584
585    my $bref = $self->read($read_size);
586
587    # if the read returned undef, that means the connection was closed
588    # (see: Danga::Socket::read)
589    return $self->client_disconnected unless defined $bref;
590
591    # if they didn't declare a content body length and we just got a
592    # readable event that's not a disconnect, something's messed up.
593    # they're overflowing us.  disconnect!
594    if (! $remain) {
595        $self->_simple_response(400, "Can't pipeline to HTTP/1.0");
596        $self->close("pipelining_to_http10");
597        return;
598    }
599
600    # now that we know we have a defined value, determine how long it is, and do
601    # housekeeping to keep our tracking numbers up to date.
602    my $len = length($$bref);
603    print "  read $len bytes\n" if Perlbal::DEBUG >= 3;
604
605    # when run under the program "trickle", epoll speaks the truth to
606    # us, but then trickle interferes and steals our reads/writes, so
607    # this fails.  normally this check isn't needed.
608    return unless $len;
609
610    $self->{read_size} += $len;
611    $self->{content_length_remain} -= $len if $remain;
612
613    my $done_reading = defined $self->{content_length_remain} && $self->{content_length_remain} <= 0;
614    my $backend = $self->backend;
615    print("  done_reading = $done_reading, backend = ", ($backend || "<undef>"), "\n") if Perlbal::DEBUG >= 3;
616
617    # upload tracking
618    if (my $session = $self->{upload_session}) {
619        my $cl = $self->{req_headers}->content_length;
620        my $remain = $self->{content_length_remain};
621        my $now = time();  # FIXME: more efficient?
622        if ($cl && $remain && ($self->{last_upload_packet} || 0) != $now) {
623            my $done = $cl - $remain;
624            $self->{last_upload_packet} = $now;
625            $udp_sock ||= IO::Socket::INET->new(Proto => 'udp');
626            my $since = $self->{last_request_time};
627            my $send = "UPLOAD:$session:$done:$cl:$since:$now";
628            if ($udp_sock) {
629                foreach my $ep (@{ $self->{service}{upload_status_listeners_sockaddr} }) {
630                    my $rv = $udp_sock->send($send, 0, $ep);
631                }
632            }
633        }
634    }
635
636    # just dump the read into the nether if we're dangling. that is
637    # the case when we send the headers to the backend and it responds
638    # before we're done reading from the client; therefore further
639    # reads from the client just need to be sent nowhere, because the
640    # RFC2616 section 8.2.3 says: "the server SHOULD NOT close the
641    # transport connection until it has read the entire request"
642    if ($self->{responded}) {
643        print "  already responded.\n" if Perlbal::DEBUG >= 3;
644        # in addition, if we're now out of data (clr == 0), then we should
645        # either close ourselves or get ready for another request
646        return $self->http_response_sent if $done_reading;
647
648        print "  already responded [2].\n" if Perlbal::DEBUG >= 3;
649        # at this point, if the backend has responded then we just return
650        # as we don't want to send it on to them or buffer it up, which is
651        # what the code below does
652        return;
653    }
654
655    # if we have no data left to read, stop reading.  all that can
656    # come later is an extra \r\n which we handle later when parsing
657    # new request headers.  and if it's something else, we'll bail on
658    # the next request, not this one.
659    if ($done_reading) {
660        Carp::confess("content_length_remain less than zero: self->{content_length_remain}")
661            if $self->{content_length_remain} < 0;
662        $self->{unread_data_waiting} = 0;
663        $self->watch_read(0);
664    }
665
666    # now, if we have a backend, then we should be writing it to the backend
667    # and not doing anything else
668    if ($backend) {
669        print "  got a backend.  sending write to it.\n" if Perlbal::DEBUG >= 3;
670        $backend->write($bref);
671        # TODO: monitor the backend's write buffer depth?
672        return;
673    }
674
675    # now, we know we don't have a backend, so we have to push this data onto our
676    # read buffer... it's not going anywhere yet
677    push @{$self->{read_buf}}, $bref;
678    $self->{read_ahead} += $len;
679    print "  no backend.  read_ahead = $self->{read_ahead}.\n" if Perlbal::DEBUG >= 3;
680
681    # if we know we've already started spooling a file to disk, then continue
682    # to do that.
683    print "  bureason = $self->{bureason}\n" if Perlbal::DEBUG >= 3 && $self->{bureason};
684    return $self->buffered_upload_update if $self->{bureason};
685
686    # if we are under our buffer-to-memory size, just continue buffering here and
687    # don't fall through to the backend request call below
688    return if
689        ! $done_reading &&
690        $self->{read_ahead} < $self->{service}->{buffer_backend_connect};
691
692    # over the buffer-to-memory size, see if we should start spooling to disk.
693    return if $self->{service}->{buffer_uploads} && $self->decide_to_buffer_to_disk;
694
695    # give plugins a chance to act on the request before we request a backend
696    # (added by Chris Hondl <chris@imvu.com>, March 2006)
697    my $svc = $self->{service};
698    return if $svc->run_hook('proxy_read_request', $self);
699
700    # if we fall through to here, we need to ensure that a backend is on the
701    # way, because no specialized handling took over above
702    print "  finally requesting a backend\n" if Perlbal::DEBUG >= 3;
703    return $self->request_backend;
704}
705
706sub handle_request {
707    my Perlbal::ClientProxy $self = shift;
708    my $req_hd = $self->{req_headers};
709
710    my $svc = $self->{service};
711    # give plugins a chance to force us to bail
712    return if $svc->run_hook('start_proxy_request', $self);
713    return if $svc->run_hook('start_http_request',  $self);
714
715    # if defined we're waiting on some amount of data.  also, we have to
716    # subtract out read_size, which is the amount of data that was
717    # extra in the packet with the header that's part of the body.
718    $self->{content_length_remain} = $req_hd->content_length;
719    $self->{unread_data_waiting} = 1 if $self->{content_length_remain};
720
721    # upload-tracking stuff.  both starting a new upload track session,
722    # and checking on status of ongoing one
723    return if $svc->{upload_status_listeners} && $self->handle_upload_tracking;
724
725    # note that we've gotten a request
726    $self->{requests}++;
727    $self->{last_request_time} = $self->{alive_time};
728
729    # either start buffering some of the request to memory, or
730    # immediately request a backend connection.
731    if ($self->{content_length_remain} && $self->{service}->{buffer_backend_connect}) {
732        # the deeper path
733        $self->start_buffering_request;
734    } else {
735        # get the backend request process moving, since we aren't buffering
736        $self->{is_buffering} = 0;
737
738        # if reproxy-caching is enabled, we can often bypass needing to allocate a BackendHTTP connection:
739        return if $svc->{reproxy_cache} && $self->satisfy_request_from_cache;
740
741        $self->request_backend;
742    }
743}
744
745sub satisfy_request_from_cache {
746    my Perlbal::ClientProxy $self = shift;
747
748    my $req_hd = $self->{req_headers};
749    my $svc    = $self->{service};
750    my $cache  = $svc->{reproxy_cache};
751    $svc->{_stat_requests}++;
752
753    my $requri   = $req_hd->request_uri    || '';
754    my $hostname = $req_hd->header("Host") || '';
755
756    my $key      = "$hostname|$requri";
757
758    my $reproxy  = $cache->get($key) or
759        return 0;
760
761    my ($timeout, $headers, $urls) = @$reproxy;
762    return 0 if time() > $timeout;
763
764    $svc->{_stat_cache_hits}++;
765    my %headers = map { ref $_ eq 'SCALAR' ? $$_ : $_ } @{$headers || []};
766
767    if (my $ims = $req_hd->header("If-Modified-Since")) {
768        my ($lm_key) = grep { uc($_) eq "LAST-MODIFIED" } keys %headers;
769        my $lm = $headers{$lm_key} || "";
770
771        # remove the IE length suffix
772        $ims =~ s/; length=(\d+)//;
773
774        # If 'Last-Modified' is same as 'If-Modified-Since', send a 304
775        if ($ims eq $lm) {
776            my $res_hd = Perlbal::HTTPHeaders->new_response(304);
777            $res_hd->header("Content-Length", "0");
778            $self->tcp_cork(1);
779            $self->state('xfer_resp');
780            $self->write($res_hd->to_string_ref);
781            $self->write(sub { $self->http_response_sent; });
782            return 1;
783        }
784    }
785
786    my $res_hd = Perlbal::HTTPHeaders->new_response(200);
787    $res_hd->header("Date", HTTP::Date::time2str(time()));
788    while (my ($key, $value) = each %headers) {
789        $res_hd->header($key, $value);
790    }
791
792    $self->start_reproxy_uri($res_hd, $urls);
793    return 1;
794}
795
796# return 1 to steal this connection (when they're asking status of an
797# upload session), return 0 to return it to handle_request's control.
798sub handle_upload_tracking {
799    my Perlbal::ClientProxy $self = shift;
800    my $req_hd = $self->{req_headers};
801
802    return 0 unless
803        $req_hd->request_uri =~ /[\?&]client_up_sess=(\w{5,50})\b/;
804
805    my $sess = $1;
806
807    # getting status?
808    if ($req_hd->request_uri =~ m!^/__upload_status\?!) {
809        my $status = Perlbal::UploadListener::get_status($sess);
810        my $now = time();
811        my $body = $status ?
812            "{done:$status->{done},total:$status->{total},starttime:$status->{starttime},nowtime:$now}" :
813            "{}";
814
815        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
816        $res->header("Content-Type", "text/plain");
817        $res->header('Content-Length', length $body);
818        $self->setup_keepalive($res);
819        $self->tcp_cork(1);  # cork writes to self
820        $self->write($res->to_string_ref);
821        $self->write(\ $body);
822        $self->write(sub { $self->http_response_sent; });
823        return 1;
824    }
825
826    # otherwise just tagging this upload as a new upload session
827    $self->{upload_session} = $sess;
828    return 0;
829}
830
831# continuation of handle_request, in the case where we need to start buffering
832# a bit of the request body to memory, either hoping that's all of it, or to
833# make a determination of whether or not we should save it all to disk first
834sub start_buffering_request {
835    my Perlbal::ClientProxy $self = shift;
836
837    # buffering case:
838    $self->{is_buffering} = 1;
839
840    # shortcut: if we know that we're buffering by size, and the size
841    # of this upload is bigger than that value, we can just turn on spool
842    # to disk right now...
843    if ($self->{service}->{buffer_uploads} && $self->{service}->{buffer_upload_threshold_size}) {
844        my $req_hd = $self->{req_headers};
845        if ($req_hd->content_length >= $self->{service}->{buffer_upload_threshold_size}) {
846            $self->{bureason} = 'size';
847            if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
848                $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', 'size');
849            }
850            $self->state('buffering_upload');
851            $self->buffered_upload_update;
852            return;
853        }
854    }
855
856    # well, we're buffering, but we're not going to disk just yet (but still might)
857    $self->state('buffering_request');
858
859    # only need time if we are using the buffer to disk functionality
860    $self->{start_time} = [ gettimeofday() ]
861        if $self->{service}->{buffer_uploads};
862}
863
864# looks at our states and decides if we should start writing to disk
865# or should just go ahead and blast this to the backend.  returns 1
866# if the decision was made to buffer to disk
867sub decide_to_buffer_to_disk {
868    my Perlbal::ClientProxy $self = shift;
869    return unless $self->{is_buffering};
870    return $self->{bureason} if defined $self->{bureason};
871
872    # this is called when we have enough data to determine whether or not to
873    # start buffering to disk
874    my $dur = tv_interval($self->{start_time}) || 1;
875    my $rate = $self->{read_ahead} / $dur;
876    my $etime = $self->{content_length_remain} / $rate;
877
878    # see if we have enough data to make the determination
879    my $reason = undef;
880
881    # see if we blow the rate away
882    if ($self->{service}->{buffer_upload_threshold_rate} > 0 &&
883            $rate < $self->{service}->{buffer_upload_threshold_rate}) {
884        # they are slower than the minimum rate
885        $reason = 'rate';
886    }
887
888    # and finally check estimated time exceeding
889    if ($self->{service}->{buffer_upload_threshold_time} > 0 &&
890            $etime > $self->{service}->{buffer_upload_threshold_time}) {
891        # exceeds
892        $reason = 'time';
893    }
894
895    unless ($reason) {
896        $self->{is_buffering} = 0;
897        return 0;
898    }
899
900    # start saving it to disk
901    $self->state('buffering_upload');
902    $self->buffered_upload_update;
903    $self->{bureason} = $reason;
904
905    if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
906        $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', $reason);
907    }
908
909    return 1;
910}
911
912# take ourselves and send along our buffered data to the backend
913sub send_buffered_upload {
914    my Perlbal::ClientProxy $self = shift;
915
916    # make sure our buoutpos is the same as the content length...
917    my $clen = $self->{req_headers}->content_length;
918    if ($clen != $self->{buoutpos}) {
919        Perlbal::log('critical', "Content length of $clen declared but $self->{buoutpos} bytes written to disk");
920        return $self->_simple_response(500);
921    }
922
923    # reset our position so we start reading from the right spot
924    $self->{buoutpos} = 0;
925    sysseek($self->{bufh}, 0, 0);
926
927    # notify that we want the backend so we get the ball rolling
928    $self->request_backend;
929}
930
931sub continue_buffered_upload {
932    my Perlbal::ClientProxy $self = shift;
933    my Perlbal::BackendHTTP $be = shift;
934    return unless $self && $be;
935
936    # now send the data
937    my $clen = $self->{req_headers}->content_length;
938    my $sent = Perlbal::Socket::sendfile($be->{fd}, fileno($self->{bufh}), $clen - $self->{buoutpos});
939    if ($sent < 0) {
940        return $self->close("epipe") if $! == EPIPE;
941        return $self->close("connreset") if $! == ECONNRESET;
942        print STDERR "Error w/ sendfile: $!\n";
943        return $self->close('sendfile_error');
944    }
945    $self->{buoutpos} += $sent;
946
947    # if we're done, purge the file and move on
948    if ($self->{buoutpos} >= $clen) {
949        $be->{buffered_upload_mode} = 0;
950        $self->purge_buffered_upload;
951        return;
952    }
953
954    # we will be called again by the backend since buffered_upload_mode is on
955}
956
957# write data to disk
958sub buffered_upload_update {
959    my Perlbal::ClientProxy $self = shift;
960    return if $self->{is_writing};
961    return unless $self->{is_buffering} && $self->{read_ahead};
962
963    # so we're not writing now and we have data to write...
964    unless ($self->{bufilename}) {
965        # create a filename and see if it exists or not
966        $self->{is_writing} = 1;
967        my $fn = join('-', $self->{service}->name, $self->{service}->listenaddr, "client", $self->{fd}, int(rand(0xffffffff)));
968        $fn = $self->{service}->{buffer_uploads_path} . '/' . $fn;
969
970        # good, now we need to create the file
971        Perlbal::AIO::aio_open($fn, O_CREAT | O_TRUNC | O_RDWR, 0644, sub {
972            $self->{is_writing} = 0;
973            $self->{bufh} = shift;
974
975            # throw errors back to the user
976            if (! $self->{bufh}) {
977                Perlbal::log('critical', "Failure to open $fn for buffered upload output");
978                return $self->_simple_response(500);
979            }
980
981            # save state and info and bounce it back to write data
982            $self->{bufilename} = $fn;
983            $self->buffered_upload_update;
984        });
985
986        return;
987    }
988
989    # at this point, we want to do some writing
990    my $bref = shift(@{$self->{read_buf}});
991    my $len = length $$bref;
992    $self->{read_ahead} -= $len;
993
994    # so at this point we have a valid filename and file handle and should write out
995    # the buffer that we have
996    $self->{is_writing} = 1;
997    Perlbal::AIO::aio_write($self->{bufh}, $self->{buoutpos}, $len, $$bref, sub {
998        my $bytes = shift;
999        $self->{is_writing} = 0;
1000
1001        # check for error
1002        unless ($bytes) {
1003            Perlbal::log('critical', "Error writing buffered upload: $!.  Tried to do $len bytes at $self->{buoutpos}.");
1004            return $self->_simple_response(500);
1005        }
1006
1007        # update our count of data written
1008        $self->{buoutpos} += $bytes;
1009
1010        # now check if we wrote less than we had in this chunk of buffer.  if that's
1011        # the case then we need to reenqueue the part of the chunk that wasn't
1012        # written out and update as appropriate.
1013        if ($bytes < $len) {
1014            my $diff = $len - $bytes;
1015            unshift @{$self->{read_buf}}, \ substr($$bref, $bytes, $diff);
1016            $self->{read_ahead} += $diff;
1017        }
1018
1019        # if we're done (no clr and no read ahead!) then send it
1020        if ($self->{read_ahead} <= 0 && $self->{content_length_remain} <= 0) {
1021            $self->send_buffered_upload;
1022            return;
1023        }
1024
1025        # spawn another writer!
1026        $self->buffered_upload_update;
1027    });
1028}
1029
1030# destroy any files we've created
1031sub purge_buffered_upload {
1032    my Perlbal::ClientProxy $self = shift;
1033
1034    # FIXME: it's reported that sometimes the two now-in-eval blocks
1035    # fail, hence the eval blocks and warnings.  the FIXME is to
1036    # figure this out, why it happens sometimes.
1037
1038    # first close our filehandle... not async
1039    eval {
1040        CORE::close($self->{bufh});
1041    };
1042    if ($@) { warn "Error closing file in ClientProxy::purge_buffered_upload: $@\n"; }
1043
1044    $self->{bufh} = undef;
1045
1046    eval {
1047        # now asyncronously unlink the file
1048        Perlbal::AIO::aio_unlink($self->{bufilename}, sub {
1049            if ($!) {
1050                # note an error, but whatever, we'll either overwrite the file later (O_TRUNC | O_CREAT)
1051                # or a cleaner will come through and do it for us someday (if the user runs one)
1052                Perlbal::log('warning', "Unable to link $self->{bufilename}: $!");
1053              }
1054        });
1055    };
1056    if ($@) { warn "Error unlinking file in ClientProxy::purge_buffered_upload: $@\n"; }
1057}
1058
1059# returns bool; whether backend should hide the 500 error from the client
1060#   and have us try a new backend.  return true to retry, false to get a 500 error.
1061sub should_retry_after_500 {
1062    my Perlbal::ClientProxy $self = shift;
1063    my Perlbal::BackendHTTP $be   = shift;
1064    my $svc = $be->{service};
1065    return 0 unless $svc->{enable_error_retries};
1066    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1067    return 0 if ++$self->{retry_count} > @sched;
1068    return 1;
1069}
1070
1071# called by Backend to tell us it got a 500 error and we should retry another backend.
1072sub retry_after_500 {
1073    my Perlbal::ClientProxy $self = shift;
1074    my Perlbal::Service     $svc  = shift;
1075
1076    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1077    my $delay = $sched[$self->{retry_count} - 1];
1078
1079    if ($delay) {
1080        Danga::Socket->AddTimer($delay, sub {
1081            return if $self->{closed};
1082            $self->rerequest_backend;
1083        });
1084    } else {
1085        $self->rerequest_backend;
1086    }
1087
1088}
1089
1090sub as_string {
1091    my Perlbal::ClientProxy $self = shift;
1092
1093    my $ret = $self->SUPER::as_string;
1094    if ($self->{backend}) {
1095        my $ipport = $self->{backend}->{ipport};
1096        $ret .= "; backend=$ipport";
1097    } else {
1098        $ret .= "; write_buf_size=$self->{write_buf_size}"
1099            if $self->{write_buf_size} > 0;
1100    }
1101    $ret .= "; highpri" if $self->{high_priority};
1102    $ret .= "; lowpri" if $self->{low_priority};
1103    $ret .= "; responded" if $self->{responded};
1104    $ret .= "; waiting_for=" . $self->{content_length_remain}
1105        if defined $self->{content_length_remain};
1106    $ret .= "; reproxying" if $self->{currently_reproxying};
1107
1108    return $ret;
1109}
1110
1111sub DESTROY {
1112    Perlbal::objdtor($_[0]);
1113    $_[0]->SUPER::DESTROY;
1114}
1115
11161;
1117
1118
1119# Local Variables:
1120# mode: perl
1121# c-basic-indent: 4
1122# indent-tabs-mode: nil
1123# End:
Note: See TracBrowser for help on using the browser.