Changeset 384
- Timestamp:
- 08/29/06 19:30:39 (2 years ago)
- Files:
-
- trunk/server/TODO (modified) (1 diff)
- trunk/server/doc/fsck-notes.txt (copied) (copied from branches/server-newrepl/doc/fsck-notes.txt)
- trunk/server/lib/MogileFS/Class.pm (copied) (copied from branches/server-newrepl/lib/MogileFS/Class.pm)
- trunk/server/lib/MogileFS/Config.pm (modified) (4 diffs)
- trunk/server/lib/MogileFS/ProcManager.pm (modified) (5 diffs)
- trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm (modified) (3 diffs)
- trunk/server/lib/MogileFS/Util.pm (modified) (1 diff)
- trunk/server/lib/MogileFS/Worker.pm (modified) (7 diffs)
- trunk/server/lib/MogileFS/Worker/Checker.pm (copied) (copied from branches/server-newrepl/lib/MogileFS/Worker/Checker.pm)
- trunk/server/lib/MogileFS/Worker/Monitor.pm (modified) (8 diffs)
- trunk/server/lib/MogileFS/Worker/Query.pm (modified) (19 diffs)
- trunk/server/lib/MogileFS/Worker/Replicate.pm (modified) (16 diffs)
- trunk/server/mogdbsetup (modified) (2 diffs)
- trunk/server/mogilefsd (modified) (13 diffs)
- trunk/server/mogstored (modified) (3 diffs)
- trunk/server/t/00-startup.t (modified) (3 diffs)
- trunk/server/t/lib/mogtestlib.pl (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/server/TODO
r320 r384 1 -- if create_open before monitor runs yet, block and wait for a round? better than 'no_devices' 2 might need some marker for 'end of monitoring'? or just wait until we have 1 or 3? 3 or if they asked for 3, only return 1 in that rare case, preferring latency to redundancy. 4 plus, we'd know that 1 is writable in last few seconds! 5 6 -- update the 'repl' command for new file_to_replicate table 7 1 8 -- replication policy error storm when a device is known to be observably down: 2 9 trunk/server/lib/MogileFS/Config.pm
r323 r384 34 34 $reaper_jobs, 35 35 $monitor_jobs, 36 $checker_jobs, 36 37 $mog_root, 37 38 $min_free_space, … … 65 66 'default_mindevcount=i' => \$cmdline{default_mindevcount}, 66 67 'node_timeout=i' => \$cmdline{node_timeout}, 68 'no_schema_check' => \$cmdline{no_schema_check}, 67 69 ); 68 70 … … 117 119 $reaper_jobs = choose_value( 'reaper_jobs', 1 ); 118 120 $monitor_jobs = choose_value( 'monitor_jobs', 1 ); 121 $checker_jobs = choose_value( 'checker_jobs', 1 ); 119 122 $min_free_space = choose_value( 'min_free_space', 100 ); 120 123 $max_disk_age = choose_value( 'max_disk_age', 5 ); 121 $DEBUG = choose_value( 'debug', 0, 1 );124 $DEBUG = choose_value( 'debug', $ENV{DEBUG} || 0, 1 ); 122 125 $USE_HTTP = ! choose_value( 'no_http', 0, 1); 123 126 $default_mindevcount = choose_value( 'default_mindevcount', 2 ); 124 127 $node_timeout = choose_value( 'node_timeout', 2 ); 128 129 choose_value( 'no_schema_check', 0 ); 125 130 126 131 # now load plugins … … 130 135 } 131 136 132 ### FUNCTION: choose_value( $name, $default [, $boolean])133 sub choose_value ($$;$){134 my ( $name, $default , $boolean) = @_;137 ### FUNCTION: choose_value( $name, $default ) 138 sub choose_value { 139 my ( $name, $default ) = @_; 135 140 return set_config($name, $cmdline{$name}) if defined $cmdline{$name}; 136 141 return set_config($name, $cfgfile{$name}) if defined $cfgfile{$name}; trunk/server/lib/MogileFS/ProcManager.pm
r328 r384 50 50 my ($class, $job) = @_; 51 51 return { 52 checker => "Checker", 52 53 queryworker => "Query", 53 54 delete => "Delete", … … 398 399 return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client; 399 400 401 # FIXME: HOW IS THIS EVER CALLED? things with colons never go to HandleQueryWorkerResponse. 402 # see MogileFS::Connection::Worker 400 403 # out-of-band messages (not replies to requests) start with a colon: 401 404 if ($line =~ /^:state_change (\w+) (\d+) (\w+)/) { … … 461 464 push @$clref, Time::HiRes::gettimeofday(); 462 465 $Mappings{$worker->{fd}} = $clref; 466 $Stats{queries}++; 463 467 464 468 # increment our counter so we know what request counter this is going out … … 547 551 548 552 } elsif ($cmd eq ":still_alive") { 549 550 553 # a no-op 554 555 } elsif ($cmd eq ":monitor_just_ran") { 556 send_monitor_has_run($child); 557 558 } elsif ($cmd =~ /^:invalidate_meta (\w+)/) { 559 send_invalidate($1, $child); 551 560 552 561 } else { … … 628 637 #warn "STATE CHANGE: $what<$whatid> = $state\n"; 629 638 # TODO: can probably send this to all children now, not just certain types 630 for my $type (qw(queryworker replicate delete )) {639 for my $type (qw(queryworker replicate delete monitor checker)) { 631 640 MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":state_change $what $whatid $state", $child); 641 } 642 } 643 644 sub send_invalidate { 645 my ($what, $child) = @_; 646 # TODO: can probably send this to all children now, not just certain types 647 for my $type (qw(queryworker replicate delete monitor checker)) { 648 MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":invalidate_meta_once $what", $child); 649 } 650 } 651 652 sub send_monitor_has_run { 653 my $child = shift; 654 for my $type (qw(replicate checker)) { 655 MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child); 632 656 } 633 657 } trunk/server/lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
r285 r384 8 8 sub replicate_to { 9 9 my ($class, %args) = @_; 10 10 11 my $fid = delete $args{fid}; # fid scalar to copy 11 12 my $on_devs = delete $args{on_devs}; # arrayref of device objects … … 20 21 my $already_on = @$on_devs; 21 22 23 # FIXME: this is NOT true. make sure it's on 2+ hosts at least, if min > 1. 24 22 25 # replication good. 23 26 return 0 if $already_on >= $min; 24 27 25 28 # see which and how many unique hosts we're already on. 29 my %on_dev; 26 30 my %on_host; 27 31 foreach my $dev (@$on_devs) { 28 32 $on_host{$dev->{hostid}} = 1; 33 $on_dev{$dev->{devid}} = 1; 29 34 } 30 35 my $uniq_hosts_on = scalar keys %on_host; … … 38 43 } 39 44 40 my @good_devids = grep { ! $failed->{$_} }45 my @good_devids = grep { ! $failed->{$_} && ! $on_dev{$_} } 41 46 Mgd::find_deviceid( 42 47 random => 1, trunk/server/lib/MogileFS/Util.pm
r272 r384 12 12 while (1) { 13 13 my $start = Time::HiRes::time(); 14 my $explicit_sleep = undef; 14 15 15 16 # run the code in a loop, so "next" will get out of it. 16 17 foreach (1) { 17 $code->(); 18 $code->(sub { 19 $explicit_sleep = shift; 20 }); 18 21 } 19 22 20 23 my $took = Time::HiRes::time() - $start; 21 24 my $sleep_for = $delay - $took; 22 Time::HiRes::sleep($sleep_for) if $sleep_for > 0; 25 if (defined $explicit_sleep) { 26 Time::HiRes::sleep($explicit_sleep); 27 } else { 28 Time::HiRes::sleep($sleep_for) if $sleep_for > 0; 29 } 23 30 } 24 31 } trunk/server/lib/MogileFS/Worker.pm
r322 r384 4 4 'last_bcast_state', # "{device|host}-$devid" => [$time, {alive|dead}] 5 5 'readbuf', # unparsed data from parent 6 'monitor_has_run', # true once we've heard of the monitor job being alive 6 7 ); 7 8 … … 19 20 $self->{readbuf} = ''; 20 21 $self->{last_bcast_state} = {}; 22 $self->{monitor_has_run} = 0; 21 23 22 24 IO::Handle::blocking($psock, 0); … … 30 32 sub get_dbh { 31 33 return Mgd::get_dbh(); 34 } 35 36 sub monitor_has_run { 37 my $self = shift; 38 return $self->{monitor_has_run} ? 1 : 0; 32 39 } 33 40 … … 46 53 return 1 if $rv == $totallen; 47 54 die "Error writing: $!" if $!; 48 55 49 56 my $remain = $totallen - $rv; 50 57 my $offset = $rv; … … 82 89 my $rv = sysread($psock, $buf, 1024); 83 90 $self->{readbuf} .= $buf; 84 91 85 92 while ($self->{readbuf} =~ s/^(.+?)\r?\n//) { 86 93 my $line = $1; … … 148 155 } 149 156 157 sub invalidate_meta { 158 my ($self, $what) = @_; 159 return if $Mgd::INVALIDATE_NO_PROPOGATE; # anti recursion 160 $self->send_to_parent(":invalidate_meta $what"); 161 } 162 150 163 # tries to parse generic (not job-specific) commands sent from parent 151 164 # to child. returns 1 on success, or 0 if comman given isn't generic, … … 172 185 } 173 186 187 if ($$lineref =~ /^:invalidate_meta_once (\w+)/) { 188 local $Mgd::INVALIDATE_NO_PROPOGATE = 1; 189 eval("Mgd::invalidate_${1}_cache()"); 190 return 1; 191 } 192 193 if ($$lineref =~ /^:monitor_has_run/) { 194 $self->{monitor_has_run} = 1; 195 return 1; 196 } 197 174 198 # TODO: warn on unknown commands? 175 199 trunk/server/lib/MogileFS/Worker/Monitor.pm
r323 r384 1 1 package MogileFS::Worker::Monitor; 2 # deletes files3 2 4 3 use strict; … … 46 45 47 46 my $host = $Mgd::cache_host{$dev->{hostid}}; 48 my $port = $host->{http_get_port} || $host->{http_port}; 49 my $url = "http://$host->{hostip}:$port/dev$dev->{devid}/usage"; 47 my $port = $host->{http_port}; 48 my $get_port = $host->{http_get_port} || $port; 49 my $url = "http://$host->{hostip}:$get_port/dev$dev->{devid}/usage"; 50 50 51 51 # now try to get the data with a short timeout … … 61 61 if ($failed_after < 0.5) { 62 62 $self->broadcast_device_unreachable($dev->{devid}); 63 error("Port $ port not listening on otherwise-alive machine $host->{hostip}? Error was: " . $response->status_line);63 error("Port $get_port not listening on otherwise-alive machine $host->{hostip}? Error was: " . $response->status_line); 64 64 } else { 65 65 $failed_after = sprintf("%.02f", $failed_after); … … 70 70 next; 71 71 } 72 73 # at this point we can reach the host 74 $self->broadcast_host_reachable($dev->{hostid}); 72 75 73 76 my %stats; … … 107 110 EOREQUEST 108 111 109 # TODO: hosts aren't writable. they're "available"110 112 # TODO: re-check the file was written as put. 111 113 # TODO: put something unique in the file … … 115 117 my $resp = $ua->request($req); 116 118 if ($resp->is_success) { 117 $self->broadcast_host_reachable($dev->{hostid});118 119 $self->broadcast_device_writeable($dev->{devid}); 119 120 error("dev$dev->{devid}: used = $used, total = $total, writeable = 1") … … 121 122 } else { 122 123 # merely readable 123 $self->broadcast_host_reachable($dev->{hostid});124 124 $self->broadcast_device_readable($dev->{devid}); 125 125 error("dev$dev->{devid}: used = $used, total = $total, writeable = 0") … … 127 127 } 128 128 } 129 130 # announce to the parent that we've run 131 $self->send_to_parent(":monitor_just_ran"); 129 132 }); 130 133 trunk/server/lib/MogileFS/Worker/Query.pm
r325 r384 51 51 next; 52 52 } 53 53 54 54 my $newread; 55 55 my $rv = sysread($psock, $newread, 1024); … … 127 127 my $args = shift; 128 128 129 if ($args->{devices} || $args->{all}) { 130 Mgd::invalidate_device_cache(); 131 } 132 if ($args->{hosts} || $args->{all}) { 133 Mgd::invalidate_host_cache(); 134 } 129 Mgd::invalidate_device_cache() if $args->{devices} || $args->{all}; 130 Mgd::invalidate_host_cache() if $args->{hosts} || $args->{all}; 131 Mgd::invalidate_class_cache() if $args->{class} || $args->{all}; 132 Mgd::invalidate_domain_cache() if $args->{domain} || $args->{all}; 135 133 136 134 return $self->ok_line; … … 160 158 161 159 # figure out what classid this file is for 162 my $class = $args->{class} ;160 my $class = $args->{class} || ""; 163 161 my $classid = 0; 164 162 if (length($class)) { … … 173 171 my (@dests, @hosts); 174 172 my $devs = Mgd::get_device_summary(); 173 175 174 while (scalar(@dests) < ($multi ? 3 : 1)) { 176 175 my $devid = Mgd::find_deviceid( 177 random=> 1,178 weight_by_free => 1,179 not_on_hosts => \@hosts,180 );181 176 random => 1, 177 must_be_writeable => 1, 178 weight_by_free => 1, 179 not_on_hosts => \@hosts, 180 ); 182 181 last unless defined $devid; 183 182 … … 292 291 # if a temp file is closed without a provided-key, that means to 293 292 # delete it. 294 unless ( length($key)) {293 unless (defined $key && length($key)) { 295 294 # add to to-delete list 296 295 $dbh->do("REPLACE INTO file_to_delete SET fid=?", undef, $fid); … … 326 325 $fid, $dmid, $key, $size, $trow->{classid}); 327 326 return $self->err_line("db_error") unless $rv; 327 328 # mark it as needing replicating: 329 $dbh->do("INSERT IGNORE INTO file_to_replicate ". 330 "SET fid=?, fromdevid=?, nexttry=0", undef, $fid, $devid); 331 return $self->err_line("db_error") if $dbh->err; 328 332 329 333 $dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fid); … … 549 553 return $self->err_line("unknown_hostid") unless $Mgd::cache_host{$hostid}; 550 554 } elsif (my $host = $args->{hostname}) { 551 while (my ($hid, $row) = each %Mgd::cache_host) { 552 if ($row->{hostname} eq $host) { 553 $hostid = $hid; 554 last; 555 } 556 } 555 $hostid = Mgd::host_id($host); 557 556 return $self->err_line("unknown_host") unless $hostid; 558 557 } … … 563 562 return $self->err_line("existing_devid"); 564 563 } 564 Mgd::invalidate_device_cache(); 565 565 return $self->ok_line; 566 566 } … … 588 588 589 589 # return the domain id we created 590 Mgd::invalidate_domain_cache(); 590 591 return $self->ok_line({ domain => $domain }); 591 592 } … … 620 621 621 622 # return the domain we nuked 623 Mgd::invalidate_domain_cache(); 622 624 return $self->ok_line({ domain => $domain }); 623 625 } … … 668 670 669 671 # return success 672 Mgd::invalidate_class_cache(); 670 673 return $self->ok_line({ class => $class, mindevcount => $mindevcount, domain => $domain }); 671 674 } … … 709 712 710 713 # return the class we nuked 714 Mgd::invalidate_class_cache(); 711 715 return $self->ok_line({ domain => $domain, class => $class }); 712 716 } … … 755 759 } else { 756 760 # get the max host id in use (FIXME: racy!) 757 $hid = $dbh->selectrow_array('SELECT MAX(hostid) FROM host') + 1;761 $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1; 758 762 759 763 # now insert the new host … … 943 947 944 948 # success, weight changed 949 Mgd::invalidate_device_cache(); 945 950 return $self->ok_line($ret); 946 951 } … … 979 984 980 985 # success, state changed 986 Mgd::invalidate_device_cache(); 981 987 return $self->ok_line($ret); 982 988 } … … 1026 1032 } 1027 1033 $ret->{"replicationcount"} = $count; 1034 1035 # now we want to do the "new" replication stats 1036 my $db_time = $dbh->selectrow_array('SELECT UNIX_TIMESTAMP()'); 1037 $stats = $dbh->selectall_arrayref('SELECT nexttry, COUNT(*) FROM file_to_replicate GROUP BY 1'); 1038 foreach my $stat (@$stats) { 1039 if ($stat->[0] < 1000) { 1040 # anything under 1000 is a specific state, so let's define those. here's the list 1041 # of short names to describe them. 1042 my $name = { 1043 0 => 'newfile', # new files that need to be replicated 1044 1 => 'redo', # files that need to go through replication again 1045 }->{$stat->[0]} || "unknown"; 1046 1047 # now put it in the output hashref. note that we do += because we might 1048 # have more than one group of unknowns. 1049 $ret->{"to_replicate_$name"} += $stat->[1]; 1050 1051 } elsif ($stat->[0] == MogileFS::Worker::Replicate::end_of_time()) { 1052 $ret->{"to_replicate_manually"} = $stat->[1]; 1053 1054 } elsif ($stat->[0] < $db_time) { 1055 $ret->{"to_replicate_overdue"} += $stat->[1]; 1056 1057 } else { 1058 $ret->{"to_replicate_deferred"} += $stat->[1]; 1059 } 1060 } 1028 1061 } 1029 1062 … … 1072 1105 } 1073 1106 1107 sub cmd_replicate_now { 1108 my MogileFS::Worker::Query $self = shift; 1109 1110 my $dbh = Mgd::get_dbh() 1111 or return $self->err_line('nodb'); 1112 my $rv = $dbh->do("UPDATE file_to_replicate SET nexttry = UNIX_TIMESTAMP() WHERE nexttry > UNIX_TIMESTAMP()"); 1113 1114 return $self->err_line('db', $dbh->errstr) if $dbh->err; 1115 return $self->ok_line({ count => int($rv) }); 1116 } 1117 1118 sub cmd_checker { 1119 my MogileFS::Worker::Query $self = shift; 1120 my $args = shift; 1121 1122 my $new_setting; 1123 if ($args->{disable}) { 1124 $new_setting = 'off'; 1125 } elsif ($args->{level}) { 1126 # they want to turn it on or change the level, so let's ensure they 1127 # specified a valid level 1128 if (MogileFS::Worker::Checker::is_valid_level($args->{level})) { 1129 $new_setting = $args->{level}; 1130 } else { 1131 return $self->err_line('invalid_checker_level'); 1132 } 1133 } 1134 1135 if (defined $new_setting) { 1136 Mgd::set_server_setting('fsck_enable', $new_setting); 1137 return $self->ok_line; 1138 } 1139 1140 $self->err_line('failure'); 1141 } 1142 1074 1143 sub ok_line { 1075 1144 my MogileFS::Worker::Query $self = shift; … … 1100 1169 'class_has_files' => "Class still has files, uanble to delete", 1101 1170 'class_not_found' => "Class not found", 1171 'db' => "Database error", 1102 1172 'domain_has_files' => "Domain still has files, uanble to delete", 1103 1173 'domain_exists' => "That domain already exists", … … 1110 1180 'host_not_found' => "Host not found", 1111 1181 'invalid_chars' => "Patterns must not contain backslashes (\\) or percent signs (%).", 1182 'invalid_checker_level' => "Checker level invalid. Please see documentation on this command.", 1112 1183 'invalid_mindevcount' => "The mindevcount must be at least 1", 1113 1184 'key_exists' => "Target key name already exists; can't overwrite.", trunk/server/lib/MogileFS/Worker/Replicate.pm
r312 r384 10 10 use List::Util (); 11 11 use MogileFS::Util qw(error every); 12 use MogileFS::Class; 12 13 use POSIX ":sys_wait_h"; # argument for waitpid 13 14 use POSIX; 15 16 # setup the value used in a 'nexttry' field to indicate that this item will never 17 # actually be tried again and require some sort of manual intervention. 18 use constant ENDOFTIME => 2147483647; 19 20 sub end_of_time { ENDOFTIME; } 14 21 15 22 sub new { … … 32 39 } 33 40 41 # { fid => lastcheck }; instructs us not to replicate this fid... we will clear 42 # out fids from this list that are expired 43 my %fidfailure; 44 45 # { fid => 1 }; used to keep track of fids we find in the unreachable_fids table 46 my %unreachable; 47 my $dbh; 48 34 49 sub work { 35 50 my $self = shift; 36 51 37 # { fid => lastcheck }; instructs us not to replicate this fid... we will clear 38 # out fids from this list that are expired 39 my %fidfailure; 40 41 # { fid => 1 }; used to keep track of fids we find in the unreachable_fids table 42 my %unreachable; 52 # give the monitor job 15 seconds to give us an update 53 my $warn_after = time() + 15; 43 54 44 55 every(2.0, sub { 45 56 $self->parent_ping; 46 57 58 # replication doesn't go well if the monitor job hasn't actively started 59 # marking things as being available 60 unless ($self->monitor_has_run) { 61 error("waiting for monitor job to complete a cycle before beginning replication") 62 if time() > $warn_after; 63 return; 64 } 65 47 66 $self->validate_dbh; 48 my$dbh = $self->get_dbh or return 0;67 $dbh = $self->get_dbh or return 0; 49 68 50 69 # update our unreachable fid list... we consider them good for 15 minutes … … 62 81 } 63 82 64 foreach_class(sub { 65 my ($dmid, $classid, $min, $policy_class) = @_; 66 67 error("Checking replication for dmid=$dmid, classid=$classid, min=$min") 83 # this finds stuff to replicate based on its record in the needs_replication table 84 $self->replicate_using_torepl_table; 85 86 # this finds stuff to replicate based on the devcounts. (old style) 87 $self->replicate_using_devcounts; 88 89 }); 90 } 91 92 sub replicate_using_torepl_table { 93 my $self = shift; 94 95 # find some fids to replicate, prioritize based on when they should be tried 96 my $LIMIT = 1000; 97 my $to_repl_map = $dbh->selectall_hashref(qq{ 98 SELECT fid, fromdevid, failcount, flags, nexttry 99 FROM file_to_replicate 100 WHERE nexttry <= UNIX_TIMESTAMP() 101 ORDER BY nexttry 102 LIMIT $LIMIT 103 }, "fid"); 104 if ($dbh->err) { 105 error("Database error selecting fids to replicate: " . $dbh->errstr); 106 return; 107 } 108 109 # get random list of hashref of things to do: 110 my $to_repl = [ List::Util::shuffle(values %$to_repl_map) ]; 111 return unless @$to_repl; 112 113 # sort our priority list in terms of 0s (immediate, only 1 copy), 1s (immediate replicate, 114 # but we already have 2 copies), and big numbers (unixtimestamps) of things that failed. 115 # but because sort is stable, these are random within their 0/1/big classes. 116 @$to_repl = sort { 117 ($a->{nexttry} < 1000 || $b->{nexttry} < 1000) ? ($a->{nexttry} <=> $b->{nexttry}) : 0 118 } @$to_repl; 119 120 foreach my $todo (@$to_repl) { 121 my $fid = $todo->{fid}; 122 123 my $errcode; 124 my ($status, $unlock) = replicate($dbh, $fid, 125 errref => \$errcode, 126 no_unlock => 1, # to make it return an $unlock subref 127 source_devid => $todo->{fromdevid}, 128 ); 129 if ($status) { 130 # $status is either 0 (failure, handled below), 1 (success, we actually 131 # replicated this file), or 2 (success, but someone else replicated it). 132 133 # when $staus == 2, this delete is unnecessary normally 134 # (somebody else presumably already deleted it if they 135 # also replicated it), but in the case of running with old 136 # replicators from previous versions, -or- simply if the 137 # other guy's delete failed, this cleans it up.... 138 $dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fid); 139 $unlock->() if $unlock; 140 next; 141 } 142 143 # ERROR CASES: 144 145 # README: please keep this up to date if you update the replicate() function so we ensure 146 # that this code always does the right thing 147 # 148 # -- HARMLESS -- 149 # failed_getting_lock => harmless. skip. somebody else probably doing. 150 # 151 # -- TEMPORARY; DO EXPONENTIAL BACKOFF -- 152 # source_down => only source available is observed down. 153 # policy_error_doing_failed => policy plugin fucked up. it's looping. 154 # policy_error_already_there => policy plugin fucked up. it's dumb. 155 # policy_no_suggestions => no copy was attempted. policy is just not happy. 156 # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions. 157 # 158 # -- FATAL; DON'T TRY AGAIN -- 159 # no_source => it simply exists nowhere. not that something's down, but file_on is empty. 160 # no_devices => no devices are configured. at all. why are we replicating something? 161 # how did something come into being since you can't delete devices? 162 163 # bail if we failed getting the lock, that means someone else probably 164 # already did it, so we should just move on 165 if ($errcode eq 'failed_getting_lock') { 166 $unlock->() if $unlock; 167 next; 168 } 169 170 # logic for setting the next try time appropriately 171 my $update_nexttry = sub { 172 my ($type, $delay) = @_; 173 if ($type eq 'end_of_time') { 174 # special; update to a time that won't happen again, as we've encountered a scenario 175 # in which case we're really hosed 176 $dbh->do("UPDATE file_to_replicate SET nexttry = " . ENDOFTIME . ", failcount = failcount + 1 WHERE fid = ?", 177 undef, $fid); 178 } else { 179 my $extra = $type eq 'offset' ? 'UNIX_TIMESTAMP() +' : ''; 180 $dbh->do("UPDATE file_to_replicate SET nexttry = $extra ?, failcount = failcount + 1 WHERE fid = ?", 181 undef, $delay+0, $fid); 182 } 183 error("Failed setting nexttry of fid $fid to $type $delay: " . $dbh->errstr) 184 if $dbh->err; 185 }; 186 187 # now let's handle any error we want to consider a total failure; do not 188 # retry at any point. push this file off to the end so someone has to come 189 # along and figure out what went wrong. 190 if ($errcode eq 'no_source' || $errcode eq 'no_devices') { 191 $update_nexttry->( end_of_time => 1 ); 192 $unlock->() if $unlock; 193 next; 194 } 195 196 # at this point, the rest of the errors require exponential backoff. define what this means 197 # as far as failcount -> delay to next try. 198 # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ... 199 my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 ); 200 $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) ); 201 $unlock->() if $unlock; 202 } 203 204 } 205 206 sub replicate_using_devcounts { 207 my $self = shift; 208 209 MogileFS::Class->foreach(sub { 210 my $mclass = shift; 211 my ($dmid, $classid, $min, $policy_class) = map { $mclass->$_ } qw(domainid classid mindevcount policy_class); 212 213 error("Checking replication for dmid=$dmid, classid=$classid, min=$min") 214 if $Mgd::DEBUG >= 1; 215 216 my $LIMIT = 1000; 217 218 # try going from devcount of 1 up to devcount of $min-1 219 $self->{fidtodo} = {}; 220 my $fixed = 0; 221 my $attempted = 0; 222 my $devcount = 1; 223 while ($fixed < $LIMIT && $devcount < $min) { 224 my $now = time(); 225 my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ". 226 "AND devcount = ? AND length IS NOT NULL ". 227 "LIMIT $LIMIT", undef, $dmid, $classid, $devcount); 228 die $dbh->errstr if $dbh->err; 229 $self->{fidtodo}{$_} = 1 foreach @$fids; 230 231 # increase devcount so we try to replicate the files at the next devcount 232 $devcount++; 233 234 # see if we have any files to replicate 235 my $count = $fids ? scalar @$fids : 0; 236 error(" found $count for dmid=$dmid/classid=$classid/min=$min") 68 237 if $Mgd::DEBUG >= 1; 69 70 my $LIMIT = 1000; 71 72 # try going from devcount of 1 up to devcount of $min-1 73 $self->{fidtodo} = {}; 74 my $fixed = 0; 75 my $attempted = 0; 76 my $devcount = 1; 77 while ($fixed < $LIMIT && $devcount < $min) { 78 my $now = time(); 79 my $fids = $dbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ". 80 "AND devcount = ? AND length IS NOT NULL ". 81 "LIMIT $LIMIT", undef, $dmid, $classid, $devcount); 82 die $dbh->errstr if $dbh->err; 83 $self->{fidtodo}{$_} = 1 foreach @$fids; 84 85 # increase devcount so we try to replicate the files at the next devcount 86 $devcount++; 87 88 # see if we have any files to replicate 89 my $count = $fids ? scalar @$fids : 0; 90 error(" found $count for dmid=$dmid/classid=$classid/min=$min") 91 if $Mgd::DEBUG >= 1; 92 next unless $count; 93 94 # randomize the list so multiple daemons/threads working on 95 # replicate at the same time don't all fight over the 96 # same fids to move 97 my @randfids = List::Util::shuffle(@$fids); 98 99 error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2; 100 foreach my $fid (@randfids) { 101 # now replicate this fid 102 $attempted++; 103 next unless $self->{fidtodo}{$fid}; 104 105 if ($fidfailure{$fid}) { 106 if ($fidfailure{$fid} < $now) { 107 delete $fidfailure{$fid}; 108 } else { 109 next; 110 } 111 } 112 113 $self->read_from_parent; 114 115 if (my $status = replicate($dbh, $fid, $min, $policy_class)) { 116 # $status is either 0 (failure, handled below), 1 (success, we actually 117 # replicated this file), or 2 (success, but someone else replicated it). 118 # so if it's 2, we just want to go to the next fid. this file is done. 119 next if $status == 2; 120 121 # if it was no longer reachable, mark it reachable 122 if (delete $unreachable{$fid}) { 123 $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid); 124 die $dbh->errstr if $dbh->err; 125 } 126 127 # housekeeping 128 $fixed++; 129 $self->send_to_parent("repl_i_did $fid"); 130 131 # status update 132 if ($fixed % 20 == 0) { 133 my $ratio = $fixed/$attempted*100; 134 error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio)) 135 if $fixed % 20 == 0; 136 } 238 next unless $count; 239 240 # randomize the list so multiple daemons/threads working on 241 # replicate at the same time don't all fight over the 242 # same fids to move 243 my @randfids = List::Util::shuffle(@$fids); 244 245 error("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2; 246 foreach my $fid (@randfids) { 247 # now replicate this fid 248 $attempted++; 249 next unless $self->{fidtodo}{$fid}; 250 251 if ($fidfailure{$fid}) { 252 if ($fidfailure{$fid} < $now) { 253 delete $fidfailure{$fid}; 137 254 } else { 138 # failed in replicate, don't retry for a minute 139 $fidfailure{$fid} = $now + 60; 255 next; 140 256 } 141 257 } 258 259 $self->read_from_parent; 260 261 if (my $status = replicate($dbh, $fid, class => $mclass)) { 262 # $status is either 0 (failure, handled below), 1 (success, we actually 263 # replicated this file), or 2 (success, but someone else replicated it). 264 # so if it's 2, we just want to go to the next fid. this file is done. 265 next if $status == 2; 266 267 # if it was no longer reachable, mark it reachable 268 if (delete $unreachable{$fid}) { 269 $dbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid); 270 die $dbh->errstr if $dbh->err; 271 } 272 273 # housekeeping 274 $fixed++; 275 $self->send_to_parent("repl_i_did $fid"); 276 277 # status update 278 if ($fixed % 20 == 0) { 279 my $ratio = $fixed/$attempted*100; 280 error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio)) 281 if $fixed % 20 == 0; 282 } 283 } else { 284 # failed in replicate, don't retry for a minute 285 $fidfailure{$fid} = $now + 60; 286 } 142 287 } 143 } );288 } 144 289 }); 145 290 } 146 291 147 # replicates $fid if its devcount is less than $min. 292 # replicates $fid if its devcount is less than $min. (eh, not quite) 293 # 294 # $policy_class is optional (perl classname representing replication policy). if present, used. if not, looked up based on $fid. 295 # 296 # README: if you update this sub to return a new error code, please update the 297 # appropriate callers to know how to deal with the errors returned. 148 298 sub replicate { 149 my ($dbh, $fid, $min, $policy_class) = @_; 150 299 my ($dbh, $fid, %opts) = @_; 300 my $errref = delete $opts{'errref'}; 301 my $mclass = delete $opts{'class'}; 302 my $no_unlock = delete $opts{'no_unlock'}; 303 my $sdevid = delete $opts{'source_devid'}; 304 die if %opts; 305 306 # bool: if source was explicitly requested by caller 307 my $fixed_source = $sdevid ? 1 : 0; 308 309 $mclass ||= MogileFS::Class->of_fid($fid); 310 311 my $policy_class = $mclass->policy_class; 151 312 eval "use $policy_class; 1;"; 152 313 if ($@) { … … 154 315 } 155 316 317 my $lock; # bool: whether we got the lock or not 156 318 my $lockname = "mgfs:fid:$fid:replicate"; 157 my $lock = $dbh->selectrow_array("SELECT GET_LOCK(?, 1)", undef, 158 $lockname); 159 return error("Unable to obtain lock $lockname") 160 unless $lock; 319 my $unlock = sub { 320 $dbh->selectrow_array("SELECT RELEASE_LOCK(?)", undef, $lockname) 321 if $lock; 322 }; 323 324 my $retunlock = sub { 325 my $rv = shift; 326
