root/trunk/lib/Perlbal/Plugin/AtomStream.pm @ 430

Revision 430, 3.0 kB (checked in by bradfitz, 4 years ago)

resume functionality

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1###########################################################################
2# basic Perlbal statistics gatherer
3###########################################################################
4
5package Perlbal::Plugin::AtomStream;
6
7use Perlbal;
8use strict;
9use warnings;
10
11our @subs;    # subscribers
12our @recent;  # recent items in format [$epoch, $atom_ref]
13
14our $last_timestamp = 0;
15
16use constant MAX_LAG => 262144;
17
18sub InjectFeed {
19    my $class = shift;
20    my $atomref = shift;
21
22    # maintain queue of last 60 seconds worth of posts
23    my $now = time();
24    push @recent, [ $now, $atomref ];
25    shift @recent while @recent && $recent[0][0] <= $now - 60;
26
27    emit_timestamp($now) if $now > $last_timestamp;
28
29    my $need_clean = 0;
30    foreach my $s (@subs) {
31        if ($s->{closed}) {
32            $need_clean = 1;
33            next;
34        }
35
36        my $lag = $s->{write_buf_size};
37
38        if ($lag > MAX_LAG) {
39            $s->{scratch}{skipped_atom}++;
40        } else {
41            if (my $skip_count = $s->{scratch}{skipped_atom}) {
42                $s->{scratch}{skipped_atom} = 0;
43                $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n");
44            }
45            $s->watch_write(0) if $s->write($atomref);
46        }
47    }
48
49    if ($need_clean) {
50        @subs = grep { ! $_->{closed} } @subs;
51    }
52}
53
54sub emit_timestamp {
55    my $time = shift;
56    $last_timestamp = $time;
57    foreach my $s (@subs) {
58        next if $s->{closed};
59        $s->{alive_time} = $time;
60        $s->write(\ "<time>$time</time>\n");
61    }
62}
63
64# called when we're being added to a service
65sub register {
66    my ($class, $svc) = @_;
67
68    Perlbal::Socket::register_callback(1, sub {
69        my $now = time();
70        emit_timestamp($now) if $now > $last_timestamp;
71        return 1;
72    });
73
74    $svc->register_hook('AtomStream', 'start_http_request', sub {
75        my Perlbal::ClientProxy $self = shift;
76        my Perlbal::HTTPHeaders $hds = $self->{req_headers};
77        return 0 unless $hds;
78        my $uri = $hds->request_uri;
79        return 0 unless $uri =~ m!^/atom-stream\.xml(?:\?since=(\d+))?$!;
80        my $since = $1 || 0;
81
82        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
83        $res->header("Content-Type", "text/xml");
84        $res->header('Connection', 'close');
85
86        push @subs, $self;
87
88        $self->write($res->to_string_ref);
89
90        my $last_rv = $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream><!-- since=$since -->\n");
91
92        # if they'd like a playback, give them all items >= time requested
93        if ($since) {
94            foreach my $item (@recent) {
95                next if $item->[0] < $since;
96                $last_rv = $self->write($item->[1]);
97            }
98        }
99
100        $self->watch_write(0) if $last_rv;
101        return 1;
102    });
103
104    return 1;
105}
106
107# called when we're no longer active on a service
108sub unregister {
109    my ($class, $svc) = @_;
110
111    return 1;
112}
113
114# called when we are loaded
115sub load {
116    return 1;
117}
118
119# called for a global unload
120sub unload {
121    return 1;
122}
123
1241;
Note: See TracBrowser for help on using the browser.