root/trunk/lib/Perlbal/Plugin/AtomStream.pm @ 431

Revision 431, 2.8 kB (checked in by bradfitz, 4 years ago)

start of work on access control plugin. doesn't work (or even
compile) yet, but I wanted to get it in so I could hack on it
elsewhere, and it doesn't break anything else, being just a plugin.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1package Perlbal::Plugin::AtomStream;
2
3use Perlbal;
4use strict;
5use warnings;
6
7our @subs;    # subscribers
8our @recent;  # recent items in format [$epoch, $atom_ref]
9
10our $last_timestamp = 0;
11
12use constant MAX_LAG => 262144;
13
14sub InjectFeed {
15    my $class = shift;
16    my $atomref = shift;
17
18    # maintain queue of last 60 seconds worth of posts
19    my $now = time();
20    push @recent, [ $now, $atomref ];
21    shift @recent while @recent && $recent[0][0] <= $now - 60;
22
23    emit_timestamp($now) if $now > $last_timestamp;
24
25    my $need_clean = 0;
26    foreach my $s (@subs) {
27        if ($s->{closed}) {
28            $need_clean = 1;
29            next;
30        }
31
32        my $lag = $s->{write_buf_size};
33
34        if ($lag > MAX_LAG) {
35            $s->{scratch}{skipped_atom}++;
36        } else {
37            if (my $skip_count = $s->{scratch}{skipped_atom}) {
38                $s->{scratch}{skipped_atom} = 0;
39                $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n");
40            }
41            $s->watch_write(0) if $s->write($atomref);
42        }
43    }
44
45    if ($need_clean) {
46        @subs = grep { ! $_->{closed} } @subs;
47    }
48}
49
50sub emit_timestamp {
51    my $time = shift;
52    $last_timestamp = $time;
53    foreach my $s (@subs) {
54        next if $s->{closed};
55        $s->{alive_time} = $time;
56        $s->write(\ "<time>$time</time>\n");
57    }
58}
59
60# called when we're being added to a service
61sub register {
62    my ($class, $svc) = @_;
63
64    Perlbal::Socket::register_callback(1, sub {
65        my $now = time();
66        emit_timestamp($now) if $now > $last_timestamp;
67        return 1;
68    });
69
70    $svc->register_hook('AtomStream', 'start_http_request', sub {
71        my Perlbal::ClientProxy $self = shift;
72        my Perlbal::HTTPHeaders $hds = $self->{req_headers};
73        return 0 unless $hds;
74        my $uri = $hds->request_uri;
75        return 0 unless $uri =~ m!^/atom-stream\.xml(?:\?since=(\d+))?$!;
76        my $since = $1 || 0;
77
78        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
79        $res->header("Content-Type", "text/xml");
80        $res->header('Connection', 'close');
81
82        push @subs, $self;
83
84        $self->write($res->to_string_ref);
85
86        my $last_rv = $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream><!-- since=$since -->\n");
87
88        # if they'd like a playback, give them all items >= time requested
89        if ($since) {
90            foreach my $item (@recent) {
91                next if $item->[0] < $since;
92                $last_rv = $self->write($item->[1]);
93            }
94        }
95
96        $self->watch_write(0) if $last_rv;
97        return 1;
98    });
99
100    return 1;
101}
102
103# called when we're no longer active on a service
104sub unregister {
105    my ($class, $svc) = @_;
106
107    return 1;
108}
109
110# called when we are loaded
111sub load {
112    return 1;
113}
114
115# called for a global unload
116sub unload {
117    return 1;
118}
119
1201;
Note: See TracBrowser for help on using the browser.