root/trunk/lib/Perlbal/ClientProxy.pm @ 766

Revision 766, 44.4 kB (checked in by bradfitz, 21 months ago)

SECURITY: patch from Jeremey James <jbj@…> to not crash
on zero byte chunked upload when buffered uploads are enabled.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1######################################################################
2# HTTP Connection from a reverse proxy client
3#
4# Copyright 2004, Danga Interactive, Inc.
5# Copyright 2005-2007, Six Apart, Ltd.
6#
7package Perlbal::ClientProxy;
8use strict;
9use warnings;
10use base "Perlbal::ClientHTTPBase";
11no  warnings qw(deprecated);
12
13use Perlbal::ChunkedUploadState;
14use Perlbal::Util;
15
16use fields (
17            'backend',             # Perlbal::BackendHTTP object (or undef if disconnected)
18            'backend_requested',   # true if we've requested a backend for this request
19            'reconnect_count',     # number of times we've tried to reconnect to backend
20            'high_priority',       # boolean; 1 if we are or were in the high priority queue
21            'low_priority',        # boolean; 1 if we are or were in the low priority queue
22            'reproxy_uris',        # arrayref; URIs to reproxy to, in order
23            'reproxy_expected_size', # int: size of response we expect to get back for reproxy
24            'currently_reproxying',  # arrayref; the host info and URI we're reproxying right now
25            'content_length_remain', # int: amount of data we're still waiting for
26            'responded',           # bool: whether we've already sent a response to the user or not
27            'last_request_time',   # int: time that we last received a request
28            'primary_res_hdrs',  # if defined, we are doing a transparent reproxy-URI
29                                 # and the headers we get back aren't necessarily
30                                 # the ones we want.  instead, get most headers
31                                 # from the provided res headers object here.
32            'is_buffering',        # bool; if we're buffering some/all of a request to memory/disk
33            'is_writing',          # bool; if on, we currently have an aio_write out
34            'start_time',          # hi-res time when we started getting data to upload
35            'bufh',                # buffered upload filehandle object
36            'bufilename',          # string; buffered upload filename
37            'bureason',            # string; if defined, the reason we're buffering to disk
38            'buoutpos',            # int; buffered output position
39            'backend_stalled',   # boolean:  if backend has shut off its reads because we're too slow.
40            'unread_data_waiting',  # boolean:  if we shut off reads while we know data is yet to be read from client
41            'chunked_upload_state', # bool/obj:  if processing a chunked upload, Perlbal::ChunkedUploadState object, else undef
42            'request_body_length',  # integer:  request's body length, either as-declared,
43                                    #           or calculated after chunked upload is complete
44
45            # for perlbal sending out UDP packets related to upload status (for xmlhttprequest upload bar)
46            'last_upload_packet',  # unixtime we last sent a UDP upload packet
47            'upload_session',      # client's self-generated upload session
48
49            # error-retrying stuff
50            'retry_count',         # number of times we've retried this request so far after getting 500 errors
51            );
52
53use constant READ_SIZE         => 131072;    # 128k, ~common TCP window size?
54use constant READ_AHEAD_SIZE   =>  32768;    # kinda arbitrary.  sum of these two is max stored per connection while waiting for backend.
55use Errno qw( EPIPE ENOENT ECONNRESET EAGAIN );
56use POSIX qw( O_CREAT O_TRUNC O_RDWR O_RDONLY );
57use Time::HiRes qw( gettimeofday tv_interval );
58
59my $udp_sock;
60
61# ClientProxy
62sub new {
63    my Perlbal::ClientProxy $self = shift;
64    my ($service, $sock) = @_;
65    $self = fields::new($self) unless ref $self;
66    $self->SUPER::new($service,  $sock );
67
68    Perlbal::objctor($self);
69
70    $self->init;
71    $self->watch_read(1);
72    return $self;
73}
74
75sub new_from_base {
76    my $class = shift;
77    my Perlbal::ClientHTTPBase $cb = shift;
78    Perlbal::Util::rebless($cb, $class);
79    $cb->init;
80    $cb->watch_read(1);
81    $cb->handle_request;
82    return $cb;
83}
84
85sub init {
86    my Perlbal::ClientProxy $self = $_[0];
87
88    $self->{last_request_time} = 0;
89
90    $self->{backend} = undef;
91    $self->{high_priority} = 0;
92    $self->{low_priority} = 0;
93
94    $self->{responded} = 0;
95    $self->{unread_data_waiting} = 0;
96    $self->{content_length_remain} = undef;
97    $self->{backend_requested} = 0;
98
99    $self->{is_buffering} = 0;
100    $self->{is_writing} = 0;
101    $self->{start_time} = undef;
102    $self->{bufh} = undef;
103    $self->{bufilename} = undef;
104    $self->{buoutpos} = 0;
105    $self->{bureason} = undef;
106    $self->{chunked_upload_state} = undef;
107    $self->{request_body_length} = undef;
108
109    $self->{reproxy_uris} = undef;
110    $self->{reproxy_expected_size} = undef;
111    $self->{currently_reproxying} = undef;
112
113    $self->{retry_count} = 0;
114}
115
116# given a service name, re-request (GET/HEAD only) to that service, even though
117# you've already done a request to your original service
118sub start_reproxy_service {
119    my Perlbal::ClientProxy $self = $_[0];
120    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
121    my $svc_name = $_[2];
122
123    my $svc = $svc_name ? Perlbal->service($svc_name) : undef;
124    unless ($svc) {
125        $self->_simple_response(404, "Vhost twiddling not configured for requested pair.");
126        return 1;
127    }
128
129    $self->{backend_requested} = 0;
130    $self->{backend} = undef;
131    $self->{res_headers} = $primary_res_hdrs;
132
133    $svc->adopt_base_client($self);
134}
135
136# call this with a string of space separated URIs to start a process
137# that will fetch the item at the first and return it to the user,
138# on failure it will try the second, then third, etc
139sub start_reproxy_uri {
140    my Perlbal::ClientProxy $self = $_[0];
141    my Perlbal::HTTPHeaders $primary_res_hdrs = $_[1];
142    my $urls = $_[2];
143
144    # at this point we need to disconnect from our backend
145    $self->{backend} = undef;
146
147    # failure if we have no primary response headers
148    return unless $self->{primary_res_hdrs} ||= $primary_res_hdrs;
149
150    # construct reproxy_uri list
151    if (defined $urls) {
152        my @uris = split /\s+/, $urls;
153        $self->{currently_reproxying} = undef;
154        $self->{reproxy_uris} = [];
155        foreach my $uri (@uris) {
156            next unless $uri =~ m!^http://(.+?)(?::(\d+))?(/.*)?$!;
157            push @{$self->{reproxy_uris}}, [ $1, $2 || 80, $3 || '/' ];
158        }
159    }
160
161    # if we get in here and we have currently_reproxying defined, then something
162    # happened and we want to retry that one
163    if ($self->{currently_reproxying}) {
164        unshift @{$self->{reproxy_uris}}, $self->{currently_reproxying};
165        $self->{currently_reproxying} = undef;
166    }
167
168    # if we have no uris in our list now, tell the user 404
169    return $self->_simple_response(503)
170        unless @{$self->{reproxy_uris} || []};
171
172    # set the expected size if we got a content length in our headers
173    if ($primary_res_hdrs && (my $expected_size = $primary_res_hdrs->header('X-REPROXY-EXPECTED-SIZE'))) {
174        $self->{reproxy_expected_size} = $expected_size;
175    }
176
177    # pass ourselves off to the reproxy manager
178    $self->state('wait_backend');
179    Perlbal::ReproxyManager::do_reproxy($self);
180}
181
182# called by the reproxy manager when we can't get to our requested backend
183sub try_next_uri {
184    my Perlbal::ClientProxy $self = $_[0];
185
186    shift @{$self->{reproxy_uris}};
187    $self->{currently_reproxying} = undef;
188    $self->start_reproxy_uri();
189}
190
191# returns true if this ClientProxy is too many bytes behind the backend
192sub too_far_behind_backend {
193    my Perlbal::ClientProxy $self    = $_[0];
194    my Perlbal::BackendHTTP $backend = $self->{backend}   or return 0;
195
196    # if a backend doesn't have a service, it's a
197    # ReproxyManager-created backend, and thus it should use the
198    # 'buffer_size_reproxy_url' parameter for acceptable buffer
199    # widths, and not the regular 'buffer_size'.  this lets people
200    # tune buffers depending on the types of webservers.  (assumption
201    # being that reproxied-to webservers are event-based and it's okay
202    # to tie the up longer in favor of using less buffer memory in
203    # perlbal)
204    my $max_buffer = defined $backend->{service} ?
205        $self->{service}->{buffer_size} :
206        $self->{service}->{buffer_size_reproxy_url};
207
208    return $self->{write_buf_size} > $max_buffer;
209}
210
211# this is a callback for when a backend has been created and is
212# ready for us to do something with it
213sub use_reproxy_backend {
214    my Perlbal::ClientProxy $self = $_[0];
215    my Perlbal::BackendHTTP $be = $_[1];
216
217    # get a URI
218    my $datref = $self->{currently_reproxying} = shift @{$self->{reproxy_uris}};
219    unless (defined $datref) {
220        # return error and close the backend
221        $be->close('invalid_uris');
222        return $self->_simple_response(503);
223    }
224
225    # now send request
226    $self->{backend} = $be;
227    $be->{client} = $self;
228
229    my $extra_hdr = "";
230    if (my $range = $self->{req_headers}->header("Range")) {
231        $extra_hdr .= "Range: $range\r\n";
232    }
233    if (my $host = $self->{req_headers}->header("Host")) {
234        $extra_hdr .= "Host: $host\r\n";
235    }
236
237    my $req_method = $self->{req_headers}->request_method eq 'HEAD' ? 'HEAD' : 'GET';
238    my $headers = "$req_method $datref->[2] HTTP/1.0\r\nConnection: keep-alive\r\n${extra_hdr}\r\n";
239
240    $be->{req_headers} = Perlbal::HTTPHeaders->new(\$headers);
241    $be->state('sending_req');
242    $self->state('backend_req_sent');
243    $be->write($be->{req_headers}->to_string_ref);
244    $be->watch_read(1);
245    $be->watch_write(1);
246}
247
248# this is called when a transient backend getting a reproxied URI has received
249# a response from the server and is ready for us to deal with it
250sub backend_response_received {
251    my Perlbal::ClientProxy $self = $_[0];
252    my Perlbal::BackendHTTP $be = $_[1];
253
254    # a response means that we are no longer currently waiting on a reproxy, and
255    # don't want to retry this URI
256    $self->{currently_reproxying} = undef;
257
258    # we fail if we got something that's NOT a 2xx code, OR, if we expected
259    # a certain size and got back something different
260    my $code = $be->{res_headers}->response_code + 0;
261
262    my $bad_code = sub {
263        return 0 if $code >= 200 && $code <= 299;
264        return 0 if $code == 416;
265        return 1;
266    };
267
268    my $bad_size = sub {
269        return 0 unless defined $self->{reproxy_expected_size};
270        return $self->{reproxy_expected_size} != $be->{res_headers}->header('Content-length');
271    };
272
273    if ($bad_code->() || $bad_size->()) {
274        # fall back to an alternate URL
275        $be->{client} = undef;
276        $be->close('non_200_reproxy');
277        $self->try_next_uri;
278        return 1;
279    }
280    return 0;
281}
282
283sub start_reproxy_file {
284    my Perlbal::ClientProxy $self = shift;
285    my $file = shift;                      # filename to reproxy
286    my Perlbal::HTTPHeaders $hd = shift;   # headers from backend, in need of cleanup
287
288    # at this point we need to disconnect from our backend
289    $self->{backend} = undef;
290
291    # call hook for pre-reproxy
292    return if $self->{service}->run_hook("start_file_reproxy", $self, \$file);
293
294    # set our expected size
295    if (my $expected_size = $hd->header('X-REPROXY-EXPECTED-SIZE')) {
296        $self->{reproxy_expected_size} = $expected_size;
297    }
298
299    # start an async stat on the file
300    $self->state('wait_stat');
301    Perlbal::AIO::aio_stat($file, sub {
302
303        # if the client's since disconnected by the time we get the stat,
304        # just bail.
305        return if $self->{closed};
306
307        my $size = -s _;
308
309        unless ($size) {
310            # FIXME: POLICY: 404 or retry request to backend w/o reproxy-file capability?
311            return $self->_simple_response(404);
312        }
313        if (defined $self->{reproxy_expected_size} && $self->{reproxy_expected_size} != $size) {
314            # 404; the file size doesn't match what we expected
315            return $self->_simple_response(404);
316        }
317
318        # if the thing we're reproxying is indeed a file, advertise that
319        # we support byteranges on it
320        if (-f _) {
321            $hd->header("Accept-Ranges", "bytes");
322        }
323
324        my ($status, $range_start, $range_end) = $self->{req_headers}->range($size);
325        my $not_satisfiable = 0;
326
327        if ($status == 416) {
328            $hd = Perlbal::HTTPHeaders->new_response(416);
329            $hd->header("Content-Range", $size ? "bytes */$size" : "*");
330            $not_satisfiable = 1;
331        }
332
333        # change the status code to 200 if the backend gave us 204 No Content
334        $hd->code(200) if $hd->response_code == 204;
335
336        # fixup the Content-Length header with the correct size (application
337        # doesn't need to provide a correct value if it doesn't want to stat())
338        if ($status == 200) {
339            $hd->header("Content-Length", $size);
340        } elsif ($status == 206) {
341            $hd->header("Content-Range", "bytes $range_start-$range_end/$size");
342            $hd->header("Content-Length", $range_end - $range_start + 1);
343            $hd->code(206);
344        }
345
346        # don't send this internal header to the client:
347        $hd->header('X-REPROXY-FILE', undef);
348
349        # rewrite some other parts of the header
350        $self->setup_keepalive($hd);
351
352        # just send the header, now that we cleaned it.
353        $self->{res_headers} = $hd;
354        $self->write($hd->to_string_ref);
355
356        if ($self->{req_headers}->request_method eq 'HEAD' || $not_satisfiable) {
357            $self->write(sub { $self->http_response_sent; });
358            return;
359        }
360
361        $self->state('wait_open');
362        Perlbal::AIO::aio_open($file, 0, 0 , sub {
363            my $fh = shift;
364
365            # if client's gone, just close filehandle and abort
366            if ($self->{closed}) {
367                CORE::close($fh) if $fh;
368                return;
369            }
370
371            # handle errors
372            if (! $fh) {
373                # FIXME: do 500 vs. 404 vs whatever based on $! ?
374                return $self->_simple_response(500);
375            }
376
377            # seek if partial content
378            if ($status == 206) {
379                sysseek($fh, $range_start, &POSIX::SEEK_SET);
380                $size = $range_end - $range_start + 1;
381            }
382
383            $self->reproxy_fh($fh, $size);
384            $self->watch_write(1);
385        });
386    });
387}
388
389# Client
390# get/set backend proxy connection
391sub backend {
392    my Perlbal::ClientProxy $self = shift;
393    return $self->{backend} unless @_;
394
395    my $backend = shift;
396    $self->state('draining_res') unless $backend;
397    return $self->{backend} = $backend;
398}
399
400# invoked by backend when it wants us to start watching for reads again
401# and feeding it data (if we have any)
402sub backend_ready {
403    my Perlbal::ClientProxy $self = $_[0];
404    my Perlbal::BackendHTTP $be = $_[1];
405
406    # if we'd turned ourselves off while we waited for a backend, turn
407    # ourselves back on, because the backend is ready for data now.
408    if ($self->{unread_data_waiting}) {
409        $self->watch_read(1);
410    }
411
412    # normal, not-buffered-to-disk case:
413    return $self->drain_read_buf_to($be) unless $self->{bureason};
414
415    # buffered-to-disk case.
416
417    # tell the backend it has to go into buffered_upload_mode,
418    # which makes it inform us of its writable availability
419    $be->invoke_buffered_upload_mode;
420}
421
422# our backend enqueues a call to this method in our write buffer, so this is called
423# right after we've finished sending all of the results to the user.  at this point,
424# if we were doing keep-alive, we don't close and setup for the next request.
425sub backend_finished {
426    my Perlbal::ClientProxy $self = shift;
427    print "ClientProxy::backend_finished\n" if Perlbal::DEBUG >= 3;
428
429    # mark ourselves as having responded (presumeably if we're here,
430    # the backend has responded already)
431    $self->{responded} = 1;
432
433    # our backend is done with us, so we disconnect ourselves from it
434    $self->{backend} = undef;
435
436    # backend is done sending data to us, so we can recycle this clientproxy
437    # if we don't have any data yet to read
438    return $self->http_response_sent unless $self->{unread_data_waiting};
439
440    # if we get here (and we do, rarely, in practice) then that means
441    # the backend read was empty/disconected (or otherwise messed up),
442    # and the only thing we can really do is close the client down.
443    $self->close("backend_finished_while_unread_data");
444}
445
446# called when we've sent a response to a user fully and we need to reset state
447sub http_response_sent {
448    my Perlbal::ClientProxy $self = $_[0];
449
450    # persistence logic is in ClientHTTPBase
451    return 0 unless $self->SUPER::http_response_sent;
452
453    print "ClientProxy::http_response_sent -- resetting state\n" if Perlbal::DEBUG >= 3;
454
455    if (my $be = $self->{backend}) {
456        $self->{backend} = undef;
457        $be->forget_client;
458    }
459
460    # if we get here we're being persistent, reset our state
461    $self->{backend_requested} = 0;
462    $self->{high_priority} = 0;
463    $self->{reproxy_uris} = undef;
464    $self->{reproxy_expected_size} = undef;
465    $self->{currently_reproxying} = undef;
466    $self->{content_length_remain} = undef;
467    $self->{primary_res_hdrs} = undef;
468    $self->{responded} = 0;
469    $self->{is_buffering} = 0;
470    $self->{is_writing} = 0;
471    $self->{start_time} = undef;
472    $self->{bufh} = undef;
473    $self->{bufilename} = undef;
474    $self->{buoutpos} = 0;
475    $self->{bureason} = undef;
476    $self->{upload_session} = undef;
477    $self->{chunked_upload_state} = undef;
478    $self->{request_body_length} = undef;
479    return 1;
480}
481
482# to request a backend connection AFTER you've already done so, if you
483# didn't like the results from the first one.  (like after a 500 error)
484sub rerequest_backend {
485    my Perlbal::ClientProxy $self = shift;
486
487    $self->{backend_requested} = 0;
488    $self->{backend} = undef;
489    $self->request_backend;
490}
491
492sub request_backend {
493    my Perlbal::ClientProxy $self = shift;
494    return if $self->{backend_requested};
495    $self->{backend_requested} = 1;
496
497    $self->state('wait_backend');
498    $self->{service}->request_backend_connection($self);
499    $self->tcp_cork(1);  # cork writes to self
500}
501
502# Client (overrides and calls super)
503sub close {
504    my Perlbal::ClientProxy $self = shift;
505    my $reason = shift;
506
507    warn sprintf(
508                    "Perlbal::ClientProxy closed %s%s.\n",
509                    ( $self->{closed} ? "again " : "" ),
510                    (defined $reason ? "saying '$reason'" : "for an unknown reason")
511    ) if Perlbal::DEBUG >= 2;
512
513    # don't close twice
514    return if $self->{closed};
515
516    # signal that we're done
517    $self->{service}->run_hooks('end_proxy_request', $self);
518
519    # kill our backend if we still have one
520    if (my $backend = $self->{backend}) {
521        print "Client ($self) closing backend ($backend)\n" if Perlbal::DEBUG >= 1;
522        $self->backend(undef);
523        $backend->close($reason ? "proxied_from_client_close:$reason" : "proxied_from_client_close");
524    } else {
525        # if no backend, tell our service that we don't care for one anymore
526        $self->{service}->note_client_close($self);
527    }
528
529    # call ClientHTTPBase's close
530    $self->SUPER::close($reason);
531}
532
533sub client_disconnected { # : void
534    my Perlbal::ClientProxy $self = shift;
535    print "ClientProxy::client_disconnected\n" if Perlbal::DEBUG >= 2;
536
537    # if client disconnected, then we need to turn off watching for
538    # further reads and purge the existing upload if any. also, we
539    # should just return and do nothing else.
540
541    $self->watch_read(0);
542    $self->purge_buffered_upload if $self->{bureason};
543    return $self->close('user_disconnected');
544}
545
546# Client
547sub event_write {
548    my Perlbal::ClientProxy $self = shift;
549    print "ClientProxy::event_write\n" if Perlbal::DEBUG >= 3;
550
551    $self->SUPER::event_write;
552
553    # obviously if we're writing the backend has processed our request
554    # and we are responding/have responded to the user, so mark it so
555    $self->{responded} = 1;
556
557    # trigger our backend to keep reading, if it's still connected
558    if ($self->{backend_stalled} && (my $backend = $self->{backend})) {
559        print "  unstalling backend\n" if Perlbal::DEBUG >= 3;
560
561        $self->{backend_stalled} = 0;
562        $backend->watch_read(1);
563    }
564}
565
566# ClientProxy
567sub event_read {
568    my Perlbal::ClientProxy $self = shift;
569    print "ClientProxy::event_read\n" if Perlbal::DEBUG >= 3;
570
571    # mark alive so we don't get killed for being idle
572    $self->{alive_time} = time;
573
574    # if we have no headers, the only thing we can do is try to get some
575    if (! $self->{req_headers}) {
576        print "  no headers.  reading.\n" if Perlbal::DEBUG >= 3;
577        $self->handle_request if $self->read_request_headers;
578        return;
579    }
580
581    # if we're buffering to disk or haven't read too much from this client, keep reading,
582    # otherwise shut off read notifications
583    unless ($self->{is_buffering} || $self->{read_ahead} < READ_AHEAD_SIZE) {
584        # our buffer is full, so turn off reads for now
585        print "  disabling reads.\n" if Perlbal::DEBUG >= 3;
586        $self->watch_read(0);
587        return;
588    }
589
590    # deal with chunked uploads
591    if (my $cus = $self->{chunked_upload_state}) {
592        $cus->on_readable($self);
593
594        # if we got more than 1MB not flushed to disk,
595        # stop reading for a bit until disk catches up
596        if ($self->{read_ahead} > 1024*1024) {
597            $self->watch_read(0);
598        }
599        return;
600    }
601
602    # read more data if we're still buffering or if our current read buffer
603    # is not full to the max READ_AHEAD_SIZE which is how much data we will
604    # buffer in from the user before passing on to the backend
605
606    # read the MIN(READ_SIZE, content_length_remain)
607    my $read_size = READ_SIZE;
608    my $remain = $self->{content_length_remain};
609
610    $read_size = $remain if $remain && $remain < $read_size;
611    print "  reading $read_size bytes (", (defined $remain ? $remain : "(undef)"), " bytes remain)\n" if Perlbal::DEBUG >= 3;
612
613    my $bref = $self->read($read_size);
614
615    # if the read returned undef, that means the connection was closed
616    # (see: Danga::Socket::read)
617    return $self->client_disconnected unless defined $bref;
618
619    # if they didn't declare a content body length and we just got a
620    # readable event that's not a disconnect, something's messed up.
621    # they're overflowing us.  disconnect!
622    if (! $remain) {
623        $self->_simple_response(400, "Can't pipeline to HTTP/1.0");
624        $self->close("pipelining_to_http10");
625        return;
626    }
627
628    # now that we know we have a defined value, determine how long it is, and do
629    # housekeeping to keep our tracking numbers up to date.
630    my $len = length($$bref);
631    print "  read $len bytes\n" if Perlbal::DEBUG >= 3;
632
633    # when run under the program "trickle", epoll speaks the truth to
634    # us, but then trickle interferes and steals our reads/writes, so
635    # this fails.  normally this check isn't needed.
636    return unless $len;
637
638    $self->{read_size} += $len;
639    $self->{content_length_remain} -= $len if $remain;
640
641    my $done_reading = defined $self->{content_length_remain} && $self->{content_length_remain} <= 0;
642    my $backend = $self->backend;
643    print("  done_reading = $done_reading, backend = ", ($backend || "<undef>"), "\n") if Perlbal::DEBUG >= 3;
644
645    # upload tracking
646    if (my $session = $self->{upload_session}) {
647        my $cl = $self->{req_headers}->content_length;
648        my $remain = $self->{content_length_remain};
649        my $now = time();  # FIXME: more efficient?
650        if ($cl && $remain && ($self->{last_upload_packet} || 0) != $now) {
651            my $done = $cl - $remain;
652            $self->{last_upload_packet} = $now;
653            $udp_sock ||= IO::Socket::INET->new(Proto => 'udp');
654            my $since = $self->{last_request_time};
655            my $send = "UPLOAD:$session:$done:$cl:$since:$now";
656            if ($udp_sock) {
657                foreach my $ep (@{ $self->{service}{upload_status_listeners_sockaddr} }) {
658                    my $rv = $udp_sock->send($send, 0, $ep);
659                }
660            }
661        }
662    }
663
664    # just dump the read into the nether if we're dangling. that is
665    # the case when we send the headers to the backend and it responds
666    # before we're done reading from the client; therefore further
667    # reads from the client just need to be sent nowhere, because the
668    # RFC2616 section 8.2.3 says: "the server SHOULD NOT close the
669    # transport connection until it has read the entire request"
670    if ($self->{responded}) {
671        print "  already responded.\n" if Perlbal::DEBUG >= 3;
672        # in addition, if we're now out of data (clr == 0), then we should
673        # either close ourselves or get ready for another request
674        return $self->http_response_sent if $done_reading;
675
676        print "  already responded [2].\n" if Perlbal::DEBUG >= 3;
677        # at this point, if the backend has responded then we just return
678        # as we don't want to send it on to them or buffer it up, which is
679        # what the code below does
680        return;
681    }
682
683    # if we have no data left to read, stop reading.  all that can
684    # come later is an extra \r\n which we handle later when parsing
685    # new request headers.  and if it's something else, we'll bail on
686    # the next request, not this one.
687    if ($done_reading) {
688        Carp::confess("content_length_remain less than zero: self->{content_length_remain}")
689            if $self->{content_length_remain} < 0;
690        $self->{unread_data_waiting} = 0;
691        $self->watch_read(0);
692    }
693
694    # now, if we have a backend, then we should be writing it to the backend
695    # and not doing anything else
696    if ($backend) {
697        print "  got a backend.  sending write to it.\n" if Perlbal::DEBUG >= 3;
698        $backend->write($bref);
699        # TODO: monitor the backend's write buffer depth?
700        return;
701    }
702
703    # now, we know we don't have a backend, so we have to push this data onto our
704    # read buffer... it's not going anywhere yet
705    push @{$self->{read_buf}}, $bref;
706    $self->{read_ahead} += $len;
707    print "  no backend.  read_ahead = $self->{read_ahead}.\n" if Perlbal::DEBUG >= 3;
708
709    # if we know we've already started spooling a file to disk, then continue
710    # to do that.
711    print "  bureason = $self->{bureason}\n" if Perlbal::DEBUG >= 3 && $self->{bureason};
712    return $self->buffered_upload_update if $self->{bureason};
713
714    # if we are under our buffer-to-memory size, just continue buffering here and
715    # don't fall through to the backend request call below
716    return if
717        ! $done_reading &&
718        $self->{read_ahead} < $self->{service}->{buffer_backend_connect};
719
720    # over the buffer-to-memory size, see if we should start spooling to disk.
721    return if $self->{service}->{buffer_uploads} && $self->decide_to_buffer_to_disk;
722
723    # give plugins a chance to act on the request before we request a backend
724    # (added by Chris Hondl <chris@imvu.com>, March 2006)
725    my $svc = $self->{service};
726    return if $svc->run_hook('proxy_read_request', $self);
727
728    # if we fall through to here, we need to ensure that a backend is on the
729    # way, because no specialized handling took over above
730    print "  finally requesting a backend\n" if Perlbal::DEBUG >= 3;
731    return $self->request_backend;
732}
733
734sub handle_request {
735    my Perlbal::ClientProxy $self = shift;
736    my $req_hd = $self->{req_headers};
737
738    unless ($req_hd) {
739        $self->close("handle_request without headers");
740        return;
741    }
742
743    $self->check_req_headers;
744
745    my $svc = $self->{service};
746    # give plugins a chance to force us to bail
747    return if $svc->run_hook('start_proxy_request', $self);
748    return if $svc->run_hook('start_http_request',  $self);
749
750    if ($self->handle_chunked_upload) {
751        # handled in method.
752    } else {
753        # if defined we're waiting on some amount of data.  also, we have to
754        # subtract out read_size, which is the amount of data that was
755        # extra in the packet with the header that's part of the body.
756        $self->{request_body_length} =
757            $self->{content_length_remain} =
758            $req_hd->content_length;
759        $self->{unread_data_waiting} = 1 if $self->{content_length_remain};
760    }
761
762    # upload-tracking stuff.  both starting a new upload track session,
763    # and checking on status of ongoing one
764    return if $svc->{upload_status_listeners} && $self->handle_upload_tracking;
765
766    # note that we've gotten a request
767    $self->{requests}++;
768    $self->{last_request_time} = $self->{alive_time};
769
770    # either start buffering some of the request to memory, or
771    # immediately request a backend connection.
772    if ($self->{chunked_upload_state}) {
773        $self->{request_body_length} = 0;
774        $self->{is_buffering} = 1;
775        $self->{bureason} = 'chunked';
776        $self->buffered_upload_update;
777    } elsif ($self->{content_length_remain} && $self->{service}->{buffer_backend_connect}) {
778        # the deeper path
779        $self->start_buffering_request;
780    } else {
781        # get the backend request process moving, since we aren't buffering
782        $self->{is_buffering} = 0;
783
784        # if reproxy-caching is enabled, we can often bypass needing to allocate a BackendHTTP connection:
785        return if $svc->{reproxy_cache} && $self->satisfy_request_from_cache;
786
787        $self->request_backend;
788    }
789}
790
791sub handle_chunked_upload {
792    my Perlbal::ClientProxy $self = shift;
793    my $req_hd = $self->{req_headers};
794    my $te = $req_hd->header("Transfer-Encoding");
795    return unless $te && $te eq "chunked";
796    return unless $self->{service}->{buffer_uploads};
797
798    $req_hd->header("Transfer-Encoding", undef); # remove it (won't go to backend)
799
800    my $eh = $req_hd->header("Expect");
801    if ($eh && $eh =~ /\b100-continue\b/) {
802        $self->write(\ "HTTP/1.1 100 Continue\r\n\r\n");
803        $req_hd->header("Expect", undef); # remove it (won't go to backend)
804    }
805
806    my $max_size = $self->{service}{max_chunked_request_size};
807
808    my $args = {
809        on_new_chunk => sub {
810            my $cref = shift;
811            my $len = length($$cref);
812            push @{$self->{read_buf}}, $cref;
813            $self->{read_ahead}          += $len;
814            $self->{request_body_length} += $len;
815
816            # if too large, disconnect them...
817            if ($max_size && $self->{request_body_length} > $max_size) {
818                $self->purge_buffered_upload;
819                $self->close;
820                return;
821            }
822            $self->buffered_upload_update;
823        },
824        on_disconnect => sub {
825            $self->client_disconnected;
826        },
827        on_zero_chunk => sub {
828            $self->send_buffered_upload;
829        },
830    };
831
832    $self->{chunked_upload_state} = Perlbal::ChunkedUploadState->new(%$args);
833    return 1;
834}
835
836sub satisfy_request_from_cache {
837    my Perlbal::ClientProxy $self = shift;
838
839    my $req_hd = $self->{req_headers};
840    my $svc    = $self->{service};
841    my $cache  = $svc->{reproxy_cache};
842    $svc->{_stat_requests}++;
843
844    my $requri   = $req_hd->request_uri    || '';
845    my $hostname = $req_hd->header("Host") || '';
846
847    my $key      = "$hostname|$requri";
848
849    my $reproxy  = $cache->get($key) or
850        return 0;
851
852    my ($timeout, $headers, $urls) = @$reproxy;
853    return 0 if time() > $timeout;
854
855    $svc->{_stat_cache_hits}++;
856    my %headers = map { ref $_ eq 'SCALAR' ? $$_ : $_ } @{$headers || []};
857
858    if (my $ims = $req_hd->header("If-Modified-Since")) {
859        my ($lm_key) = grep { uc($_) eq "LAST-MODIFIED" } keys %headers;
860        my $lm = $headers{$lm_key} || "";
861
862        # remove the IE length suffix
863        $ims =~ s/; length=(\d+)//;
864
865        # If 'Last-Modified' is same as 'If-Modified-Since', send a 304
866        if ($ims eq $lm) {
867            my $res_hd = Perlbal::HTTPHeaders->new_response(304);
868            $res_hd->header("Content-Length", "0");
869            $self->tcp_cork(1);
870            $self->state('xfer_resp');
871            $self->write($res_hd->to_string_ref);
872            $self->write(sub { $self->http_response_sent; });
873            return 1;
874        }
875    }
876
877    my $res_hd = Perlbal::HTTPHeaders->new_response(200);
878    $res_hd->header("Date", HTTP::Date::time2str(time()));
879    while (my ($key, $value) = each %headers) {
880        $res_hd->header($key, $value);
881    }
882
883    $self->start_reproxy_uri($res_hd, $urls);
884    return 1;
885}
886
887# return 1 to steal this connection (when they're asking status of an
888# upload session), return 0 to return it to handle_request's control.
889sub handle_upload_tracking {
890    my Perlbal::ClientProxy $self = shift;
891    my $req_hd = $self->{req_headers};
892
893    return 0 unless
894        $req_hd->request_uri =~ /[\?&]client_up_sess=(\w{5,50})\b/;
895
896    my $sess = $1;
897
898    # getting status?
899    if ($req_hd->request_uri =~ m!^/__upload_status\?!) {
900        my $status = Perlbal::UploadListener::get_status($sess);
901        my $now = time();
902        my $body = $status ?
903            "{done:$status->{done},total:$status->{total},starttime:$status->{starttime},nowtime:$now}" :
904            "{}";
905
906        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
907        $res->header("Content-Type", "text/plain");
908        $res->header('Content-Length', length $body);
909        $self->setup_keepalive($res);
910        $self->tcp_cork(1);  # cork writes to self
911        $self->write($res->to_string_ref);
912        $self->write(\ $body);
913        $self->write(sub { $self->http_response_sent; });
914        return 1;
915    }
916
917    # otherwise just tagging this upload as a new upload session
918    $self->{upload_session} = $sess;
919    return 0;
920}
921
922# continuation of handle_request, in the case where we need to start buffering
923# a bit of the request body to memory, either hoping that's all of it, or to
924# make a determination of whether or not we should save it all to disk first
925sub start_buffering_request {
926    my Perlbal::ClientProxy $self = shift;
927
928    # buffering case:
929    $self->{is_buffering} = 1;
930
931    # shortcut: if we know that we're buffering by size, and the size
932    # of this upload is bigger than that value, we can just turn on spool
933    # to disk right now...
934    if ($self->{service}->{buffer_uploads} && $self->{service}->{buffer_upload_threshold_size}) {
935        my $req_hd = $self->{req_headers};
936        if ($req_hd->content_length >= $self->{service}->{buffer_upload_threshold_size}) {
937            $self->{bureason} = 'size';
938            if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
939                $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', 'size');
940            }
941            $self->state('buffering_upload');
942            $self->buffered_upload_update;
943            return;
944        }
945    }
946
947    # well, we're buffering, but we're not going to disk just yet (but still might)
948    $self->state('buffering_request');
949
950    # only need time if we are using the buffer to disk functionality
951    $self->{start_time} = [ gettimeofday() ]
952        if $self->{service}->{buffer_uploads};
953}
954
955# looks at our states and decides if we should start writing to disk
956# or should just go ahead and blast this to the backend.  returns 1
957# if the decision was made to buffer to disk
958sub decide_to_buffer_to_disk {
959    my Perlbal::ClientProxy $self = shift;
960    return unless $self->{is_buffering};
961    return $self->{bureason} if defined $self->{bureason};
962
963    # this is called when we have enough data to determine whether or not to
964    # start buffering to disk
965    my $dur = tv_interval($self->{start_time}) || 1;
966    my $rate = $self->{read_ahead} / $dur;
967    my $etime = $self->{content_length_remain} / $rate;
968
969    # see if we have enough data to make the determination
970    my $reason = undef;
971
972    # see if we blow the rate away
973    if ($self->{service}->{buffer_upload_threshold_rate} > 0 &&
974            $rate < $self->{service}->{buffer_upload_threshold_rate}) {
975        # they are slower than the minimum rate
976        $reason = 'rate';
977    }
978
979    # and finally check estimated time exceeding
980    if ($self->{service}->{buffer_upload_threshold_time} > 0 &&
981            $etime > $self->{service}->{buffer_upload_threshold_time}) {
982        # exceeds
983        $reason = 'time';
984    }
985
986    unless ($reason) {
987        $self->{is_buffering} = 0;
988        return 0;
989    }
990
991    # start saving it to disk
992    $self->state('buffering_upload');
993    $self->buffered_upload_update;
994    $self->{bureason} = $reason;
995
996    if ($ENV{PERLBAL_DEBUG_BUFFERED_UPLOADS}) {
997        $self->{req_headers}->header('X-PERLBAL-BUFFERED-UPLOAD-REASON', $reason);
998    }
999
1000    return 1;
1001}
1002
1003# take ourselves and send along our buffered data to the backend
1004sub send_buffered_upload {
1005    my Perlbal::ClientProxy $self = shift;
1006
1007    # make sure our buoutpos is the same as the content length...
1008    return if $self->{is_writing};
1009
1010    # set the content-length that goes to the backend...
1011    if ($self->{chunked_upload_state}) {
1012        $self->{req_headers}->header("Content-Length", $self->{request_body_length});
1013    }
1014
1015    my $clen = $self->{req_headers}->content_length;
1016    if ($clen != $self->{buoutpos}) {
1017        Perlbal::log('critical', "Content length of $clen declared but $self->{buoutpos} bytes written to disk");
1018        return $self->_simple_response(500);
1019    }
1020
1021    # reset our position so we start reading from the right spot
1022    $self->{buoutpos} = 0;
1023    sysseek($self->{bufh}, 0, 0) if ($self->{bufh}); # But only if it exists at all
1024
1025    # notify that we want the backend so we get the ball rolling
1026    $self->request_backend;
1027}
1028
1029sub continue_buffered_upload {
1030    my Perlbal::ClientProxy $self = shift;
1031    my Perlbal::BackendHTTP $be = shift;
1032    return unless $self && $be;
1033
1034    # now send the data
1035    my $clen = $self->{request_body_length};
1036
1037    if ($self->{buoutpos} < $clen) {
1038        my $sent = Perlbal::Socket::sendfile($be->{fd}, fileno($self->{bufh}), $clen - $self->{buoutpos});
1039        if ($sent < 0) {
1040            return $self->close("epipe") if $! == EPIPE;
1041            return $self->close("connreset") if $! == ECONNRESET;
1042            print STDERR "Error w/ sendfile: $!\n";
1043            return $self->close('sendfile_error');
1044        }
1045        $self->{buoutpos} += $sent;
1046    }
1047
1048    # if we're done, purge the file and move on
1049    if ($self->{buoutpos} >= $clen) {
1050        $be->{buffered_upload_mode} = 0;
1051        $self->purge_buffered_upload;
1052        return;
1053    }
1054
1055    # we will be called again by the backend since buffered_upload_mode is on
1056}
1057
1058# write data to disk
1059sub buffered_upload_update {
1060    my Perlbal::ClientProxy $self = shift;
1061    return if $self->{is_writing};
1062    return unless $self->{is_buffering} && $self->{read_ahead};
1063
1064    # so we're not writing now and we have data to write...
1065    unless ($self->{bufilename}) {
1066        # create a filename and see if it exists or not
1067        $self->{is_writing} = 1;
1068        my $fn = join('-', $self->{service}->name, $self->{service}->listenaddr, "client", $self->{fd}, int(rand(0xffffffff)));
1069        $fn = $self->{service}->{buffer_uploads_path} . '/' . $fn;
1070
1071        # good, now we need to create the file
1072        Perlbal::AIO::aio_open($fn, O_CREAT | O_TRUNC | O_RDWR, 0644, sub {
1073            $self->{is_writing} = 0;
1074            $self->{bufh} = shift;
1075
1076            # throw errors back to the user
1077            if (! $self->{bufh}) {
1078                Perlbal::log('critical', "Failure to open $fn for buffered upload output");
1079                return $self->_simple_response(500);
1080            }
1081
1082            # save state and info and bounce it back to write data
1083            $self->{bufilename} = $fn;
1084            $self->buffered_upload_update;
1085        });
1086
1087        return;
1088    }
1089
1090    # can't proceed if we have no disk file to async write to
1091    # people reported seeing this crash rarely in production...
1092    # must be a race between previously in-flight's write
1093    # re-invoking a write immediately after something triggered
1094    # a buffered upload purge.
1095    unless ($self->{bufh}) {
1096        $self->close;
1097        return;
1098    }
1099
1100    # at this point, we want to do some writing
1101    my $bref = shift(@{$self->{read_buf}});
1102    my $len = length $$bref;
1103    $self->{read_ahead} -= $len;
1104
1105    # so at this point we have a valid filename and file handle and should write out
1106    # the buffer that we have
1107    $self->{is_writing} = 1;
1108    Perlbal::AIO::aio_write($self->{bufh}, $self->{buoutpos}, $len, $$bref, sub {
1109        my $bytes = shift;
1110        $self->{is_writing} = 0;
1111
1112        # check for error
1113        unless ($bytes) {
1114            Perlbal::log('critical', "Error writing buffered upload: $!.  Tried to do $len bytes at $self->{buoutpos}.");
1115            return $self->_simple_response(500);
1116        }
1117
1118        # update our count of data written
1119        $self->{buoutpos} += $bytes;
1120
1121        # now check if we wrote less than we had in this chunk of buffer.  if that's
1122        # the case then we need to reenqueue the part of the chunk that wasn't
1123        # written out and update as appropriate.
1124        if ($bytes < $len) {
1125            my $diff = $len - $bytes;
1126            unshift @{$self->{read_buf}}, \ substr($$bref, $bytes, $diff);
1127            $self->{read_ahead} += $diff;
1128        }
1129
1130        # if we're processing a chunked upload, ...
1131        if ($self->{chunked_upload_state}) {
1132            # turn reads back on, if we haven't hit the end yet.
1133            if ($self->{unread_data_waiting} && $self->{read_ahead} < 1024*1024) {
1134                $self->watch_read(1);
1135                $self->{unread_data_waiting} = 0;
1136            }
1137
1138            if ($self->{read_ahead} == 0 && $self->{chunked_upload_state}->hit_zero_chunk) {
1139                $self->watch_read(0);
1140                $self->send_buffered_upload;
1141                return;
1142            }
1143        }
1144
1145        # if we're done (no clr and no read ahead!) then send it
1146        elsif ($self->{read_ahead} <= 0 && $self->{content_length_remain} <= 0) {
1147            $self->send_buffered_upload;
1148            return;
1149        }
1150
1151        # spawn another writer!
1152        $self->buffered_upload_update;
1153    });
1154}
1155
1156# destroy any files we've created
1157sub purge_buffered_upload {
1158    my Perlbal::ClientProxy $self = shift;
1159
1160    # Main reason for failure below is a 0-length chunked upload, where the file is never created.
1161    return unless $self->{bufh};
1162
1163    # FIXME: it's reported that sometimes the two now-in-eval blocks
1164    # fail, hence the eval blocks and warnings.  the FIXME is to
1165    # figure this out, why it happens sometimes.
1166
1167    # first close our filehandle... not async
1168    eval {
1169        CORE::close($self->{bufh});
1170    };
1171    if ($@) { warn "Error closing file in ClientProxy::purge_buffered_upload: $@\n"; }
1172
1173    $self->{bufh} = undef;
1174
1175    eval {
1176        # now asyncronously unlink the file
1177        Perlbal::AIO::aio_unlink($self->{bufilename}, sub {
1178            if ($!) {
1179                # note an error, but whatever, we'll either overwrite the file later (O_TRUNC | O_CREAT)
1180                # or a cleaner will come through and do it for us someday (if the user runs one)
1181                Perlbal::log('warning', "Unable to link $self->{bufilename}: $!");
1182              }
1183        });
1184    };
1185    if ($@) { warn "Error unlinking file in ClientProxy::purge_buffered_upload: $@\n"; }
1186}
1187
1188# returns bool; whether backend should hide the 500 error from the client
1189#   and have us try a new backend.  return true to retry, false to get a 500 error.
1190sub should_retry_after_500 {
1191    my Perlbal::ClientProxy $self = shift;
1192    my Perlbal::BackendHTTP $be   = shift;
1193    my $svc = $be->{service};
1194    return 0 unless $svc->{enable_error_retries};
1195    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1196    return 0 if ++$self->{retry_count} > @sched;
1197    return 1;
1198}
1199
1200# called by Backend to tell us it got a 500 error and we should retry another backend.
1201sub retry_after_500 {
1202    my Perlbal::ClientProxy $self = shift;
1203    my Perlbal::Service     $svc  = shift;
1204
1205    my @sched = split(/\s*,\s*/, $svc->{error_retry_schedule});
1206    my $delay = $sched[$self->{retry_count} - 1];
1207
1208    if ($delay) {
1209        Danga::Socket->AddTimer($delay, sub {
1210            return if $self->{closed};
1211            $self->rerequest_backend;
1212        });
1213    } else {
1214        $self->rerequest_backend;
1215    }
1216
1217}
1218
1219sub as_string {
1220    my Perlbal::ClientProxy $self = shift;
1221
1222    my $ret = $self->SUPER::as_string;
1223    if ($self->{backend}) {
1224        my $ipport = $self->{backend}->{ipport};
1225        $ret .= "; backend=$ipport";
1226    } else {
1227        $ret .= "; write_buf_size=$self->{write_buf_size}"
1228            if $self->{write_buf_size} > 0;
1229    }
1230    $ret .= "; highpri" if $self->{high_priority};
1231    $ret .= "; lowpri" if $self->{low_priority};
1232    $ret .= "; responded" if $self->{responded};
1233    $ret .= "; waiting_for=" . $self->{content_length_remain}
1234        if defined $self->{content_length_remain};
1235    $ret .= "; reproxying" if $self->{currently_reproxying};
1236
1237    return $ret;
1238}
1239
1240sub set_queue_low {
1241    my Perlbal::ClientProxy $self = shift;
1242    $self->{low_priority} = 1;
1243    return;
1244}
1245
1246sub set_queue_high {
1247    my Perlbal::ClientProxy $self = shift;
1248    $self->{high_priority} = 1;
1249    return;
1250}
1251
1252
1253sub DESTROY {
1254    Perlbal::objdtor($_[0]);
1255    $_[0]->SUPER::DESTROY;
1256}
1257
12581;
1259
1260
1261# Local Variables:
1262# mode: perl
1263# c-basic-indent: 4
1264# indent-tabs-mode: nil
1265# End:
Note: See TracBrowser for help on using the browser.