Changeset 48
- Timestamp:
- 07/19/04 21:23:59 (4 years ago)
- Files:
-
- trunk/api/perl/MogileFS.pm (modified) (6 diffs)
- trunk/devnotes/sql.txt (modified) (2 diffs)
- trunk/server/mogilefsd (modified) (11 diffs)
- trunk/server/mogstored (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/api/perl/MogileFS.pm
r45 r48 23 23 use strict; 24 24 use Carp; 25 use IO::WrapTie; 25 26 use fields qw(root domain backend); 26 27 … … 50 51 sub new_file { 51 52 my MogileFS $self = shift; 52 my ($key, $class ) = @_;53 my ($key, $class, $bytes) = @_; 53 54 54 55 my $res = $self->{backend}->do_request … … 60 61 61 62 # create a MogileFS::NewFile object, based off of IO::File 62 return MogileFS::NewFile->new( 63 mg => $self, 64 fid => $res->{fid}, 65 path => $res->{path}, 66 devid => $res->{devid}, 67 class => $class, 68 key => $key 69 ); 63 if ($res->{path} =~ m!^http://!) { 64 return IO::WrapTie::wraptie('MogileFS::NewHTTPFile', 65 mg => $self, 66 fid => $res->{fid}, 67 path => $res->{path}, 68 devid => $res->{devid}, 69 class => $class, 70 key => $key, 71 content_length => $bytes+0, 72 ); 73 } else { 74 return MogileFS::NewFile->new( 75 mg => $self, 76 fid => $res->{fid}, 77 path => $res->{path}, 78 devid => $res->{devid}, 79 class => $class, 80 key => $key 81 ); 82 } 70 83 } 71 84 … … 80 93 }) or return undef; 81 94 82 return map { "$self->{root}/" . $res->{"path$_"} } (1..$res->{paths}); 95 my @paths = map { $res->{"path$_"} } (1..$res->{paths}); 96 return @paths if scalar(@paths) > 0 && $paths[0] =~ m!^http://!; 97 return map { "$self->{root}/$_"} @paths; 83 98 } 84 99 … … 124 139 # FIXME: add actual validation 125 140 { 126 $self->{root} = $args{root} or 127 _fail("constructor requires parameter 'root'"); 128 141 # root is only needed for NFS based installations 142 unless (ref $args{hosts} && $args{hosts}->[0] =~ /:/) { 143 $self->{root} = $args{root} or 144 _fail("constructor requires parameter 'root' for non-HTTP setups"); 145 } 146 147 # get domain (required) 129 148 $self->{domain} = $args{domain} or 130 149 _fail("constructor requires parameter 'domain'"); … … 491 510 } 492 511 512 ################################################################################ 513 # MogileFS::HTTPFile object 514 # NOTE: This is meant to be used within IO::WrapTie... 515 # 516 517 package MogileFS::NewHTTPFile; 518 519 use fields ('host', 520 'sock', # IO::Socket; created only when we need it 521 'uri', 522 'data', # buffered data we have 523 'pos', # simulated file position 524 'length', # length of data field 525 'content_length', # declared length of data we will be receiving (not required) 526 'mg', 527 'fid', 528 'devid', 529 'class', 530 'key', 531 'path', # full URL to save data to 532 ); 533 534 sub TIEHANDLE { 535 my MogileFS::NewHTTPFile $self = shift; 536 $self = fields::new($self) unless ref $self; 537 538 my %args = @_; 539 return undef unless $args{path} =~ m!http://(.+?)(/.+)$!; 540 541 $self->{host} = $1; 542 $self->{uri} = $2; 543 $self->{data} = ''; 544 $self->{length} = 0; 545 $self->{content_length} = $args{content_length} + 0; 546 $self->{pos} = 0; 547 $self->{$_} = $args{$_} foreach qw(mg fid devid class key path); 548 549 return $self; 550 } 551 *new = *TIEHANDLE; 552 553 sub PRINT { 554 my MogileFS::NewHTTPFile $self = shift; 555 556 # get data to send to server 557 my $data = shift; 558 my $newlen = length $data; 559 $self->{pos} += $newlen; 560 561 # now make socket if we don't have one 562 if (!$self->{sock} && $self->{content_length}) { 563 $self->{sock} = IO::Socket::INET->new(PeerAddr => $self->{host}) 564 or die "Error: unable to open socket: $!\n"; 565 $self->{sock}->print("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{content_length}\r\n\r\n"); 566 } 567 568 # write some data to our socket 569 if ($self->{sock}) { 570 # store on data if we're under 1k 571 if ($self->{length} < 1024) { 572 if ($self->{length} + $newlen > 1024) { 573 $self->{length} = 1024; 574 $self->{data} .= substr($data, 0, 1024 - $self->{length}); 575 } else { 576 $self->{length} += $newlen; 577 $self->{data} .= $data; 578 } 579 } 580 581 # actually write 582 $self->{sock}->print($data); 583 } else { 584 # or not, just stick it on our queued data 585 $self->{data} .= $data; 586 $self->{length} += $newlen; 587 } 588 } 589 *print = *PRINT; 590 591 # get/set functions 592 sub _getset { 593 my MogileFS::NewHTTPFile $self = shift; 594 my $what = shift; 595 596 if (@_) { 597 # note: we're a TIEHANDLE interface, so we're not QUITE like a 598 # normal class... our parameters tend to come in via an arrayref 599 my $val = shift; 600 $val = shift(@$val) if ref $val eq 'ARRAY'; 601 return $self->{$what} = $val; 602 } else { 603 return $self->{$what}; 604 } 605 } 606 sub path { _getset(shift, 'path'); } 607 sub class { _getset(shift, 'class', @_); } 608 sub key { _getset(shift, 'key', @_); } 609 610 sub CLOSE { 611 my MogileFS::NewHTTPFile $self = shift; 612 613 # if we're closed and we have no sock... 614 unless ($self->{sock}) { 615 $self->{sock} = IO::Socket::INET->new(PeerAddr => $self->{host}) 616 or die "Error: unable to open socket: $!\n"; 617 $self->{sock}->print("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{length}\r\n\r\n"); 618 $self->{sock}->print($self->{data}); 619 } 620 621 # get response from put 622 if ($self->{sock}) { 623 my $line = $self->{sock}->getline; 624 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 625 # all 2xx responses are success 626 unless ($1 >= 200 && $1 <= 299) { 627 $@ = "HTTP response $1 from upload\n"; 628 return undef; 629 } 630 } else { 631 $@ = "Response line not understood: $line\n"; 632 return undef; 633 } 634 $self->{sock}->close; 635 } 636 637 my MogileFS $mg = $self->{mg}; 638 my $domain = $mg->{domain}; 639 640 my $fid = $self->{fid}; 641 my $devid = $self->{devid}; 642 my $path = $self->{path}; 643 644 my $key = shift || $self->{key}; 645 646 $mg->{backend}->do_request 647 ("create_close", { 648 fid => $fid, 649 devid => $devid, 650 domain => $domain, 651 size => $self->{content_length} ? $self->{content_length} : $self->{length}, 652 key => $key, 653 path => $path, 654 }) or return undef; 655 656 return 1; 657 } 658 *close = *CLOSE; 659 660 sub TELL { 661 # return our current pos 662 return $_[0]->{pos}; 663 } 664 *tell = *TELL; 665 666 sub SEEK { 667 # simply set pos... 668 die "ERROR: Seek past end of file\n" if $_[1] > $_[0]->{length}; 669 $_[0]->{pos} = $_[1]; 670 } 671 *seek = *SEEK; 672 673 sub EOF { 674 return ($_[0]->{pos} >= $_[0]->{length}) ? 1 : 0; 675 } 676 *eof = *EOF; 677 678 sub BINMODE { 679 # no-op, we're always in binary mode 680 } 681 *binmode = *BINMODE; 682 683 sub READ { 684 my MogileFS::NewHTTPFile $self = shift; 685 my $count = $_[1] + 0; 686 687 my $max = $self->{length} - $self->{pos}; 688 $max = $count if $count < $max; 689 690 $_[0] = substr($self->{data}, $self->{pos}, $max); 691 $self->{pos} += $max; 692 693 return $max; 694 } 695 *read = *READ; 696 697 sub AUTOLOAD { 698 use vars qw($AUTOLOAD); 699 warn "Error: $AUTOLOAD not implemented.\n"; 700 } 701 493 702 1; trunk/devnotes/sql.txt
r22 r48 27 27 classname VARCHAR(50), 28 28 UNIQUE (dmid,classname), 29 mindevcount TINYINT UNSIGNED NOT NULL ,29 mindevcount TINYINT UNSIGNED NOT NULL 30 30 ); 31 31 … … 127 127 128 128 status ENUM('alive','dead','down'), 129 http_port MEDIUMINT UNSIGNED DEFAULT 7500, 129 130 130 131 hostname VARCHAR(40), trunk/server/mogilefsd
r43 r48 40 40 $DEFAULT_MOG_ROOT = "/mnt/mogilefs"; 41 41 $DEBUG = 0; 42 43 use constant USE_HTTP => 1; 42 44 43 45 my ( … … 320 322 321 323 my $path = make_path($devid, $fid); 322 my $rv = unlink "$Mgd::MOG_ROOT/$path"; 324 my $rv = 0; 325 if (my $urlref = Mgd::is_url($path)) { 326 # hit up the server and delete it 327 my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0], 328 PeerPort => $urlref->[1], 329 Timeout => 2); 330 unless ($sock) { 331 # timeout or something, mark this device as down for now and move on 332 $dev_down{$devid} = 1; 333 next; 334 } 335 336 # send delete request 337 print "Sending delete for $path\n" if $Mgd::DEBUG >= 2; 338 print $sock "DELETE $urlref->[2] HTTP/1.0\r\n\r\n"; 339 my $response = <$sock>; 340 if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 341 if (($1 >= 200 && $1 <= 299) || $1 == 404) { 342 # effectively means all went well 343 $rv = 1; 344 } else { 345 # remote file system error? mark node as down 346 warn "Error: unlink failure: $path: $1\n"; 347 $dev_down{$devid} = 1; 348 next; 349 } 350 } else { 351 warn "Error: unknown response line: $response\n"; 352 } 353 } else { 354 # do normal unlink 355 $rv = unlink "$Mgd::MOG_ROOT/$path"; 356 } 323 357 324 358 # device is timing out. take note of it and … … 346 380 347 381 } 382 } 383 384 # copies a file from one Perlbal to another utilizing HTTP 385 sub http_copy { 386 my ($sdevid, $ddevid, $fid) = @_; 387 388 # get some information we'll need 389 my $devs = Mgd::get_device_summary(); 390 my ($sdev, $ddev) = ($devs->{$sdevid}, $devs->{$ddevid}); 391 unless (ref $sdev && ref $ddev) { 392 warn "Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid\n"; 393 return 0; 394 } 395 my ($spath, $dpath) = (Mgd::make_http_path($sdevid, $fid), 396 Mgd::make_http_path($ddevid, $fid)); 397 my ($shost, $sport) = (Mgd::hostid_ip($sdev->{hostid}), Mgd::hostid_http_port($sdev->{hostid})); 398 my ($dhost, $dport) = (Mgd::hostid_ip($ddev->{hostid}), Mgd::hostid_http_port($ddev->{hostid})); 399 unless (defined $spath && defined $dpath && defined $shost && defined $dhost && $sport && $dport) { 400 # show detailed information to find out what's not configured right 401 warn "Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid\n"; 402 warn " http://$shost:$sport$spath -> http://$dhost:$dport$dpath\n"; 403 return 0; 404 } 405 print "http://$shost:$sport$spath -> http://$dhost:$dport$dpath\n" if $Mgd::DEBUG >= 2; 406 407 # setup our pipe error handler, in case we get closed on 408 my $pipe_closed = 0; 409 local $SIG{PIPE} = sub { $pipe_closed = 1; }; 410 411 # okay, now get the file 412 my $sock = IO::Socket::INET->new(PeerAddr => $shost, PeerPort => $sport, Timeout => 2) 413 or return 0; 414 print $sock "GET $spath HTTP/1.0\r\n\r\n"; 415 return 0 if $pipe_closed; 416 417 # we just want a content length 418 my $clen; 419 while (defined (my $line = <$sock>)) { 420 $line =~ s/[\s\r\n]+$//; 421 last unless length $line; 422 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 423 # make sure we get a good response 424 unless ($1 >= 200 && $1 <= 299) { 425 warn "Error: Resource http://$shost:$sport$spath failed: HTTP $1\n"; 426 return 0; 427 } 428 } 429 next unless $line =~ /^Content-length:\s*(\d+)\s*$/i; 430 $clen = $1; 431 } 432 return 0 unless $clen; 433 434 # open target for put 435 my $dsock = IO::Socket::INET->new(PeerAddr => $dhost, PeerPort => $dport, Timeout => 2) or return 0; 436 $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n") or return 0; 437 return 0 if $pipe_closed; 438 439 # now read data and print while we're reading 440 my ($data, $read) = ('', 0); 441 while (!$pipe_closed && (my $bytes = read($sock, $data, $clen - $read))) { 442 $read += $bytes; 443 last unless $dsock->write($data); 444 } 445 446 # now read in the response line (should be first line) 447 my $line = <$dsock>; 448 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 449 return 1 if $1 >= 200 && $1 <= 299; 450 warn "Error: got a 404 in put: device not on host?: http://$dhost:$dport$dpath" if $1 == 404; 451 } else { 452 warn "Error: HTTP response line not recognized: $line"; 453 } 454 return 0; 348 455 } 349 456 … … 408 515 $sdevid ||= @exist_devid[int(rand(scalar @exist_devid))]; 409 516 410 my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid); 411 my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid); 412 my $rv = File::Copy::copy($src_path, $dst_path); 517 my $rv = undef; 518 if (Mgd::USE_HTTP) { 519 $rv = http_copy($sdevid, $ddevid, $fid); 520 } else { 521 my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid); 522 my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid); 523 $rv = File::Copy::copy($src_path, $dst_path); 524 } 413 525 414 526 return $retunlock->(0) unless $rv; … … 536 648 %cache_host = (); 537 649 my $dbh = get_dbh(); 538 my $sth = $dbh->prepare("SELECT hostid, status, hostname, hostip, remoteroot FROM host");650 my $sth = $dbh->prepare("SELECT hostid, status, hostname, hostip, http_port, remoteroot FROM host"); 539 651 $sth->execute; 540 652 $cache_host{$_->{hostid}} = $_ while $_ = $sth->fetchrow_hashref; … … 549 661 } 550 662 663 # get size of file, return 0 on error 664 sub get_file_size { 665 my $path = shift; 666 667 # quick case -- just a file on disk 668 unless ($path =~ m!^http://(.+?)(/.+)$!) { 669 return -s "$Mgd::MOG_ROOT/$path" 670 } 671 672 # URL; use a HEAD request to get the size of the file 673 my $sock = IO::Socket::INET->new(PeerAddr => $1, Timeout => 2) 674 or return 0; 675 print $sock "HEAD $2 HTTP/1.0\r\n\r\n"; 676 while (defined (my $line = <$sock>)) { 677 if ($line =~ /^Content-length: (\d+)/i) { 678 return $1+0; 679 } 680 } 681 682 # no content length found? 683 return 0; 684 } 685 551 686 sub domain_id { 552 687 my $domain = shift; … … 567 702 } 568 703 704 sub hostid_ip { 705 my $hostid = shift; 706 check_host_cache(); 707 my $h = $cache_host{$hostid}; 708 return $h ? $h->{hostip} : undef; 709 } 710 711 sub hostid_http_port { 712 my $hostid = shift; 713 check_host_cache(); 714 my $h = $cache_host{$hostid}; 715 return $h ? $h->{http_port} : undef; 716 } 717 718 sub make_http_path { 719 my ($devid, $fid) = @_; 720 721 my $dsum = get_device_summary(); 722 my $dinfo = $dsum->{$devid}; 723 return undef unless $dinfo; 724 my $hostname = hostid_name($dinfo->{hostid}); 725 726 my $nfid = sprintf '%010d', $fid; 727 my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} ); 728 729 return "/dev$devid/$b/$mmm/$ttt/$nfid.fid"; 730 } 731 732 sub make_full_url { 733 my ($devid, $fid) = @_; 734 735 # get some information we'll need 736 my $devs = Mgd::get_device_summary(); 737 my $dev = $devs->{$devid} or return undef; 738 my $path = Mgd::make_http_path($devid, $fid) or return undef; 739 my $host = Mgd::hostid_ip($dev->{hostid}) or return undef; 740 my $port = Mgd::hostid_http_port($dev->{hostid}) or return undef; 741 return "http://$host:$port$path"; 742 } 743 744 # if given an HTTP URL, break it down into [ host, port, URI ], else 745 # returns undef 746 sub is_url { 747 my $path = shift; 748 if ($path =~ m!^http://(.+?)(?::(\d+))?(/.+)$!) { 749 return [ $1, $2 || 80, $3 ]; 750 } 751 return undef; 752 } 753 569 754 sub make_path { 755 # jump out if we should be using HTTP stuff 756 return Mgd::make_full_url(@_) if Mgd::USE_HTTP; 757 570 758 my ($devid, $fid) = @_; 571 759 … … 819 1007 } 820 1008 821 my $size = -s "$Mgd::MOG_ROOT/$path"; 1009 # get size of file and verify that it matches what we were given, if anything 1010 my $size = Mgd::get_file_size($path); 1011 return $self->err_line("size_mismatch") 1012 if $args->{size} && ($args->{size} != $size); 822 1013 823 1014 # TODO: check for EIO? … … 915 1106 916 1107 my $key = $args->{key}; 1108 917 1109 return $self->err_line("no_key") unless length($key); 918 1110 … … 923 1115 my $dbh = Mgd::get_dbh or 924 1116 return $self->err_line("nodb"); 925 1117 926 1118 my $filerow = Mgd::key_filerow($dbh, $dmid, $key); 927 1119 return $self->err_line("unknown_key") unless $filerow; … … 941 1133 next unless $dev && $dev->{status} eq "alive"; 942 1134 my $path = Mgd::make_path($devid, $fid); 943 next unless $ret->{paths} || ( -s "$Mgd::MOG_ROOT/$path"== $filerow->{length});1135 next unless $ret->{paths} || (Mgd::get_file_size($path) == $filerow->{length}); 944 1136 my $n = ++$ret->{paths}; 945 1137 $ret->{"path$n"} = $path; 946 1138 last if $n == 2; # one verified, one likely seems enough for now. time will tell. 947 1139 } 1140 948 1141 return $self->ok_line($ret); 949 1142 } trunk/server/mogstored
r47 r48 25 25 26 26 my $path = "/var/mogdata"; 27 my $listen = "0.0.0.0: 8090";27 my $listen = "0.0.0.0:7500"; 28 28 29 29 my $conf = "
