root/trunk/lib/Perlbal/ClientProxy.pm

Revision 823, 45.0 kB (checked in by ask, 5 months ago)

Fix obscure race condition (spontaneously closed keepalives after POST requests)

Author: Andreas J Koenig

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1######################################################################
2# HTTP Connection from a reverse proxy client
3#
4# Copyright 2004, Danga Interactive, Inc.
5# Copyright 2005-2007, Six Apart, Ltd.
6#
7package Perlbal::ClientProxy;
8use strict;
9use warnings;
10use base "Perlbal::ClientHTTPBase";
11no  warnings qw(deprecated);
12
13use Perlbal::ChunkedUploadState;
14use Perlbal::Util;
15
16use fields (
17            'backend',             # Perlbal::BackendHTTP object (or undef if disconnected)
18            'backend_requested',   # true if we've requested a backend for this request
19            'reconnect_count',     # number of times we've tried to reconnect to backend
20            'high_priority',       # boolean; 1 if we are or were in the high priority queue
21            'low_priority',        # boolean; 1 if we are or were in the low priority queue
22            'reproxy_uris',        # arrayref; URIs to reproxy to, in order
23            'reproxy_expected_size', # int: size of response we expect to get back for reproxy
24            'currently_reproxying',  # arrayref; the host info and URI we're reproxying right now
25            'content_length_remain', # int: amount of data we're still waiting for
26            'responded',           # bool: whether we've already sent a response to the user or not
27            'last_request_time',   # int: time that we last received a request
28            'primary_res_hdrs',  # if defined, we are doing a transparent reproxy-URI
29                                 # and the headers we get back aren't necessarily
30                                 # the ones we want.  instead, get most headers
31                                 # from the provided res headers object here.
32            'is_buffering',        # bool; if we're buffering some/all of a request to memory/disk
33            'is_writing',          # bool; if on, we currently have an aio_write out
34            'start_time',          # hi-res time when we started getting data to upload
35            'bufh',                # buffered upload filehandle object
36            'bufilename',          # string; buffered upload filename
37            'bureason',            # string; if defined, the reason we're buffering to disk
38            'buoutpos',            # int; buffered output position
39            'backend_stalled',   # boolean:  if backend has shut off its reads because we're too slow.
40            'unread_data_waiting',  # boolean:  if we shut off reads while we know data is yet to be read from client
41            'chunked_upload_state', # bool/obj:  if processing a chunked upload, Perlbal::ChunkedUploadState object, else undef
42            'request_body_length',  # integer:  request's body length, either as-declared,
43                                    #           or calculated after chunked upload is complete
44
45            # for perlbal sending out UDP packets related to upload status (for xmlhttprequest upload bar)
46            'last_upload_packet',  # unixtime we last sent a UDP upload packet
47            'upload_session',      # client's self-generated upload session
48
49            # error-retrying stuff
50            'retry_count',         # number of times we've retried this request so far after getting 500 errors
51            );
52
53use constant READ_SIZE         => 131072;    # 128k, ~common TCP window size?
54use constant READ_AHEAD_SIZE   =>  32768;    # kinda arbitrary.  sum of these two is max stored per connection while waiting for backend.
55use Errno qw( EPIPE ENOENT ECONNRESET EAGAIN );
56use POSIX qw( O_CREAT O_TRUNC O_RDWR O_RDONLY );
57use Time::HiRes qw( gettimeofday tv_interval );
58
59my $udp_sock;
60
61# ClientProxy
62sub new {
63    my Perlbal::ClientProxy $self = shift;
64    my ($service, $sock) = @_;
65    $self = fields::new($self) unless ref $self;
66    $self->SUPER::new($service,  $sock );
67
68    Perlbal::objctor($self);
69
70    $self->init;
71    $self->watch_read(1);
72    return $self;
73}
74
75sub new_from_base {
76    my $class = shift;
77    my Perlbal::ClientHTTPBase $cb = shift;
78    Perlbal::Util::rebless($cb, $class);
79    $cb->init;
80    $cb->watch_read(1);
81    $cb->handle_request;
82    return $cb;
83}
84
85sub init {
86    my Perlbal::ClientProxy $self = $_[0];
87
88    $self->{last_request_time} = 0;
89
90    $self->{backend} = undef;
91    $self->{high_priority} = 0;
92    $self->{low_priority} = 0;
93
94    $self->{responded} = 0;
95    $self->{unread_data_waiting} = 0;
96    $self->{content_length_remain} = undef;
97    $self->{backend_requested} = 0;
98
99    $self->{is_buffering} = 0;
100    $self->{is_writing} = 0;
101    $self->{start_time} = undef;
102    $self->{bufh} = undef;
103    $self->{bufilename} = undef;
104    $self->{buoutpos} = 0;
105    $self->{bureason} = undef;
106    $self->{chunked_upload_state} = undef;
107    $self->{request_body_length} = undef;
108
109    $self->{reproxy_uris} = undef;
110    $self->{reproxy_expected_size} = undef;
111    $self->{currently_reproxying} = undef;
112
113    $self->{retry_count} = 0;
114}
115
116# given a service name, re-request (GET/HEAD only) to that service, even though
117# you've already done a request to your original service
118sub start_reproxy_service {
119    my Perlbal::ClientProxy $self = $_[0];
120    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
121    my $svc_name = $_[2];
122
123    my $svc = $svc_name ? Perlbal->service($svc_name) : undef;
124    unless ($svc) {
125        $self->_simple_response(404, "Vhost twiddling not configured for requested pair.");
126        return 1;
127    }
128
129    $self->{backend_requested} = 0;
130    $self->{backend} = undef;
131    $self->{res_headers} = $primary_res_hdrs;
132
133    $svc->adopt_base_client($self);
134}
135
136# call this with a string of space separated URIs to start a process
137# that will fetch the item at the first and return it to the user,
138# on failure it will try the second, then third, etc
139sub start_reproxy_uri {
140    my Perlbal::ClientProxy $self = $_[0];
141    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
142    my $urls = $_[2];
143
144    # at this point we need to disconnect from our backend
145    $self->{backend} = undef;
146
147    # failure if we have no primary response headers
148    return unless $self->{primary_res_hdrs} ||= $primary_res_hdrs;
149
150    # construct reproxy_uri list
151    if (defined $urls) {
152        my @uris = split /\s+/, $urls;
153        $self->{currently_reproxying} = undef;
154        $self->{reproxy_uris} = [];
155        foreach my $uri (@uris) {
156            next unless $uri =~ m!^http://(.+?)(?::(\d+))?(/.*)?$!;
157            push @{$self->{reproxy_uris}}, [ $1, $2 || 80, $3 || '/' ];
158        }
159    }
160
161    # if we get in here and we have currently_reproxying defined, then something
162    # happened and we want to retry that one
163    if ($self->{currently_reproxying}) {
164        unshift @{$self->{reproxy_uris}}, $self->{currently_reproxying};
165        $self->{currently_reproxying} = undef;
166    }
167
168    # if we have no uris in our list now, tell the user 404
169    return $self->_simple_response(503)
170        unless @{$self->{reproxy_uris} || []};
171
172    # set the expected size if we got a content length in our headers
173    if ($primary_res_hdrs && (my $expected_size = $primary_res_hdrs->header('X-REPROXY-EXPECTED-SIZE'))) {
174        $self->{reproxy_expected_size} = $expected_size;
175    }
176
177    # pass ourselves off to the reproxy manager
178    $self->state('wait_backend');
179    Perlbal::ReproxyManager::do_reproxy($self);
180}
181
182# called by the reproxy manager when we can't get to our requested backend
183sub try_next_uri {
184    my Perlbal::ClientProxy $self = $_[0];
185
186    shift @{$self->{reproxy_uris}};
187    $self->{currently_reproxying} = undef;
188    $self->start_reproxy_uri();
189}
190
191# returns true if this ClientProxy is too many bytes behind the backend
192sub too_far_behind_backend {
193    my Perlbal::ClientProxy $self    = $_[0];
194    my Perlbal::BackendHTTP $backend = $self->{backend}   or return 0;
195
196    # if a backend doesn't have a service, it's a
197    # ReproxyManager-created backend, and thus it should use the
198    # 'buffer_size_reproxy_url' parameter for acceptable buffer
199    # widths, and not the regular 'buffer_size'.  this lets people
200    # tune buffers depending on the types of webservers.  (assumption
201    # being that reproxied-to webservers are event-based and it's okay
202    # to tie the up longer in favor of using less buffer memory in
203    # perlbal)
204    my $max_buffer = defined $backend->{service} ?
205        $self->{service}->{buffer_size} :
206        $self->{service}->{buffer_size_reproxy_url};
207
208    return $self->{write_buf_size} > $max_buffer;
209}
210
211# this is a callback for when a backend has been created and is
212# ready for us to do something with it
213sub use_reproxy_backend {
214    my Perlbal::ClientProxy $self = $_[0];
215    my Perlbal::BackendHTTP $be = $_[1];
216
217    # get a URI
218    my $datref = $self->{currently_reproxying} = shift @{$self->{reproxy_uris}};
219    unless (defined $datref) {
220        # return error and close the backend
221        $be->close('invalid_uris');
222        return $self->_simple_response(503);
223    }
224
225    # now send request
226    $self->{backend} = $be;
227    $be->{client} = $self;
228
229    my $extra_hdr = "";
230    if (my $range = $self->{req_headers}->header("Range")) {
231        $extra_hdr .= "Range: $range\r\n";
232    }
233    if (my $host = $self->{req_headers}->header("Host")) {
234        $extra_hdr .= "Host: $host\r\n";
235    }
236
237    my $req_method = $self->{req_headers}->request_method eq 'HEAD' ? 'HEAD' : 'GET';
238    my $headers = "$req_method $datref->[2] HTTP/1.0\r\nConnection: keep-alive\r\n${extra_hdr}\r\n";
239
240    $be->{req_headers} = Perlbal::HTTPHeaders->new(\$headers);
241    $be->state('sending_req');
242    $self->state('backend_req_sent');
243    $be->write($be->{req_headers}->to_string_ref);
244    $be->watch_read(1);
245    $be->watch_write(1);
246}
247
248# this is called when a transient backend getting a reproxied URI has received
249# a response from the server and is ready for us to deal with it
250sub backend_response_received {
251    my Perlbal::ClientProxy $self = $_[0];
252    my Perlbal::BackendHTTP $be = $_[1];
253
254    # a response means that we are no longer currently waiting on a reproxy, and
255    # don't want to retry this URI
256    $self->{currently_reproxying} = undef;
257
258    # we fail if we got something that's NOT a 2xx code, OR, if we expected
259    # a certain size and got back something different
260    my $code = $be->{res_headers}->response_code + 0;
261
262    my $bad_code = sub {
263        return 0 if $code >= 200 && $code <= 299;
264        return 0 if $code == 416;
265        return 1;
266    };
267
268    my $bad_size = sub {
269        return 0 unless defined $self->{reproxy_expected_size};
270        return $self->{reproxy_expected_size} != $be->{res_headers}->header('Content-length');
271    };
272
273    if ($bad_code->() || $bad_size->()) {
274        # fall back to an alternate URL
275        $be->{client} = undef;
276        $be->close('non_200_reproxy');
277        $self->try_next_uri;
278        return 1;
279    }
280    return 0;
281}
282
283sub start_reproxy_file {
284    my Perlbal::ClientProxy $self = shift;
285    my $file = shift;                      # filename to reproxy
286    my Perlbal::HTTPHeaders $hd = shift;   # headers from backend, in need of cleanup
287
288    # at this point we need to disconnect from our backend
289    $self->{backend} = undef;
290
291    # call hook for pre-reproxy
292    return if $self->{service}->run_hook("start_file_reproxy", $self, \$file);
293
294    # set our expected size
295    if (my $expected_size = $hd->header('X-REPROXY-EXPECTED-SIZE')) {
296        $self->{reproxy_expected_size} = $expected_size;
297    }
298
299    # start an async stat on the file
300    $self->state('wait_stat');
301    Perlbal::AIO::aio_stat($file, sub {
302
303        # if the client's since disconnected by the time we get the stat,
304        # just bail.
305        return if $self->{closed};
306
307        my $size = -s _;
308
309        unless ($size) {
310            # FIXME: POLICY: 404 or retry request to backend w/o reproxy-file capability?
311            return $self->_simple_response(404);
312        }
313        if (defined $self->{reproxy_expected_size} && $self->{reproxy_expected_size} != $size) {
314            # 404; the file size doesn't match what we expected
315            return $self->_simple_response(404);
316        }
317
318        # if the thing we're reproxying is indeed a file, advertise that
319        # we support byte ranges on it
320        if (-f _) {
321            $hd->header("Accept-Ranges", "bytes");
322        }
323
324        my ($status, $range_start, $range_end) = $self->{req_headers}->range($size);
325        my $not_satisfiable = 0;
326
327        if ($status == 416) {
328            $hd = Perlbal::HTTPHeaders->new_response(416);
329            $hd->header("Content-Range", $size ? "bytes */$size" : "*");
330            $not_satisfiable = 1;
331        }
332
333        # change the status code to 200 if the backend gave us 204 No Content
334        $hd->code(200) if $hd->response_code == 204;
335
336        # fixup the Content-Length header with the correct size (application
337        # doesn't need to provide a correct value if it doesn't want to stat())
338        if ($status == 200) {
339            $hd->header("Content-Length", $size);
340        } elsif ($status == 206) {
341            $hd->header("Content-Range", "bytes $range_start-$range_end/$size");
342            $hd->header("Content-Length", $range_end - $range_start + 1);
343            $hd->code(206);
344        }
345
346        # don't send this internal header to the client:
347        $hd->header('X-REPROXY-FILE', undef);
348
349        # rewrite some other parts of the header
350        $self->setup_keepalive($hd);
351
352        # just send the header, now that we cleaned it.
353        $self->{res_headers} = $hd;
354        $self->write($hd->to_string_ref);
355
356        if ($self->{req_headers}->request_method eq 'HEAD' || $not_satisfiable) {
357            $self->write(sub { $self->http_response_sent; });
358            return;
359        }
360
361        $self->state('wait_open');
362        Perlbal::AIO::aio_open($file, 0, 0 , sub {
363            my $fh = shift;
364
365            # if client's gone, just close filehandle and abort
366            if ($self->{closed}) {
367                CORE::close($fh) if $fh;
368                return;
369            }
370
371            # handle errors
372            if (! $fh) {
373                # FIXME: do 500 vs. 404 vs whatever based on $! ?
374                return $self->_simple_response(500);
375            }
376
377            # seek if partial content
378            if ($status == 206) {
379                sysseek($fh, $range_start, &POSIX::SEEK_SET);
380                $size = $range_end - $range_start + 1;
381            }
382
383            $self->reproxy_fh($fh, $size);
384            $self->watch_write(1);
385        });
386    });
387}
388
389# Client
390# get/set backend proxy connection
391sub backend {
392    my Perlbal::ClientProxy $self = shift;
393    return $self->{backend} unless @_;
394
395    my $backend = shift;
396    $self->state('draining_res') unless $backend;
397    return $self->{backend} = $backend;
398}
399
400# invoked by backend when it wants us to start watching for reads again
401# and feeding it data (if we have any)
402sub backend_ready {
403    my Perlbal::ClientProxy $self = $_[0];
404    my Perlbal::BackendHTTP $be = $_[1];
405
406    # if we'd turned ourselves off while we waited for a backend, turn
407    # ourselves back on, because the backend is ready for data now.
408    if ($self->{unread_data_waiting}) {
409        $self->watch_read(1);
410    }
411
412    # normal, not-buffered-to-disk case:
413    return $self->drain_read_buf_to($be) unless $self->{bureason};
414
415    # buffered-to-disk case.
416
417    # tell the backend it has to go into buffered_upload_mode,
418    # which makes it inform us of its writable availability
419    $be->invoke_buffered_upload_mode;
420}
421
422# our backend enqueues a call to this method in our write buffer, so this is called
423# right after we've finished sending all of the results to the user.  at this point,
424# if we were doing keep-alive, we don't close and setup for the next request.
425sub backend_finished {
426    my Perlbal::ClientProxy $self = shift;
427    print "ClientProxy::backend_finished\n" if Perlbal::DEBUG >= 3;
428
429    # mark ourselves as having responded (presumably if we're here,
430    # the backend has responded already)
431    $self->{responded} = 1;
432
433    # our backend is done with us, so we disconnect ourselves from it
434    $self->{backend} = undef;
435
436    # backend is done sending data to us, so we can recycle this clientproxy
437    # if we don't have any data yet to read
438    return $self->http_response_sent unless $self->{unread_data_waiting};
439
440    # if we get here (and we do, rarely, in practice) then that means
441    # the backend read was empty/disconnected (or otherwise messed up),
442    # and the only thing we can really do is close the client down.
443    $self->close("backend_finished_while_unread_data");
444}
445
446# Called when this client is entering a persist_wait state, but before we are returned to base.
447sub persist_wait {
448    my Perlbal::ClientProxy $self = $_[0];
449    # We're in keepalive, and just completed a proxy request
450    $self->{service}->run_hooks('end_proxy_request', $self);
451}
452
453# called when we've sent a response to a user fully and we need to reset state
454sub http_response_sent {
455    my Perlbal::ClientProxy $self = $_[0];
456
457    # persistence logic is in ClientHTTPBase
458    return 0 unless $self->SUPER::http_response_sent;
459
460    print "ClientProxy::http_response_sent -- resetting state\n" if Perlbal::DEBUG >= 3;
461
462    if (my $be = $self->{backend}) {
463        $self->{backend} = undef;
464        $be->forget_client;
465    }
466
467    # if we get here we're being persistent, reset our state
468    $self->{backend_requested} = 0;
469    $self->{high_priority} = 0;
470    $self->{reproxy_uris} = undef;
471    $self->{reproxy_expected_size} = undef;
472    $self->{currently_reproxying} = undef;
473    $self->{content_length_remain} = undef;
474    $self->{primary_res_hdrs} = undef;
475    $self->{responded} = 0;
476    $self->{is_buffering} = 0;
477    $self->{is_writing} = 0;
478    $self->{start_time} = undef;
479    $self->{bufh} = undef;
480    $self->{bufilename} = undef;
481    $self->{buoutpos} = 0;
482    $self->{bureason} = undef;
483    $self->{upload_session} = undef;
484    $self->{chunked_upload_state} = undef;
485    $self->{request_body_length} = undef;
486    return 1;
487}
488
489# to request a backend connection AFTER you've already done so, if you
490# didn't like the results from the first one.  (like after a 500 error)
491sub rerequest_backend {
492    my Perlbal::ClientProxy $self = shift;
493
494    $self->{backend_requested} = 0;
495    $self->{backend} = undef;
496    $self->request_backend;
497}
498
499sub request_backend {
500    my Perlbal::ClientProxy $self = shift;
501    return if $self->{backend_requested};
502    $self->{backend_requested} = 1;
503
504    $self->state('wait_backend');
505    $self->{service}->request_backend_connection($self);
506    $self->tcp_cork(1);  # cork writes to self
507}
508
509# Client (overrides and calls super)
510sub close {
511    my Perlbal::ClientProxy $self = shift;
512    my $reason = shift;
513
514    warn sprintf(
515                    "Perlbal::ClientProxy closed %s%s.\n",
516                    ( $self->{closed} ? "again " : "" ),
517                    (defined $reason ? "saying '$reason'" : "for an unknown reason")
518    ) if Perlbal::DEBUG >= 2;
519
520    # don't close twice
521    return if $self->{closed};
522
523    # signal that we're done
524    $self->{service}->run_hooks('end_proxy_request', $self);
525
526    # kill our backend if we still have one
527    if (my $backend = $self->{backend}) {
528        print "Client ($self) closing backend ($backend)\n" if Perlbal::DEBUG >= 1;
529        $self->backend(undef);
530        $backend->close($reason ? "proxied_from_client_close:$reason" : "proxied_from_client_close");
531    } else {
532        # if no backend, tell our service that we don't care for one anymore
533        $self->{service}->note_client_close($self);
534    }
535
536    # call ClientHTTPBase's close
537    $self->SUPER::close($reason);
538}
539
540sub client_disconnected { # : void
541    my Perlbal::ClientProxy $self = shift;
542    print "ClientProxy::client_disconnected\n" if Perlbal::DEBUG >= 2;
543
544    # if client disconnected, then we need to turn off watching for
545    # further reads and purge the existing upload if any. also, we
546    # should just return and do nothing else.
547
548    $self->watch_read(0);
549    $self->purge_buffered_upload if $self->{bureason};
550    return $self->close('user_disconnected');
551}
552
553# Client
554sub event_write {
555    my Perlbal::ClientProxy $self = shift;
556    print "ClientProxy::event_write\n" if Perlbal::DEBUG >= 3;
557
558    # obviously if we're writing the backend has processed our request
559    # and we are responding/have responded to the user, so mark it so
560    $self->{responded} = 1;
561
562    # will eventually, finally reset the whole object on completion
563    $self->SUPER::event_write;
564
565    # trigger our backend to keep reading, if it's still connected
566    if ($self->{backend_stalled} && (my $backend = $self->{backend})) {
567        print "  unstalling backend\n" if Perlbal::DEBUG >= 3;
568
569        $self->{backend_stalled} = 0;
570        $backend->watch_read(1);
571    }
572}
573
574# ClientProxy
575sub event_read {
576    my Perlbal::ClientProxy $self = shift;
577    print "ClientProxy::event_read\n" if Perlbal::DEBUG >= 3;
578
579    # mark alive so we don't get killed for being idle
580    $self->{alive_time} = time;
581
582    # if we have no headers, the only thing we can do is try to get some
583    if (! $self->{req_headers}) {
584        print "  no headers.  reading.\n" if Perlbal::DEBUG >= 3;
585        $self->handle_request if $self->read_request_headers;
586        return;
587    }
588
589    # if we're buffering to disk or haven't read too much from this client, keep reading,
590    # otherwise shut off read notifications
591    unless ($self->{is_buffering} || $self->{read_ahead} < READ_AHEAD_SIZE) {
592        # our buffer is full, so turn off reads for now
593        print "  disabling reads.\n" if Perlbal::DEBUG >= 3;
594        $self->watch_read(0);
595        return;
596    }
597
598    # deal with chunked uploads
599    if (my $cus = $self->{chunked_upload_state}) {
600        $cus->on_readable($self);
601
602        # if we got more than 1MB not flushed to disk,
603        # stop reading for a bit until disk catches up
604        if ($self->{read_ahead} > 1024*1024) {
605            $self->watch_read(0);
606        }
607        return;
608    }
609
610    # read more data if we're still buffering or if our current read buffer
611    # is not full to the max READ_AHEAD_SIZE which is how much data we will
612    # buffer in from the user before passing on to the backend
613
614    # read the MIN(READ_SIZE, content_length_remain)
615    my $read_size = READ_SIZE;
616    my $remain = $self->{content_length_remain};
617
618    $read_size = $remain if $remain && $remain < $read_size;
619    print "  reading $read_size bytes (", (defined $remain ? $remain : "(undef)"), " bytes remain)\n" if Perlbal::DEBUG >= 3;
620
621    my $bref = $self->read($read_size);
622
623    # if the read returned undef, that means the connection was closed
624    # (see: Danga::Socket::read)
625    return $self->client_disconnected unless defined $bref;
626
627    # if they didn't declare a content body length and we just got a
628    # readable event that's not a disconnect, something's messed up.
629    # they're overflowing us.  disconnect!
630    if (! $remain) {
631        $self->_simple_response(400, "Can't pipeline to HTTP/1.0");
632        $self->close("pipelining_to_http10");
633        return;
634    }
635
636    # now that we know we have a defined value, determine how long it is, and do
637    # housekeeping to keep our tracking numbers up to date.
638    my $len = length($$bref);
639    print "  read $len bytes\n" if Perlbal::DEBUG >= 3;
640
641    # when run under the program "trickle", epoll speaks the truth to
642    # us, but then trickle interferes and steals our reads/writes, so
643    # this fails.  normally this check isn't needed.
644    return unless $len;
645
646    $self->{read_size} += $len;
647    $self->{content_length_remain} -= $len if $remain;
648
649    my $done_reading = defined $self->{content_length_remain} && $self->{content_length_remain} <= 0;
650    my $backend = $self->backend;
651    print("  done_reading = $done_reading, backend = ", ($backend || "<undef>"), "\n") if Perlbal::DEBUG >= 3;
652
653    # upload tracking
654    if (my $session = $self->{upload_session}) {
655        my $cl = $self->{req_headers}->content_length;
656        my $remain = $self->{content_length_remain};
657        my $now = time();  # FIXME: more efficient?
658        if ($cl && $remain && ($self->{last_upload_packet} || 0) != $now) {
659            my $done = $cl - $remain;
660            $self->{last_upload_packet} = $now;
661            $udp_sock ||= IO::Socket::INET->new(Proto => 'udp');
662            my $since = $self->{last_request_time};
663            my $send = "UPLOAD:$session:$done:$cl:$since:$now";
664            if ($udp_sock) {
665                foreach my $ep (@{ $self->{service}{upload_status_listeners_sockaddr} }) {
666                    my $rv = $udp_sock->send($send, 0, $ep);
667                }
668            }
669        }
670    }
671
672    # just dump the read into the nether if we're dangling. that is
673    # the case when we send the headers to the backend and it responds
674    # before we're done reading from the client; therefore further
675    # reads from the client just need to be sent nowhere, because the
676    # RFC2616 section 8.2.3 says: "the server SHOULD NOT close the
677    # transport connection until it has read the entire request"
678    if ($self->{responded}) {
679        print "  already responded.\n" if Perlbal::DEBUG >= 3;
680        # in addition, if we're now out of data (clr == 0), then we should
681        # either close ourselves or get ready for another request
682        return $self->http_response_sent if $done_reading;
683
684        print "  already responded [2].\n" if Perlbal::DEBUG >= 3;
685        # at this point, if the backend has responded then we just return
686        # as we don't want to send it on to them or buffer it up, which is
687        # what the code below does
688        return;
689    }
690
691    # if we have no data left to read, stop reading.  all that can
692    # come later is an extra \r\n which we handle later when parsing
693    # new request headers.  and if it's something else, we'll bail on
694    # the next request, not this one.
695    if ($done_reading) {
696        Carp::confess("content_length_remain less than zero: self->{content_length_remain}")
697            if $self->{content_length_remain} < 0;
698        $self->{unread_data_waiting} = 0;
699        $self->watch_read(0);
700    }
701
702    # now, if we have a backend, then we should be writing it to the backend
703    # and not doing anything else
704    if ($backend) {
705        print "  got a backend.  sending write to it.\n" if Perlbal::DEBUG >= 3;
706        $backend->write($bref);
707        # TODO: monitor the backend's write buffer depth?
708        return;
709    }
710
711    # now, we know we don't have a backend, so we have to push this data onto our
712    # read buffer... it's not going anywhere yet
713    push @{$self->{read_buf}}, $bref;
714    $self->{read_ahead} += $len;
715    print "  no backend.  read_ahead = $self->{read_ahead}.\n" if Perlbal::DEBUG >= 3;
716
717    # if we know we've already started spooling a file to disk, then continue
718    # to do that.
719    print "  bureason = $self->{bureason}\n" if Perlbal::DEBUG >= 3 && $self->{bureason};
720    return $self->buffered_upload_update if $self->{bureason};
721
722    # if we are under our buffer-to-memory size, just continue buffering here and
723    # don't fall through to the backend request call below
724    return if
725        ! $done_reading &&
726        $self->{read_ahead} < $self->{service}->{buffer_backend_connect};
727
728    # over the buffer-to-memory size, see if we should start spooling to disk.
729    return if $self->{service}->{buffer_uploads} && $self->decide_to_buffer_to_disk;
730
731    # give plugins a chance to act on the request before we request a backend
732    # (added by Chris Hondl <chris@imvu.com>, March 2006)
733    my $svc = $self->{service};
734    return if $svc->run_hook('proxy_read_request', $self);
735
736    # if we fall through to here, we need to ensure that a backend is on the
737    # way, because no specialized handling took over above
738    print "  finally requesting a backend\n" if Perlbal::DEBUG >= 3;
739    return $self->request_backend;
740}
741
742sub handle_request {
743    my Perlbal::ClientProxy $self = shift;
744    my $req_hd = $self->{req_headers};
745
746    unless ($req_hd) {
747        $self->close("handle_request without headers");
748        return;
749    }
750
751    $self->check_req_headers;
752
753    my $svc = $self->{service};
754    # give plugins a chance to force us to bail
755    return if $svc->run_hook('start_proxy_request', $self);
756    return if $svc->run_hook('start_http_request',  $self);
757
758    if ($self->handle_chunked_upload) {
759        # handled in method.
760    } else {
761        # if defined we're waiting on some amount of data.  also, we have to
762        # subtract out read_size, which is the amount of data that was
763        # extra in the packet with the header that's part of the body.
764        my $length = $self->{request_body_length} =
765            $self->{content_length_remain} =
766            $req_hd->content_length;
767
768        if (defined $length && $length < 0) {
769            $self->_simple_response(400, "Invalid request: Content-Length < 0");
770            $self->close("negative_content_length");
771            return;
772        }
773
774        $self->{unread_data_waiting} = 1 if $self->{content_length_remain};
775    }
776
777    # upload-tracking stuff.  both starting a new upload track session,
778    # and checking on status of ongoing one
779    return if $svc->{upload_status_listeners} && $self->handle_upload_tracking;
780
781    # note that we've gotten a request
782    $self->{requests}++;
783    $self->{last_request_time} = $self->{alive_time};
784
785    # either start buffering some of the request to memory, or
786    # immediately request a backend connection.
787    if ($self->{chunked_upload_state}) {
788        $self->{request_body_length} = 0;
789        $self->{is_buffering} = 1;
790        $self->{bureason} = 'chunked';
791        $self->buffered_upload_update;
792    } elsif ($self->{content_length_remain} && $self->{service}->{buffer_backend_connect}) {
793        # the deeper path
794        $self->start_buffering_request;
795    } else {
796        # get the backend request process moving, since we aren't buffering
797        $self->{is_buffering} = 0;
798
799        # if reproxy-caching is enabled, we can often bypass needing to allocate a BackendHTTP connection:
800        return if $svc->{reproxy_cache} && $self->satisfy_request_from_cache;
801
802        $self->request_backend;
803    }
804}
805
806sub handle_chunked_upload {
807    my Perlbal::ClientProxy $self = shift;
808    my $req_hd = $self->{req_headers};
809    my $te = $req_hd->header("Transfer-Encoding");
810    return unless $te && $te eq "chunked";
811    return unless $self->{service}->{buffer_uploads};
812
813    $req_hd->header("Transfer-Encoding", undef); # remove it (won't go to backend)
814
815    my $eh = $req_hd->header("Expect");
816    if ($eh && $eh =~ /\b100-continue\b/) {
817        $self->write(\ "HTTP/1.1 100 Continue\r\n\r\n");
818        $req_hd->header("Expect", undef); # remove it (won't go to backend)
819    }
820
821    my $max_size = $self->{service}{max_chunked_request_size};
822
823    my $args = {
824        on_new_chunk => sub {
825            my $cref = shift;
826            my $len = length($$cref);
827            push @{$self->{read_buf}}, $cref;
828            $self->{read_ahead}          += $len;
829            $self->{request_body_length} += $len;
830
831            # if too large, disconnect them...
832            if ($max_size && $self->{request_body_length} > $max_size) {
833                $self->purge_buffered_upload;
834                $self->close;
835                return;
836            }
837            $self->buffered_upload_update;
838        },
839        on_disconnect => sub {
840            $self->client_disconnected;
841        },
842        on_zero_chunk => sub {
843            $self->send_buffered_upload;
844        },
845    };
846
847    $self->{chunked_upload_state} = Perlbal::ChunkedUploadState->new(%$args);
848    return 1;
849}
850
851sub satisfy_request_from_cache {
852    my Perlbal::ClientProxy $self = shift;
853
854    my $req_hd = $self->{req_headers};
855    my $svc    = $self->{service};
856    my $cache  = $svc->{reproxy_cache};
857    $svc->{_stat_requests}++;
858
859    my $requri   = $req_hd->request_uri    || '';
860    my $hostname = $req_hd->header("Host") || '';
861
862    my $key      = "$hostname|$requri";
863
864    my $reproxy  = $cache->get($key) or
865        return 0;
866
867    my ($timeout, $headers, $urls) = @$reproxy;
868    return 0 if time() > $timeout;
869
870    $svc->{_stat_cache_hits}++;
871    my %headers = map { ref $_ eq 'SCALAR' ? $$_ : $_ } @{$headers || []};
872
873    if (my $ims = $req_hd->header("If-Modified-Since")) {
874        my ($lm_key) = grep { uc($_) eq "LAST-MODIFIED" } keys %headers;
875        my $lm = $headers{$lm_key} || "";
876
877        # remove the IE length suffix
878        $ims =~ s/; length=(\d+)//;
879
880        # If 'Last-Modified' is same as 'If-Modified-Since', send a 304
881        if ($ims eq $lm) {
882            my $res_hd = Perlbal::HTTPHeaders->new_response(304);
883            $res_hd->header("Content-Length", "0");
884            $self->tcp_cork(1);
885            $self->state('xfer_resp');
886            $self->write($res_hd->to_string_ref);
887            $self->write(sub { $self->http_response_sent; });
888            return 1;
889        }
890    }
891
892    my $res_hd = Perlbal::HTTPHeaders->new_response(200);
893    $res_hd->header("Date", HTTP::Date::time2str(time()));
894    while (my ($key, $value) = each %headers) {
895        $res_hd->header($key, $value);
896    }
897
898    $self->start_reproxy_uri($res_hd, $urls);
899    return 1;
900}
901
902# return 1 to steal this connection (when they're asking status of an
903# upload session), return 0 to return it to handle_request's control.
904sub handle_upload_tracking {
905    my Perlbal::ClientProxy $self = shift;
906    my $req_hd = $self->{req_headers};
907
908    return 0 unless
909        $req_hd->request_uri =~ /[\?&]client_up_sess=(\w{5,50})\b/;
910
911    my $sess = $1;
912
913    # getting status?
914    if ($req_hd->request_uri =~ m!^/__upload_status\?!) {
915        my $status = Perlbal::UploadListener::get_status($sess);
916        my $now = time();
917        my $body = $status ?
918            "{done:$status->{done},total:$status->{total},starttime:$status->{starttime},nowtime:$now}" :
919            "{}";
920
921        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
922        $res->header("Content-Type", "text/plain");
923        $res->header('Content-Length', length $body);
924        $self->setup_keepalive($res);
925        $self->tcp_cork(1);  # cork writes to self
926        $self->write($res->to_string_ref);
927        $self->write(\ $body);
928        $self->write(sub { $self->http_response_sent; });
929        return 1;
930    }
931
932    # otherwise just tagging this upload as a new upload session
933    $self->{upload_session} = $sess;
934    return 0;
935}
936
937# continuation of handle_request, in the case where we need to start buffering
938# a bit of the request body to memory, either hoping that's all of it, or to
939# make a determination of whether or not we should save it all to disk first
940sub start_buffering_request {
941    my Perlbal::ClientProxy $self = shift;
942
943    # buffering case:
944    $self->{is_buffering} = 1;
945
946    # shortcut: if we know that we're buffering by size, and the size
947    # of this upload is bigger than that value, we can just turn on spool
948    # to disk right now...
949    if ($self->{service}->{buffer_uploads} && $self->{service}->{buffer_upload_threshold_size}) {
950        my $req_hd = $self->{req_headers};
951        if ($req_hd->content_length >= $self->{service}->{buffer_upload_threshold_size}) {
952            $self->{bureason} = 'size';
953            if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
954                $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', 'size');
955            }
956            $self->state('buffering_upload');
957            $self->buffered_upload_update;
958            return;
959        }
960    }
961
962    # well, we're buffering, but we're not going to disk just yet (but still might)
963    $self->state('buffering_request');
964
965    # only need time if we are using the buffer to disk functionality
966    $self->{start_time} = [ gettimeofday() ]
967        if $self->{service}->{buffer_uploads};
968}
969
970# looks at our states and decides if we should start writing to disk
971# or should just go ahead and blast this to the backend.  returns 1
972# if the decision was made to buffer to disk
973sub decide_to_buffer_to_disk {
974    my Perlbal::ClientProxy $self = shift;
975    return unless $self->{is_buffering};
976    return $self->{bureason} if defined $self->{bureason};
977
978    # this is called when we have enough data to determine whether or not to
979    # start buffering to disk
980    my $dur = tv_interval($self->{start_time}) || 1;
981    my $rate = $self->{read_ahead} / $dur;
982    my $etime = $self->{content_length_remain} / $rate;
983
984    # see if we have enough data to make the determination
985    my $reason = undef;
986
987    # see if we blow the rate away
988    if ($self->{service}->{buffer_upload_threshold_rate} > 0 &&
989            $rate < $self->{service}->{buffer_upload_threshold_rate}) {
990        # they are slower than the minimum rate
991        $reason = 'rate';
992    }
993
994    # and finally check estimated time exceeding
995    if ($self->{service}->{buffer_upload_threshold_time} > 0 &&
996            $etime > $self->{service}->{buffer_upload_threshold_time}) {
997        # exceeds
998        $reason = 'time';
999    }
1000
1001    unless ($reason) {
1002        $self->{is_buffering} = 0;
1003        return 0;
1004    }
1005
1006    # start saving it to disk
1007    $self->state('buffering_upload');
1008    $self->buffered_upload_update;
1009    $self->{bureason} = $reason;
1010
1011    if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
1012        $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', $reason);
1013    }
1014
1015    return 1;
1016}
1017
1018# take ourselves and send along our buffered data to the backend
1019sub send_buffered_upload {
1020    my Perlbal::ClientProxy $self = shift;
1021
1022    # make sure our buoutpos is the same as the content length...
1023    return if $self->{is_writing};
1024
1025    # set the content-length that goes to the backend...
1026    if ($self->{chunked_upload_state}) {
1027        $self->{req_headers}->header("Content-Length", $self->{request_body_length});
1028    }
1029
1030    my $clen = $self->{req_headers}->content_length;
1031    if ($clen != $self->{buoutpos}) {
1032        Perlbal::log('crit', "Content length of $clen declared but $self->{buoutpos} bytes written to disk");
1033        return $self->_simple_response(500);
1034    }
1035
1036    # reset our position so we start reading from the right spot
1037    $self->{buoutpos} = 0;
1038    sysseek($self->{bufh}, 0, 0) if ($self->{bufh}); # But only if it exists at all
1039
1040    # notify that we want the backend so we get the ball rolling
1041    $self->request_backend;
1042}
1043
1044sub continue_buffered_upload {
1045    my Perlbal::ClientProxy $self = shift;
1046    my Perlbal::BackendHTTP $be = shift;
1047    return unless $self && $be;
1048
1049    # now send the data
1050    my $clen = $self->{request_body_length};
1051
1052    if ($self->{buoutpos} < $clen) {
1053        my $sent = Perlbal::Socket::sendfile($be->{fd}, fileno($self->{bufh}), $clen - $self->{buoutpos});
1054        if ($sent < 0) {
1055            return $self->close("epipe") if $! == EPIPE;
1056            return $self->close("connreset") if $! == ECONNRESET;
1057            print STDERR "Error w/ sendfile: $!\n";
1058            return $self->close('sendfile_error');
1059        }
1060        $self->{buoutpos} += $sent;
1061    }
1062
1063    # if we're done, purge the file and move on
1064    if ($self->{buoutpos} >= $clen) {
1065        $be->{buffered_upload_mode} = 0;
1066        $self->purge_buffered_upload;
1067        return;
1068    }
1069
1070    # we will be called again by the backend since buffered_upload_mode is on
1071}
1072
1073# write data to disk
1074sub buffered_upload_update {
1075    my Perlbal::ClientProxy $self = shift;
1076    return if $self->{is_writing};
1077    return unless $self->{is_buffering} && $self->{read_ahead};
1078
1079    # so we're not writing now and we have data to write...
1080    unless ($self->{bufilename}) {
1081        # create a filename and see if it exists or not
1082        $self->{is_writing} = 1;
1083        my $fn = join('-', $self->{service}->name, $self->{service}->listenaddr, "client", $self->{fd}, int(rand(0xffffffff)));
1084        $fn = $self->{service}->{buffer_uploads_path} . '/' . $fn;
1085
1086        # good, now we need to create the file
1087        Perlbal::AIO::aio_open($fn, O_CREAT | O_TRUNC | O_RDWR, 0644, sub {
1088            $self->{is_writing} = 0;
1089            $self->{bufh} = shift;
1090
1091            # throw errors back to the user
1092            if (! $self->{bufh}) {
1093                Perlbal::log('crit', "Failure to open $fn for buffered upload output");
1094                return $self->_simple_response(500);
1095            }
1096
1097            # save state and info and bounce it back to write data
1098            $self->{bufilename} = $fn;
1099            $self->buffered_upload_update;
1100        });
1101
1102        return;
1103    }
1104
1105    # can't proceed if we have no disk file to async write to
1106    # people reported seeing this crash rarely in production...
1107    # must be a race between previously in-flight's write
1108    # re-invoking a write immediately after something triggered
1109    # a buffered upload purge.
1110    unless ($self->{bufh}) {
1111        $self->close;
1112        return;
1113    }
1114
1115    # at this point, we want to do some writing
1116    my $bref = shift(@{$self->{read_buf}});
1117    my $len = length $$bref;
1118    $self->{read_ahead} -= $len;
1119
1120    # so at this point we have a valid filename and file handle and should write out
1121    # the buffer that we have
1122    $self->{is_writing} = 1;
1123    Perlbal::AIO::aio_write($self->{bufh}, $self->{buoutpos}, $len, $$bref, sub {
1124        my $bytes = shift;
1125        $self->{is_writing} = 0;
1126
1127        # check for error
1128        unless ($bytes > 0) {
1129            Perlbal::log('crit', "Error writing buffered upload: $!.  Tried to do $len bytes at $self->{buoutpos}.");
1130            return $self->_simple_response(500);
1131        }
1132
1133        # update our count of data written
1134        $self->{buoutpos} += $bytes;
1135
1136        # now check if we wrote less than we had in this chunk of buffer.  if that's
1137        # the case then we need to re-enqueue the part of the chunk that wasn't
1138        # written out and update as appropriate.
1139        if ($bytes < $len) {
1140            my $diff = $len - $bytes;
1141            unshift @{$self->{read_buf}}, \ substr($$bref, $bytes, $diff);
1142            $self->{read_ahead} += $diff;
1143        }
1144
1145        # if we're processing a chunked upload, ...
1146        if ($self->{chunked_upload_state}) {
1147            # turn reads back on, if we haven't hit the end yet.
1148            if ($self->{unread_data_waiting} && $self->{read_ahead} < 1024*1024) {
1149                $self->watch_read(1);
1150                $self->{unread_data_waiting} = 0;
1151            }
1152
1153            if ($self->{read_ahead} == 0 && $self->{chunked_upload_state}->hit_zero_chunk) {
1154                $self->watch_read(0);
1155                $self->send_buffered_upload;
1156                return;
1157            }
1158        }
1159
1160        # if we're done (no clr and no read ahead!) then send it
1161        elsif ($self->{read_ahead} <= 0 && $self->{content_length_remain} <= 0) {
1162            $self->send_buffered_upload;
1163            return;
1164        }
1165
1166        # spawn another writer!
1167        $self->buffered_upload_update;
1168    });
1169}
1170
1171# destroy any files we've created
1172sub purge_buffered_upload {
1173    my Perlbal::ClientProxy $self = shift;
1174
1175    # Main reason for failure below is a 0-length chunked upload, where the file is never created.
1176    return unless $self->{bufh};
1177
1178    # FIXME: it's reported that sometimes the two now-in-eval blocks
1179    # fail, hence the eval blocks and warnings.  the FIXME is to
1180    # figure this out, why it happens sometimes.
1181
1182    # first close our filehandle... not async
1183    eval {
1184        CORE::close($self->{bufh});
1185    };
1186    if ($@) { warn "Error closing file in ClientProxy::purge_buffered_upload: $@\n"; }
1187
1188    $self->{bufh} = undef;
1189
1190    eval {
1191        # now asynchronously unlink the file
1192        Perlbal::AIO::aio_unlink($self->{bufilename}, sub {
1193            if ($!) {
1194                # note an error, but whatever, we'll either overwrite the file later (O_TRUNC | O_CREAT)
1195                # or a cleaner will come through and do it for us someday (if the user runs one)
1196                Perlbal::log('warning', "Unable to link $self->{bufilename}: $!");
1197              }
1198        });
1199    };
1200    if ($@) { warn "Error unlinking file in ClientProxy::purge_buffered_upload: $@\n"; }
1201}
1202
1203# returns bool; whether backend should hide the 500 error from the client
1204#   and have us try a new backend.  return true to retry, false to get a 500 error.
1205sub should_retry_after_500 {
1206    my Perlbal::ClientProxy $self = shift;
1207    my Perlbal::BackendHTTP $be   = shift;
1208    my $svc = $be->{service};
1209    return 0 unless $svc->{enable_error_retries};
1210    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1211    return 0 if ++$self->{retry_count} > @sched;
1212    return 1;
1213}
1214
1215# called by Backend to tell us it got a 500 error and we should retry another backend.
1216sub retry_after_500 {
1217    my Perlbal::ClientProxy $self = shift;
1218    my Perlbal::Service     $svc  = shift;
1219
1220    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1221    my $delay = $sched[$self->{retry_count} - 1];
1222
1223    if ($delay) {
1224        Danga::Socket->AddTimer($delay, sub {
1225            return if $self->{closed};
1226            $self->rerequest_backend;
1227        });
1228    } else {
1229        $self->rerequest_backend;
1230    }
1231
1232}
1233
1234sub as_string {
1235    my Perlbal::ClientProxy $self = shift;
1236
1237    my $ret = $self->SUPER::as_string;
1238    if ($self->{backend}) {
1239        my $ipport = $self->{backend}->{ipport};
1240        $ret .= "; backend=$ipport";
1241    } else {
1242        $ret .= "; write_buf_size=$self->{write_buf_size}"
1243            if $self->{write_buf_size} > 0;
1244    }
1245    $ret .= "; highpri" if $self->{high_priority};
1246    $ret .= "; lowpri" if $self->{low_priority};
1247    $ret .= "; responded" if $self->{responded};
1248    $ret .= "; waiting_for=" . $self->{content_length_remain}
1249        if defined $self->{content_length_remain};
1250    $ret .= "; reproxying" if $self->{currently_reproxying};
1251
1252    return $ret;
1253}
1254
1255sub set_queue_low {
1256    my Perlbal::ClientProxy $self = shift;
1257    $self->{low_priority} = 1;
1258    return;
1259}
1260
1261sub set_queue_high {
1262    my Perlbal::ClientProxy $self = shift;
1263    $self->{high_priority} = 1;
1264    return;
1265}
1266
1267
1268sub DESTROY {
1269    Perlbal::objdtor($_[0]);
1270    $_[0]->SUPER::DESTROY;
1271}
1272
12731;
1274
1275
1276# Local Variables:
1277# mode: perl
1278# c-basic-indent: 4
1279# indent-tabs-mode: nil
1280# End:
Note: See TracBrowser for help on using the browser.