Changeset 79

Show
Ignore:
Timestamp:
07/06/06 22:24:37 (2 years ago)
Author:
btrott
Message:

Fixed grab-race.t by reorganizing the way we select jobs. Now we select a batch of
jobs, shuffle them, then try to update grabbed_until until we actually successfully
updated a job. This fixes/reduces the contention issue between workers.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/lib/TheSchwartz.pm

    r77 r79  
    2323# test harness hook 
    2424our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; 
     25 
     26## Number of jobs to fetch at a time in find_job_for_workers. 
     27our $FIND_JOB_BATCH_SIZE = 50; 
    2528 
    2629sub new { 
     
    120123        my $driver = $client->driver_for($hashdsn); 
    121124 
    122         my $job
     125        my @jobs
    123126        eval { 
    124             ## Start a transaction, because if we find a job we'll need to 
    125             ## update it. 
    126             $driver->begin_work; 
    127  
    128127            ## Search for jobs in this database where: 
    129128            ## 1. funcname is in the list of abilities this $client supports; 
     
    134133                      @$worker_classes; 
    135134            my $now = time; 
    136             ($job) = $driver->search('TheSchwartz::Job' => { 
     135            @jobs = $driver->search('TheSchwartz::Job' => { 
    137136                    funcid        => \@ids, 
    138137                    run_after     => { op => '<=', value => $now }, 
    139138                    grabbed_until => { op => '<=', value => $now }, 
    140                 }, { limit => 1 }); 
     139                }, { limit => $FIND_JOB_BATCH_SIZE }); 
    141140        }; 
    142141        if ($@) { 
     
    149148        $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; 
    150149 
    151         if ($job) { 
    152             ## Got a job! 
    153  
     150        ## If we didn't find any jobs, move on to the next database. 
     151        next unless @jobs; 
     152 
     153        ## Got some jobs! Randomize them to avoid contention between workers. 
     154        @jobs = shuffle(@jobs); 
     155 
     156        JOB: 
     157        while (my $job = shift @jobs) { 
    154158            ## Convert the funcid to a funcname, based on this database's map. 
    155159            $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) ); 
     
    158162            ## no one else takes it. 
    159163            my $worker_class = $job->funcname; 
    160             $job->grabbed_until(time + $worker_class->grab_for); 
     164            my $old_grabbed_until = $job->grabbed_until; 
     165            $job->grabbed_until(time + ($worker_class->grab_for || 1)); 
    161166 
    162167            ## Update the job in the database, and end the transaction. 
    163             $driver->update($job); 
    164             $driver->commit; 
     168            if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) { 
     169                ## We lost the race to get this particular job--another worker must 
     170                ## have got it and already updated it. Move on to the next job. 
     171                next JOB; 
     172            } 
    165173 
    166174            ## Now prepare the job, and return it. 
     
    173181            return $job; 
    174182        } 
    175  
    176         ## If we didn't find a job, we need to commit to end the 
    177         ## transaction in this database. 
    178         $driver->commit; 
    179183    } 
    180184}