#!/usr/bin/perl # # MogileFS daemon # # Copyright 2004, Danga Interactive # Copyright 2005-2006, Six Apart Ltd. # # Authors: # Brad Fitzpatrick # Brad Whitaker # Mark Smith # # License: # Artistic/GPLv2, at your choosing. # package Mgd; use strict; use Getopt::Long; use IO::Socket; use Symbol; use POSIX; use DBI; use DBD::mysql; use File::Copy (); use Carp; use File::Basename (); use File::Path (); use Sys::Syslog; use Time::HiRes qw(gettimeofday tv_interval); use Net::Netmask; use LWP::UserAgent; use List::Util; use Socket qw(PF_INET IPPROTO_TCP SOCK_STREAM); use lib 'lib'; use MogileFS::Sys; use MogileFS::Util qw(error daemonize); use MogileFS::Connection::Client; use MogileFS::Connection::Worker; use MogileFS::Worker::Query; use MogileFS::Worker::Delete; use MogileFS::Worker::Replicate; use MogileFS::Worker::Reaper; use MogileFS::Worker::Monitor; use MogileFS::Worker::Checker; use MogileFS::ProcManager; use MogileFS::Config; use MogileFS::ReplicationPolicy::MultipleHosts; # this is incremented whenever the schema changes. server will refuse # to start-up with an old schema version use constant SCHEMA_VERSION => 6; my %streamcache; # host -> IO::Socket::INET to mogstored our $starttime = time(); # time we got going our %domaincache; # { domainname => { domainrow } } our $domaincachetime = 0; our $client_ip = undef; # client ip address our $force_alt_zone = 0; # if on, force to use alternate zone (if it's defined) MogileFS::Config->load_config; # don't run as root die "mogilefsd cannot be run as root\n" if $< == 0 && MogileFS->config('user') ne "root"; check_database(); daemonize() if MogileFS->config("daemonize"); MogileFS::ProcManager->set_min_workers('queryworker' => MogileFS->config('query_jobs')); MogileFS::ProcManager->set_min_workers('delete' => MogileFS->config('delete_jobs')); MogileFS::ProcManager->set_min_workers('replicate' => MogileFS->config('replicate_jobs')); MogileFS::ProcManager->set_min_workers('reaper' => MogileFS->config('reaper_jobs')); MogileFS::ProcManager->set_min_workers('monitor' => MogileFS->config('monitor_jobs')); MogileFS::ProcManager->set_min_workers('checker' => MogileFS->config('checker_jobs')); # open up our log openlog('mogilefsd', 'pid', 'daemon'); Mgd::log('info', 'beginning run'); # Install signal handlers. $SIG{TERM} = sub { my @children = MogileFS::ProcManager->child_pids; print STDERR scalar @children, " children to kill.\n" if $DEBUG; my $count = kill( 'TERM' => @children ); print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG; exit 0; }; $SIG{INT} = sub { my @children = MogileFS::ProcManager->child_pids; print STDERR scalar @children, " children to kill.\n" if $DEBUG; my $count = kill( 'INT' => @children ); print STDERR "Sent SIGINT to $count children.\n" if $DEBUG; exit 0; }; $SIG{PIPE} = 'IGNORE'; # catch them by hand # setup server socket to listen for client connections my $server = IO::Socket::INET->new(LocalPort => MogileFS->config('conf_port'), Type => SOCK_STREAM, Proto => 'tcp', Blocking => 0, Reuse => 1, Listen => 10 ) or die "Error creating socket: $@\n"; # accept handler for new clients my $accept_handler = sub { my $csock = $server->accept or return; MogileFS::Connection::Client->new($csock); }; # so children can close these once they fork sub close_listeners { close($server); } # setup Danga::Socket to start handling connections Danga::Socket->DebugLevel( 3 ); Danga::Socket->OtherFds( fileno($server) => $accept_handler ); # setup the post event loop callback to spawn jobs, and the timeout Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker); # and now, actually start listening for events eval { print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG; Danga::Socket->EventLoop(); }; if ( $@ ) { Mgd::log('err', "crash log: $@"); } Mgd::log('info', 'ending run'); closelog(); # database checking/connecting { my ($dbh, $dbh_pid); sub validate_dbh { return unless $dbh; if ($$ != $dbh_pid) { undef $dbh; $dbh_pid = 0; return; } my $id = $dbh->selectrow_array("SELECT CONNECTION_ID()"); if (! $id) { # handle's dead. don't use it. (MySQL-ism above) undef $dbh; } } sub get_dbh { return $dbh if $dbh && $dbh_pid == $$; $dbh_pid = $$; return $dbh = DBI->connect(MogileFS->config('db_dsn'), MogileFS->config('db_user'), MogileFS->config('db_pass'), { PrintError => 0, }); } } # log stuff to syslog or the screen sub log { # simple logging functionality if (! $MogileFS::Config::daemonize) { # syslog acts like printf so we have to use printf and append a \n shift; # ignore the first parameter (info, warn, critical, etc) printf(shift(@_) . "\n", @_); } else { # just pass the parameters to syslog syslog(@_); } } sub get_mindevcounts { # make sure we have good info Mgd::check_host_cache(); my $host_ct = keys %Mgd::cache_host; # find the classes for each domainid (including domains without explict classes) my %min; # dmid -> classid -> mindevcount validate_dbh(); my $dbh = get_dbh(); my $sth = $dbh->prepare("SELECT d.dmid, c.classid, c.mindevcount ". "FROM domain d LEFT JOIN class c ON d.dmid=c.dmid"); $sth->execute; while (my ($dmid, $classid, $mct) = $sth->fetchrow_array) { $min{$dmid} ||= {}; # note the existence of this dmid # classid may be NULL (undef), in which case there are no classes defined # and we don't note the mindevcount (yet) $min{$dmid}{$classid} = int($host_ct < $mct ? $host_ct : $mct) if defined $classid; } # now iterate over %min again to set the implicit class my $default_min = MogileFS->config('default_mindevcount'); foreach my $dmid (keys %min) { # each domain's classid=0, if not defined, has an implied mindevcount of $default_mindevcount # which most people will probably use. $min{$dmid}{0} = $host_ct < $default_min ? $host_ct : $default_min unless exists $min{$dmid}{0}; } # return ref to hash return \%min; } ##################################################################### ### S E R V E R A P I F U N C T I O N S ##################################################################### # returns hashref of devid -> $device_row_href (where devid is alive/down, but not dead) # cached for 15 seconds. use vars qw($cache_device_summary $cache_device_summary_time %cache_host $cache_host_time); # general purpose device locator. example: # # my $devid = Mgd::find_deviceid( # random => 1, # get random device (else find first suitable) # min_free_space => 100, # with at least 100MB free # weight_by_free => 1, # find result weighted by free space # max_disk_age => 5, # minutes of age the last usage report can be before we ignore the disk # not_on_hosts => [ 1, 2 ], # no devices on hosts 1 and 2 # must_be_alive => 1, # if specified, device/host must be writeable (fully available) # ); # # returns undef if no suitable device was found. else, if you wanted an # array will return an array of the suitable devices--if you want just a # single item, you get just the first one found. sub find_deviceid { my %opts = ( @_ ); # validate we're getting called with known parameters my %valid_keys = map { $_ => 1 } qw( random min_free_space weight_by_free max_disk_age not_on_hosts must_be_writeable must_be_readable ); warn "invalid key $_ in call to find_deviceid\n" foreach grep { ! $valid_keys{$_} } keys %opts; # copy down global minimum free space if not specified $opts{min_free_space} ||= MogileFS->config("min_free_space"); $opts{max_disk_age} ||= MogileFS->config("max_disk_age"); if ($opts{max_disk_age}) { $opts{max_disk_age} = time() - ($opts{max_disk_age} * 60); } $opts{must_be_alive} = 1 unless defined $opts{must_be_alive}; # setup for iterating over devices my $devs = Mgd::get_device_summary(); my @devids = keys %{$devs || {}}; my $devcount = scalar(@devids); my $start = $opts{random} ? int(rand($devcount)) : 0; my %not_on_host = ( map { $_ => 1 } @{$opts{not_on_hosts} || []} ); my $total_free = 0; # now find a device that matches what they want my @list; for (my $i = 0; $i < $devcount; $i++) { my $idx = ($i + $start) % $devcount; my $dev = $devs->{$devids[$idx]}; # series of suitability checks next unless $dev->{status} eq 'alive'; next if $not_on_host{$dev->{hostid}}; next if $opts{max_disk_age} && $dev->{mb_asof} && $dev->{mb_asof} < $opts{max_disk_age}; next if $opts{min_free_space} && $dev->{mb_total} && $dev->{mb_free} < $opts{min_free_space}; next if $opts{must_be_writeable} && (MogileFS->observed_state("host", $dev->{hostid}) ne "reachable" || MogileFS->observed_state("device", $dev->{devid}) ne "writeable"); next if $opts{must_be_readable} && (MogileFS->observed_state("host", $dev->{hostid}) ne "reachable" || MogileFS->observed_state("device", $dev->{devid}) ne "readable"); # we get here, this is a suitable device push @list, $dev->{devid}; $total_free += $dev->{mb_free}; } # now we have a list ordered randomly, do free space weighting if ($opts{weight_by_free}) { my $rand = int(rand($total_free)); my $cur = 0; foreach my $devid (@list) { $cur += $devs->{$devid}->{mb_free}; return $devid if $cur >= $rand; } } # return whole list if wanting array, else just first item return wantarray ? @list : shift(@list); } sub get_device_summary { my $now = time; return $cache_device_summary if $cache_device_summary_time > $now - 15; my $dbh = get_dbh(); # learn devices my %dev; # my %hostdevs; # hostid -> [ devid ] (where devid is alive/down, but not dead) my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " . "mb_used, mb_asof, status, weight FROM device"); $sth->execute; my $row; $dev{$row->{devid}} = $row while $row = $sth->fetchrow_hashref; # now override device status with host status if the host status is less than the device status Mgd::check_host_cache(); foreach my $devid (keys %dev) { # makes others have an easier time of finding devices by free space $dev{$devid}->{mb_free} = $dev{$devid}->{mb_total} - $dev{$devid}->{mb_used}; my $host = $cache_host{$dev{$devid}->{hostid}}; die "No host for dev $devid (host $dev{$devid}->{hostid})" unless $host; my $host_status = $host->{status}; die "No status" unless $host_status =~ /^\w+$/; if ($dev{$devid}->{status} eq 'alive' && $host_status ne 'alive') { $dev{$devid}->{status} = $host_status; } elsif ($dev{$devid}->{status} eq 'down' && $host_status eq 'dead') { $dev{$devid}->{status} = $host_status; } } $cache_device_summary_time = $now; return $cache_device_summary = \%dev; } sub invalidate_class_cache { # FIXME: no cache yet exists } sub invalidate_domain_cache { $domaincachetime = 0; %domaincache = (); if (my $worker = MogileFS::ProcManager->is_child) { $worker->invalidate_meta("domain"); } } sub invalidate_device_cache { # so next time it's invalid and won't be used old $cache_device_summary_time = 0; $cache_device_summary = undef; if (my $worker = MogileFS::ProcManager->is_child) { $worker->invalidate_meta("device"); } } # FIXME: this should propogate to parent, and send messages down to # query workers. otherwise there's coherency issues between threads # for a bit. sub invalidate_host_cache { # so next time it's invalid and won't be used old $cache_host_time = 0; %cache_host = (); if (my $worker = MogileFS::ProcManager->is_child) { $worker->invalidate_meta("host"); } } sub reload_host_cache { %cache_host = (); my $dbh = get_dbh(); my $sth = $dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " . "hostip, http_port, http_get_port, remoteroot, altip, altmask FROM host"); $sth->execute; while (my $host = $sth->fetchrow_hashref) { die unless $host->{status} =~ /^\w+$/; $cache_host{$host->{hostid}} = $host; $cache_host{$host->{hostid}}->{mask} = Net::Netmask->new2($host->{altmask}) if $host->{altip} && $host->{altmask}; } $cache_host_time = time(); return \%cache_host; } sub check_host_cache { my $now = time(); return if $cache_host_time > $now - 5; reload_host_cache(); } sub key_filerow { my ($dbh, $dmid, $key) = @_; my $row = $dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ". "FROM file WHERE dmid=? AND dkey=?", undef, $dmid, $key); return $row; } # given a file descriptor number and a timeout, wait for that descriptor to # become readable; returns 0 or 1 on if it did or not sub wait_for_readability { my ($fileno, $timeout) = @_; return 0 unless $fileno && $timeout >= 0; my $rin; vec($rin, $fileno, 1) = 1; my $nfound = select($rin, undef, undef, $timeout); # nfound can be undef or 0, both failures, or 1, a success return $nfound ? 1 : 0; } sub wait_for_writeability { my ($fileno, $timeout) = @_; return 0 unless $fileno && $timeout; my $rout; vec($rout, $fileno, 1) = 1; my $nfound = select(undef, $rout, undef, $timeout); # nfound can be undef or 0, both failures, or 1, a success return $nfound ? 1 : 0; } # get size of file, return 0 on error. # tries to finish in 2.5 seconds, under the client's default 3 second timeout. (configurable) my %last_stream_connect_error; # host => $hirestime. # (caching the connection used for HEAD requests) my %head_socket; # host:port => [$pid, $time, $socket] sub get_file_size { my ($path, $dev) = @_; # dev is optional. will be used to send out errors if present. # quick case -- just a file on disk unless ($path =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) { my $root = MogileFS->config("root"); return -s "$root/$path" } my ($host, $port, $uri) = ($1, $2, $3); # don't sigpipe us my $flag_nosignal = MogileFS::Sys->flag_nosignal; local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal; # setup for sending size request to cached host my $req = "size $uri\r\n"; my $reqlen = length $req; my $rv = 0; my $sock = $streamcache{$host}; my $start_time = Time::HiRes::time(); my $httpsock; my $start_connecting_to_http = sub { return if $httpsock; # don't allow starting connecting twice # try to reuse cached socket if (my $cached = $head_socket{"$host:$port"}) { my ($pid, $conntime, $cachesock) = @{ $cached }; # see if it's still connected if ($pid == $$ && getpeername($cachesock) && $conntime > $start_time - 15 && # readability would indicated conn closed, or garbage: ! Mgd::wait_for_readability(fileno($cachesock), 0.00)) { $httpsock = $cachesock; return; } } socket $httpsock, PF_INET, SOCK_STREAM, IPPROTO_TCP; IO::Handle::blocking($httpsock, 0); connect $httpsock, Socket::sockaddr_in($port, Socket::inet_aton($host)); }; # sub to parse the response from $sock. returns undef on error, # or otherwise the size of the $path in bytes. my $node_timeout = MogileFS->config("node_timeout"); my $stream_response_timeout = 1.0; my $read_timed_out = 0; my $parse_response = sub { # give the socket 1 second to become readable until we get # scared of no reply and start connecting to HTTP to do a HEAD # request. if both timeout, we know the machine is gone, but # we don't want to wait 2 seconds + 2 seconds... prefer to do # connects in parallel to reduce overall latency. unless (Mgd::wait_for_readability(fileno($sock), $stream_response_timeout)) { $start_connecting_to_http->(); # give the socket its final time to get to 2 seconds # before we really give up on it unless (Mgd::wait_for_readability(fileno($sock), $node_timeout - $stream_response_timeout)) { $read_timed_out = 1; close($sock); return undef; } } # now we know there's readable data my $line = <$sock>; return undef unless defined $line; return undef unless $line =~ /^(\S+)\s+(-?\d+)/; # expected format: "uri size" return error("get_file_size() requested size of $path, got back size of $1 ($2 bytes)") if $1 ne $uri; return 0 if $2 < 0; # backchannel sends back -1 on errors, which we need to map to 0 return $2+0; }; my $conn_timeout = 2; # try using the cached socket if ($sock) { $rv = send($sock, $req, $flag_nosignal); if ($!) { undef $streamcache{$host}; } elsif ($rv != $reqlen) { return error("send() didn't return expected length ($rv, not $reqlen) for $path"); } else { # success my $size = $parse_response->(); return $size if defined $size; undef $streamcache{$host}; } } # try creating a connection to the stream elsif (($last_stream_connect_error{$host} ||= 0) < $start_time - 15.0) { $sock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => MogileFS->config("mogstored_stream_port"), Timeout => $conn_timeout); $streamcache{$host} = $sock; if ($sock) { $rv = send($sock, $req, $flag_nosignal); if ($!) { return error("error talking to mogstored stream ($path): $!"); } elsif ($rv != $reqlen) { return error("send() didn't return expected length ($rv, not $reqlen) for $path"); } else { # success my $size = $parse_response->(); return $size if defined $size; undef $streamcache{$host}; } } else { # see if we timed out connecting. my $elapsed = Time::HiRes::time() - $start_time; if ($elapsed > $conn_timeout - 0.2) { return error("node $host seems to be down in get_file_size"); } else { # cache that we can't connect to the mogstored stream # port for people using only apache/lighttpd (dav) on # the storage nodes $last_stream_connect_error{$host} = Time::HiRes::time(); } } } # try HTTP $start_connecting_to_http->(); # this will only work once anyway, if we already started above. # failure case: use a HEAD request to get the size of the file: # give them 2 seconds to connect to server, unless we'd already timed out earlier my $time_remain = 2.5 - (Time::HiRes::time() - $start_time); return 0 if $time_remain <= 0; # did we timeout? unless (Mgd::wait_for_writeability(fileno($httpsock), $time_remain)) { if (my $worker = MogileFS::ProcManager->is_child) { $worker->broadcast_host_unreachable($dev->{hostid}) if $dev; } return error("get_file_size() connect timeout for HTTP HEAD for size of $path"); } # did we fail to connect? (got a RST, etc) unless (getpeername($httpsock)) { if (my $worker = MogileFS::ProcManager->is_child) { $worker->broadcast_device_unreachable($dev->{devid}) if $dev; } return error("get_file_size() connect failure for HTTP HEAD for size of $path"); } my $rv = syswrite($httpsock, "HEAD $uri HTTP/1.0\r\nConnection: keep-alive\r\n\r\n"); $time_remain = 2.5 - (Time::HiRes::time() - $start_time); return 0 if $time_remain <= 0; return error("get_file_size() read timeout ($time_remain) for HTTP HEAD for size of $path") unless Mgd::wait_for_readability(fileno($httpsock), $time_remain); my $first = <$httpsock>; return error("get_file_size()'s HEAD request wasn't a 200 OK") unless $first && $first =~ m!^HTTP/1\.\d 200!; # FIXME: this could block too probably, if we don't get a whole # line. in practice, all headers will come at once, though in same packet/read. my $cl = undef; my $keep_alive = 0; while (defined (my $line = <$httpsock>)) { if ($line eq "\r\n") { if ($keep_alive) { $head_socket{"$host:$port"} = [ $$, Time::HiRes::time(), $httpsock ]; } else { delete $head_socket{"$host:$port"}; } return $cl; } $cl = $1 if $line =~ /^Content-length: (\d+)/i; $keep_alive = 1 if $line =~ /^Connection:.+\bkeep-alive\b/i; } delete $head_socket{"$host:$port"}; # no content length found? return error("get_file_size() found no content-length header in response for $path"); } sub class_id { my ($dmid, $class) = @_; return undef unless $dmid > 0 && length $class; my $dbh = Mgd::get_dbh; my $classid = $dbh->selectrow_array ("SELECT classid FROM class WHERE dmid=? AND classname=?", undef, $dmid, $class) or return undef; return undef unless $classid; return $classid; } sub hostid_classes { my $dmid = shift; return undef unless $dmid > 0; my $dbh = Mgd::get_dbh; my $classes = $dbh->selectall_arrayref ("SELECT classid, classname, mindevcount FROM class WHERE dmid=?", undef, $dmid) or return undef; return undef unless $classes; my $res = {}; foreach my $row (@$classes) { $res->{$row->[0]} = { classid => $row->[0], classname => $row->[1], mindevcount => $row->[2], }; } return $res; } sub host_id { my $host = shift; return undef unless $host; my $hostid = undef; my $find = sub { foreach my $id (keys %Mgd::cache_host) { next unless $Mgd::cache_host{$id}->{hostname} eq $host; $hostid = $id; last; } }; # reload if it's been awhile Mgd::check_host_cache(); $find->(); return $hostid if $hostid; # force a reload Mgd::reload_host_cache(); $find->(); return $hostid; } sub domain_id { my $domain = $_[0]; # reload the cache if time is up, or if cache is empty for requested item my $now = time(); if ($domaincachetime + 5 < $now || ! $domaincache{$domain}) { %domaincache = (); # now get updated list my $dbh = Mgd::get_dbh; my $domains = $dbh->selectall_arrayref('SELECT dmid, namespace FROM domain'); foreach my $row (@{$domains || []}) { # namespace -> dmid $domaincache{$row->[1]} = $row->[0]; } $domaincachetime = $now; } # just use cached version return $domaincache{$domain}; } sub class_name { my ($dmid, $classid) = @_; return undef unless $dmid > 0 && length $classid; # FIXME: cache this # lookup class my $dbh = Mgd::get_dbh; my $classname = $dbh->selectrow_array ("SELECT classname FROM class WHERE dmid=? AND classid=?", undef, $dmid, $classid) or return undef; return undef unless $classname; return $classname; } sub domain_name { my $dmid = shift; # FIXME: cache this # lookup domain my $dbh = Mgd::get_dbh; my $namespace = $dbh->selectrow_array ("SELECT namespace FROM domain WHERE dmid=?", undef, $dmid); return $namespace; } sub hostid_name { my $hostid = shift; check_host_cache(); my $h = $cache_host{$hostid}; return $h ? $h->{hostname} : undef; } sub set_force_altzone { my $val = shift; $force_alt_zone = $val; } sub set_client_ip { my $ip = shift; $client_ip = $ip; } sub hostid_ip { my $hostid = shift; check_host_cache(); my $h = $cache_host{$hostid}; return undef unless $h; # if we have a client ip and an object for alt matching... if ($h->{mask} && $h->{altip} && ($force_alt_zone || ($client_ip && $h->{altip} && $h->{mask}->match($client_ip)))) { return $h->{altip}; } else { return $h->{hostip}; } } sub hostid_http_port { my $hostid = shift; check_host_cache(); my $h = $cache_host{$hostid}; return $h ? $h->{http_port} : undef; } sub hostid_http_get_port { my $hostid = shift; check_host_cache(); my $h = $cache_host{$hostid}; return $h ? $h->{http_get_port} : undef; } my %dir_made; # /dev/path -> [$depth, $time] my $dir_made_lastclean = 0; sub vivify_directories { my $path = shift; # $path is something like: # http://10.0.0.26:7500/dev2/0/000/148/0000148056.fid # three directories we'll want to make: # http://10.0.0.26:7500/dev2/0 # http://10.0.0.26:7500/dev2/0/000 # http://10.0.0.26:7500/dev2/0/000/148 return 0 unless $path =~ /^http/; # TODO: file-based mode? naah. should deprecate non-HTTP mode. return 0 unless $path =~ m!/dev(\d+)/(\d+)/(\d\d\d)/(\d\d\d)/\d+\.fid$!; my ($devid, $p1, $p2, $p3) = ($1, $2, $3, $4); my @need; push @need, "/dev$devid/$p1"; push @need, "/dev$devid/$p1/$p2"; push @need, "/dev$devid/$p1/$p2/$p3"; my $devs = Mgd::get_device_summary(); my $dev = $devs->{$devid} or return 0; my $host = Mgd::hostid_ip($dev->{hostid}) or return 0; my $port = Mgd::hostid_http_port($dev->{hostid}) or return 0; my $peer = "$host:$port"; my $now = time(); my $depth = 0; foreach my $path (@need) { $depth++; next if $dir_made{$path}; my $sock = IO::Socket::INET->new(PeerAddr => $peer, Timeout => 1) or next; print $sock "MKCOL $path HTTP/1.0\r\n". "Content-Length: 0\r\n\r\n"; my $ans = <$sock>; $dir_made{$path} = [$depth, $now]; } # cleanup %dir_made occasionally. my $clean_interval = 300; # every 5 minutes. if ($dir_made_lastclean < $now - $clean_interval) { $dir_made_lastclean = $now; foreach my $k (keys %dir_made) { my ($depth, $ctime) = @{$dir_made{$k}}; delete $dir_made{$k} if $depth == 3 && $dir_made{$k} < $now - 3600; } } } sub make_http_path { my ($devid, $fid) = @_; my $dsum = get_device_summary(); my $dinfo = $dsum->{$devid}; return undef unless $dinfo; my $hostname = hostid_name($dinfo->{hostid}); my $nfid = sprintf '%010d', $fid; my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} ); return "/dev$devid/$b/$mmm/$ttt/$nfid.fid"; } sub make_full_url { # set use_get_port to be true to specify to use the get port my ($devid, $fid, $use_get_port) = @_; # get some information we'll need my $devs = Mgd::get_device_summary(); my $dev = $devs->{$devid} or return undef; my $path = Mgd::make_http_path($devid, $fid) or return undef; my $host = Mgd::hostid_ip($dev->{hostid}) or return undef; my $port = $use_get_port ? Mgd::hostid_http_get_port($dev->{hostid}) : undef; $port ||= Mgd::hostid_http_port($dev->{hostid}) or return undef; return "http://$host:$port$path"; } # if given an HTTP URL, break it down into [ host, port, URI ], else # returns undef sub is_url { my $path = shift; if ($path =~ m!^http://(.+?)(?::(\d+))?(/.+)$!) { return [ $1, $2 || 80, $3 ]; } return undef; } sub make_path { # jump out if we should be using HTTP stuff return Mgd::make_full_url(@_) if MogileFS::Config::http_mode(); my ($devid, $fid) = @_; my $dsum = get_device_summary(); my $dinfo = $dsum->{$devid}; return undef unless $dinfo; my $hostname = hostid_name($dinfo->{hostid}); my $nfid = sprintf '%010d', $fid; my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} ); my $path = "$hostname/dev$devid/$b/$mmm/$ttt/$nfid.fid"; make_dirs( MogileFS->config('root') . "/$path" ) or return undef; return $path; } sub make_get_path { # the get path only changes for HTTP mode return Mgd::make_path(@_) unless MogileFS::Config::http_mode(); return Mgd::make_full_url(@_, 1); } sub make_dirs { my $filename = shift; my $dir = File::Basename::dirname($filename); eval { File::Path::mkpath($dir, 0, 0775); }; return $@ ? 0 : 1; } sub update_fid_devcount { my ($fid, $no_lock) = @_; my $dbh = Mgd::get_dbh() or return 0; my $lockname = "mgfs:fid:$fid"; unless ($no_lock) { my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 10)", undef, $lockname); return 0 unless $lock; } my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?", undef, $fid); $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef, $ct, $fid); unless ($no_lock) { $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname); } return 1; } sub check_database { my $dbh = Mgd::get_dbh(); unless ($dbh) { die qq{ Error: unable to establish connection with your MogileFS database. Please verify that you have correctly setup a configuration file or are providing the correct information in order to reach the database and try running the MogileFS server again. If you haven\'t setup your database yet, run 'mogdbsetup'. } } my $sversion = get_server_setting('schema_version') || 0; unless ($sversion == SCHEMA_VERSION || MogileFS::Config->config('no_schema_check')) { my $exp = SCHEMA_VERSION; die "Server's database schema version of $sversion doesn't match expected value of $exp. Halting.\n\n". "Please run mogdbsetup to upgrade your schema.\n"; } } # set_server_setting( key, value ) # set value to undef to remove whatever is presently stored; returns 1 on success or # undef on error sub set_server_setting { my ($key, $val) = @_; return unless $key; my $dbh = Mgd::get_dbh() or return undef; if (defined $val) { $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val); } else { $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key); } return undef if $dbh->err; return 1; } # get_server_setting( key ) # get value of server setting, undef on error (or no result) sub get_server_setting { my $dbh = Mgd::get_dbh() or return undef; my $ret = $dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?", undef, shift); return undef if $dbh->err; return $ret; } package MogileFS; # just so MogileFS->config($key) will work: use MogileFS::Config qw(config); my %obs_state; my %hooks; sub set_observed_state { my ($class, $what, $id, $state) = @_; die "set_observed_state() with invalid what '$what', valid: host, device" unless $what =~ /^(?:host|device)$/; die "set_observed_state() with invalid device state '$state', valid: writeable, readable, unreachable" if $what eq 'device' && $state !~ /^(?:writeable|readable|unreachable)$/; die "set_observed_state() with invalid host state '$state', valid: reachable, unreachable" if $what eq 'host' && $state !~ /^(?:reachable|unreachable)$/; $obs_state{"$what-$id"} = $state; } sub observed_state { my ($class, $what, $id) = @_; return $obs_state{"$what-$id"} || ""; } sub register_worker_command { # just pass this through to the Worker class return MogileFS::Worker::Query::register_command(@_); } sub register_global_hook { $hooks{$_[0]} = $_[1]; return 1; } sub unregister_global_hook { delete $hooks{$_[0]}; return 1; } sub run_global_hook { my $hookname = shift; my $ref = $hooks{$hookname}; return $ref->(@_) if defined $ref; return undef; } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: