Changeset 384

Show
Ignore:
Timestamp:
08/29/06 19:30:39 (2 years ago)
Author:
bradfitz
Message:

merging the server-newrepl branch into trunk, now that it's pretty
stable and in use on livejournal.com.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/server/TODO

    r320 r384  
     1-- if create_open before monitor runs yet, block and wait for a round?  better than 'no_devices' 
     2   might need some marker for 'end of monitoring'?  or just wait until we have 1 or 3? 
     3   or if they asked for 3, only return 1 in that rare case, preferring latency to redundancy. 
     4   plus, we'd know that 1 is writable in last few seconds! 
     5 
     6-- update the 'repl' command for new file_to_replicate table 
     7 
    18-- replication policy error storm when a device is known to be observably down: 
    29 
  • trunk/server/lib/MogileFS/Config.pm

    r323 r384  
    3434    $reaper_jobs, 
    3535    $monitor_jobs, 
     36    $checker_jobs, 
    3637    $mog_root, 
    3738    $min_free_space, 
     
    6566                             'default_mindevcount=i' => \$cmdline{default_mindevcount}, 
    6667                             'node_timeout=i' => \$cmdline{node_timeout}, 
     68                             'no_schema_check' => \$cmdline{no_schema_check}, 
    6769                             ); 
    6870 
     
    117119    $reaper_jobs    = choose_value( 'reaper_jobs', 1 ); 
    118120    $monitor_jobs   = choose_value( 'monitor_jobs', 1 ); 
     121    $checker_jobs   = choose_value( 'checker_jobs', 1 ); 
    119122    $min_free_space = choose_value( 'min_free_space', 100 ); 
    120123    $max_disk_age   = choose_value( 'max_disk_age', 5 ); 
    121     $DEBUG          = choose_value( 'debug', 0, 1 ); 
     124    $DEBUG          = choose_value( 'debug', $ENV{DEBUG} || 0, 1 ); 
    122125    $USE_HTTP       = ! choose_value( 'no_http', 0, 1); 
    123126    $default_mindevcount = choose_value( 'default_mindevcount', 2 ); 
    124127    $node_timeout   = choose_value( 'node_timeout', 2 ); 
     128 
     129    choose_value( 'no_schema_check', 0 ); 
    125130 
    126131    # now load plugins 
     
    130135} 
    131136 
    132 ### FUNCTION: choose_value( $name, $default[, $boolean]
    133 sub choose_value ($$;$)
    134     my ( $name, $default, $boolean ) = @_; 
     137### FUNCTION: choose_value( $name, $default
     138sub choose_value
     139    my ( $name, $default ) = @_; 
    135140    return set_config($name, $cmdline{$name}) if defined $cmdline{$name}; 
    136141    return set_config($name, $cfgfile{$name}) if defined $cfgfile{$name}; 
  • trunk/server/lib/MogileFS/ProcManager.pm

    r328 r384  
    5050    my ($class, $job) = @_; 
    5151    return { 
     52        checker     => "Checker", 
    5253        queryworker => "Query", 
    5354        delete      => "Delete", 
     
    398399    return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client; 
    399400 
     401    # FIXME: HOW IS THIS EVER CALLED?  things with colons never go to HandleQueryWorkerResponse. 
     402    #        see MogileFS::Connection::Worker 
    400403    # out-of-band messages (not replies to requests) start with a colon: 
    401404    if ($line =~ /^:state_change (\w+) (\d+) (\w+)/) { 
     
    461464        push @$clref, Time::HiRes::gettimeofday(); 
    462465        $Mappings{$worker->{fd}} = $clref; 
     466        $Stats{queries}++; 
    463467 
    464468        # increment our counter so we know what request counter this is going out 
     
    547551 
    548552    } elsif ($cmd eq ":still_alive") { 
    549  
    550553        # a no-op 
     554 
     555    } elsif ($cmd eq ":monitor_just_ran") { 
     556        send_monitor_has_run($child); 
     557 
     558    } elsif ($cmd =~ /^:invalidate_meta (\w+)/) { 
     559        send_invalidate($1, $child); 
    551560 
    552561    } else { 
     
    628637    #warn "STATE CHANGE: $what<$whatid> = $state\n"; 
    629638    # TODO: can probably send this to all children now, not just certain types 
    630     for my $type (qw(queryworker replicate delete)) { 
     639    for my $type (qw(queryworker replicate delete monitor checker)) { 
    631640        MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":state_change $what $whatid $state", $child); 
     641    } 
     642} 
     643 
     644sub send_invalidate { 
     645    my ($what, $child) = @_; 
     646    # TODO: can probably send this to all children now, not just certain types 
     647    for my $type (qw(queryworker replicate delete monitor checker)) { 
     648        MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":invalidate_meta_once $what", $child); 
     649    } 
     650} 
     651 
     652sub send_monitor_has_run { 
     653    my $child = shift; 
     654    for my $type (qw(replicate checker)) { 
     655        MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child); 
    632656    } 
    633657} 
  • trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm

    r285 r384  
    88sub replicate_to { 
    99    my ($class, %args) = @_; 
     10 
    1011    my $fid      = delete $args{fid};      # fid scalar to copy 
    1112    my $on_devs  = delete $args{on_devs};  # arrayref of device objects 
     
    2021    my $already_on = @$on_devs; 
    2122 
     23    # FIXME: this is NOT true.  make sure it's on 2+ hosts at least, if min > 1. 
     24 
    2225    # replication good. 
    2326    return 0 if $already_on >= $min; 
    2427 
    2528    # see which and how many unique hosts we're already on. 
     29    my %on_dev; 
    2630    my %on_host; 
    2731    foreach my $dev (@$on_devs) { 
    2832        $on_host{$dev->{hostid}} = 1; 
     33        $on_dev{$dev->{devid}} = 1; 
    2934    } 
    3035    my $uniq_hosts_on    = scalar keys %on_host; 
     
    3843    } 
    3944 
    40     my @good_devids = grep { ! $failed->{$_}
     45    my @good_devids = grep { ! $failed->{$_} && ! $on_dev{$_}
    4146            Mgd::find_deviceid( 
    4247                               random         => 1, 
  • trunk/server/lib/MogileFS/Util.pm

    r272 r384  
    1212    while (1) { 
    1313        my $start = Time::HiRes::time(); 
     14        my $explicit_sleep = undef; 
    1415 
    1516        # run the code in a loop, so "next" will get out of it. 
    1617        foreach (1) { 
    17             $code->(); 
     18            $code->(sub { 
     19                $explicit_sleep = shift; 
     20            }); 
    1821        } 
    1922 
    2023        my $took = Time::HiRes::time() - $start; 
    2124        my $sleep_for = $delay - $took; 
    22         Time::HiRes::sleep($sleep_for) if $sleep_for > 0; 
     25        if (defined $explicit_sleep) { 
     26            Time::HiRes::sleep($explicit_sleep); 
     27        } else { 
     28            Time::HiRes::sleep($sleep_for) if $sleep_for > 0; 
     29        } 
    2330    } 
    2431} 
  • trunk/server/lib/MogileFS/Worker.pm

    r322 r384  
    44            'last_bcast_state',   # "{device|host}-$devid" => [$time, {alive|dead}] 
    55            'readbuf',            # unparsed data from parent 
     6            'monitor_has_run',    # true once we've heard of the monitor job being alive 
    67            ); 
    78 
     
    1920    $self->{readbuf}          = ''; 
    2021    $self->{last_bcast_state} = {}; 
     22    $self->{monitor_has_run}  = 0; 
    2123 
    2224    IO::Handle::blocking($psock, 0); 
     
    3032sub get_dbh { 
    3133    return Mgd::get_dbh(); 
     34} 
     35 
     36sub monitor_has_run { 
     37    my $self = shift; 
     38    return $self->{monitor_has_run} ? 1 : 0; 
    3239} 
    3340 
     
    4653    return 1 if $rv == $totallen; 
    4754    die "Error writing: $!" if $!; 
    48      
     55 
    4956    my $remain = $totallen - $rv; 
    5057    my $offset = $rv; 
     
    8289        my $rv = sysread($psock, $buf, 1024); 
    8390        $self->{readbuf} .= $buf; 
    84          
     91 
    8592        while ($self->{readbuf} =~ s/^(.+?)\r?\n//) { 
    8693            my $line = $1; 
     
    148155} 
    149156 
     157sub invalidate_meta { 
     158    my ($self, $what) = @_; 
     159    return if $Mgd::INVALIDATE_NO_PROPOGATE;  # anti recursion 
     160    $self->send_to_parent(":invalidate_meta $what"); 
     161} 
     162 
    150163# tries to parse generic (not job-specific) commands sent from parent 
    151164# to child.  returns 1 on success, or 0 if comman given isn't generic, 
     
    172185    } 
    173186 
     187    if ($$lineref =~ /^:invalidate_meta_once (\w+)/) { 
     188        local $Mgd::INVALIDATE_NO_PROPOGATE = 1; 
     189        eval("Mgd::invalidate_${1}_cache()"); 
     190        return 1; 
     191    } 
     192 
     193    if ($$lineref =~ /^:monitor_has_run/) { 
     194        $self->{monitor_has_run} = 1; 
     195        return 1; 
     196    } 
     197 
    174198    # TODO: warn on unknown commands? 
    175199 
  • trunk/server/lib/MogileFS/Worker/Monitor.pm

    r323 r384  
    11package MogileFS::Worker::Monitor; 
    2 # deletes files 
    32 
    43use strict; 
     
    4645 
    4746            my $host = $Mgd::cache_host{$dev->{hostid}}; 
    48             my $port = $host->{http_get_port} || $host->{http_port}; 
    49             my $url = "http://$host->{hostip}:$port/dev$dev->{devid}/usage"; 
     47            my $port = $host->{http_port}; 
     48            my $get_port = $host->{http_get_port} || $port; 
     49            my $url = "http://$host->{hostip}:$get_port/dev$dev->{devid}/usage"; 
    5050 
    5151            # now try to get the data with a short timeout 
     
    6161                if ($failed_after < 0.5) { 
    6262                    $self->broadcast_device_unreachable($dev->{devid}); 
    63                     error("Port $port not listening on otherwise-alive machine $host->{hostip}?  Error was: " . $response->status_line); 
     63                    error("Port $get_port not listening on otherwise-alive machine $host->{hostip}?  Error was: " . $response->status_line); 
    6464                } else { 
    6565                    $failed_after = sprintf("%.02f", $failed_after); 
     
    7070                next; 
    7171            } 
     72 
     73            # at this point we can reach the host 
     74            $self->broadcast_host_reachable($dev->{hostid}); 
    7275 
    7376            my %stats; 
     
    107110EOREQUEST 
    108111 
    109             # TODO: hosts aren't writable.  they're "available" 
    110112            # TODO: re-check the file was written as put. 
    111113            # TODO: put something unique in the file 
     
    115117            my $resp = $ua->request($req); 
    116118            if ($resp->is_success) { 
    117                 $self->broadcast_host_reachable($dev->{hostid}); 
    118119                $self->broadcast_device_writeable($dev->{devid}); 
    119120                error("dev$dev->{devid}: used = $used, total = $total, writeable = 1") 
     
    121122            } else { 
    122123                # merely readable 
    123                 $self->broadcast_host_reachable($dev->{hostid}); 
    124124                $self->broadcast_device_readable($dev->{devid}); 
    125125                error("dev$dev->{devid}: used = $used, total = $total, writeable = 0") 
     
    127127            } 
    128128        } 
     129 
     130        # announce to the parent that we've run 
     131        $self->send_to_parent(":monitor_just_ran"); 
    129132    }); 
    130133 
  • trunk/server/lib/MogileFS/Worker/Query.pm

    r325 r384  
    5151            next; 
    5252        } 
    53              
     53 
    5454        my $newread; 
    5555        my $rv = sysread($psock, $newread, 1024); 
     
    127127    my $args = shift; 
    128128 
    129     if ($args->{devices} || $args->{all}) { 
    130         Mgd::invalidate_device_cache(); 
    131     } 
    132     if ($args->{hosts} || $args->{all}) { 
    133         Mgd::invalidate_host_cache(); 
    134     } 
     129    Mgd::invalidate_device_cache()  if $args->{devices} || $args->{all}; 
     130    Mgd::invalidate_host_cache()    if $args->{hosts}   || $args->{all}; 
     131    Mgd::invalidate_class_cache()   if $args->{class}   || $args->{all}; 
     132    Mgd::invalidate_domain_cache()  if $args->{domain}  || $args->{all}; 
    135133 
    136134    return $self->ok_line; 
     
    160158 
    161159    # figure out what classid this file is for 
    162     my $class = $args->{class}
     160    my $class = $args->{class} || ""
    163161    my $classid = 0; 
    164162    if (length($class)) { 
     
    173171    my (@dests, @hosts); 
    174172    my $devs = Mgd::get_device_summary(); 
     173 
    175174    while (scalar(@dests) < ($multi ? 3 : 1)) { 
    176175        my $devid = Mgd::find_deviceid( 
    177             random => 1, 
    178             weight_by_free => 1, 
    179             not_on_hosts => \@hosts
    180         ); 
    181  
     176                                       random          => 1, 
     177                                       must_be_writeable => 1, 
     178                                       weight_by_free   => 1
     179                                       not_on_hosts     => \@hosts, 
     180                                       ); 
    182181        last unless defined $devid; 
    183182 
     
    292291    # if a temp file is closed without a provided-key, that means to 
    293292    # delete it. 
    294     unless (length($key)) { 
     293    unless (defined $key && length($key)) { 
    295294        # add to to-delete list 
    296295        $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid); 
     
    326325                      $fid, $dmid, $key, $size, $trow->{classid}); 
    327326    return $self->err_line("db_error") unless $rv; 
     327 
     328    # mark it as needing replicating: 
     329    $dbh->do("INSERT IGNORE INTO file_to_replicate ". 
     330             "SET fid=?, fromdevid=?, nexttry=0", undef, $fid, $devid); 
     331    return $self->err_line("db_error") if $dbh->err; 
    328332 
    329333    $dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid); 
     
    549553        return $self->err_line("unknown_hostid") unless $Mgd::cache_host{$hostid}; 
    550554    } elsif (my $host = $args->{hostname}) { 
    551         while (my ($hid, $row) = each %Mgd::cache_host) { 
    552             if ($row->{hostname} eq $host) { 
    553                 $hostid = $hid; 
    554                 last; 
    555             } 
    556         } 
     555        $hostid = Mgd::host_id($host); 
    557556        return $self->err_line("unknown_host") unless $hostid; 
    558557    } 
     
    563562        return $self->err_line("existing_devid"); 
    564563    } 
     564    Mgd::invalidate_device_cache(); 
    565565    return $self->ok_line; 
    566566} 
     
    588588 
    589589    # return the domain id we created 
     590    Mgd::invalidate_domain_cache(); 
    590591    return $self->ok_line({ domain => $domain }); 
    591592} 
     
    620621 
    621622    # return the domain we nuked 
     623    Mgd::invalidate_domain_cache(); 
    622624    return $self->ok_line({ domain => $domain }); 
    623625} 
     
    668670 
    669671    # return success 
     672    Mgd::invalidate_class_cache(); 
    670673    return $self->ok_line({ class => $class, mindevcount => $mindevcount, domain => $domain }); 
    671674} 
     
    709712 
    710713    # return the class we nuked 
     714    Mgd::invalidate_class_cache(); 
    711715    return $self->ok_line({ domain => $domain, class => $class }); 
    712716} 
     
    755759    } else { 
    756760        # get the max host id in use (FIXME: racy!) 
    757         $hid = $dbh->selectrow_array('SELECT MAX(hostid) FROM host') + 1; 
     761        $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1; 
    758762 
    759763        # now insert the new host 
     
    943947 
    944948    # success, weight changed 
     949    Mgd::invalidate_device_cache(); 
    945950    return $self->ok_line($ret); 
    946951} 
     
    979984 
    980985    # success, state changed 
     986    Mgd::invalidate_device_cache(); 
    981987    return $self->ok_line($ret); 
    982988} 
     
    10261032        } 
    10271033        $ret->{"replicationcount"} = $count; 
     1034 
     1035        # now we want to do the "new" replication stats 
     1036        my $db_time = $dbh->selectrow_array('SELECT UNIX_TIMESTAMP()'); 
     1037        $stats = $dbh->selectall_arrayref('SELECT nexttry, COUNT(*) FROM file_to_replicate GROUP BY 1'); 
     1038        foreach my $stat (@$stats) { 
     1039            if ($stat->[0] < 1000) { 
     1040                # anything under 1000 is a specific state, so let's define those.  here's the list 
     1041                # of short names to describe them. 
     1042                my $name = { 
     1043                    0 => 'newfile', # new files that need to be replicated 
     1044                    1 => 'redo',    # files that need to go through replication again 
     1045                }->{$stat->[0]} || "unknown"; 
     1046 
     1047                # now put it in the output hashref.  note that we do += because we might 
     1048                # have more than one group of unknowns. 
     1049                $ret->{"to_replicate_$name"} += $stat->[1]; 
     1050 
     1051            } elsif ($stat->[0] == MogileFS::Worker::Replicate::end_of_time()) { 
     1052                $ret->{"to_replicate_manually"} = $stat->[1]; 
     1053 
     1054            } elsif ($stat->[0] < $db_time) { 
     1055                $ret->{"to_replicate_overdue"} += $stat->[1]; 
     1056 
     1057            } else { 
     1058                $ret->{"to_replicate_deferred"} += $stat->[1]; 
     1059            } 
     1060        } 
    10281061    } 
    10291062 
     
    10721105} 
    10731106 
     1107sub cmd_replicate_now { 
     1108    my MogileFS::Worker::Query $self = shift; 
     1109 
     1110    my $dbh = Mgd::get_dbh() 
     1111        or return $self->err_line('nodb'); 
     1112    my $rv = $dbh->do("UPDATE file_to_replicate SET nexttry = UNIX_TIMESTAMP() WHERE nexttry > UNIX_TIMESTAMP()"); 
     1113 
     1114    return $self->err_line('db', $dbh->errstr) if $dbh->err; 
     1115    return $self->ok_line({ count => int($rv) }); 
     1116} 
     1117 
     1118sub cmd_checker { 
     1119    my MogileFS::Worker::Query $self = shift; 
     1120    my $args = shift; 
     1121 
     1122    my $new_setting; 
     1123    if ($args->{disable}) { 
     1124        $new_setting = 'off'; 
     1125    } elsif ($args->{level}) { 
     1126        # they want to turn it on or change the level, so let's ensure they 
     1127        # specified a valid level 
     1128        if (MogileFS::Worker::Checker::is_valid_level($args->{level})) { 
     1129            $new_setting = $args->{level}; 
     1130        } else { 
     1131            return $self->err_line('invalid_checker_level'); 
     1132        } 
     1133    } 
     1134 
     1135    if (defined $new_setting) { 
     1136        Mgd::set_server_setting('fsck_enable', $new_setting); 
     1137        return $self->ok_line; 
     1138    } 
     1139 
     1140    $self->err_line('failure'); 
     1141} 
     1142 
    10741143sub ok_line { 
    10751144    my MogileFS::Worker::Query $self = shift; 
     
    11001169        'class_has_files' => "Class still has files, uanble to delete", 
    11011170        'class_not_found' => "Class not found", 
     1171        'db' => "Database error", 
    11021172        'domain_has_files' => "Domain still has files, uanble to delete", 
    11031173        'domain_exists' => "That domain already exists", 
     
    11101180        'host_not_found' => "Host not found", 
    11111181        'invalid_chars' => "Patterns must not contain backslashes (\\) or percent signs (%).", 
     1182        'invalid_checker_level' => "Checker level invalid.  Please see documentation on this command.", 
    11121183        'invalid_mindevcount' => "The mindevcount must be at least 1", 
    11131184        'key_exists' => "Target key name already exists; can't overwrite.", 
  • trunk/server/lib/MogileFS/Worker/Replicate.pm

    r312 r384  
    1010use List::Util (); 
    1111use MogileFS::Util qw(error every); 
     12use MogileFS::Class; 
    1213use POSIX ":sys_wait_h"; # argument for waitpid 
    1314use POSIX; 
     15 
     16# setup the value used in a 'nexttry' field to indicate that this item will never 
     17# actually be tried again and require some sort of manual intervention. 
     18use constant ENDOFTIME => 2147483647; 
     19 
     20sub end_of_time { ENDOFTIME; } 
    1421 
    1522sub new { 
     
    3239} 
    3340 
     41# { fid => lastcheck }; instructs us not to replicate this fid... we will clear 
     42# out fids from this list that are expired 
     43my %fidfailure; 
     44 
     45# { fid => 1 }; used to keep track of fids we find in the unreachable_fids table 
     46my %unreachable; 
     47my $dbh; 
     48 
    3449sub work { 
    3550    my $self = shift; 
    3651 
    37     # { fid => lastcheck }; instructs us not to replicate this fid... we will clear 
    38     # out fids from this list that are expired 
    39     my %fidfailure; 
    40  
    41     # { fid => 1 }; used to keep track of fids we find in the unreachable_fids table 
    42     my %unreachable; 
     52    # give the monitor job 15 seconds to give us an update 
     53    my $warn_after = time() + 15; 
    4354 
    4455    every(2.0, sub { 
    4556        $self->parent_ping; 
    4657 
     58        # replication doesn't go well if the monitor job hasn't actively started 
     59        # marking things as being available 
     60        unless ($self->monitor_has_run) { 
     61            error("waiting for monitor job to complete a cycle before beginning replication") 
     62                if time() > $warn_after; 
     63            return; 
     64        } 
     65 
    4766        $self->validate_dbh; 
    48         my $dbh = $self->get_dbh or return 0; 
     67        $dbh = $self->get_dbh or return 0; 
    4968 
    5069        # update our unreachable fid list... we consider them good for 15 minutes 
     
    6281        } 
    6382 
    64         foreach_class(sub { 
    65             my ($dmid, $classid, $min, $policy_class) = @_; 
    66  
    67             error("Checking replication for dmid=$dmid, classid=$classid, min=$min") 
     83        # this finds stuff to replicate based on its record in the needs_replication table 
     84        $self->replicate_using_torepl_table; 
     85 
     86        # this finds stuff to replicate based on the devcounts.  (old style) 
     87        $self->replicate_using_devcounts; 
     88 
     89    }); 
     90
     91 
     92sub replicate_using_torepl_table { 
     93    my $self = shift; 
     94 
     95    # find some fids to replicate, prioritize based on when they should be tried 
     96    my $LIMIT = 1000; 
     97    my $to_repl_map = $dbh->selectall_hashref(qq{ 
     98        SELECT fid, fromdevid, failcount, flags, nexttry 
     99        FROM file_to_replicate 
     100        WHERE nexttry <= UNIX_TIMESTAMP() 
     101        ORDER BY nexttry 
     102        LIMIT $LIMIT 
     103    }, "fid"); 
     104    if ($dbh->err) { 
     105        error("Database error selecting fids to replicate: " . $dbh->errstr); 
     106        return; 
     107    } 
     108 
     109    # get random list of hashref of things to do: 
     110    my $to_repl = [ List::Util::shuffle(values %$to_repl_map) ]; 
     111    return unless @$to_repl; 
     112 
     113    # sort our priority list in terms of 0s (immediate, only 1 copy), 1s (immediate replicate, 
     114    # but we already have 2 copies), and big numbers (unixtimestamps) of things that failed. 
     115    # but because sort is stable, these are random within their 0/1/big classes. 
     116    @$to_repl = sort { 
     117        ($a->{nexttry} < 1000 || $b->{nexttry} < 1000) ? ($a->{nexttry} <=> $b->{nexttry}) : 0 
     118    } @$to_repl; 
     119 
     120    foreach my $todo (@$to_repl) { 
     121        my $fid = $todo->{fid}; 
     122 
     123        my $errcode; 
     124        my ($status, $unlock) = replicate($dbh, $fid, 
     125                                          errref       => \$errcode, 
     126                                          no_unlock    => 1,   # to make it return an $unlock subref 
     127                                          source_devid => $todo->{fromdevid}, 
     128                                          ); 
     129        if ($status) { 
     130            # $status is either 0 (failure, handled below), 1 (success, we actually 
     131            # replicated this file), or 2 (success, but someone else replicated it). 
     132 
     133            # when $staus == 2, this delete is unnecessary normally 
     134            # (somebody else presumably already deleted it if they 
     135            # also replicated it), but in the case of running with old 
     136            # replicators from previous versions, -or- simply if the 
     137            # other guy's delete failed, this cleans it up.... 
     138            $dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fid); 
     139            $unlock->() if $unlock; 
     140            next; 
     141        } 
     142 
     143        # ERROR CASES: 
     144 
     145        # README: please keep this up to date if you update the replicate() function so we ensure 
     146        # that this code always does the right thing 
     147        # 
     148        # -- HARMLESS -- 
     149        # failed_getting_lock        => harmless.  skip.  somebody else probably doing. 
     150        # 
     151        # -- TEMPORARY; DO EXPONENTIAL BACKOFF -- 
     152        # source_down                => only source available is observed down. 
     153        # policy_error_doing_failed  => policy plugin fucked up.  it's looping. 
     154        # policy_error_already_there => policy plugin fucked up.  it's dumb. 
     155        # policy_no_suggestions      => no copy was attempted.  policy is just not happy. 
     156        # copy_error                 => policy said to do 1+ things, we failed, it ran out of suggestions. 
     157        # 
     158        # -- FATAL; DON'T TRY AGAIN -- 
     159        # no_source                  => it simply exists nowhere.  not that something's down, but file_on is empty. 
     160        # no_devices                 => no devices are configured.  at all.  why are we replicating something? 
     161        #                               how did something come into being since you can't delete devices? 
     162 
     163        # bail if we failed getting the lock, that means someone else probably 
     164        # already did it, so we should just move on 
     165        if ($errcode eq 'failed_getting_lock') { 
     166            $unlock->() if $unlock; 
     167            next; 
     168        } 
     169 
     170        # logic for setting the next try time appropriately 
     171        my $update_nexttry = sub { 
     172            my ($type, $delay) = @_; 
     173            if ($type eq 'end_of_time') { 
     174                # special; update to a time that won't happen again, as we've encountered a scenario 
     175                # in which case we're really hosed 
     176                $dbh->do("UPDATE file_to_replicate SET nexttry = " . ENDOFTIME . ", failcount = failcount + 1 WHERE fid = ?", 
     177                         undef, $fid); 
     178            } else { 
     179                my $extra = $type eq 'offset' ? 'UNIX_TIMESTAMP() +' : ''; 
     180                $dbh->do("UPDATE file_to_replicate SET nexttry = $extra ?, failcount = failcount + 1 WHERE fid = ?", 
     181                         undef, $delay+0, $fid); 
     182            } 
     183            error("Failed setting nexttry of fid $fid to $type $delay: " . $dbh->errstr) 
     184                if $dbh->err; 
     185        }; 
     186 
     187        # now let's handle any error we want to consider a total failure; do not 
     188        # retry at any point.  push this file off to the end so someone has to come 
     189        # along and figure out what went wrong. 
     190        if ($errcode eq 'no_source' || $errcode eq 'no_devices') { 
     191            $update_nexttry->( end_of_time => 1 ); 
     192            $unlock->() if $unlock; 
     193            next; 
     194        } 
     195 
     196        # at this point, the rest of the errors require exponential backoff.  define what this means 
     197        # as far as failcount -> delay to next try. 
     198        # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ... 
     199        my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 ); 
     200        $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) ); 
     201        $unlock->() if $unlock; 
     202    } 
     203 
     204
     205 
     206sub replicate_using_devcounts { 
     207    my $self = shift; 
     208 
     209    MogileFS::Class->foreach(sub { 
     210        my $mclass = shift; 
     211        my ($dmid, $classid, $min, $policy_class) = map { $mclass->$_ } qw(domainid classid mindevcount policy_class); 
     212 
     213        error("Checking replication for dmid=$dmid, classid=$classid, min=$min") 
     214            if $Mgd::DEBUG >= 1; 
     215 
     216        my $LIMIT = 1000; 
     217 
     218        # try going from devcount of 1 up to devcount of $min-1 
     219        $self->{fidtodo} = {}; 
     220        my $fixed = 0; 
     221        my $attempted = 0; 
     222        my $devcount = 1; 
     223        while ($fixed < $LIMIT && $devcount < $min) { 
     224            my $now = time(); 
     225            my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ". 
     226                                                "AND devcount = ? AND length IS NOT NULL ". 
     227                                                "LIMIT $LIMIT", undef, $dmid, $classid, $devcount); 
     228            die $dbh->errstr if $dbh->err; 
     229            $self->{fidtodo}{$_} = 1 foreach @$fids; 
     230 
     231            # increase devcount so we try to replicate the files at the next devcount 
     232            $devcount++; 
     233 
     234            # see if we have any files to replicate 
     235            my $count = $fids ? scalar @$fids : 0; 
     236            error("  found $count for dmid=$dmid/classid=$classid/min=$min") 
    68237                if $Mgd::DEBUG >= 1; 
    69  
    70             my $LIMIT = 1000; 
    71  
    72             # try going from devcount of 1 up to devcount of $min-1 
    73             $self->{fidtodo} = {}; 
    74             my $fixed = 0; 
    75             my $attempted = 0; 
    76             my $devcount = 1; 
    77             while ($fixed < $LIMIT && $devcount < $min) { 
    78                 my $now = time(); 
    79                 my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ". 
    80                                                     "AND devcount = ? AND length IS NOT NULL ". 
    81                                                     "LIMIT $LIMIT", undef, $dmid, $classid, $devcount); 
    82                 die $dbh->errstr if $dbh->err; 
    83                 $self->{fidtodo}{$_} = 1 foreach @$fids; 
    84  
    85                 # increase devcount so we try to replicate the files at the next devcount 
    86                 $devcount++; 
    87  
    88                 # see if we have any files to replicate 
    89                 my $count = $fids ? scalar @$fids : 0; 
    90                 error("  found $count for dmid=$dmid/classid=$classid/min=$min") 
    91                     if $Mgd::DEBUG >= 1; 
    92                 next unless $count; 
    93  
    94                 # randomize the list so multiple daemons/threads working on 
    95                 # replicate at the same time don't all fight over the 
    96                 # same fids to move 
    97                 my @randfids = List::Util::shuffle(@$fids); 
    98  
    99                 error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2; 
    100                 foreach my $fid (@randfids) { 
    101                     # now replicate this fid 
    102                     $attempted++; 
    103                     next unless $self->{fidtodo}{$fid}; 
    104  
    105                     if ($fidfailure{$fid}) { 
    106                         if ($fidfailure{$fid} < $now) { 
    107                             delete $fidfailure{$fid}; 
    108                         } else { 
    109                             next; 
    110                         } 
    111                     } 
    112  
    113                     $self->read_from_parent; 
    114  
    115                     if (my $status = replicate($dbh, $fid, $min, $policy_class)) { 
    116                         # $status is either 0 (failure, handled below), 1 (success, we actually 
    117                         # replicated this file), or 2 (success, but someone else replicated it). 
    118                         # so if it's 2, we just want to go to the next fid.  this file is done. 
    119                         next if $status == 2; 
    120  
    121                         # if it was no longer reachable, mark it reachable 
    122                         if (delete $unreachable{$fid}) { 
    123                             $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid); 
    124                             die $dbh->errstr if $dbh->err; 
    125                         } 
    126  
    127                         # housekeeping 
    128                         $fixed++; 
    129                         $self->send_to_parent("repl_i_did $fid"); 
    130  
    131                         # status update 
    132                         if ($fixed % 20 == 0) { 
    133                             my $ratio = $fixed/$attempted*100; 
    134                             error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio)) 
    135                                 if $fixed % 20 == 0; 
    136                         } 
     238            next unless $count; 
     239 
     240            # randomize the list so multiple daemons/threads working on 
     241            # replicate at the same time don't all fight over the 
     242            # same fids to move 
     243            my @randfids = List::Util::shuffle(@$fids); 
     244 
     245            error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2; 
     246            foreach my $fid (@randfids) { 
     247                # now replicate this fid 
     248                $attempted++; 
     249                next unless $self->{fidtodo}{$fid}; 
     250 
     251                if ($fidfailure{$fid}) { 
     252                    if ($fidfailure{$fid} < $now) { 
     253                        delete $fidfailure{$fid}; 
    137254                    } else { 
    138                         # failed in replicate, don't retry for a minute 
    139                         $fidfailure{$fid} = $now + 60; 
     255                        next; 
    140256                    } 
    141257                } 
     258 
     259                $self->read_from_parent; 
     260 
     261                if (my $status = replicate($dbh, $fid, class => $mclass)) { 
     262                    # $status is either 0 (failure, handled below), 1 (success, we actually 
     263                    # replicated this file), or 2 (success, but someone else replicated it). 
     264                    # so if it's 2, we just want to go to the next fid.  this file is done. 
     265                    next if $status == 2; 
     266 
     267                    # if it was no longer reachable, mark it reachable 
     268                    if (delete $unreachable{$fid}) { 
     269                        $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid); 
     270                        die $dbh->errstr if $dbh->err; 
     271                    } 
     272 
     273                    # housekeeping 
     274                    $fixed++; 
     275                    $self->send_to_parent("repl_i_did $fid"); 
     276 
     277                    # status update 
     278                    if ($fixed % 20 == 0) { 
     279                        my $ratio = $fixed/$attempted*100; 
     280                        error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio)) 
     281                            if $fixed % 20 == 0; 
     282                    } 
     283                } else { 
     284                    # failed in replicate, don't retry for a minute 
     285                    $fidfailure{$fid} = $now + 60; 
     286                } 
    142287            } 
    143         }); 
     288        } 
    144289    }); 
    145290} 
    146291 
    147 # replicates $fid if its devcount is less than $min. 
     292# replicates $fid if its devcount is less than $min.  (eh, not quite) 
     293
     294# $policy_class is optional (perl classname representing replication policy).  if present, used.  if not, looked up based on $fid. 
     295
     296# README: if you update this sub to return a new error code, please update the 
     297# appropriate callers to know how to deal with the errors returned. 
    148298sub replicate { 
    149     my ($dbh, $fid, $min, $policy_class) = @_; 
    150  
     299    my ($dbh, $fid, %opts) = @_; 
     300    my $errref    = delete $opts{'errref'}; 
     301    my $mclass    = delete $opts{'class'}; 
     302    my $no_unlock = delete $opts{'no_unlock'}; 
     303    my $sdevid    = delete $opts{'source_devid'}; 
     304    die if %opts; 
     305 
     306    # bool:  if source was explicitly requested by caller 
     307    my $fixed_source = $sdevid ? 1 : 0; 
     308 
     309    $mclass ||= MogileFS::Class->of_fid($fid); 
     310 
     311    my $policy_class = $mclass->policy_class; 
    151312    eval "use $policy_class; 1;"; 
    152313    if ($@) { 
     
    154315    } 
    155316 
     317    my $lock;  # bool: whether we got the lock or not 
    156318    my $lockname = "mgfs:fid:$fid:replicate"; 
    157     my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef, 
    158                                      $lockname); 
    159     return error("Unable to obtain lock $lockname") 
    160         unless $lock; 
     319    my $unlock = sub { 
     320        $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname) 
     321            if $lock; 
     322    }; 
     323 
     324    my $retunlock = sub { 
     325        my $rv = shift; 
     326