root/trunk/lib/Perlbal/Plugin/AtomStream.pm @ 829

Revision 580, 3.5 kB (checked in by aimbert, 3 years ago)

add a path filter t the AtomStream:

- the AtomInject plugin receives a PUT request on a given path ("$put_path") with the content of an entry.
- the AtomStream plugin receives a GET request on another path ("$get_path/atom-stream.xml") (the arg 'since' is still allowed)

$put_path and $get_path can be '/' or '/a' or '/a/b', ....

then the AtomStream plugin produces a result filtered like this:

GET /atom-stream.xml returns everything ('/' or '/a' or '/a/b')
GET /a/atom-stream.xml returns the entries put with '/a' or '/a/b'
GET /a/b/atom-stream.xml returns the entries put with '/a/b'

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1package Perlbal::Plugin::AtomStream;
2
3use URI;
4
5use Perlbal;
6use strict;
7use warnings;
8
9our @subs;    # subscribers
10our @recent;  # recent items in format [$epoch, $atom_ref, $path_segments_arrayref]
11
12our $last_timestamp = 0;
13
14use constant MAX_LAG => 262144;
15
16sub InjectFeed {
17    my $class = shift;
18    my ($atomref, $path) = @_;
19
20    # maintain queue of last 60 seconds worth of posts
21    my $now = time();
22    my @put_segments = URI->new($path)->path_segments;
23    push @recent, [ $now, $atomref, \@put_segments ];
24    shift @recent while @recent && $recent[0][0] <= $now - 60;
25
26    emit_timestamp($now) if $now > $last_timestamp;
27
28    my $need_clean = 0;
29    foreach my $s (@subs) {
30        if ($s->{closed}) {
31            $need_clean = 1;
32            next;
33        }
34
35        next unless filter(\@put_segments, $s->{scratch}{get_segments});
36       
37        my $lag = $s->{write_buf_size};
38
39        if ($lag > MAX_LAG) {
40            $s->{scratch}{skipped_atom}++;
41        } else {
42            if (my $skip_count = $s->{scratch}{skipped_atom}) {
43                $s->{scratch}{skipped_atom} = 0;
44                $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n");
45            }
46            $s->watch_write(0) if $s->write($atomref);
47        }
48    }
49
50    if ($need_clean) {
51        @subs = grep { ! $_->{closed} } @subs;
52    }
53}
54
55sub emit_timestamp {
56    my $time = shift;
57    $last_timestamp = $time;
58    foreach my $s (@subs) {
59        next if $s->{closed};
60        $s->{alive_time} = $time;
61        $s->write(\ "<time>$time</time>\n");
62    }
63}
64
65sub filter {
66    my ($put, $get) = @_;
67    return 0 if scalar @$put < scalar @$get;
68    for( my $i = 0 ; $i < scalar @$get ; $i++) {
69        return 0 if $put->[$i] ne $get->[$i];
70    }
71    return 1;
72}
73
74# called when we're being added to a service
75sub register {
76    my ($class, $svc) = @_;
77
78    Perlbal::Socket::register_callback(1, sub {
79        my $now = time();
80        emit_timestamp($now) if $now > $last_timestamp;
81        return 1;
82    });
83
84    $svc->register_hook('AtomStream', 'start_http_request', sub {
85        my Perlbal::ClientProxy $self = shift;
86        my Perlbal::HTTPHeaders $hds = $self->{req_headers};
87        return 0 unless $hds;
88        my $uri = URI->new($hds->request_uri);
89        my @get_segments = $uri->path_segments;
90        $self->{scratch}{get_segments} = \@get_segments;
91        return 0 unless pop @get_segments eq 'atom-stream.xml';
92        my %params = $uri->query_form;
93        my $since = $params{since} =~ /\d+/ ? $params{since} : 0;
94
95        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
96        $res->header("Content-Type", "text/xml");
97        $res->header('Connection', 'close');
98
99        push @subs, $self;
100
101        $self->write($res->to_string_ref);
102
103        my $last_rv = $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream><!-- since=$since -->\n");
104
105        # if they'd like a playback, give them all items >= time requested
106        if ($since) {
107            foreach my $item (@recent) {
108                next if $item->[0] < $since;
109                next unless filter($item->[2], \@get_segments);
110                $last_rv = $self->write($item->[1]);
111            }
112        }
113
114        $self->watch_write(0) if $last_rv;
115        return 1;
116    });
117
118    return 1;
119}
120
121# called when we're no longer active on a service
122sub unregister {
123    my ($class, $svc) = @_;
124
125    return 1;
126}
127
128# called when we are loaded
129sub load {
130    return 1;
131}
132
133# called for a global unload
134sub unload {
135    return 1;
136}
137
1381;
Note: See TracBrowser for help on using the browser.