root/trunk/lib/Perlbal/Plugin/AtomStream.pm @ 408

Revision 408, 2.2 kB (checked in by bradfitz, 4 years ago)

add in the whole "sorryTooSlow" XML element

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1###########################################################################
2# basic Perlbal statistics gatherer
3###########################################################################
4
5package Perlbal::Plugin::AtomStream;
6
7use Perlbal;
8use strict;
9use warnings;
10
11our @subs;  # subscribers
12
13use constant MAX_LAG => 262144;
14
15sub InjectFeed {
16    my $class = shift;
17    my $atomref = shift;
18
19    my $need_clean = 0;
20    foreach my $s (@subs) {
21        if ($s->{closed}) {
22            $need_clean = 1;
23            next;
24        }
25
26        my $lag = $s->{write_buf_size};
27
28        if ($lag > MAX_LAG) {
29            $s->{scratch}{skipped_atom}++;
30        } else {
31            if (my $skip_count = $s->{scratch}{skipped_atom}) {
32                $s->{scratch}{skipped_atom} = 0;
33                $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n");
34            }
35            $s->write($atomref);
36        }
37    }
38
39    if ($need_clean) {
40        @subs = grep { ! $_->{closed} } @subs;
41    }
42}
43
44# called when we're being added to a service
45sub register {
46    my ($class, $svc) = @_;
47
48    Perlbal::Socket::register_callback(1, sub {
49        my $now = time();
50        foreach my $s (@subs) {
51            next if $s->{closed};
52            $s->{alive_time} = $now;
53            $s->write(\ "<time>$now</time>\n");
54        }
55        return 1;
56    });
57
58    $svc->register_hook('AtomStream', 'start_http_request', sub {
59        my Perlbal::ClientProxy $self = shift;
60        my Perlbal::HTTPHeaders $hds = $self->{req_headers};
61        return 0 unless $hds;
62        my $uri = $hds->request_uri;
63        return 0 unless $uri =~ m!^/atom-stream\.xml$!;
64
65        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
66        $res->header("Content-Type", "text/xml");
67        $res->header('Connection', 'close');
68
69        push @subs, $self;
70
71        $self->write($res->to_string_ref);
72        $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream>\n");
73        return 1;
74    });
75
76    return 1;
77}
78
79# called when we're no longer active on a service
80sub unregister {
81    my ($class, $svc) = @_;
82
83    return 1;
84}
85
86# called when we are loaded
87sub load {
88    return 1;
89}
90
91# called for a global unload
92sub unload {
93    return 1;
94}
95
961;
Note: See TracBrowser for help on using the browser.