| | 165 | sub _grab_a_job { |
| | 166 | my TheSchwartz $client = shift; |
| | 167 | my $hashdsn = shift; |
| | 168 | my $driver = $client->driver_for($hashdsn); |
| | 169 | |
| | 170 | ## Got some jobs! Randomize them to avoid contention between workers. |
| | 171 | my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_; |
| | 172 | |
| | 173 | JOB: |
| | 174 | while (my $job = shift @jobs) { |
| | 175 | ## Convert the funcid to a funcname, based on this database's map. |
| | 176 | $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) ); |
| | 177 | |
| | 178 | ## Update the job's grabbed_until column so that |
| | 179 | ## no one else takes it. |
| | 180 | my $worker_class = $job->funcname; |
| | 181 | my $old_grabbed_until = $job->grabbed_until; |
| | 182 | |
| | 183 | my $server_time = $client->get_server_time($driver) |
| | 184 | or die "expected a server time"; |
| | 185 | |
| | 186 | $job->grabbed_until($server_time + ($worker_class->grab_for || 1)); |
| | 187 | |
| | 188 | ## Update the job in the database, and end the transaction. |
| | 189 | if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) { |
| | 190 | ## We lost the race to get this particular job--another worker must |
| | 191 | ## have got it and already updated it. Move on to the next job. |
| | 192 | $TheSchwartz::T_LOST_RACE->() if $TheSchwartz::T_LOST_RACE; |
| | 193 | next JOB; |
| | 194 | } |
| | 195 | |
| | 196 | ## Now prepare the job, and return it. |
| | 197 | my $handle = TheSchwartz::JobHandle->new({ |
| | 198 | dsn_hashed => $hashdsn, |
| | 199 | jobid => $job->jobid, |
| | 200 | }); |
| | 201 | $handle->client($client); |
| | 202 | $job->handle($handle); |
| | 203 | return $job; |
| | 204 | } |
| | 205 | |
| | 206 | return undef; |
| | 207 | } |
| | 208 | |