Show
Ignore:
Timestamp:
08/25/05 00:21:24 (4 years ago)
Author:
bradfitz
Message:

resume functionality

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/lib/Perlbal/Plugin/AtomStream.pm

    r408 r430  
    99use warnings; 
    1010 
    11 our @subs;  # subscribers 
     11our @subs;    # subscribers 
     12our @recent;  # recent items in format [$epoch, $atom_ref] 
     13 
     14our $last_timestamp = 0; 
    1215 
    1316use constant MAX_LAG => 262144; 
     
    1619    my $class = shift; 
    1720    my $atomref = shift; 
     21 
     22    # maintain queue of last 60 seconds worth of posts 
     23    my $now = time(); 
     24    push @recent, [ $now, $atomref ]; 
     25    shift @recent while @recent && $recent[0][0] <= $now - 60; 
     26 
     27    emit_timestamp($now) if $now > $last_timestamp; 
    1828 
    1929    my $need_clean = 0; 
     
    3343                $s->write(\ "<sorryTooSlow youMissed=\"$skip_count\" />\n"); 
    3444            } 
    35             $s->write($atomref); 
     45            $s->watch_write(0) if $s->write($atomref); 
    3646        } 
    3747    } 
     
    3949    if ($need_clean) { 
    4050        @subs = grep { ! $_->{closed} } @subs; 
     51    } 
     52} 
     53 
     54sub emit_timestamp { 
     55    my $time = shift; 
     56    $last_timestamp = $time; 
     57    foreach my $s (@subs) { 
     58        next if $s->{closed}; 
     59        $s->{alive_time} = $time; 
     60        $s->write(\ "<time>$time</time>\n"); 
    4161    } 
    4262} 
     
    4868    Perlbal::Socket::register_callback(1, sub { 
    4969        my $now = time(); 
    50         foreach my $s (@subs) { 
    51             next if $s->{closed}; 
    52             $s->{alive_time} = $now; 
    53             $s->write(\ "<time>$now</time>\n"); 
    54         } 
     70        emit_timestamp($now) if $now > $last_timestamp; 
    5571        return 1; 
    5672    }); 
     
    6177        return 0 unless $hds; 
    6278        my $uri = $hds->request_uri; 
    63         return 0 unless $uri =~ m!^/atom-stream\.xml$!; 
     79        return 0 unless $uri =~ m!^/atom-stream\.xml(?:\?since=(\d+))?$!; 
     80        my $since = $1 || 0; 
    6481 
    6582        my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200); 
     
    7087 
    7188        $self->write($res->to_string_ref); 
    72         $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream>\n"); 
     89 
     90        my $last_rv = $self->write(\ "<?xml version='1.0' encoding='utf-8' ?>\n<atomStream><!-- since=$since -->\n"); 
     91 
     92        # if they'd like a playback, give them all items >= time requested 
     93        if ($since) { 
     94            foreach my $item (@recent) { 
     95                next if $item->[0] < $since; 
     96                $last_rv = $self->write($item->[1]); 
     97            } 
     98        } 
     99 
     100        $self->watch_write(0) if $last_rv; 
    73101        return 1; 
    74102    });