#!/usr/bin/perl eval 'exec /usr/bin/perl -w -S $0 ${1+"$@"}' if 0; # not running under some shell # # MogileFS storage node daemon # (perlbal front-end) # # (c) 2004, Brad Fitzpatrick, # (c) 2006, Six Apart, Ltd. use strict; use lib 'lib'; use IO::Socket::INET; use POSIX qw(ENOENT EACCES EBADF WNOHANG); use Fcntl qw(SEEK_CUR SEEK_SET SEEK_END O_RDWR O_CREAT O_TRUNC); use Perlbal 1.52; use FindBin qw($Bin $RealScript); # verify their Linux::AIO or IO::AIO works. Perlbal 1.51 does this, # but just copying it here so people don't need to upgrade for this # one check. also because the rules are different: in Perlbal # it's understandable to not have working in AIO, in mogstored # it's essentially required, except for dev and light testing. BEGIN { my $OPTMOD_IO_AIO = eval "use IO::AIO 1.6 (); 1;"; my $OPTMOD_LINUX_AIO = eval "use Linux::AIO 1.71 (); 1;"; if ($OPTMOD_LINUX_AIO) { my $good = 0; Linux::AIO::aio_open("/tmp/$$-" . rand() . "-bogusdir/bogusfile-$$", O_RDWR|O_CREAT|O_TRUNC, 0, sub { $good = 1 if $_[0] < 0 && $! == ENOENT; }); while (Linux::AIO::nreqs()) { my $rfd = ""; vec ($rfd, Linux::AIO::poll_fileno(), 1) = 1; select $rfd, undef, undef, undef; Linux::AIO::poll_cb(); } unless ($good) { # pretend that they don't have Linux::AIO, but only bitch at them if they don't have IO::AIO ... if ($OPTMOD_IO_AIO) { $Perlbal::AIO_MODE = "ioaio"; } else { warn("WARNING: Your installation of Linux::AIO doesn't work.\n". " You seem to have installed it without 'make test',\n". " or you ignored the failing tests. I'm going to ignore\n". " that you have it and proceed without async IO. The\n". " modern replacement to Linux::AIO is IO::AIO.\n"); } $OPTMOD_LINUX_AIO = 0; } } unless ($OPTMOD_LINUX_AIO || $OPTMOD_IO_AIO) { if ($ENV{'MOGSTORED_RUN_WITHOUT_AIO'}) { warn("WARNING: Running without async IO. Won't run well with many clients.\n"); } else { die("ERROR: IO::AIO not installed, so async IO not available. Refusing to run\n". " unless you set the environment variable MOGSTORED_RUN_WITHOUT_AIO=1\n"); } } } my $selfexe = "$Bin/$RealScript"; # State: my %on_death; # pid -> subref (to run when pid dies) my %devnum_to_device; # mogile device number (eg. 'dev1' would be '1') -> os device path (eg. '/dev/rd0') my %osdevnum_to_device; # os device number (fetched via stat(file)[0]) -> os device path (ec. '/dev/rd0') my %iostat_listeners; # fd => SideChannel client: clients interested in iostat data. my $iostat_available = 1; # bool: iostat working. assume working to start. my ($iostat_pipe_r, $iostat_pipe_w); # pipes for talking to iostat process # Config: my $opt_daemonize; my $opt_config; my $opt_iostat = 1; # default to on now my $max_conns = 10000; my $http_listen = "0.0.0.0:7500"; my $mgmt_listen = "0.0.0.0:7501"; my $docroot = "/var/mogdata"; my $default_config = "/etc/mogilefs/mogstored.conf"; my %config_opts = ( 'iostat' => \$opt_iostat, 'daemonize|d' => \$opt_daemonize, 'config=s' => \$opt_config, 'httplisten=s' => \$http_listen, 'mgmtlisten=s' => \$mgmt_listen, 'docroot=s' => \$docroot, 'maxconns=i' => \$max_conns, ); usage() unless Getopt::Long::GetOptions(%config_opts); $opt_config = $default_config if ! $opt_config && -e $default_config; load_config_file($opt_config => \%config_opts) if $opt_config; # use AIO channels in Perlbal Perlbal::AIO::set_file_to_chan_hook(sub { my $filename = shift; $filename =~ m{/dev(\d+)\b} or return undef; return "dev$1"; }); my $xs_conf = ""; if (eval "use Perlbal::XS::HTTPHeaders (); 1") { $xs_conf .= "xs enable headers\n" unless defined $ENV{PERLBAL_XS_HEADERS} && ! $ENV{PERLBAL_XS_HEADERS}; } # this is the perlbal configuration only. not the mogstored configuration. my $pb_conf = " $xs_conf SERVER max_connections = $max_conns CREATE SERVICE mogstored SET role = web_server SET listen = $http_listen SET docroot = $docroot SET dirindexing = 0 SET enable_put = 1 SET enable_delete = 1 SET min_put_directory = 1 SET persist_client = 1 ENABLE mogstored # don't listen... this is just a stub service. CREATE SERVICE mgmt SET role = management ENABLE mgmt "; Perlbal::run_manage_commands($pb_conf, sub { print STDERR "$_[0]\n"; }); unless (Perlbal::Socket->WatchedSockets > 0) { die "Invalid configuration. (shouldn't happen?) Stopping.\n"; } if ($opt_daemonize) { die "mogstored won't daemonize with \$ENV{MOGSTORED_RUN_WITHOUT_AIO} set.\n" if $ENV{'MOGSTORED_RUN_WITHOUT_AIO'}; Perlbal::daemonize(); } else { print "Running.\n"; } # set number of AIO threads, between 10-100 (for some reason, have to # set aio threads after daemonizing) { my $aio_threads = aio_threads(disks($docroot)); Perlbal::run_manage_commands("SERVER aio_threads = $aio_threads", sub { print STDERR "$_[0]\n"; }); } # kill our children processes on exit: my $parent_pid = $$; $SIG{TERM} = $SIG{INT} = sub { return unless $$ == $parent_pid; # don't let this be inherited kill 'TERM', grep { $_ } keys %on_death; POSIX::_exit(0); }; setup_iostat_pipes(); start_disk_usage_process(); start_iostat_process() if $opt_iostat; harvest_dead_children(); # every 2 seconds, it reschedules itself setup_sidechannel_listener(); # now start the main loop Perlbal::run(); ############################################################################ # main:: functions ############################################################################ sub usage { my $note = shift; $note = $note ? "NOTE: $note\n\n" : ""; die "${note}Usage: mogstored [OPTS] OPTS: --daemonize -d Daemonize --config= Set config file (default is /etc/mogilefs/mogstored.conf) --httplisten= IP/Port HTTP server listens on --mgmtlisten= IP/Port management/sidechannel listens on --docroot= Docroot above device mount points. Defaults to /var/mogdata "; } sub load_config_file { my ($conffile, $opts) = @_; # parse the mogstored config file, which is just lines of comments and # "key = value" lines, where keys are just the same as commandline # options. die "Config file $opt_config doesn't exist.\n" unless -e $conffile; open my $fh, $conffile or die "Couldn't open config file for reading: $!"; while (<$fh>) { s/\#.*//; next unless /\S/; if (/SERVER max_connect/i || /CREATE SERVICE/i) { usage("Your $opt_config file is the old syntax. The new format is simply lines of = where keys are the same as mogstored's command line options."); } die "Unknown config syntax: $_\n" unless /^\s*(\w+)\s*=\s*(.+?)\s*$/; my ($key, $val) = ($1, $2); my $dest; foreach my $ck (keys %$opts) { next unless $ck =~ /^$key\b/; $dest = $opts->{$ck}; } die "Unknown config setting: $key\n" unless $dest; $$dest = $val; } } sub harvest_dead_children { my $dead = waitpid(-1, WNOHANG); if ($dead > 0) { my $code = delete $on_death{$dead}; $code->() if $code; } Danga::Socket->AddTimer(2, \&harvest_dead_children); } # returns $pid of child, if parent, else runs child. sub start_disk_usage_process { my $child = fork; unless (defined $child) { Perlbal::log('crit', "Fork error creating disk usage tracking process"); return undef; } # if we're the parent. if ($child) { $on_death{$child} = sub { start_disk_usage_process(); # start a new one }; return $child; } $SIG{TERM} = 'DEFAULT'; # override custom one from earlier $ENV{MOG_DOCROOT} = Perlbal->service('mogstored')->{docroot}; my $exe = $selfexe; $exe =~ s/mogstored$/mogstored-diskusage/ or die "$0 not what we were expecting!"; exec $exe; } sub iostat_subscribe { my $sock = shift; $iostat_listeners{fileno($sock->sock)} = $sock; } sub iostat_unsubscribe { my $sock = shift; my $fdno = fileno($sock->sock); return unless defined $fdno; delete $iostat_listeners{$fdno}; } sub setup_sidechannel_listener { Mogstored::SideChannelListener->new($mgmt_listen); } my $iostat_read_buf = ""; sub setup_iostat_pipes { pipe ($iostat_pipe_r, $iostat_pipe_w); IO::Handle::blocking($iostat_pipe_r, 0); IO::Handle::blocking($iostat_pipe_w, 0); Danga::Socket->AddOtherFds(fileno($iostat_pipe_r), sub { read_from_iostat_child(); }); } sub start_iostat_process { my $pid = fork; unless (defined $pid) { warn "Fork for iostat failed: $!"; return; } if ($pid) { # Parent $on_death{$pid} = sub { start_iostat_process(); }; return; } close STDIN; close STDOUT; close STDERR; # We may not be able to see errors beyond this point open STDIN, '<', '/dev/null' or die "Couldn't open STDIN for reading from /dev/null"; open STDOUT, '>&', $iostat_pipe_w or die "Couldn't dup pipe for use as STDOUT"; open STDERR, '>', '/dev/null' or die "Couldn't open STDOUT for writing to /dev/null"; $ENV{MOG_DOCROOT} = Perlbal->service('mogstored')->{docroot}; my $exe = $selfexe; $exe =~ s/mogstored$/mogstored-iostat/ or die "$0 not what we were expecting!"; exec $exe; } # (runs in parent event-loop process) sub read_from_iostat_child { my $data; my $rv = sysread($iostat_pipe_r, $data, 10240); return unless $rv && $rv > 0; $iostat_read_buf .= $data; # only write complete lines to sockets (in case for some reason we get # a partial read and child process dies...) while ($iostat_read_buf =~ s/(.+)\r?\n//) { my $line = $1; foreach my $out_sock (values %iostat_listeners) { # where $line will be like "dev53\t53.23" or a "." to signal end of a group of devices. $out_sock->write("$line\n"); } } } sub disks { my $root = shift; opendir(my $dh, $root) or die "Failed to open docroot: $root: $!"; return scalar grep { /^dev\d+$/ } readdir($dh); } # returns aio threads to use, given a disk count sub aio_threads { my $disks = shift; my $threads = ($disks || 1) * 10; return 100 if $threads > 100; return $threads; } ############################################################################# package Mogstored::SideChannelListener; use strict; use base 'Perlbal::TCPListener'; sub new { my ($class, $hostport) = @_; # we don't _really_ need this, but TCPListener kinda does, to keep it from # exploding/warning. so we created this stub service above in our static # config, just for this. my $svc = Perlbal->service("mgmt") or die "Where is mgmt service?"; return $class->SUPER::new($hostport, $svc); } sub event_read { my $self = shift; # accept as many connections as we can while (my ($csock, $peeraddr) = $self->{sock}->accept) { IO::Handle::blocking($csock, 0); my $client = Mogstored::SideChannelClient->new($csock); $client->watch_read(1); } } ############################################################################# ### simple package for handling the stream request port package Mogstored::SideChannelClient; use strict; use base qw{Perlbal::Socket}; use fields ( 'count', # how many requests we've serviced 'read_buf', # unprocessed read buffer 'mogsvc', # the mogstored Perlbal::Service object ); # needed since we're pretending to be a Perlbal::Socket... never idle out sub max_idle_time { return 0; } sub new { my Mogstored::SideChannelClient $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new(@_); $self->{count} = 0; $self->{read_buf} = ''; $self->{mogsvc} = Perlbal->service('mogstored'); return $self; } sub event_read { my Mogstored::SideChannelClient $self = shift; my $bref = $self->read(1024); return $self->close unless defined $bref; $self->{read_buf} .= $$bref; my $path = $self->{mogsvc}->{docroot}; while ($self->{read_buf} =~ s/^(.+?)\r?\n//) { my $cmd = $1; if ($cmd =~ /^size (\S+)$/) { # increase our count $self->{count}++; # validate uri my $uri = $1; if ($uri =~ /\.\./) { $self->write("ERROR: uri invalid (contains ..)\r\n"); return; } # now stat the file to get the size and such Perlbal::AIO::aio_stat("$path$uri", sub { return if $self->{closed}; my $size = -e _ ? -s _ : -1; $self->write("$uri $size\r\n"); }); } elsif ($cmd =~ /^watch$/i) { unless ($iostat_available) { $self->write("ERR iostat unavailable\r\n"); next; } $self->watch_read(0); main::iostat_subscribe($self); } else { # we don't understand this so pass it on to manage command interface my @out; Perlbal::run_manage_command($cmd, sub { push @out, $_[0]; }); $self->write(join("\r\n", @out) . "\r\n"); } } } # override Danga::Socket's event handlers which die sub event_err { $_[0]->close; } sub event_hup { $_[0]->close; } # as_string handler sub as_string { my Mogstored::SideChannelClient $self = shift; my $ret = $self->SUPER::as_string; $ret .= "; size_requests=$self->{count}"; return $ret; } sub close { my Mogstored::SideChannelClient $self = shift; main::iostat_unsubscribe($self); $self->SUPER::close; } sub die_gracefully { if ($$ == $parent_pid) { kill 'TERM', grep { $_ } keys %on_death; } } # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: __END__ =head1 NAME mogstored -- MogileFS storage daemon =head1 USAGE This is the MogileFS storage daemon, which is just an HTTP server that supports PUT, DELETE, etc. It's actually a wrapper around L, doing all the proper Perlbal config for you. In addition, it monitors disk usage, I/O activity, etc, which are checked from the L. =head1 AUTHORS Brad Fitzpatrick Ebrad@danga.comE Mark Smith Ejunior@danga.comE Jonathan Steinert Ejsteinert@sixapart.comE =head1 ENVIRONMENT =over 4 =item PERLBAL_XS_HEADERS If defined and 0, Perlbal::XS::HTTPHeaders will not be used, if present. Otherwise, it will be enabled by default, if installed and loadable. =back =head1 COPYRIGHT Copyright 2004, Danga Interactive Copyright 2005-2006, Six Apart Ltd. =head1 LICENSE Same terms as Perl itself. Artistic/GPLv2, at your choosing. =head1 SEE ALSO L -- high level overview of MogileFS L -- MogileFS daemon L