root/trunk/lib/Perlbal/ClientProxy.pm @ 763

Revision 763, 44.2 kB (checked in by bradfitz, 21 months ago)

Patch from Andy Armstrong <andy@…> to make Perlbal pass tests
on Perl 5.10, for upcoming Fedora Core release's deadline.

Thanks Andy!

(just cleaned up style to be consistent wrt spaces around parens,
siding in favor of removing them to match rest of Perlbal's style)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1######################################################################
2# HTTP Connection from a reverse proxy client
3#
4# Copyright 2004, Danga Interactive, Inc.
5# Copyright 2005-2007, Six Apart, Ltd.
6#
7package Perlbal::ClientProxy;
8use strict;
9use warnings;
10use base "Perlbal::ClientHTTPBase";
11no  warnings qw(deprecated);
12
13use Perlbal::ChunkedUploadState;
14use Perlbal::Util;
15
16use fields (
17            'backend',             # Perlbal::BackendHTTP object (or undef if disconnected)
18            'backend_requested',   # true if we've requested a backend for this request
19            'reconnect_count',     # number of times we've tried to reconnect to backend
20            'high_priority',       # boolean; 1 if we are or were in the high priority queue
21            'low_priority',        # boolean; 1 if we are or were in the low priority queue
22            'reproxy_uris',        # arrayref; URIs to reproxy to, in order
23            'reproxy_expected_size', # int: size of response we expect to get back for reproxy
24            'currently_reproxying',  # arrayref; the host info and URI we're reproxying right now
25            'content_length_remain', # int: amount of data we're still waiting for
26            'responded',           # bool: whether we've already sent a response to the user or not
27            'last_request_time',   # int: time that we last received a request
28            'primary_res_hdrs',  # if defined, we are doing a transparent reproxy-URI
29                                 # and the headers we get back aren't necessarily
30                                 # the ones we want.  instead, get most headers
31                                 # from the provided res headers object here.
32            'is_buffering',        # bool; if we're buffering some/all of a request to memory/disk
33            'is_writing',          # bool; if on, we currently have an aio_write out
34            'start_time',          # hi-res time when we started getting data to upload
35            'bufh',                # buffered upload filehandle object
36            'bufilename',          # string; buffered upload filename
37            'bureason',            # string; if defined, the reason we're buffering to disk
38            'buoutpos',            # int; buffered output position
39            'backend_stalled',   # boolean:  if backend has shut off its reads because we're too slow.
40            'unread_data_waiting',  # boolean:  if we shut off reads while we know data is yet to be read from client
41            'chunked_upload_state', # bool/obj:  if processing a chunked upload, Perlbal::ChunkedUploadState object, else undef
42            'request_body_length',  # integer:  request's body length, either as-declared,
43                                    #           or calculated after chunked upload is complete
44
45            # for perlbal sending out UDP packets related to upload status (for xmlhttprequest upload bar)
46            'last_upload_packet',  # unixtime we last sent a UDP upload packet
47            'upload_session',      # client's self-generated upload session
48
49            # error-retrying stuff
50            'retry_count',         # number of times we've retried this request so far after getting 500 errors
51            );
52
53use constant READ_SIZE         => 131072;    # 128k, ~common TCP window size?
54use constant READ_AHEAD_SIZE   =>  32768;    # kinda arbitrary.  sum of these two is max stored per connection while waiting for backend.
55use Errno qw( EPIPE ENOENT ECONNRESET EAGAIN );
56use POSIX qw( O_CREAT O_TRUNC O_RDWR O_RDONLY );
57use Time::HiRes qw( gettimeofday tv_interval );
58
59my $udp_sock;
60
61# ClientProxy
62sub new {
63    my Perlbal::ClientProxy $self = shift;
64    my ($service, $sock) = @_;
65    $self = fields::new($self) unless ref $self;
66    $self->SUPER::new($service,  $sock );
67
68    Perlbal::objctor($self);
69
70    $self->init;
71    $self->watch_read(1);
72    return $self;
73}
74
75sub new_from_base {
76    my $class = shift;
77    my Perlbal::ClientHTTPBase $cb = shift;
78    Perlbal::Util::rebless($cb, $class);
79    $cb->init;
80    $cb->watch_read(1);
81    $cb->handle_request;
82    return $cb;
83}
84
85sub init {
86    my Perlbal::ClientProxy $self = $_[0];
87
88    $self->{last_request_time} = 0;
89
90    $self->{backend} = undef;
91    $self->{high_priority} = 0;
92    $self->{low_priority} = 0;
93
94    $self->{responded} = 0;
95    $self->{unread_data_waiting} = 0;
96    $self->{content_length_remain} = undef;
97    $self->{backend_requested} = 0;
98
99    $self->{is_buffering} = 0;
100    $self->{is_writing} = 0;
101    $self->{start_time} = undef;
102    $self->{bufh} = undef;
103    $self->{bufilename} = undef;
104    $self->{buoutpos} = 0;
105    $self->{bureason} = undef;
106    $self->{chunked_upload_state} = undef;
107    $self->{request_body_length} = undef;
108
109    $self->{reproxy_uris} = undef;
110    $self->{reproxy_expected_size} = undef;
111    $self->{currently_reproxying} = undef;
112
113    $self->{retry_count} = 0;
114}
115
116# given a service name, re-request (GET/HEAD only) to that service, even though
117# you've already done a request to your original service
118sub start_reproxy_service {
119    my Perlbal::ClientProxy $self = $_[0];
120    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
121    my $svc_name = $_[2];
122
123    my $svc = $svc_name ? Perlbal->service($svc_name) : undef;
124    unless ($svc) {
125        $self->_simple_response(404, "Vhost twiddling not configured for requested pair.");
126        return 1;
127    }
128
129    $self->{backend_requested} = 0;
130    $self->{backend} = undef;
131    $self->{res_headers} = $primary_res_hdrs;
132
133    $svc->adopt_base_client($self);
134}
135
136# call this with a string of space separated URIs to start a process
137# that will fetch the item at the first and return it to the user,
138# on failure it will try the second, then third, etc
139sub start_reproxy_uri {
140    my Perlbal::ClientProxy $self = $_[0];
141    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
142    my $urls = $_[2];
143
144    # at this point we need to disconnect from our backend
145    $self->{backend} = undef;
146
147    # failure if we have no primary response headers
148    return unless $self->{primary_res_hdrs} ||= $primary_res_hdrs;
149
150    # construct reproxy_uri list
151    if (defined $urls) {
152        my @uris = split /\s+/, $urls;
153        $self->{currently_reproxying} = undef;
154        $self->{reproxy_uris} = [];
155        foreach my $uri (@uris) {
156            next unless $uri =~ m!^http://(.+?)(?::(\d+))?(/.*)?$!;
157            push @{$self->{reproxy_uris}}, [ $1, $2 || 80, $3 || '/' ];
158        }
159    }
160
161    # if we get in here and we have currently_reproxying defined, then something
162    # happened and we want to retry that one
163    if ($self->{currently_reproxying}) {
164        unshift @{$self->{reproxy_uris}}, $self->{currently_reproxying};
165        $self->{currently_reproxying} = undef;
166    }
167
168    # if we have no uris in our list now, tell the user 404
169    return $self->_simple_response(503)
170        unless @{$self->{reproxy_uris} || []};
171
172    # set the expected size if we got a content length in our headers
173    if ($primary_res_hdrs && (my $expected_size = $primary_res_hdrs->header('X-REPROXY-EXPECTED-SIZE'))) {
174        $self->{reproxy_expected_size} = $expected_size;
175    }
176
177    # pass ourselves off to the reproxy manager
178    $self->state('wait_backend');
179    Perlbal::ReproxyManager::do_reproxy($self);
180}
181
182# called by the reproxy manager when we can't get to our requested backend
183sub try_next_uri {
184    my Perlbal::ClientProxy $self = $_[0];
185
186    shift @{$self->{reproxy_uris}};
187    $self->{currently_reproxying} = undef;
188    $self->start_reproxy_uri();
189}
190
191# returns true if this ClientProxy is too many bytes behind the backend
192sub too_far_behind_backend {
193    my Perlbal::ClientProxy $self    = $_[0];
194    my Perlbal::BackendHTTP $backend = $self->{backend}   or return 0;
195
196    # if a backend doesn't have a service, it's a
197    # ReproxyManager-created backend, and thus it should use the
198    # 'buffer_size_reproxy_url' parameter for acceptable buffer
199    # widths, and not the regular 'buffer_size'.  this lets people
200    # tune buffers depending on the types of webservers.  (assumption
201    # being that reproxied-to webservers are event-based and it's okay
202    # to tie the up longer in favor of using less buffer memory in
203    # perlbal)
204    my $max_buffer = defined $backend->{service} ?
205        $self->{service}->{buffer_size} :
206        $self->{service}->{buffer_size_reproxy_url};
207
208    return $self->{write_buf_size} > $max_buffer;
209}
210
211# this is a callback for when a backend has been created and is
212# ready for us to do something with it
213sub use_reproxy_backend {
214    my Perlbal::ClientProxy $self = $_[0];
215    my Perlbal::BackendHTTP $be = $_[1];
216
217    # get a URI
218    my $datref = $self->{currently_reproxying} = shift @{$self->{reproxy_uris}};
219    unless (defined $datref) {
220        # return error and close the backend
221        $be->close('invalid_uris');
222        return $self->_simple_response(503);
223    }
224
225    # now send request
226    $self->{backend} = $be;
227    $be->{client} = $self;
228
229    my $extra_hdr = "";
230    if (my $range = $self->{req_headers}->header("Range")) {
231        $extra_hdr .= "Range: $range\r\n";
232    }
233    if (my $host = $self->{req_headers}->header("Host")) {
234        $extra_hdr .= "Host: $host\r\n";
235    }
236
237    my $req_method = $self->{req_headers}->request_method eq 'HEAD' ? 'HEAD' : 'GET';
238    my $headers = "$req_method $datref->[2] HTTP/1.0\r\nConnection: keep-alive\r\n${extra_hdr}\r\n";
239
240    $be->{req_headers} = Perlbal::HTTPHeaders->new(\$headers);
241    $be->state('sending_req');
242    $self->state('backend_req_sent');
243    $be->write($be->{req_headers}->to_string_ref);
244    $be->watch_read(1);
245    $be->watch_write(1);
246}
247
248# this is called when a transient backend getting a reproxied URI has received
249# a response from the server and is ready for us to deal with it
250sub backend_response_received {
251    my Perlbal::ClientProxy $self = $_[0];
252    my Perlbal::BackendHTTP $be = $_[1];
253
254    # a response means that we are no longer currently waiting on a reproxy, and
255    # don't want to retry this URI
256    $self->{currently_reproxying} = undef;
257
258    # we fail if we got something that's NOT a 2xx code, OR, if we expected
259    # a certain size and got back something different
260    my $code = $be->{res_headers}->response_code + 0;
261
262    my $bad_code = sub {
263        return 0 if $code >= 200 && $code <= 299;
264        return 0 if $code == 416;
265        return 1;
266    };
267
268    my $bad_size = sub {
269        return 0 unless defined $self->{reproxy_expected_size};
270        return $self->{reproxy_expected_size} != $be->{res_headers}->header('Content-length');
271    };
272
273    if ($bad_code->() || $bad_size->()) {
274        # fall back to an alternate URL
275        $be->{client} = undef;
276        $be->close('non_200_reproxy');
277        $self->try_next_uri;
278        return 1;
279    }
280    return 0;
281}
282
283sub start_reproxy_file {
284    my Perlbal::ClientProxy $self = shift;
285    my $file = shift;                      # filename to reproxy
286    my Perlbal::HTTPHeaders $hd = shift;   # headers from backend, in need of cleanup
287
288    # at this point we need to disconnect from our backend
289    $self->{backend} = undef;
290
291    # call hook for pre-reproxy
292    return if $self->{service}->run_hook("start_file_reproxy", $self, \$file);
293
294    # set our expected size
295    if (my $expected_size = $hd->header('X-REPROXY-EXPECTED-SIZE')) {
296        $self->{reproxy_expected_size} = $expected_size;
297    }
298
299    # start an async stat on the file
300    $self->state('wait_stat');
301    Perlbal::AIO::aio_stat($file, sub {
302
303        # if the client's since disconnected by the time we get the stat,
304        # just bail.
305        return if $self->{closed};
306
307        my $size = -s _;
308
309        unless ($size) {
310            # FIXME: POLICY: 404 or retry request to backend w/o reproxy-file capability?
311            return $self->_simple_response(404);
312        }
313        if (defined $self->{reproxy_expected_size} && $self->{reproxy_expected_size} != $size) {
314            # 404; the file size doesn't match what we expected
315            return $self->_simple_response(404);
316        }
317
318        # if the thing we're reproxying is indeed a file, advertise that
319        # we support byteranges on it
320        if (-f _) {
321            $hd->header("Accept-Ranges", "bytes");
322        }
323
324        my ($status, $range_start, $range_end) = $self->{req_headers}->range($size);
325        my $not_satisfiable = 0;
326
327        if ($status == 416) {
328            $hd = Perlbal::HTTPHeaders->new_response(416);
329            $hd->header("Content-Range", $size ? "bytes */$size" : "*");
330            $not_satisfiable = 1;
331        }
332
333        # change the status code to 200 if the backend gave us 204 No Content
334        $hd->code(200) if $hd->response_code == 204;
335
336        # fixup the Content-Length header with the correct size (application
337        # doesn't need to provide a correct value if it doesn't want to stat())
338        if ($status == 200) {
339            $hd->header("Content-Length", $size);
340        } elsif ($status == 206) {
341            $hd->header("Content-Range", "bytes $range_start-$range_end/$size");
342            $hd->header("Content-Length", $range_end - $range_start + 1);
343            $hd->code(206);
344        }
345
346        # don't send this internal header to the client:
347        $hd->header('X-REPROXY-FILE', undef);
348
349        # rewrite some other parts of the header
350        $self->setup_keepalive($hd);
351
352        # just send the header, now that we cleaned it.
353        $self->{res_headers} = $hd;
354        $self->write($hd->to_string_ref);
355
356        if ($self->{req_headers}->request_method eq 'HEAD' || $not_satisfiable) {
357            $self->write(sub { $self->http_response_sent; });
358            return;
359        }
360
361        $self->state('wait_open');
362        Perlbal::AIO::aio_open($file, 0, 0 , sub {
363            my $fh = shift;
364
365            # if client's gone, just close filehandle and abort
366            if ($self->{closed}) {
367                CORE::close($fh) if $fh;
368                return;
369            }
370
371            # handle errors
372            if (! $fh) {
373                # FIXME: do 500 vs. 404 vs whatever based on $! ?
374                return $self->_simple_response(500);
375            }
376
377            # seek if partial content
378            if ($status == 206) {
379                sysseek($fh, $range_start, &POSIX::SEEK_SET);
380                $size = $range_end - $range_start + 1;
381            }
382
383            $self->reproxy_fh($fh, $size);
384            $self->watch_write(1);
385        });
386    });
387}
388
389# Client
390# get/set backend proxy connection
391sub backend {
392    my Perlbal::ClientProxy $self = shift;
393    return $self->{backend} unless @_;
394
395    my $backend = shift;
396    $self->state('draining_res') unless $backend;
397    return $self->{backend} = $backend;
398}
399
400# invoked by backend when it wants us to start watching for reads again
401# and feeding it data (if we have any)
402sub backend_ready {
403    my Perlbal::ClientProxy $self = $_[0];
404    my Perlbal::BackendHTTP $be = $_[1];
405
406    # if we'd turned ourselves off while we waited for a backend, turn
407    # ourselves back on, because the backend is ready for data now.
408    if ($self->{unread_data_waiting}) {
409        $self->watch_read(1);
410    }
411
412    # normal, not-buffered-to-disk case:
413    return $self->drain_read_buf_to($be) unless $self->{bureason};
414
415    # buffered-to-disk case.
416
417    # tell the backend it has to go into buffered_upload_mode,
418    # which makes it inform us of its writable availability
419    $be->invoke_buffered_upload_mode;
420}
421
422# our backend enqueues a call to this method in our write buffer, so this is called
423# right after we've finished sending all of the results to the user.  at this point,
424# if we were doing keep-alive, we don't close and setup for the next request.
425sub backend_finished {
426    my Perlbal::ClientProxy $self = shift;
427    print "ClientProxy::backend_finished\n" if Perlbal::DEBUG >= 3;
428
429    # mark ourselves as having responded (presumeably if we're here,
430    # the backend has responded already)
431    $self->{responded} = 1;
432
433    # our backend is done with us, so we disconnect ourselves from it
434    $self->{backend} = undef;
435
436    # backend is done sending data to us, so we can recycle this clientproxy
437    # if we don't have any data yet to read
438    return $self->http_response_sent unless $self->{unread_data_waiting};
439
440    # if we get here (and we do, rarely, in practice) then that means
441    # the backend read was empty/disconected (or otherwise messed up),
442    # and the only thing we can really do is close the client down.
443    $self->close("backend_finished_while_unread_data");
444}
445
446# called when we've sent a response to a user fully and we need to reset state
447sub http_response_sent {
448    my Perlbal::ClientProxy $self = $_[0];
449
450    # persistence logic is in ClientHTTPBase
451    return 0 unless $self->SUPER::http_response_sent;
452
453    print "ClientProxy::http_response_sent -- resetting state\n" if Perlbal::DEBUG >= 3;
454
455    if (my $be = $self->{backend}) {
456        $self->{backend} = undef;
457        $be->forget_client;
458    }
459
460    # if we get here we're being persistent, reset our state
461    $self->{backend_requested} = 0;
462    $self->{high_priority} = 0;
463    $self->{reproxy_uris} = undef;
464    $self->{reproxy_expected_size} = undef;
465    $self->{currently_reproxying} = undef;
466    $self->{content_length_remain} = undef;
467    $self->{primary_res_hdrs} = undef;
468    $self->{responded} = 0;
469    $self->{is_buffering} = 0;
470    $self->{is_writing} = 0;
471    $self->{start_time} = undef;
472    $self->{bufh} = undef;
473    $self->{bufilename} = undef;
474    $self->{buoutpos} = 0;
475    $self->{bureason} = undef;
476    $self->{upload_session} = undef;
477    $self->{chunked_upload_state} = undef;
478    $self->{request_body_length} = undef;
479    return 1;
480}
481
482# to request a backend connection AFTER you've already done so, if you
483# didn't like the results from the first one.  (like after a 500 error)
484sub rerequest_backend {
485    my Perlbal::ClientProxy $self = shift;
486
487    $self->{backend_requested} = 0;
488    $self->{backend} = undef;
489    $self->request_backend;
490}
491
492sub request_backend {
493    my Perlbal::ClientProxy $self = shift;
494    return if $self->{backend_requested};
495    $self->{backend_requested} = 1;
496
497    $self->state('wait_backend');
498    $self->{service}->request_backend_connection($self);
499    $self->tcp_cork(1);  # cork writes to self
500}
501
502# Client (overrides and calls super)
503sub close {
504    my Perlbal::ClientProxy $self = shift;
505    my $reason = shift;
506
507    warn sprintf(
508                    "Perlbal::ClientProxy closed %s%s.\n",
509                    ( $self->{closed} ? "again " : "" ),
510                    (defined $reason ? "saying '$reason'" : "for an unknown reason")
511    ) if Perlbal::DEBUG >= 2;
512
513    # don't close twice
514    return if $self->{closed};
515
516    # signal that we're done
517    $self->{service}->run_hooks('end_proxy_request', $self);
518
519    # kill our backend if we still have one
520    if (my $backend = $self->{backend}) {
521        print "Client ($self) closing backend ($backend)\n" if Perlbal::DEBUG >= 1;
522        $self->backend(undef);
523        $backend->close($reason ? "proxied_from_client_close:$reason" : "proxied_from_client_close");
524    } else {
525        # if no backend, tell our service that we don't care for one anymore
526        $self->{service}->note_client_close($self);
527    }
528
529    # call ClientHTTPBase's close
530    $self->SUPER::close($reason);
531}
532
533sub client_disconnected { # : void
534    my Perlbal::ClientProxy $self = shift;
535    print "ClientProxy::client_disconnected\n" if Perlbal::DEBUG >= 2;
536
537    # if client disconnected, then we need to turn off watching for
538    # further reads and purge the existing upload if any. also, we
539    # should just return and do nothing else.
540
541    $self->watch_read(0);
542    $self->purge_buffered_upload if $self->{bureason};
543    return $self->close('user_disconnected');
544}
545
546# Client
547sub event_write {
548    my Perlbal::ClientProxy $self = shift;
549    print "ClientProxy::event_write\n" if Perlbal::DEBUG >= 3;
550
551    $self->SUPER::event_write;
552
553    # obviously if we're writing the backend has processed our request
554    # and we are responding/have responded to the user, so mark it so
555    $self->{responded} = 1;
556
557    # trigger our backend to keep reading, if it's still connected
558    if ($self->{backend_stalled} && (my $backend = $self->{backend})) {
559        print "  unstalling backend\n" if Perlbal::DEBUG >= 3;
560
561        $self->{backend_stalled} = 0;
562        $backend->watch_read(1);
563    }
564}
565
566# ClientProxy
567sub event_read {
568    my Perlbal::ClientProxy $self = shift;
569    print "ClientProxy::event_read\n" if Perlbal::DEBUG >= 3;
570
571    # mark alive so we don't get killed for being idle
572    $self->{alive_time} = time;
573
574    # if we have no headers, the only thing we can do is try to get some
575    if (! $self->{req_headers}) {
576        print "  no headers.  reading.\n" if Perlbal::DEBUG >= 3;
577        $self->handle_request if $self->read_request_headers;
578        return;
579    }
580
581    # if we're buffering to disk or haven't read too much from this client, keep reading,
582    # otherwise shut off read notifications
583    unless ($self->{is_buffering} || $self->{read_ahead} < READ_AHEAD_SIZE) {
584        # our buffer is full, so turn off reads for now
585        print "  disabling reads.\n" if Perlbal::DEBUG >= 3;
586        $self->watch_read(0);
587        return;
588    }
589
590    # deal with chunked uploads
591    if (my $cus = $self->{chunked_upload_state}) {
592        $cus->on_readable($self);
593
594        # if we got more than 1MB not flushed to disk,
595        # stop reading for a bit until disk catches up
596        if ($self->{read_ahead} > 1024*1024) {
597            $self->watch_read(0);
598        }
599        return;
600    }
601
602    # read more data if we're still buffering or if our current read buffer
603    # is not full to the max READ_AHEAD_SIZE which is how much data we will
604    # buffer in from the user before passing on to the backend
605
606    # read the MIN(READ_SIZE, content_length_remain)
607    my $read_size = READ_SIZE;
608    my $remain = $self->{content_length_remain};
609
610    $read_size = $remain if $remain && $remain < $read_size;
611    print "  reading $read_size bytes (", (defined $remain ? $remain : "(undef)"), " bytes remain)\n" if Perlbal::DEBUG >= 3;
612
613    my $bref = $self->read($read_size);
614
615    # if the read returned undef, that means the connection was closed
616    # (see: Danga::Socket::read)
617    return $self->client_disconnected unless defined $bref;
618
619    # if they didn't declare a content body length and we just got a
620    # readable event that's not a disconnect, something's messed up.
621    # they're overflowing us.  disconnect!
622    if (! $remain) {
623        $self->_simple_response(400, "Can't pipeline to HTTP/1.0");
624        $self->close("pipelining_to_http10");
625        return;
626    }
627
628    # now that we know we have a defined value, determine how long it is, and do
629    # housekeeping to keep our tracking numbers up to date.
630    my $len = length($$bref);
631    print "  read $len bytes\n" if Perlbal::DEBUG >= 3;
632
633    # when run under the program "trickle", epoll speaks the truth to
634    # us, but then trickle interferes and steals our reads/writes, so
635    # this fails.  normally this check isn't needed.
636    return unless $len;
637
638    $self->{read_size} += $len;
639    $self->{content_length_remain} -= $len if $remain;
640
641    my $done_reading = defined $self->{content_length_remain} && $self->{content_length_remain} <= 0;
642    my $backend = $self->backend;
643    print("  done_reading = $done_reading, backend = ", ($backend || "<undef>"), "\n") if Perlbal::DEBUG >= 3;
644
645    # upload tracking
646    if (my $session = $self->{upload_session}) {
647        my $cl = $self->{req_headers}->content_length;
648        my $remain = $self->{content_length_remain};
649        my $now = time();  # FIXME: more efficient?
650        if ($cl && $remain && ($self->{last_upload_packet} || 0) != $now) {
651            my $done = $cl - $remain;
652            $self->{last_upload_packet} = $now;
653            $udp_sock ||= IO::Socket::INET->new(Proto => 'udp');
654            my $since = $self->{last_request_time};
655            my $send = "UPLOAD:$session:$done:$cl:$since:$now";
656            if ($udp_sock) {
657                foreach my $ep (@{ $self->{service}{upload_status_listeners_sockaddr} }) {
658                    my $rv = $udp_sock->send($send, 0, $ep);
659                }
660            }
661        }
662    }
663
664    # just dump the read into the nether if we're dangling. that is
665    # the case when we send the headers to the backend and it responds
666    # before we're done reading from the client; therefore further
667    # reads from the client just need to be sent nowhere, because the
668    # RFC2616 section 8.2.3 says: "the server SHOULD NOT close the
669    # transport connection until it has read the entire request"
670    if ($self->{responded}) {
671        print "  already responded.\n" if Perlbal::DEBUG >= 3;
672        # in addition, if we're now out of data (clr == 0), then we should
673        # either close ourselves or get ready for another request
674        return $self->http_response_sent if $done_reading;
675
676        print "  already responded [2].\n" if Perlbal::DEBUG >= 3;
677        # at this point, if the backend has responded then we just return
678        # as we don't want to send it on to them or buffer it up, which is
679        # what the code below does
680        return;
681    }
682
683    # if we have no data left to read, stop reading.  all that can
684    # come later is an extra \r\n which we handle later when parsing
685    # new request headers.  and if it's something else, we'll bail on
686    # the next request, not this one.
687    if ($done_reading) {
688        Carp::confess("content_length_remain less than zero: self->{content_length_remain}")
689            if $self->{content_length_remain} < 0;
690        $self->{unread_data_waiting} = 0;
691        $self->watch_read(0);
692    }
693
694    # now, if we have a backend, then we should be writing it to the backend
695    # and not doing anything else
696    if ($backend) {
697        print "  got a backend.  sending write to it.\n" if Perlbal::DEBUG >= 3;
698        $backend->write($bref);
699        # TODO: monitor the backend's write buffer depth?
700        return;
701    }
702
703    # now, we know we don't have a backend, so we have to push this data onto our
704    # read buffer... it's not going anywhere yet
705    push @{$self->{read_buf}}, $bref;
706    $self->{read_ahead} += $len;
707    print "  no backend.  read_ahead = $self->{read_ahead}.\n" if Perlbal::DEBUG >= 3;
708
709    # if we know we've already started spooling a file to disk, then continue
710    # to do that.
711    print "  bureason = $self->{bureason}\n" if Perlbal::DEBUG >= 3 && $self->{bureason};
712    return $self->buffered_upload_update if $self->{bureason};
713
714    # if we are under our buffer-to-memory size, just continue buffering here and
715    # don't fall through to the backend request call below
716    return if
717        ! $done_reading &&
718        $self->{read_ahead} < $self->{service}->{buffer_backend_connect};
719
720    # over the buffer-to-memory size, see if we should start spooling to disk.
721    return if $self->{service}->{buffer_uploads} && $self->decide_to_buffer_to_disk;
722
723    # give plugins a chance to act on the request before we request a backend
724    # (added by Chris Hondl <chris@imvu.com>, March 2006)
725    my $svc = $self->{service};
726    return if $svc->run_hook('proxy_read_request', $self);
727
728    # if we fall through to here, we need to ensure that a backend is on the
729    # way, because no specialized handling took over above
730    print "  finally requesting a backend\n" if Perlbal::DEBUG >= 3;
731    return $self->request_backend;
732}
733
734sub handle_request {
735    my Perlbal::ClientProxy $self = shift;
736    my $req_hd = $self->{req_headers};
737
738    unless ($req_hd) {
739        $self->close("handle_request without headers");
740        return;
741    }
742
743    $self->check_req_headers;
744
745    my $svc = $self->{service};
746    # give plugins a chance to force us to bail
747    return if $svc->run_hook('start_proxy_request', $self);
748    return if $svc->run_hook('start_http_request',  $self);
749
750    if ($self->handle_chunked_upload) {
751        # handled in method.
752    } else {
753        # if defined we're waiting on some amount of data.  also, we have to
754        # subtract out read_size, which is the amount of data that was
755        # extra in the packet with the header that's part of the body.
756        $self->{request_body_length} =
757            $self->{content_length_remain} =
758            $req_hd->content_length;
759        $self->{unread_data_waiting} = 1 if $self->{content_length_remain};
760    }
761
762    # upload-tracking stuff.  both starting a new upload track session,
763    # and checking on status of ongoing one
764    return if $svc->{upload_status_listeners} && $self->handle_upload_tracking;
765
766    # note that we've gotten a request
767    $self->{requests}++;
768    $self->{last_request_time} = $self->{alive_time};
769
770    # either start buffering some of the request to memory, or
771    # immediately request a backend connection.
772    if ($self->{chunked_upload_state}) {
773        $self->{request_body_length} = 0;
774        $self->{is_buffering} = 1;
775        $self->{bureason} = 'chunked';
776        $self->buffered_upload_update;
777    } elsif ($self->{content_length_remain} && $self->{service}->{buffer_backend_connect}) {
778        # the deeper path
779        $self->start_buffering_request;
780    } else {
781        # get the backend request process moving, since we aren't buffering
782        $self->{is_buffering} = 0;
783
784        # if reproxy-caching is enabled, we can often bypass needing to allocate a BackendHTTP connection:
785        return if $svc->{reproxy_cache} && $self->satisfy_request_from_cache;
786
787        $self->request_backend;
788    }
789}
790
791sub handle_chunked_upload {
792    my Perlbal::ClientProxy $self = shift;
793    my $req_hd = $self->{req_headers};
794    my $te = $req_hd->header("Transfer-Encoding");
795    return unless $te && $te eq "chunked";
796    return unless $self->{service}->{buffer_uploads};
797
798    $req_hd->header("Transfer-Encoding", undef); # remove it (won't go to backend)
799
800    my $eh = $req_hd->header("Expect");
801    if ($eh && $eh =~ /\b100-continue\b/) {
802        $self->write(\ "HTTP/1.1 100 Continue\r\n\r\n");
803        $req_hd->header("Expect", undef); # remove it (won't go to backend)
804    }
805
806    my $max_size = $self->{service}{max_chunked_request_size};
807
808    my $args = {
809        on_new_chunk => sub {
810            my $cref = shift;
811            my $len = length($$cref);
812            push @{$self->{read_buf}}, $cref;
813            $self->{read_ahead}          += $len;
814            $self->{request_body_length} += $len;
815
816            # if too large, disconnect them...
817            if ($max_size && $self->{request_body_length} > $max_size) {
818                $self->purge_buffered_upload;
819                $self->close;
820                return;
821            }
822            $self->buffered_upload_update;
823        },
824        on_disconnect => sub {
825            $self->client_disconnected;
826        },
827        on_zero_chunk => sub {
828            $self->send_buffered_upload;
829        },
830    };
831
832    $self->{chunked_upload_state} = Perlbal::ChunkedUploadState->new(%$args);
833    return 1;
834}
835
836sub satisfy_request_from_cache {
837    my Perlbal::ClientProxy $self = shift;
838
839    my $req_hd = $self->{req_headers};
840    my $svc    = $self->{service};
841    my $cache  = $svc->{reproxy_cache};
842    $svc->{_stat_requests}++;
843
844    my $requri   = $req_hd->request_uri    || '';
845    my $hostname = $req_hd->header("Host") || '';
846
847    my $key      = "$hostname|$requri";
848
849    my $reproxy  = $cache->get($key) or
850        return 0;
851
852    my ($timeout, $headers, $urls) = @$reproxy;
853    return 0 if time() > $timeout;
854
855    $svc->{_stat_cache_hits}++;
856    my %headers = map { ref $_ eq 'SCALAR' ? $$_ : $_ } @{$headers || []};
857
858    if (my $ims = $req_hd->header("If-Modified-Since")) {
859        my ($lm_key) = grep { uc($_) eq "LAST-MODIFIED" } keys %headers;
860        my $lm = $headers{$lm_key} || "";
861
862        # remove the IE length suffix
863        $ims =~ s/; length=(\d+)//;
864
865        # If 'Last-Modified' is same as 'If-Modified-Since', send a 304
866        if ($ims eq $lm) {
867            my $res_hd = Perlbal::HTTPHeaders->new_response(304);
868            $res_hd->header("Content-Length", "0");
869            $self->tcp_cork(1);
870            $self->state('xfer_resp');
871            $self->write($res_hd->to_string_ref);
872            $self->write(sub { $self->http_response_sent; });
873            return 1;
874        }
875    }
876
877    my $res_hd = Perlbal::HTTPHeaders->new_response(200);
878    $res_hd->header("Date", HTTP::Date::time2str(time()));
879    while (my ($key, $value) = each %headers) {
880        $res_hd->header($key, $value);
881    }
882
883    $self->start_reproxy_uri($res_hd, $urls);
884    return 1;
885}
886
887# return 1 to steal this connection (when they're asking status of an
888# upload session), return 0 to return it to handle_request's control.
889sub handle_upload_tracking {
890    my Perlbal::ClientProxy $self = shift;
891    my $req_hd = $self->{req_headers};
892
893    return 0 unless
894        $req_hd->request_uri =~ /[\?&]client_up_sess=(\w{5,50})\b/;
895
896    my $sess = $1;
897
898    # getting status?
899    if ($req_hd->request_uri =~ m!^/__upload_status\?!) {
900        my $status = Perlbal::UploadListener::get_status($sess);
901        my $now = time();
902        my $body = $status ?
903            "{done:$status->{done},total:$status->{total},starttime:$status->{starttime},nowtime:$now}" :
904            "{}";
905
906        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
907        $res->header("Content-Type", "text/plain");
908        $res->header('Content-Length', length $body);
909        $self->setup_keepalive($res);
910        $self->tcp_cork(1);  # cork writes to self
911        $self->write($res->to_string_ref);
912        $self->write(\ $body);
913        $self->write(sub { $self->http_response_sent; });
914        return 1;
915    }
916
917    # otherwise just tagging this upload as a new upload session
918    $self->{upload_session} = $sess;
919    return 0;
920}
921
922# continuation of handle_request, in the case where we need to start buffering
923# a bit of the request body to memory, either hoping that's all of it, or to
924# make a determination of whether or not we should save it all to disk first
925sub start_buffering_request {
926    my Perlbal::ClientProxy $self = shift;
927
928    # buffering case:
929    $self->{is_buffering} = 1;
930
931    # shortcut: if we know that we're buffering by size, and the size
932    # of this upload is bigger than that value, we can just turn on spool
933    # to disk right now...
934    if ($self->{service}->{buffer_uploads} && $self->{service}->{buffer_upload_threshold_size}) {
935        my $req_hd = $self->{req_headers};
936        if ($req_hd->content_length >= $self->{service}->{buffer_upload_threshold_size}) {
937            $self->{bureason} = 'size';
938            if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
939                $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', 'size');
940            }
941            $self->state('buffering_upload');
942            $self->buffered_upload_update;
943            return;
944        }
945    }
946
947    # well, we're buffering, but we're not going to disk just yet (but still might)
948    $self->state('buffering_request');
949
950    # only need time if we are using the buffer to disk functionality
951    $self->{start_time} = [ gettimeofday() ]
952        if $self->{service}->{buffer_uploads};
953}
954
955# looks at our states and decides if we should start writing to disk
956# or should just go ahead and blast this to the backend.  returns 1
957# if the decision was made to buffer to disk
958sub decide_to_buffer_to_disk {
959    my Perlbal::ClientProxy $self = shift;
960    return unless $self->{is_buffering};
961    return $self->{bureason} if defined $self->{bureason};
962
963    # this is called when we have enough data to determine whether or not to
964    # start buffering to disk
965    my $dur = tv_interval($self->{start_time}) || 1;
966    my $rate = $self->{read_ahead} / $dur;
967    my $etime = $self->{content_length_remain} / $rate;
968
969    # see if we have enough data to make the determination
970    my $reason = undef;
971
972    # see if we blow the rate away
973    if ($self->{service}->{buffer_upload_threshold_rate} > 0 &&
974            $rate < $self->{service}->{buffer_upload_threshold_rate}) {
975        # they are slower than the minimum rate
976        $reason = 'rate';
977    }
978
979    # and finally check estimated time exceeding
980    if ($self->{service}->{buffer_upload_threshold_time} > 0 &&
981            $etime > $self->{service}->{buffer_upload_threshold_time}) {
982        # exceeds
983        $reason = 'time';
984    }
985
986    unless ($reason) {
987        $self->{is_buffering} = 0;
988        return 0;
989    }
990
991    # start saving it to disk
992    $self->state('buffering_upload');
993    $self->buffered_upload_update;
994    $self->{bureason} = $reason;
995
996    if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
997        $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', $reason);
998    }
999
1000    return 1;
1001}
1002
1003# take ourselves and send along our buffered data to the backend
1004sub send_buffered_upload {
1005    my Perlbal::ClientProxy $self = shift;
1006
1007    # make sure our buoutpos is the same as the content length...
1008    return if $self->{is_writing};
1009
1010    # set the content-length that goes to the backend...
1011    if ($self->{chunked_upload_state}) {
1012        $self->{req_headers}->header("Content-Length", $self->{request_body_length});
1013    }
1014
1015    my $clen = $self->{req_headers}->content_length;
1016    if ($clen != $self->{buoutpos}) {
1017        Perlbal::log('critical', "Content length of $clen declared but $self->{buoutpos} bytes written to disk");
1018        return $self->_simple_response(500);
1019    }
1020
1021    # reset our position so we start reading from the right spot
1022    $self->{buoutpos} = 0;
1023    sysseek($self->{bufh}, 0, 0);
1024
1025    # notify that we want the backend so we get the ball rolling
1026    $self->request_backend;
1027}
1028
1029sub continue_buffered_upload {
1030    my Perlbal::ClientProxy $self = shift;
1031    my Perlbal::BackendHTTP $be = shift;
1032    return unless $self && $be;
1033
1034    # now send the data
1035    my $clen = $self->{request_body_length};
1036
1037    my $sent = Perlbal::Socket::sendfile($be->{fd}, fileno($self->{bufh}), $clen - $self->{buoutpos});
1038    if ($sent < 0) {
1039        return $self->close("epipe") if $! == EPIPE;
1040        return $self->close("connreset") if $! == ECONNRESET;
1041        print STDERR "Error w/ sendfile: $!\n";
1042        return $self->close('sendfile_error');
1043    }
1044    $self->{buoutpos} += $sent;
1045
1046    # if we're done, purge the file and move on
1047    if ($self->{buoutpos} >= $clen) {
1048        $be->{buffered_upload_mode} = 0;
1049        $self->purge_buffered_upload;
1050        return;
1051    }
1052
1053    # we will be called again by the backend since buffered_upload_mode is on
1054}
1055
1056# write data to disk
1057sub buffered_upload_update {
1058    my Perlbal::ClientProxy $self = shift;
1059    return if $self->{is_writing};
1060    return unless $self->{is_buffering} && $self->{read_ahead};
1061
1062    # so we're not writing now and we have data to write...
1063    unless ($self->{bufilename}) {
1064        # create a filename and see if it exists or not
1065        $self->{is_writing} = 1;
1066        my $fn = join('-', $self->{service}->name, $self->{service}->listenaddr, "client", $self->{fd}, int(rand(0xffffffff)));
1067        $fn = $self->{service}->{buffer_uploads_path} . '/' . $fn;
1068
1069        # good, now we need to create the file
1070        Perlbal::AIO::aio_open($fn, O_CREAT | O_TRUNC | O_RDWR, 0644, sub {
1071            $self->{is_writing} = 0;
1072            $self->{bufh} = shift;
1073
1074            # throw errors back to the user
1075            if (! $self->{bufh}) {
1076                Perlbal::log('critical', "Failure to open $fn for buffered upload output");
1077                return $self->_simple_response(500);
1078            }
1079
1080            # save state and info and bounce it back to write data
1081            $self->{bufilename} = $fn;
1082            $self->buffered_upload_update;
1083        });
1084
1085        return;
1086    }
1087
1088    # can't proceed if we have no disk file to async write to
1089    # people reported seeing this crash rarely in production...
1090    # must be a race between previously in-flight's write
1091    # re-invoking a write immediately after something triggered
1092    # a buffered upload purge.
1093    unless ($self->{bufh}) {
1094        $self->close;
1095        return;
1096    }
1097
1098    # at this point, we want to do some writing
1099    my $bref = shift(@{$self->{read_buf}});
1100    my $len = length $$bref;
1101    $self->{read_ahead} -= $len;
1102
1103    # so at this point we have a valid filename and file handle and should write out
1104    # the buffer that we have
1105    $self->{is_writing} = 1;
1106    Perlbal::AIO::aio_write($self->{bufh}, $self->{buoutpos}, $len, $$bref, sub {
1107        my $bytes = shift;
1108        $self->{is_writing} = 0;
1109
1110        # check for error
1111        unless ($bytes) {
1112            Perlbal::log('critical', "Error writing buffered upload: $!.  Tried to do $len bytes at $self->{buoutpos}.");
1113            return $self->_simple_response(500);
1114        }
1115
1116        # update our count of data written
1117        $self->{buoutpos} += $bytes;
1118
1119        # now check if we wrote less than we had in this chunk of buffer.  if that's
1120        # the case then we need to reenqueue the part of the chunk that wasn't
1121        # written out and update as appropriate.
1122        if ($bytes < $len) {
1123            my $diff = $len - $bytes;
1124            unshift @{$self->{read_buf}}, \ substr($$bref, $bytes, $diff);
1125            $self->{read_ahead} += $diff;
1126        }
1127
1128        # if we're processing a chunked upload, ...
1129        if ($self->{chunked_upload_state}) {
1130            # turn reads back on, if we haven't hit the end yet.
1131            if ($self->{unread_data_waiting} && $self->{read_ahead} < 1024*1024) {
1132                $self->watch_read(1);
1133                $self->{unread_data_waiting} = 0;
1134            }
1135
1136            if ($self->{read_ahead} == 0 && $self->{chunked_upload_state}->hit_zero_chunk) {
1137                $self->watch_read(0);
1138                $self->send_buffered_upload;
1139                return;
1140            }
1141        }
1142
1143        # if we're done (no clr and no read ahead!) then send it
1144        elsif ($self->{read_ahead} <= 0 && $self->{content_length_remain} <= 0) {
1145            $self->send_buffered_upload;
1146            return;
1147        }
1148
1149        # spawn another writer!
1150        $self->buffered_upload_update;
1151    });
1152}
1153
1154# destroy any files we've created
1155sub purge_buffered_upload {
1156    my Perlbal::ClientProxy $self = shift;
1157
1158    # FIXME: it's reported that sometimes the two now-in-eval blocks
1159    # fail, hence the eval blocks and warnings.  the FIXME is to
1160    # figure this out, why it happens sometimes.
1161
1162    # first close our filehandle... not async
1163    eval {
1164        CORE::close($self->{bufh});
1165    };
1166    if ($@) { warn "Error closing file in ClientProxy::purge_buffered_upload: $@\n"; }
1167
1168    $self->{bufh} = undef;
1169
1170    eval {
1171        # now asyncronously unlink the file
1172        Perlbal::AIO::aio_unlink($self->{bufilename}, sub {
1173            if ($!) {
1174                # note an error, but whatever, we'll either overwrite the file later (O_TRUNC | O_CREAT)
1175                # or a cleaner will come through and do it for us someday (if the user runs one)
1176                Perlbal::log('warning', "Unable to link $self->{bufilename}: $!");
1177              }
1178        });
1179    };
1180    if ($@) { warn "Error unlinking file in ClientProxy::purge_buffered_upload: $@\n"; }
1181}
1182
1183# returns bool; whether backend should hide the 500 error from the client
1184#   and have us try a new backend.  return true to retry, false to get a 500 error.
1185sub should_retry_after_500 {
1186    my Perlbal::ClientProxy $self = shift;
1187    my Perlbal::BackendHTTP $be   = shift;
1188    my $svc = $be->{service};
1189    return 0 unless $svc->{enable_error_retries};
1190    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1191    return 0 if ++$self->{retry_count} > @sched;
1192    return 1;
1193}
1194
1195# called by Backend to tell us it got a 500 error and we should retry another backend.
1196sub retry_after_500 {
1197    my Perlbal::ClientProxy $self = shift;
1198    my Perlbal::Service     $svc  = shift;
1199
1200    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1201    my $delay = $sched[$self->{retry_count} - 1];
1202
1203    if ($delay) {
1204        Danga::Socket->AddTimer($delay, sub {
1205            return if $self->{closed};
1206            $self->rerequest_backend;
1207        });
1208    } else {
1209        $self->rerequest_backend;
1210    }
1211
1212}
1213
1214sub as_string {
1215    my Perlbal::ClientProxy $self = shift;
1216
1217    my $ret = $self->SUPER::as_string;
1218    if ($self->{backend}) {
1219        my $ipport = $self->{backend}->{ipport};
1220        $ret .= "; backend=$ipport";
1221    } else {
1222        $ret .= "; write_buf_size=$self->{write_buf_size}"
1223            if $self->{write_buf_size} > 0;
1224    }
1225    $ret .= "; highpri" if $self->{high_priority};
1226    $ret .= "; lowpri" if $self->{low_priority};
1227    $ret .= "; responded" if $self->{responded};
1228    $ret .= "; waiting_for=" . $self->{content_length_remain}
1229        if defined $self->{content_length_remain};
1230    $ret .= "; reproxying" if $self->{currently_reproxying};
1231
1232    return $ret;
1233}
1234
1235sub set_queue_low {
1236    my Perlbal::ClientProxy $self = shift;
1237    $self->{low_priority} = 1;
1238    return;
1239}
1240
1241sub set_queue_high {
1242    my Perlbal::ClientProxy $self = shift;
1243    $self->{high_priority} = 1;
1244    return;
1245}
1246
1247
1248sub DESTROY {
1249    Perlbal::objdtor($_[0]);
1250    $_[0]->SUPER::DESTROY;
1251}
1252
12531;
1254
1255
1256# Local Variables:
1257# mode: perl
1258# c-basic-indent: 4
1259# indent-tabs-mode: nil
1260# End:
Note: See TracBrowser for help on using the browser.