Changeset 48

Show
Ignore:
Timestamp:
07/19/04 21:23:59 (4 years ago)
Author:
marksmith
Message:

Update MogileFS to support HTTP for all operations -- get, put, copy, and
delete. This is made to interact with a Perlbal setup using mogstored as
the frontend.

People wishing to use the old style MogileFS on a local filesystem or NFS
should turn USE_HTTP to 0 at the top of mogilefsd.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/api/perl/MogileFS.pm

    r45 r48  
    2323use strict; 
    2424use Carp; 
     25use IO::WrapTie; 
    2526use fields qw(root domain backend); 
    2627 
     
    5051sub new_file { 
    5152    my MogileFS $self = shift; 
    52     my ($key, $class) = @_; 
     53    my ($key, $class, $bytes) = @_; 
    5354 
    5455    my $res = $self->{backend}->do_request 
     
    6061 
    6162    # create a MogileFS::NewFile object, based off of IO::File 
    62     return MogileFS::NewFile->new( 
    63                                   mg    => $self, 
    64                                   fid   => $res->{fid}, 
    65                                   path  => $res->{path}, 
    66                                   devid => $res->{devid}, 
    67                                   class => $class, 
    68                                   key   => $key 
    69                                   ); 
     63    if ($res->{path} =~ m!^http://!) { 
     64        return IO::WrapTie::wraptie('MogileFS::NewHTTPFile',  
     65                                          mg    => $self, 
     66                                          fid   => $res->{fid}, 
     67                                          path  => $res->{path}, 
     68                                          devid => $res->{devid}, 
     69                                          class => $class, 
     70                                          key   => $key, 
     71                                          content_length => $bytes+0, 
     72                                          ); 
     73    } else { 
     74        return MogileFS::NewFile->new( 
     75                                      mg    => $self, 
     76                                      fid   => $res->{fid}, 
     77                                      path  => $res->{path}, 
     78                                      devid => $res->{devid}, 
     79                                      class => $class, 
     80                                      key   => $key 
     81                                      ); 
     82    } 
    7083} 
    7184 
     
    8093        }) or return undef; 
    8194 
    82     return map { "$self->{root}/" . $res->{"path$_"} } (1..$res->{paths}); 
     95    my @paths = map { $res->{"path$_"} } (1..$res->{paths}); 
     96    return @paths if scalar(@paths) > 0 && $paths[0] =~ m!^http://!; 
     97    return map { "$self->{root}/$_"} @paths; 
    8398} 
    8499 
     
    124139    # FIXME: add actual validation 
    125140    { 
    126         $self->{root} = $args{root} or 
    127             _fail("constructor requires parameter 'root'"); 
    128  
     141        # root is only needed for NFS based installations 
     142        unless (ref $args{hosts} && $args{hosts}->[0] =~ /:/) { 
     143            $self->{root} = $args{root} or 
     144                _fail("constructor requires parameter 'root' for non-HTTP setups"); 
     145        } 
     146 
     147        # get domain (required) 
    129148        $self->{domain} = $args{domain} or 
    130149            _fail("constructor requires parameter 'domain'"); 
     
    491510} 
    492511 
     512################################################################################ 
     513# MogileFS::HTTPFile object 
     514# NOTE: This is meant to be used within IO::WrapTie... 
     515# 
     516 
     517package MogileFS::NewHTTPFile; 
     518 
     519use fields ('host', 
     520            'sock', # IO::Socket; created only when we need it 
     521            'uri', 
     522            'data', # buffered data we have 
     523            'pos', # simulated file position 
     524            'length', # length of data field 
     525            'content_length', # declared length of data we will be receiving (not required) 
     526            'mg', 
     527            'fid', 
     528            'devid', 
     529            'class', 
     530            'key', 
     531            'path', # full URL to save data to 
     532            ); 
     533 
     534sub TIEHANDLE { 
     535    my MogileFS::NewHTTPFile $self = shift; 
     536    $self = fields::new($self) unless ref $self; 
     537 
     538    my %args = @_; 
     539    return undef unless $args{path} =~ m!http://(.+?)(/.+)$!; 
     540 
     541    $self->{host} = $1; 
     542    $self->{uri} = $2; 
     543    $self->{data} = ''; 
     544    $self->{length} = 0; 
     545    $self->{content_length} = $args{content_length} + 0; 
     546    $self->{pos} = 0; 
     547    $self->{$_} = $args{$_} foreach qw(mg fid devid class key path); 
     548     
     549    return $self; 
     550} 
     551*new = *TIEHANDLE; 
     552 
     553sub PRINT { 
     554    my MogileFS::NewHTTPFile $self = shift; 
     555 
     556    # get data to send to server 
     557    my $data = shift; 
     558    my $newlen = length $data; 
     559    $self->{pos} += $newlen; 
     560 
     561    # now make socket if we don't have one 
     562    if (!$self->{sock} && $self->{content_length}) { 
     563        $self->{sock} = IO::Socket::INET->new(PeerAddr => $self->{host}) 
     564            or die "Error: unable to open socket: $!\n"; 
     565        $self->{sock}->print("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{content_length}\r\n\r\n"); 
     566    } 
     567 
     568    # write some data to our socket 
     569    if ($self->{sock}) { 
     570        # store on data if we're under 1k 
     571        if ($self->{length} < 1024) { 
     572            if ($self->{length} + $newlen > 1024) { 
     573                $self->{length} = 1024; 
     574                $self->{data} .= substr($data, 0, 1024 - $self->{length}); 
     575            } else { 
     576                $self->{length} += $newlen; 
     577                $self->{data} .= $data; 
     578            } 
     579        } 
     580 
     581        # actually write 
     582        $self->{sock}->print($data); 
     583    } else { 
     584        # or not, just stick it on our queued data 
     585        $self->{data} .= $data; 
     586        $self->{length} += $newlen; 
     587    } 
     588} 
     589*print = *PRINT; 
     590 
     591# get/set functions 
     592sub _getset { 
     593    my MogileFS::NewHTTPFile $self = shift; 
     594    my $what = shift; 
     595     
     596    if (@_) { 
     597        # note: we're a TIEHANDLE interface, so we're not QUITE like a 
     598        # normal class... our parameters tend to come in via an arrayref 
     599        my $val = shift; 
     600        $val = shift(@$val) if ref $val eq 'ARRAY'; 
     601        return $self->{$what} = $val; 
     602    } else { 
     603        return $self->{$what}; 
     604    } 
     605} 
     606sub path  { _getset(shift, 'path');      } 
     607sub class { _getset(shift, 'class', @_); } 
     608sub key   { _getset(shift, 'key', @_);   } 
     609 
     610sub CLOSE { 
     611    my MogileFS::NewHTTPFile $self = shift; 
     612 
     613    # if we're closed and we have no sock... 
     614    unless ($self->{sock}) { 
     615        $self->{sock} = IO::Socket::INET->new(PeerAddr => $self->{host}) 
     616            or die "Error: unable to open socket: $!\n"; 
     617        $self->{sock}->print("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{length}\r\n\r\n"); 
     618        $self->{sock}->print($self->{data}); 
     619    } 
     620     
     621    # get response from put 
     622    if ($self->{sock}) { 
     623        my $line = $self->{sock}->getline; 
     624        if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 
     625            # all 2xx responses are success 
     626            unless ($1 >= 200 && $1 <= 299) { 
     627                $@ = "HTTP response $1 from upload\n"; 
     628                return undef; 
     629            } 
     630        } else { 
     631            $@ = "Response line not understood: $line\n"; 
     632            return undef; 
     633        } 
     634        $self->{sock}->close; 
     635    } 
     636 
     637    my MogileFS $mg = $self->{mg}; 
     638    my $domain = $mg->{domain}; 
     639 
     640    my $fid   = $self->{fid}; 
     641    my $devid = $self->{devid}; 
     642    my $path  = $self->{path}; 
     643 
     644    my $key = shift || $self->{key}; 
     645 
     646    $mg->{backend}->do_request 
     647        ("create_close", { 
     648            fid    => $fid, 
     649            devid  => $devid, 
     650            domain => $domain, 
     651            size   => $self->{content_length} ? $self->{content_length} : $self->{length}, 
     652            key    => $key, 
     653            path   => $path, 
     654        }) or return undef; 
     655 
     656    return 1; 
     657} 
     658*close = *CLOSE; 
     659 
     660sub TELL { 
     661    # return our current pos 
     662    return $_[0]->{pos}; 
     663} 
     664*tell = *TELL; 
     665 
     666sub SEEK { 
     667    # simply set pos... 
     668    die "ERROR: Seek past end of file\n" if $_[1] > $_[0]->{length}; 
     669    $_[0]->{pos} = $_[1]; 
     670} 
     671*seek = *SEEK; 
     672 
     673sub EOF { 
     674    return ($_[0]->{pos} >= $_[0]->{length}) ? 1 : 0; 
     675} 
     676*eof = *EOF; 
     677 
     678sub BINMODE { 
     679    # no-op, we're always in binary mode 
     680} 
     681*binmode = *BINMODE; 
     682 
     683sub READ { 
     684    my MogileFS::NewHTTPFile $self = shift; 
     685    my $count = $_[1] + 0; 
     686     
     687    my $max = $self->{length} - $self->{pos}; 
     688    $max = $count if $count < $max; 
     689 
     690    $_[0] = substr($self->{data}, $self->{pos}, $max); 
     691    $self->{pos} += $max; 
     692 
     693    return $max; 
     694} 
     695*read = *READ; 
     696 
     697sub AUTOLOAD { 
     698    use vars qw($AUTOLOAD); 
     699    warn "Error: $AUTOLOAD not implemented.\n"; 
     700} 
     701 
    4937021; 
  • trunk/devnotes/sql.txt

    r22 r48  
    2727      classname     VARCHAR(50), 
    2828      UNIQUE      (dmid,classname), 
    29       mindevcount   TINYINT UNSIGNED NOT NULL, 
     29      mindevcount   TINYINT UNSIGNED NOT NULL 
    3030); 
    3131 
     
    127127 
    128128   status     ENUM('alive','dead','down'), 
     129   http_port  MEDIUMINT UNSIGNED DEFAULT 7500, 
    129130 
    130131   hostname   VARCHAR(40), 
  • trunk/server/mogilefsd

    r43 r48  
    4040$DEFAULT_MOG_ROOT = "/mnt/mogilefs"; 
    4141$DEBUG = 0; 
     42 
     43use constant USE_HTTP => 1; 
    4244 
    4345my ( 
     
    320322 
    321323                my $path = make_path($devid, $fid); 
    322                 my $rv = unlink "$Mgd::MOG_ROOT/$path"; 
     324                my $rv = 0; 
     325                if (my $urlref = Mgd::is_url($path)) { 
     326                    # hit up the server and delete it 
     327                    my $sock = IO::Socket::INET->new(PeerAddr => $urlref->[0], 
     328                                                     PeerPort => $urlref->[1], 
     329                                                     Timeout => 2); 
     330                    unless ($sock) { 
     331                        # timeout or something, mark this device as down for now and move on 
     332                        $dev_down{$devid} = 1; 
     333                        next; 
     334                    } 
     335                     
     336                    # send delete request 
     337                    print "Sending delete for $path\n" if $Mgd::DEBUG >= 2; 
     338                    print $sock "DELETE $urlref->[2] HTTP/1.0\r\n\r\n"; 
     339                    my $response = <$sock>; 
     340                    if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 
     341                        if (($1 >= 200 && $1 <= 299) || $1 == 404) { 
     342                            # effectively means all went well 
     343                            $rv = 1; 
     344                        } else { 
     345                            # remote file system error?  mark node as down 
     346                            warn "Error: unlink failure: $path: $1\n"; 
     347                            $dev_down{$devid} = 1; 
     348                            next; 
     349                        } 
     350                    } else { 
     351                        warn "Error: unknown response line: $response\n"; 
     352                    } 
     353                } else { 
     354                    # do normal unlink 
     355                    $rv = unlink "$Mgd::MOG_ROOT/$path"; 
     356                } 
    323357 
    324358                # device is timing out.  take note of it and 
     
    346380 
    347381    } 
     382} 
     383 
     384# copies a file from one Perlbal to another utilizing HTTP 
     385sub http_copy { 
     386    my ($sdevid, $ddevid, $fid) = @_; 
     387 
     388    # get some information we'll need 
     389    my $devs = Mgd::get_device_summary(); 
     390    my ($sdev, $ddev) = ($devs->{$sdevid}, $devs->{$ddevid}); 
     391    unless (ref $sdev && ref $ddev) { 
     392        warn "Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid\n"; 
     393        return 0; 
     394    } 
     395    my ($spath, $dpath) = (Mgd::make_http_path($sdevid, $fid), 
     396                           Mgd::make_http_path($ddevid, $fid)); 
     397    my ($shost, $sport) = (Mgd::hostid_ip($sdev->{hostid}), Mgd::hostid_http_port($sdev->{hostid})); 
     398    my ($dhost, $dport) = (Mgd::hostid_ip($ddev->{hostid}), Mgd::hostid_http_port($ddev->{hostid})); 
     399    unless (defined $spath && defined $dpath && defined $shost && defined $dhost && $sport && $dport) { 
     400        # show detailed information to find out what's not configured right 
     401        warn "Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid\n"; 
     402        warn "       http://$shost:$sport$spath -> http://$dhost:$dport$dpath\n"; 
     403        return 0; 
     404    } 
     405    print "http://$shost:$sport$spath -> http://$dhost:$dport$dpath\n" if $Mgd::DEBUG >= 2; 
     406 
     407    # setup our pipe error handler, in case we get closed on 
     408    my $pipe_closed = 0; 
     409    local $SIG{PIPE} = sub { $pipe_closed = 1; }; 
     410 
     411    # okay, now get the file 
     412    my $sock = IO::Socket::INET->new(PeerAddr => $shost, PeerPort => $sport, Timeout => 2) 
     413        or return 0; 
     414    print $sock "GET $spath HTTP/1.0\r\n\r\n"; 
     415    return 0 if $pipe_closed; 
     416 
     417    # we just want a content length 
     418    my $clen; 
     419    while (defined (my $line = <$sock>)) { 
     420        $line =~ s/[\s\r\n]+$//; 
     421        last unless length $line; 
     422        if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 
     423            # make sure we get a good response 
     424            unless ($1 >= 200 && $1 <= 299) { 
     425                warn "Error: Resource http://$shost:$sport$spath failed: HTTP $1\n"; 
     426                return 0; 
     427            } 
     428        } 
     429        next unless $line =~ /^Content-length:\s*(\d+)\s*$/i; 
     430        $clen = $1; 
     431    } 
     432    return 0 unless $clen; 
     433 
     434    # open target for put 
     435    my $dsock = IO::Socket::INET->new(PeerAddr => $dhost, PeerPort => $dport, Timeout => 2) or return 0; 
     436    $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n") or return 0; 
     437    return 0 if $pipe_closed; 
     438 
     439    # now read data and print while we're reading 
     440    my ($data, $read) = ('', 0); 
     441    while (!$pipe_closed && (my $bytes = read($sock, $data, $clen - $read))) { 
     442        $read += $bytes; 
     443        last unless $dsock->write($data); 
     444    } 
     445 
     446    # now read in the response line (should be first line) 
     447    my $line = <$dsock>; 
     448    if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { 
     449        return 1 if $1 >= 200 && $1 <= 299; 
     450        warn "Error: got a 404 in put: device not on host?: http://$dhost:$dport$dpath" if $1 == 404; 
     451    } else { 
     452        warn "Error: HTTP response line not recognized: $line"; 
     453    } 
     454    return 0; 
    348455} 
    349456 
     
    408515        $sdevid ||= @exist_devid[int(rand(scalar @exist_devid))]; 
    409516 
    410         my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid); 
    411         my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid); 
    412         my $rv = File::Copy::copy($src_path, $dst_path); 
     517        my $rv = undef; 
     518        if (Mgd::USE_HTTP) { 
     519            $rv = http_copy($sdevid, $ddevid, $fid); 
     520        } else { 
     521            my $dst_path = $MOG_ROOT . "/" . make_path($ddevid, $fid); 
     522            my $src_path = $MOG_ROOT . "/" . make_path($sdevid, $fid); 
     523            $rv = File::Copy::copy($src_path, $dst_path); 
     524        } 
    413525 
    414526        return $retunlock->(0) unless $rv; 
     
    536648    %cache_host = (); 
    537649    my $dbh = get_dbh(); 
    538     my $sth = $dbh->prepare("SELECT hostid, status, hostname, hostip, remoteroot FROM host"); 
     650    my $sth = $dbh->prepare("SELECT hostid, status, hostname, hostip, http_port, remoteroot FROM host"); 
    539651    $sth->execute; 
    540652    $cache_host{$_->{hostid}} = $_ while $_ = $sth->fetchrow_hashref; 
     
    549661} 
    550662 
     663# get size of file, return 0 on error 
     664sub get_file_size { 
     665    my $path = shift; 
     666 
     667    # quick case -- just a file on disk 
     668    unless ($path =~ m!^http://(.+?)(/.+)$!) { 
     669        return -s "$Mgd::MOG_ROOT/$path" 
     670    } 
     671 
     672    # URL; use a HEAD request to get the size of the file 
     673    my $sock = IO::Socket::INET->new(PeerAddr => $1, Timeout => 2) 
     674        or return 0; 
     675    print $sock "HEAD $2 HTTP/1.0\r\n\r\n"; 
     676    while (defined (my $line = <$sock>)) { 
     677        if ($line =~ /^Content-length: (\d+)/i) { 
     678            return $1+0; 
     679        } 
     680    } 
     681 
     682    # no content length found? 
     683    return 0; 
     684} 
     685 
    551686sub domain_id { 
    552687    my $domain = shift; 
     
    567702} 
    568703 
     704sub hostid_ip { 
     705    my $hostid = shift; 
     706    check_host_cache(); 
     707    my $h = $cache_host{$hostid}; 
     708    return $h ? $h->{hostip} : undef; 
     709} 
     710 
     711sub hostid_http_port { 
     712    my $hostid = shift; 
     713    check_host_cache(); 
     714    my $h = $cache_host{$hostid}; 
     715    return $h ? $h->{http_port} : undef; 
     716} 
     717 
     718sub make_http_path { 
     719    my ($devid, $fid) = @_; 
     720 
     721    my $dsum = get_device_summary(); 
     722    my $dinfo = $dsum->{$devid}; 
     723    return undef unless $dinfo; 
     724    my $hostname = hostid_name($dinfo->{hostid}); 
     725 
     726    my $nfid = sprintf '%010d', $fid; 
     727    my ( $b, $mmm, $ttt, $hto ) = ( $nfid =~ m{(\d)(\d{3})(\d{3})(\d{3})} ); 
     728 
     729    return "/dev$devid/$b/$mmm/$ttt/$nfid.fid"; 
     730} 
     731 
     732sub make_full_url { 
     733    my ($devid, $fid) = @_; 
     734 
     735    # get some information we'll need 
     736    my $devs = Mgd::get_device_summary(); 
     737    my $dev = $devs->{$devid} or return undef; 
     738    my $path = Mgd::make_http_path($devid, $fid) or return undef; 
     739    my $host = Mgd::hostid_ip($dev->{hostid}) or return undef; 
     740    my $port = Mgd::hostid_http_port($dev->{hostid}) or return undef; 
     741    return "http://$host:$port$path"; 
     742} 
     743 
     744# if given an HTTP URL, break it down into [ host, port, URI ], else 
     745# returns undef 
     746sub is_url { 
     747    my $path = shift; 
     748    if ($path =~ m!^http://(.+?)(?::(\d+))?(/.+)$!) { 
     749        return [ $1, $2 || 80, $3 ]; 
     750    } 
     751    return undef; 
     752} 
     753 
    569754sub make_path { 
     755    # jump out if we should be using HTTP stuff 
     756    return Mgd::make_full_url(@_) if Mgd::USE_HTTP; 
     757     
    570758    my ($devid, $fid) = @_; 
    571759 
     
    8191007    } 
    8201008 
    821     my $size = -s "$Mgd::MOG_ROOT/$path"; 
     1009    # get size of file and verify that it matches what we were given, if anything 
     1010    my $size = Mgd::get_file_size($path); 
     1011    return $self->err_line("size_mismatch") 
     1012        if $args->{size} && ($args->{size} != $size); 
    8221013 
    8231014    # TODO: check for EIO? 
     
    9151106 
    9161107    my $key = $args->{key}; 
     1108 
    9171109    return $self->err_line("no_key") unless length($key); 
    9181110 
     
    9231115    my $dbh = Mgd::get_dbh or 
    9241116        return $self->err_line("nodb"); 
    925  
     1117     
    9261118    my $filerow = Mgd::key_filerow($dbh, $dmid, $key); 
    9271119    return $self->err_line("unknown_key") unless $filerow; 
     
    9411133        next unless $dev && $dev->{status} eq "alive"; 
    9421134        my $path = Mgd::make_path($devid, $fid); 
    943         next unless $ret->{paths} || (-s "$Mgd::MOG_ROOT/$path" == $filerow->{length}); 
     1135        next unless $ret->{paths} || (Mgd::get_file_size($path) == $filerow->{length}); 
    9441136        my $n = ++$ret->{paths}; 
    9451137        $ret->{"path$n"} = $path; 
    9461138        last if $n == 2;   # one verified, one likely seems enough for now.  time will tell. 
    9471139    } 
     1140 
    9481141    return $self->ok_line($ret); 
    9491142} 
  • trunk/server/mogstored

    r47 r48  
    2525 
    2626my $path = "/var/mogdata"; 
    27 my $listen = "0.0.0.0:8090"; 
     27my $listen = "0.0.0.0:7500"; 
    2828 
    2929my $conf = "