Changeset 430 for trunk/lib/Perlbal/Plugin/AtomStream.pm
- Timestamp:
- 08/25/05 00:21:24 (4 years ago)
- Files:
-
- 1 modified
-
trunk/lib/Perlbal/Plugin/AtomStream.pm (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/Perlbal/Plugin/AtomStream.pm
r408 r430 9 9 use warnings; 10 10 11 our @subs; # subscribers 11 our @subs; # subscribers 12 our @recent; # recent items in format [$epoch, $atom_ref] 13 14 our $last_timestamp = 0; 12 15 13 16 use constant MAX_LAG => 262144; … … 16 19 my $class = shift; 17 20 my $atomref = shift; 21 22 # maintain queue of last 60 seconds worth of posts 23 my $now = time(); 24 push @recent, [ $now, $atomref ]; 25 shift @recent while @recent && $recent[0][0] <= $now - 60; 26 27 emit_timestamp($now) if $now > $last_timestamp; 18 28 19 29 my $need_clean = 0; … … 33 43 $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n"); 34 44 } 35 $s->w rite($atomref);45 $s->watch_write(0) if $s->write($atomref); 36 46 } 37 47 } … … 39 49 if ($need_clean) { 40 50 @subs = grep { ! $_->{closed} } @subs; 51 } 52 } 53 54 sub emit_timestamp { 55 my $time = shift; 56 $last_timestamp = $time; 57 foreach my $s (@subs) { 58 next if $s->{closed}; 59 $s->{alive_time} = $time; 60 $s->write(\ "<time>$time</time>\n"); 41 61 } 42 62 } … … 48 68 Perlbal::Socket::register_callback(1, sub { 49 69 my $now = time(); 50 foreach my $s (@subs) { 51 next if $s->{closed}; 52 $s->{alive_time} = $now; 53 $s->write(\ "<time>$now</time>\n"); 54 } 70 emit_timestamp($now) if $now > $last_timestamp; 55 71 return 1; 56 72 }); … … 61 77 return 0 unless $hds; 62 78 my $uri = $hds->request_uri; 63 return 0 unless $uri =~ m!^/atom-stream\.xml$!; 79 return 0 unless $uri =~ m!^/atom-stream\.xml(?:\?since=(\d+))?$!; 80 my $since = $1 || 0; 64 81 65 82 my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200); … … 70 87 71 88 $self->write($res->to_string_ref); 72 $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream>\n"); 89 90 my $last_rv = $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream><!-- since=$since -->\n"); 91 92 # if they'd like a playback, give them all items >= time requested 93 if ($since) { 94 foreach my $item (@recent) { 95 next if $item->[0] < $since; 96 $last_rv = $self->write($item->[1]); 97 } 98 } 99 100 $self->watch_write(0) if $last_rv; 73 101 return 1; 74 102 });
