root/trunk/lib/Perlbal/Plugin/AtomStream.pm

Revision 580, 3.5 kB (checked in by aimbert, 2 years ago)

add a path filter t the AtomStream:

- the AtomInject plugin receives a PUT request on a given path ("$put_path") with the content of an entry.
- the AtomStream plugin receives a GET request on another path ("$get_path/atom-stream.xml") (the arg 'since' is still allowed)

$put_path and $get_path can be '/' or '/a' or '/a/b', ....

then the AtomStream plugin produces a result filtered like this:

GET /atom-stream.xml returns everything ('/' or '/a' or '/a/b')
GET /a/atom-stream.xml returns the entries put with '/a' or '/a/b'
GET /a/b/atom-stream.xml returns the entries put with '/a/b'

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1 package Perlbal::Plugin::AtomStream;
2
3 use URI;
4
5 use Perlbal;
6 use strict;
7 use warnings;
8
9 our @subs;    # subscribers
10 our @recent;  # recent items in format [$epoch, $atom_ref, $path_segments_arrayref]
11
12 our $last_timestamp = 0;
13
14 use constant MAX_LAG => 262144;
15
16 sub InjectFeed {
17     my $class = shift;
18     my ($atomref, $path) = @_;
19
20     # maintain queue of last 60 seconds worth of posts
21     my $now = time();
22     my @put_segments = URI->new($path)->path_segments;
23     push @recent, [ $now, $atomref, \@put_segments ];
24     shift @recent while @recent && $recent[0][0] <= $now - 60;
25
26     emit_timestamp($now) if $now > $last_timestamp;
27
28     my $need_clean = 0;
29     foreach my $s (@subs) {
30         if ($s->{closed}) {
31             $need_clean = 1;
32             next;
33         }
34
35         next unless filter(\@put_segments, $s->{scratch}{get_segments});
36        
37         my $lag = $s->{write_buf_size};
38
39         if ($lag > MAX_LAG) {
40             $s->{scratch}{skipped_atom}++;
41         } else {
42             if (my $skip_count = $s->{scratch}{skipped_atom}) {
43                 $s->{scratch}{skipped_atom} = 0;
44                 $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n");
45             }
46             $s->watch_write(0) if $s->write($atomref);
47         }
48     }
49
50     if ($need_clean) {
51         @subs = grep { ! $_->{closed} } @subs;
52     }
53 }
54
55 sub emit_timestamp {
56     my $time = shift;
57     $last_timestamp = $time;
58     foreach my $s (@subs) {
59         next if $s->{closed};
60         $s->{alive_time} = $time;
61         $s->write(\ "<time>$time</time>\n");
62     }
63 }
64
65 sub filter {
66     my ($put, $get) = @_;
67     return 0 if scalar @$put < scalar @$get;
68     for( my $i = 0 ; $i < scalar @$get ; $i++) {
69         return 0 if $put->[$i] ne $get->[$i];
70     }
71     return 1;
72 }
73
74 # called when we're being added to a service
75 sub register {
76     my ($class, $svc) = @_;
77
78     Perlbal::Socket::register_callback(1, sub {
79         my $now = time();
80         emit_timestamp($now) if $now > $last_timestamp;
81         return 1;
82     });
83
84     $svc->register_hook('AtomStream', 'start_http_request', sub {
85         my Perlbal::ClientProxy $self = shift;
86         my Perlbal::HTTPHeaders $hds = $self->{req_headers};
87         return 0 unless $hds;
88         my $uri = URI->new($hds->request_uri);
89         my @get_segments = $uri->path_segments;
90         $self->{scratch}{get_segments} = \@get_segments;
91         return 0 unless pop @get_segments eq 'atom-stream.xml';
92         my %params = $uri->query_form;
93         my $since = $params{since} =~ /\d+/ ? $params{since} : 0;
94
95         my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200);
96         $res->header("Content-Type", "text/xml");
97         $res->header('Connection', 'close');
98
99         push @subs, $self;
100
101         $self->write($res->to_string_ref);
102
103         my $last_rv = $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream><!-- since=$since -->\n");
104
105         # if they'd like a playback, give them all items >= time requested
106         if ($since) {
107             foreach my $item (@recent) {
108                 next if $item->[0] < $since;
109                 next unless filter($item->[2], \@get_segments);
110                 $last_rv = $self->write($item->[1]);
111             }
112         }
113
114         $self->watch_write(0) if $last_rv;
115         return 1;
116     });
117
118     return 1;
119 }
120
121 # called when we're no longer active on a service
122 sub unregister {
123     my ($class, $svc) = @_;
124
125     return 1;
126 }
127
128 # called when we are loaded
129 sub load {
130     return 1;
131 }
132
133 # called for a global unload
134 sub unload {
135     return 1;
136 }
137
138 1;
Note: See TracBrowser for help on using the browser.