#!/usr/bin/perl -w eval 'exec /usr/bin/perl -w -S $0 ${1+"$@"}' if 0; # not running under some shell # # MogileFS storage node daemon # (perlbal front-end) # # (c) 2004, Brad Fitzpatrick, # (c) 2006, Six Apart, Ltd. use strict; use lib 'lib'; use Perlbal; use IO::Socket::INET; # test to see if we have a new enough Perlbal.pm. because until # Perlbal 1.43 there was no proper '$VERSION' in the module. eval { Perlbal::AIO::aio_stat(".", sub {}); }; if ($@) { die "CPAN module Perlbal too old. Need 1.3 or later. $@"; } my $opt_daemonize; my $opt_config; my $max_conns = 10000; my $http_listen = "0.0.0.0:7500"; my $mgmt_listen = "0.0.0.0:7501"; my $docroot = "/var/mogdata"; my %config_opts = ( 'daemonize|d' => \$opt_daemonize, 'config=s' => \$opt_config, 'httplisten=s' => \$http_listen, 'mgmtlisten=s' => \$mgmt_listen, 'docroot=s' => \$docroot, 'maxconns=i' => \$max_conns, ); usage() unless Getopt::Long::GetOptions(%config_opts); my $default_config = "/etc/mogilefs/mogstored.conf"; $opt_config = $default_config if ! $opt_config && -e $default_config; sub usage { my $note = shift; $note = $note ? "NOTE: $note\n\n" : ""; die "${note}Usage: mogstored [OPTS] OPTS: --daemonize -d Daemonize --config= Set config file (default is /etc/mogilefs/mogstored.conf) --httplisten= IP/Port HTTP server listens on --mgmtlisten= IP/Port management/sidechannel listens on --docroot= Docroot above device mount points. Defaults to /var/mogdata "; } # parse the mogstored config file, which is just lines of comments and # "key = value" lines, where keys are just the same as commandline # options. if ($opt_config) { die "Config file $opt_config doesn't exist.\n" unless -e $opt_config; open F, $opt_config or die "Couldn't open config file for reading: $!"; while () { s/\#.*//; next unless /\S/; if (/SERVER max_connect/i || /CREATE SERVICE/i) { usage("Your $opt_config file is the old syntax. The new format is simply lines of = where keys are the same as mogstored's command line options."); } die "Unknown config syntax: $_\n" unless /^\s*(\w+)\s*=\s*(.+?)\s*$/; my ($key, $val) = ($1, $2); my $dest = $config_opts{$key} || $config_opts{"$key=s"}; die "Unknown config setting: $key\n" unless $dest; $$dest = $val; } } # this is the perlbal configuration only. not the mogstored configuration. my $pb_conf ||= " SERVER max_connections = $max_conns CREATE SERVICE mogstored SET role = web_server SET listen = $http_listen SET docroot = $docroot SET dirindexing = 0 SET enable_put = 1 SET enable_delete = 1 SET min_put_directory = 1 SET persist_client = 1 ENABLE mogstored "; Perlbal::run_manage_commands($pb_conf, sub { print STDERR "$_[0]\n"; }); unless (Perlbal::Socket->WatchedSockets() > 0) { die "Invalid configuration. (shouldn't happen?) Stopping.\n"; } if ($opt_daemonize) { Perlbal::daemonize(); } else { print "Running.\n"; } # register our disk usage callback to get disk usage and keep an eye on how we're doing Perlbal::Socket::register_callback(1, sub { my $err = sub { Perlbal::log('crit', $_[0]); return 60; }; my $path = Perlbal->service('mogstored')->{docroot}; $path =~ s!/$!!; # find all devices below us my @devnum; if (opendir(D, $path)) { @devnum = grep { /^dev\d+$/ } readdir(D); closedir(D); } else { $err->("Failed to open $path: $!"); } foreach my $devnum (@devnum) { my $rval = `df -k -l -P $path/$devnum`; foreach my $l (split /\r?\n/, $rval) { next unless $l =~ /^(.+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.+)\s+(.+)$/; my ($dev, $total, $used, $avail, $useper, $disk) = ($1, $2, $3, $4, $5, $6); unless ($disk =~ m!$devnum/?$!) { $disk = "$path/$devnum"; } # create string to print my $now = time; my $output = { time => time(), device => $dev, # /dev/sdh1 total => $total, # integer: total KiB blocks used => $used, # integer: used KiB blocks available => $avail, # integer: available KiB blocks 'use' => $useper, # "45%" disk => $disk, # mount point of disk (/var/mogdata/dev8), or path if not a mount }; # open a file on that disk location called 'usage' my $rv = open(FILE, ">$disk/usage"); unless ($rv) { $err->("Unable to open '$disk/usage' for writing: $!"); next; } foreach (sort keys %$output) { print FILE "$_: $output->{$_}\n"; } close FILE; } } return 60; }); # setup a new socket for handling size requests my $server = IO::Socket::INET->new(LocalAddr => $mgmt_listen, Type => SOCK_STREAM, Proto => 'tcp', Blocking => 0, Reuse => 1, Listen => 10 ) or die "Error creating management socket: $@\n"; # in Perl 5.6, we weren't always seeing this turned off by IO::Socket # so we have to do it manually here just to be sure. IO::Handle::blocking($server, 0); # accept handler for new workers my $accept_handler = sub { my $csock = $server->accept or return; IO::Handle::blocking($csock, 0); my $client = SideChannelClient->new($csock); $client->watch_read(1); }; # add to fd list so this one gets processed Perlbal::Socket->AddOtherFds(fileno($server) => $accept_handler); # now start the main loop Perlbal::run(); ############################################################################# ### simple package for handling the stream request port package SideChannelClient; use base qw{Danga::Socket}; use fields ( 'count', # how many requests we've serviced 'read_buf', # unprocessed read buffer 'mogsvc', # the mogstored Perlbal::Service object ); # needed since we're pretending to be a Perlbal::Socket... never idle out sub max_idle_time { return 0; } sub new { my SideChannelClient $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new(@_); $self->{count} = 0; $self->{read_buf} = ''; $self->{mogsvc} = Perlbal->service('mogstored'); return $self; } sub event_read { my SideChannelClient $self = shift; my $bref = $self->read(1024); return $self->close unless defined $bref; $self->{read_buf} .= $$bref; my $path = $self->{mogsvc}->{docroot}; while ($self->{read_buf} =~ s/^(.+?)\r?\n//) { my $cmd = $1; if ($cmd =~ /^size (\S+)$/) { # increase our count $self->{count}++; # validate uri my $uri = $1; if ($uri =~ /\.\./) { $self->write("ERROR: uri invalid (contains ..)\r\n"); return; } # now stat the file to get the size and such Perlbal::AIO::aio_stat("$path$uri", sub { return if $self->{closed}; my $size = -e _ ? -s _ : -1; $self->write("$uri $size\r\n"); }); } else { # we don't understand this so pass it on to manage command interface my @out; Perlbal::run_manage_command($cmd, sub { push @out, $_[0]; }); $self->write(join("\r\n", @out) . "\r\n"); } } } # override Danga::Socket's event handlers which die sub event_err { $_[0]->close; } sub event_hup { $_[0]->close; } # as_string handler sub as_string { my SideChannelClient $self = shift; my $ret = $self->SUPER::as_string; $ret .= "; size_requests=$self->{count}"; return $ret; } # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: