#!/usr/bin/perl use strict; use warnings; use lib "$ENV{LJHOME}/cgi-bin"; require 'ljlib.pl'; $|++; use Errno qw(EAGAIN EWOULDBLOCK); use LJ::Blockwatch; use IO::Socket::INET; use Time::HiRes qw(tv_interval); my %files; my %filetimes; my $last_time_checked = time(); my %time_averages; my %sockets; # fileno -> IO::Socket::INET socket my %socket_destinations; # fileno -> "hostname:port" ############################################################################# # This block handles initial connections to the nodes we want to listen to. # The list is hard coded in the @destinations list for the moment. ############################################################################# { my %connecting_sockets; my @destinations = qw(localhost:7600 127.0.0.1:7600); foreach my $dest (@destinations) { my $sock = IO::Socket::INET->new( PeerHost => $dest, Blocking => 0 ) or die "Couldn't connect: $!"; $connecting_sockets{$sock->fileno} = $sock; $socket_destinations{$sock->fileno} = $dest; } sleep 3; my $win = ''; foreach my $fd (keys %connecting_sockets) { vec($win, $fd, 1) = 1; } select(undef, my $wout = $win, undef, 0); while (my ($fd, $sock) = each %connecting_sockets) { if (vec($wout, $fd, 1)) { $sockets{$fd} = $sock; $sock->write("evwatch\n"); } } } die "Nothing allowed us to connect" unless keys %sockets; my %socket_buffers = map { ($_, '') } keys %sockets; # fileno -> buffer ############################################################################# # This block handles listening to each of the sockets for reading and handing # the incoming data off to sub process_line anytime there has been a full # line read. ############################################################################# while (1) { my $rin = ''; foreach my $fd (keys %sockets) { vec($rin, $fd, 1) = 1; } select(my $rout = $rin, undef, undef, undef); # Read data from the sockets that are ready SOCK: foreach my $fd (keys %sockets) { my $sock = $sockets{$fd}; my $bufref = \$socket_buffers{$fd}; if (vec($rout, $fd, 1)) { READ: while (1) { my $length = sysread($sock, my $read_buffer, 1024); if ($length) { $$bufref .= $read_buffer; next READ; # Read again, till we get a read error. } if ($! == EAGAIN || $! == EWOULDBLOCK) { last READ; # We've read all we can on this loop. } # Other errors mean we just close the connection and move on. delete $sockets{$fd}; delete $socket_buffers{$fd}; next SOCK; } my $dest = $socket_destinations{$fd}; while ($$bufref =~ s/(.*?)\r?\n//) { my $line = $1; next unless $line; my ($filename, $time, $utime, $direction, $event) = split /,/, $line; process_line("${dest}${filename}", $time, $utime, $direction, $event); } } } } ############################################################################# # Process a line of incoming data, arguments are: # label - hostname and filename concatted together # time, utime - pair of integers that report when this event happened # direction - boolean indicating the direction of this event # begin is 0 # end is 1 # event - integer representing the event that occurred ############################################################################# sub process_line { my ($label, $time, $utime, $direction, $event) = @_; my $filename = $label; my $current_time = time(); $filetimes{$filename} = $current_time; my $filedata = $files{$filename} ||= {}; my $eventdata = $filedata->{$event} ||= []; if ($direction) { # backing out one operation my $start_times = pop @$eventdata; delete $filedata->{$event} unless @$eventdata; return unless $start_times; my $interval = tv_interval($start_times, [$time, $utime]); my $average = \$time_averages{$event}; if (defined $$average) { $$average *= .95; $$average += ($interval * .05); } else { $$average = $interval; } } else { # adding an event push @$eventdata, [$time, $utime]; } if ($last_time_checked + 1 <= $current_time) { $last_time_checked = $current_time; foreach my $key (keys %filetimes) { if ($filetimes{$key} < $current_time - 10) { print "Removing $key.\n"; delete $filetimes{$key}; delete $files{$key}; } } dump_stats(); } } sub dump_stats { while (my ($filename, $filedata) = each %files) { next unless keys %$filedata; print "For '$filename'\n"; while (my ($event, $times) = each %$filedata) { my $event_name = LJ::Blockwatch->get_event_name($event); print " $event_name has " . @$times . " outstanding.\n"; } } continue { print "\n"; } foreach my $event (map {$_->[1]} sort {$a->[0] <=> $b->[0]} map { [$time_averages{$_}, $_] } keys %time_averages) { my $time = $time_averages{$event}; my $event_name = LJ::Blockwatch->get_event_name($event); printf "$time\t$event_name\n"; } print "\n"; }