Changeset 79
- Timestamp:
- 07/06/06 22:24:37 (2 years ago)
- Files:
-
- trunk/lib/TheSchwartz.pm (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/lib/TheSchwartz.pm
r77 r79 23 23 # test harness hook 24 24 our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; 25 26 ## Number of jobs to fetch at a time in find_job_for_workers. 27 our $FIND_JOB_BATCH_SIZE = 50; 25 28 26 29 sub new { … … 120 123 my $driver = $client->driver_for($hashdsn); 121 124 122 my $job;125 my @jobs; 123 126 eval { 124 ## Start a transaction, because if we find a job we'll need to125 ## update it.126 $driver->begin_work;127 128 127 ## Search for jobs in this database where: 129 128 ## 1. funcname is in the list of abilities this $client supports; … … 134 133 @$worker_classes; 135 134 my $now = time; 136 ($job)= $driver->search('TheSchwartz::Job' => {135 @jobs = $driver->search('TheSchwartz::Job' => { 137 136 funcid => \@ids, 138 137 run_after => { op => '<=', value => $now }, 139 138 grabbed_until => { op => '<=', value => $now }, 140 }, { limit => 1});139 }, { limit => $FIND_JOB_BATCH_SIZE }); 141 140 }; 142 141 if ($@) { … … 149 148 $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; 150 149 151 if ($job) { 152 ## Got a job! 153 150 ## If we didn't find any jobs, move on to the next database. 151 next unless @jobs; 152 153 ## Got some jobs! Randomize them to avoid contention between workers. 154 @jobs = shuffle(@jobs); 155 156 JOB: 157 while (my $job = shift @jobs) { 154 158 ## Convert the funcid to a funcname, based on this database's map. 155 159 $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) ); … … 158 162 ## no one else takes it. 159 163 my $worker_class = $job->funcname; 160 $job->grabbed_until(time + $worker_class->grab_for); 164 my $old_grabbed_until = $job->grabbed_until; 165 $job->grabbed_until(time + ($worker_class->grab_for || 1)); 161 166 162 167 ## Update the job in the database, and end the transaction. 163 $driver->update($job); 164 $driver->commit; 168 if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) { 169 ## We lost the race to get this particular job--another worker must 170 ## have got it and already updated it. Move on to the next job. 171 next JOB; 172 } 165 173 166 174 ## Now prepare the job, and return it. … … 173 181 return $job; 174 182 } 175 176 ## If we didn't find a job, we need to commit to end the177 ## transaction in this database.178 $driver->commit;179 183 } 180 184 }
