root/branches/release-30/extlib/TheSchwartz.pm @ 1406

Revision 1406, 30.1 kB (checked in by bchoate, 21 months ago)

Updated with prioritization support from TheSchwartz 1.07

  • Property svn:keywords set to Id Revision
Line 
1# $Id$
2
3package TheSchwartz;
4use strict;
5use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize );
6
7our $VERSION = "1.07";
8
9use Carp qw( croak );
10use Data::ObjectDriver::Errors;
11use Data::ObjectDriver::Driver::DBI;
12use Digest::MD5 qw( md5_hex );
13use List::Util qw( shuffle );
14use TheSchwartz::FuncMap;
15use TheSchwartz::Job;
16use TheSchwartz::JobHandle;
17
18use constant RETRY_DEFAULT => 30;
19use constant OK_ERRORS => { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, };
20
21# test harness hooks
22our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
23our $T_LOST_RACE;
24
25## Number of jobs to fetch at a time in find_job_for_workers.
26our $FIND_JOB_BATCH_SIZE = 50;
27
28sub new {
29    my TheSchwartz $client = shift;
30    my %args = @_;
31    $client = fields::new($client) unless ref $client;
32
33    croak "databases must be an arrayref if specified"
34        unless !exists $args{databases} || ref $args{databases} eq 'ARRAY';
35    my $databases = delete $args{databases};
36
37    $client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
38    $client->set_prioritize(delete $args{prioritize});
39    $client->set_verbose(delete $args{verbose});
40    $client->set_scoreboard(delete $args{scoreboard});
41    $client->{driver_cache_expiration} = delete $args{driver_cache_expiration} || 0;
42    croak "unknown options ", join(', ', keys %args) if keys %args;
43
44    $client->hash_databases($databases);
45    $client->reset_abilities;
46    $client->{dead_dsns} = {};
47    $client->{retry_at} = {};
48    $client->{funcmap_cache} = {};
49
50    return $client;
51}
52
53sub debug {
54    my TheSchwartz $client = shift;
55    return unless $client->{verbose};
56    $client->{verbose}->(@_);  # ($msg, $job)   but $job is optional
57}
58
59sub hash_databases {
60    my TheSchwartz $client = shift;
61    my($list) = @_;
62    for my $ref (@$list) {
63        my $full = join '|', map { $ref->{$_} || '' } qw( dsn user pass );
64        $client->{databases}{ md5_hex($full) } = $ref;
65    }
66}
67
68sub driver_for {
69    my TheSchwartz $client = shift;
70    my($hashdsn) = @_;
71    my $driver;
72    my $t = time;
73    my $cache_duration = $client->{driver_cache_expiration};
74    if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
75        $driver = $client->{cached_drivers}{$hashdsn}{driver};
76    } else {
77        my $db = $client->{databases}{$hashdsn};
78        $driver = Data::ObjectDriver::Driver::DBI->new(
79                dsn      => $db->{dsn},
80                username => $db->{user},
81                password => $db->{pass},
82                ($db->{prefix} ? (prefix   => $db->{prefix}) : ()),
83        );
84        if ($cache_duration) {
85            $client->{cached_drivers}{$hashdsn}{driver} = $driver;
86            $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
87        }
88    }
89    return $driver;
90}
91
92sub mark_database_as_dead {
93    my TheSchwartz $client = shift;
94    my($hashdsn) = @_;
95    $client->{dead_dsns}{$hashdsn} = 1;
96    $client->{retry_at}{$hashdsn} = time + $client->{retry_seconds};
97}
98
99sub is_database_dead {
100    my TheSchwartz $client = shift;
101    my($hashdsn) = @_;
102    ## If this database is marked as dead, check the retry time. If
103    ## it has passed, try the database again to see if it's undead.
104    if ($client->{dead_dsns}{$hashdsn}) {
105        if ($client->{retry_at}{$hashdsn} < time) {
106            delete $client->{dead_dsns}{$hashdsn};
107            delete $client->{retry_at}{$hashdsn};
108            return 0;
109        } else {
110            return 1;
111        }
112    }
113    return 0;
114}
115
116sub lookup_job {
117    my TheSchwartz $client = shift;
118    my $handle = $client->handle_from_string(@_);
119    my $driver = $client->driver_for($handle->dsn_hashed);
120
121    my $id = $handle->jobid;
122    my $job = $driver->lookup('TheSchwartz::Job' => $handle->jobid)
123        or return;
124
125    $job->handle($handle);
126    $job->funcname( $client->funcid_to_name($driver, $handle->dsn_hashed, $job->funcid) );
127    return $job;
128}
129
130sub list_jobs {
131    my TheSchwartz $client = shift;
132    my $arg = shift;
133    my @options;
134    push @options, run_after     => { op => '<=', value => $arg->{run_after} }     if exists $arg->{run_after};
135    push @options, grabbed_until => { op => '<=', value => $arg->{grabbed_until} } if exists $arg->{grabbed_until};
136    die "No funcname" unless exists $arg->{funcname};
137
138    $arg->{want_handle} = 1 unless defined $arg->{want_handle};
139    my $limit = $arg->{limit} || $FIND_JOB_BATCH_SIZE;
140
141    if ($arg->{coalesce}) {
142        $arg->{coalesce_op} ||= '=';
143        push @options, coalesce => { op => $arg->{coalesce_op}, value => $arg->{coalesce}};
144    }
145
146    my @jobs;
147    for my $hashdsn ($client->shuffled_databases) {
148        ## If the database is dead, skip it
149        next if $client->is_database_dead($hashdsn);
150        my $driver = $client->driver_for($hashdsn);
151        my $funcid;
152        if (ref($arg->{funcname})) {
153            $funcid = [map { $client->funcname_to_id($driver, $hashdsn, $_) } @{$arg->{funcname}}];
154        } else {
155            $funcid = $client->funcname_to_id($driver, $hashdsn, $arg->{funcname});
156        }
157
158        if ($arg->{want_handle}) {
159            push @jobs, map {
160                my $handle = TheSchwartz::JobHandle->new({
161                    dsn_hashed => $hashdsn,
162                    client     => $client,
163                    jobid      => $_->jobid
164                    });
165                $_->handle($handle);
166                $_;
167            } $driver->search('TheSchwartz::Job' => {
168                funcid        => $funcid,
169                @options
170                }, { limit => $limit,
171                    ( $client->prioritize ? ( sort => 'priority',
172                    direction => 'descend' ) : () )
173                });
174        } else {
175            push @jobs, $driver->search('TheSchwartz::Job' => {
176                funcid        => $funcid,
177                @options
178                }, { limit => $limit,
179                    ( $client->prioritize ? ( sort => 'priority',
180                        direction => 'descend' ) : () )
181                }
182            );
183        }
184    }
185    return @jobs;
186}
187
188sub find_job_with_coalescing_prefix {
189    my TheSchwartz $client = shift;
190    my ($funcname, $coval) = @_;
191    $coval .= "%";
192    return $client->_find_job_with_coalescing('LIKE', $funcname, $coval);
193}
194
195sub find_job_with_coalescing_value {
196    my TheSchwartz $client = shift;
197    return $client->_find_job_with_coalescing('=', @_);
198}
199
200sub _find_job_with_coalescing {
201    my TheSchwartz $client = shift;
202    my ($op, $funcname, $coval) = @_;
203
204    for my $hashdsn ($client->shuffled_databases) {
205        ## If the database is dead, skip it
206        next if $client->is_database_dead($hashdsn);
207
208        my $driver = $client->driver_for($hashdsn);
209        my $unixtime = $driver->dbd->sql_for_unixtime;
210
211        my @jobs;
212        eval {
213            ## Search for jobs in this database where:
214            ## 1. funcname is in the list of abilities this $client supports;
215            ## 2. the job is scheduled to be run (run_after is in the past);
216            ## 3. no one else is working on the job (grabbed_until is in
217            ##    in the past).
218            my $funcid = $client->funcname_to_id($driver, $hashdsn, $funcname);
219
220            @jobs = $driver->search('TheSchwartz::Job' => {
221                    funcid        => $funcid,
222                    run_after     => \ "<= $unixtime",
223                    grabbed_until => \ "<= $unixtime",
224                    coalesce      => { op => $op, value => $coval },
225                }, { limit => $FIND_JOB_BATCH_SIZE,
226                    ( $client->prioritize ? ( sort => 'priority',
227                        direction => 'descend' ) : () )
228                }
229            );
230        };
231        if ($@) {
232            unless (OK_ERRORS->{ $driver->last_error || 0 }) {
233                $client->mark_database_as_dead($hashdsn);
234            }
235        }
236
237        my $job = $client->_grab_a_job($hashdsn, @jobs);
238        return $job if $job;
239    }
240}
241
242sub find_job_for_workers {
243    my TheSchwartz $client = shift;
244    my($worker_classes) = @_;
245    $worker_classes ||= $client->{current_abilities};
246
247    for my $hashdsn ($client->shuffled_databases) {
248        ## If the database is dead, skip it.
249        next if $client->is_database_dead($hashdsn);
250
251        my $driver = $client->driver_for($hashdsn);
252        my $unixtime = $driver->dbd->sql_for_unixtime;
253
254        my @jobs;
255        eval {
256            ## Search for jobs in this database where:
257            ## 1. funcname is in the list of abilities this $client supports;
258            ## 2. the job is scheduled to be run (run_after is in the past);
259            ## 3. no one else is working on the job (grabbed_until is in
260            ##    in the past).
261            my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
262                      @$worker_classes;
263
264            @jobs = $driver->search('TheSchwartz::Job' => {
265                    funcid        => \@ids,
266                    run_after     => \ "<= $unixtime",
267                    grabbed_until => \ "<= $unixtime",
268                }, { limit => $FIND_JOB_BATCH_SIZE,
269                    ( $client->prioritize ? ( sort => 'priority',
270                    direction => 'descend' ) : () )
271                }
272            );
273        };
274        if ($@) {
275            unless (OK_ERRORS->{ $driver->last_error || 0 }) {
276                $client->mark_database_as_dead($hashdsn);
277            }
278        }
279
280        # for test harness race condition testing
281        $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
282
283        my $job = $client->_grab_a_job($hashdsn, @jobs);
284        return $job if $job;
285    }
286}
287
288sub get_server_time {
289    my TheSchwartz $client = shift;
290    my($driver) = @_;
291    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
292    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
293}
294
295sub _grab_a_job {
296    my TheSchwartz $client = shift;
297    my $hashdsn = shift;
298    my $driver = $client->driver_for($hashdsn);
299
300    ## Got some jobs! Randomize them to avoid contention between workers.
301    my @jobs = shuffle(@_);
302
303  JOB:
304    while (my $job = shift @jobs) {
305        ## Convert the funcid to a funcname, based on this database's map.
306        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
307
308        ## Update the job's grabbed_until column so that
309        ## no one else takes it.
310        my $worker_class = $job->funcname;
311        my $old_grabbed_until = $job->grabbed_until;
312
313        my $server_time = $client->get_server_time($driver)
314            or die "expected a server time";
315
316        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
317
318        ## Update the job in the database, and end the transaction.
319        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
320            ## We lost the race to get this particular job--another worker must
321            ## have got it and already updated it. Move on to the next job.
322            $T_LOST_RACE->() if $T_LOST_RACE;
323            next JOB;
324        }
325
326        ## Now prepare the job, and return it.
327        my $handle = TheSchwartz::JobHandle->new({
328            dsn_hashed => $hashdsn,
329            jobid      => $job->jobid,
330        });
331        $handle->client($client);
332        $job->handle($handle);
333        return $job;
334    }
335
336    return undef;
337}
338
339
340sub shuffled_databases {
341    my TheSchwartz $client = shift;
342    my @dsns = keys %{ $client->{databases} };
343    return shuffle(@dsns);
344}
345
346sub insert_job_to_driver {
347    my $client = shift;
348    my($job, $driver, $hashdsn) = @_;
349    eval {
350        ## Set the funcid of the job, based on the funcname. Since each
351        ## database has a separate cache, this needs to be calculated based
352        ## on the hashed DSN. Also: this might fail, if the database is dead.
353        $job->funcid( $client->funcname_to_id($driver, $hashdsn, $job->funcname) );
354
355        ## This is sub-optimal because of clock skew, but something is
356        ## better than a NULL value. And currently, nothing in TheSchwartz
357        ## code itself uses insert_time. TODO: use server time, but without
358        ## having to do a roundtrip just to get the server time.
359        $job->insert_time(time);
360
361        ## Now, insert the job. This also might fail.
362        $driver->insert($job);
363    };
364    if ($@) {
365        unless (OK_ERRORS->{ $driver->last_error || 0 }) {
366            $client->mark_database_as_dead($hashdsn);
367        }
368    } elsif ($job->jobid) {
369        ## We inserted the job successfully!
370        ## Attach a handle to the job, and return the handle.
371        my $handle = TheSchwartz::JobHandle->new({
372                dsn_hashed => $hashdsn,
373                client     => $client,
374                jobid      => $job->jobid
375            });
376        $job->handle($handle);
377        return $handle;
378    }
379    return undef;
380}
381
382sub insert_jobs {
383    my TheSchwartz $client = shift;
384    my (@jobs) = @_;
385
386    ## Try each of the databases that are registered with $client, in
387    ## random order. If we successfully create the job, exit the loop.
388    my @handles;
389  DATABASE:
390    for my $hashdsn ($client->shuffled_databases) {
391        ## If the database is dead, skip it.
392        next if $client->is_database_dead($hashdsn);
393
394        my $driver = $client->driver_for($hashdsn);
395        $driver->begin_work;
396        for my $j (@jobs) {
397            my $h = $client->insert_job_to_driver($j, $driver, $hashdsn);
398            if ($h) {
399                push @handles, $h;
400            } else {
401                $driver->rollback;
402                @handles = ();
403                next DATABASE;
404            }
405        }
406        last if eval { $driver->commit };
407        @handles = ();
408        next DATABASE;
409    }
410
411    return wantarray ? @handles : scalar @handles;
412}
413
414sub insert {
415    my TheSchwartz $client = shift;
416    my $job = shift;
417    if (ref($_[0]) eq "TheSchwartz::Job") {
418        croak "Can't insert multiple jobs with method 'insert'\n";
419    }
420    unless (ref($job) eq 'TheSchwartz::Job') {
421        $job = TheSchwartz::Job->new_from_array($job, $_[0]);
422    }
423
424    ## Try each of the databases that are registered with $client, in
425    ## random order. If we successfully create the job, exit the loop.
426    for my $hashdsn ($client->shuffled_databases) {
427        ## If the database is dead, skip it.
428        next if $client->is_database_dead($hashdsn);
429
430        my $driver = $client->driver_for($hashdsn);
431
432        ## Try to insert the job into this database. If we get a handle
433        ## back, return it.
434        my $handle = $client->insert_job_to_driver($job, $driver, $hashdsn);
435        return $handle if $handle;
436    }
437
438    ## If the job wasn't submitted successfully to any database, return.
439    return undef;
440}
441
442sub handle_from_string {
443    my TheSchwartz $client = shift;
444    my $handle = TheSchwartz::JobHandle->new_from_string(@_);
445    $handle->client($client);
446    return $handle;
447}
448
449sub can_do {
450    my TheSchwartz $client = shift;
451    my($class) = @_;
452    push @{ $client->{all_abilities} }, $class;
453    push @{ $client->{current_abilities} }, $class;
454}
455
456sub reset_abilities {
457    my TheSchwartz $client = shift;
458    $client->{all_abilities} = [];
459    $client->{current_abilities} = [];
460}
461
462sub restore_full_abilities {
463    my $client = shift;
464    $client->{current_abilities} = [ @{ $client->{all_abilities} } ];
465}
466
467sub temporarily_remove_ability {
468    my $client = shift;
469    my($class) = @_;
470    $client->{current_abilities} = [
471            grep { $_ ne $class } @{ $client->{current_abilities} }
472        ];
473    if (!@{ $client->{current_abilities} }) {
474        $client->restore_full_abilities;
475    }
476}
477
478sub work_on {
479    my TheSchwartz $client = shift;
480    my $hstr = shift;  # Handle string
481    my $job = $client->lookup_job($hstr) or
482        return 0;
483    return $client->work_once($job);
484}
485
486sub work {
487    my TheSchwartz $client = shift;
488    my($delay) = @_;
489    $delay ||= 5;
490    while (1) {
491        sleep $delay unless $client->work_once;
492    }
493}
494
495sub work_until_done {
496    my TheSchwartz $client = shift;
497    while (1) {
498        $client->work_once or last;
499    }
500}
501
502## Returns true if it did something, false if no jobs were found
503sub work_once {
504    my TheSchwartz $client = shift;
505    my $job = shift;  # optional specific job to work on
506
507    ## Look for a job with our current set of abilities. Note that the
508    ## list of current abilities may not be equal to the full set of
509    ## abilities, to allow for even distribution between jobs.
510    $job ||= $client->find_job_for_workers;
511
512    ## If we didn't find anything, restore our full abilities, and try
513    ## again.
514    if (!$job &&
515        @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
516        $client->restore_full_abilities;
517        $job = $client->find_job_for_workers;
518    }
519
520    my $class = $job ? $job->funcname : undef;
521    if ($job) {
522        my $priority = $job->priority ? ", priority " . $job->priority : "";
523        $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
524    } else {
525        $client->debug("TheSchwartz::work_once found no jobs");
526    }
527
528    ## If we still don't have anything, return.
529    return unless $job;
530
531    ## Now that we found a job for this particular funcname, remove it
532    ## from our list of current abilities. So the next time we look for a
533    ## we'll find a job for a different funcname. This prevents starvation of
534    ## high funcid values because of the way MySQL's indexes work.
535    $client->temporarily_remove_ability($class);
536
537    $class->work_safely($job);
538
539    ## We got a job, so return 1 so work_until_done (which calls this method)
540    ## knows to keep looking for jobs.
541    return 1;
542}
543
544sub funcid_to_name {
545    my TheSchwartz $client = shift;
546    my($driver, $hashdsn, $funcid) = @_;
547    my $cache = $client->_funcmap_cache($hashdsn);
548    return $cache->{funcid2name}{$funcid};
549}
550
551sub funcname_to_id {
552    my TheSchwartz $client = shift;
553    my($driver, $hashdsn, $funcname) = @_;
554    my $cache = $client->_funcmap_cache($hashdsn);
555    unless (exists $cache->{funcname2id}{$funcname}) {
556        my $map = TheSchwartz::FuncMap->create_or_find($driver, $funcname);
557        $cache->{funcname2id}{ $map->funcname } = $map->funcid;
558        $cache->{funcid2name}{ $map->funcid }   = $map->funcname;
559    }
560    return $cache->{funcname2id}{$funcname};
561}
562
563sub _funcmap_cache {
564    my TheSchwartz $client = shift;
565    my($hashdsn) = @_;
566    unless (exists $client->{funcmap_cache}{$hashdsn}) {
567        my $driver = $client->driver_for($hashdsn);
568        my @maps = $driver->search('TheSchwartz::FuncMap');
569        my $cache = { funcname2id => {}, funcid2name => {} };
570        for my $map (@maps) {
571            $cache->{funcname2id}{ $map->funcname } = $map->funcid;
572            $cache->{funcid2name}{ $map->funcid }   = $map->funcname;
573        }
574        $client->{funcmap_cache}{$hashdsn} = $cache;
575    }
576    return $client->{funcmap_cache}{$hashdsn};
577}
578
579# accessors
580
581sub verbose {
582    my TheSchwartz $client = shift;
583    return $client->{verbose};
584}
585
586sub set_verbose {
587    my TheSchwartz $client = shift;
588    my $logger = shift;   # or non-coderef to just print to stderr
589    if ($logger && ref $logger ne "CODE") {
590        $logger = sub {
591            my $msg = shift;
592            $msg =~ s/\s+$//;
593            print STDERR "$msg\n";
594        };
595    }
596    $client->{verbose} = $logger;
597}
598
599sub scoreboard {
600    my TheSchwartz $client = shift;
601
602    return $client->{scoreboard};
603}
604
605sub set_scoreboard {
606    my TheSchwartz $client = shift;
607    my ($dir) = @_;
608
609    return unless $dir;
610
611    # They want the scoreboard but don't care where it goes
612    if (($dir eq '1') or ($dir eq 'on')) {
613        # Find someplace in tmpfs to save this
614        foreach my $d (qw(/var/run /dev/shm)) {
615            $dir = $d;
616            last if -e $dir;
617        }
618    }
619
620    $dir .= '/theschwartz';
621    unless (-e $dir) {
622        mkdir($dir, 0755) or die "Can't create scoreboard directory '$dir': $!";
623    }
624
625    $client->{scoreboard} = $dir."/scoreboard.$$";
626}
627
628sub start_scoreboard {
629    my TheSchwartz $client = shift;
630
631    # Don't do anything if we're not configured to write to the scoreboard
632    my $scoreboard = $client->scoreboard;
633    return unless $scoreboard;
634
635    # Don't do anything of (for some reason) we don't have a current job
636    my $job = $client->current_job;
637    return unless $job;
638
639    my $class = $job->funcname;
640
641    open(SB, '>', $scoreboard)
642      or $job->debug("Could not write scoreboard '$scoreboard': $!");
643    print SB join("\n", ("pid=$$",
644                         'funcname='.($class||''),
645                         'started='.($job->grabbed_until-($class->grab_for||1)),
646                         'arg='._serialize_args($job->arg),
647                        )
648                 ), "\n";
649    close(SB);
650
651    return;
652}
653
654# Quick and dirty serializer.  Don't use Data::Dumper because we don't need to
655# recurse indefinitely and we want to truncate the output produced
656sub _serialize_args {
657    my ($args) = @_;
658
659    if (ref $args) {
660        if (ref $args eq 'HASH') {
661            return join ',',
662                   map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
663                   keys %$args;
664        } elsif (ref $args eq 'ARRAY') {
665            return join ',',
666                   map { substr($_||'', 0, 200) }
667                   @$args;
668        }
669    } else {
670        return $args;
671    }
672}
673
674sub end_scoreboard {
675    my TheSchwartz $client = shift;
676
677    # Don't do anything if we're not configured to write to the scoreboard
678    my $scoreboard = $client->scoreboard;
679    return unless $scoreboard;
680
681    my $job = $client->current_job;
682
683    open(SB, '>>', $scoreboard)
684      or $job->debug("Could not append scoreboard '$scoreboard': $!");
685    print SB "done=".time."\n";
686    close(SB);
687
688    return;
689}
690
691sub clean_scoreboard {
692    my TheSchwartz $client = shift;
693
694    # Don't do anything if we're not configured to write to the scoreboard
695    my $scoreboard = $client->scoreboard;
696    return unless $scoreboard;
697
698    unlink($scoreboard);
699}
700
701sub prioritize {
702    my TheSchwartz $client = shift;
703    return $client->{prioritize};
704}
705
706sub set_prioritize {
707    my TheSchwartz $client = shift;
708    $client->{prioritize} = shift;
709}
710
711# current job being worked.  so if something dies, work_safely knows which to mark as dead.
712sub current_job {
713    my TheSchwartz $client = shift;
714    $client->{current_job};
715}
716
717sub set_current_job {
718    my TheSchwartz $client = shift;
719    $client->{current_job} = shift;
720}
721
722DESTROY {
723    foreach my $arg (@_) {
724        # Call 'clean_scoreboard' on TheSchwartz objects
725        if (ref($arg) and $arg->isa('TheSchwartz')) {
726            $arg->clean_scoreboard;
727        }
728    }
729}
730
7311;
732
733__END__
734
735=head1 NAME
736
737TheSchwartz - reliable job queue
738
739=head1 SYNOPSIS
740
741    # MyApp.pm
742    package MyApp;
743
744    sub work_asynchronously {
745        my %args = @_;
746
747        my $client = TheSchwartz->new( databases => $DATABASE_INFO );
748        $client->insert('MyWorker', \%args);
749    }
750
751
752    # myworker.pl
753    package MyWorker;
754    use base qw( TheSchwartz::Worker );
755
756    sub work {
757        my $class = shift;
758        my TheSchwartz::Job $job = shift;
759
760        print "Workin' hard or hardly workin'? Hyuk!!\n";
761
762        $job->completed();
763    }
764
765    package main;
766
767    my $client = TheSchwartz->new( databases => $DATABASE_INFO );
768    $client->can_do('MyWorker');
769    $client->work();
770
771
772=head1 DESCRIPTION
773
774TheSchwartz is a reliable job queue system. Your application can put jobs into
775the system, and your worker processes can pull jobs from the queue atomically
776to perform. Failed jobs can be left in the queue to retry later.
777
778I<Abilities> specify what jobs a worker process can perform. Abilities are the
779names of C<TheSchwartz::Worker> subclasses, as in the synopsis: the C<MyWorker>
780class name is used to specify that the worker script can perform the job. When
781using the C<TheSchwartz> client's C<work> functions, the class-ability duality
782is used to automatically dispatch to the proper class to do the actual work.
783
784TheSchwartz clients will also prefer to do jobs for unused abilities before
785reusing a particular ability, to avoid exhausting the supply of one kind of job
786while jobs of other types stack up.
787
788Some jobs with high setup times can be performed more efficiently if a group of
789related jobs are performed together. TheSchwartz offers a facility to
790I<coalesce> jobs into groups, which a properly constructed worker can find and
791perform at once. For example, if your worker were delivering email, you might
792store the domain name from the recipient's address as the coalescing value. The
793worker that grabs that job could then batch deliver all the mail for that
794domain once it connects to that domain's mail server.
795
796=head1 USAGE
797
798=head2 C<TheSchwartz-E<gt>new( %args )>
799
800Optional members of C<%args> are:
801
802=over 4
803
804=item * C<databases>
805
806An arrayref of database information. TheSchwartz workers can use multiple
807databases, such that if any of them are unavailable, the worker will search for
808appropriate jobs in the other databases automatically.
809
810Each member of the C<databases> value should be a hashref containing:
811
812=over 4
813
814=item * C<dsn>
815
816The database DSN for this database.
817
818=item * C<user>
819
820The username to use when connecting to this database.
821
822=item * C<pass>
823
824The password to use when connecting to this database.
825
826=back
827
828=item * C<verbose>
829
830A value indicating whether to log debug messages. If C<verbose> is a coderef,
831it is called to log debug messages. If C<verbose> is not a coderef but is some
832other true value, debug messages will be sent to C<STDERR>. Otherwise, debug
833messages will not be logged.
834
835=item * C<prioritize>
836
837A value indicating whether to utilize the job 'priority' field when selecting
838jobs to be processed. If unspecified, jobs will always be executed in a
839randomized order.
840
841=item * C<driver_cache_expiration>
842
843Optional value to control how long database connections are cached for in seconds.
844By default, connections are not cached. To re-use the same database connection for
845five minutes, pass driver_cache_expiration => 300 to the constructor. Improves job
846throughput in cases where the work to process a job is small compared to the database
847connection set-up and tear-down time.
848
849=item * C<retry_seconds>
850
851The number of seconds after which to try reconnecting to apparently dead
852databases. If not given, TheSchwartz will retry connecting to databases after
85330 seconds.
854
855=back
856
857=head2 C<$client-E<gt>list_jobs( %args )>
858
859Returns a list of C<TheSchwartz::Job> objects matching the given arguments. The
860required members of C<%args> are:
861
862=over 4
863
864=item * C<funcname>
865
866the name of the function or a reference to an array of functions
867
868=item * C<run_after>
869
870the value you want to check <= against on the run_after column
871
872=item * C<grabbed_until>
873
874the value you want to check <= against on the grabbed_until column
875
876=item * C<coalesce_op>
877
878defaults to '=', set it to whatever you want to compare the coalesce field too
879if you want to search, you can use 'LIKE'
880
881=item * C<coalesce>
882
883coalesce value to search for, if you set op to 'LIKE' you can use '%' here,
884do remember that '%' searches anchored at the beginning of the string are
885much faster since it is can do a btree index lookup
886
887=item * C<want_handle>
888
889if you want all your jobs to be set up using a handle.  defaults to true.
890this option might be removed, as you should always have this on a Job object.
891
892=back
893
894It is important to remember that this function doesnt lock anything, it just
895returns as many jobs as there is up to amount of databases * FIND_JOB_BATCH_SIZE
896
897=head2 C<$client-E<gt>lookup_job( $handle_id )>
898
899Returns a C<TheSchwartz::Job> corresponding to the given handle ID.
900
901=head2 C<$client-E<gt>set_verbose( $verbose )>
902
903Sets the current logging function to C<$verbose> if it's a coderef. If not a
904coderef, enables debug logging to C<STDERR> if C<$verbose> is true; otherwise,
905disables logging.
906
907=head1 POSTING JOBS
908
909The methods of TheSchwartz clients used by applications posting jobs to the
910queue are:
911
912=head2 C<$client-E<gt>insert( $job )>
913
914Adds the given C<TheSchwartz::Job> to one of the client's job databases.
915
916=head2 C<$client-E<gt>insert( $funcname, $arg )>
917
918Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
919
920=head2 C<$client-E<gt>insert_jobs( @jobs )>
921
922Adds the given C<TheSchwartz::Job> objects to one of the client's job
923databases. All the given jobs are recorded in I<one> job database.
924
925=head1 WORKING
926
927The methods of TheSchwartz clients for use in worker processes are:
928
929=head2 C<$client-E<gt>can_do( $ability )>
930
931Adds C<$ability> to the list of abilities C<$client> is capable of performing.
932Subsequent calls to that client's C<work> methods will find jobs requiring the
933given ability.
934
935=head2 C<$client-E<gt>work_once()>
936
937Find and perform one job C<$client> can do.
938
939=head2 C<$client-E<gt>work_until_done()>
940
941Find and perform jobs C<$client> can do until no more such jobs are found in
942any of the client's job databases.
943
944=head2 C<$client-E<gt>work( [$delay] )>
945
946Find and perform any jobs C<$client> can do, forever. When no job is available,
947the working process will sleep for C<$delay> seconds (or 5, if not specified)
948before looking again.
949
950=head2 C<$client-E<gt>work_on($handle)>
951
952Given a job handle (a scalar string) I<$handle>, runs the job, then returns.
953
954=head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
955
956Returns a C<TheSchwartz::Job> for a random job that the client can do. If
957specified, the job returned matches one of the abilities in the arrayref
958C<$abilities>, rather than C<$client>'s abilities.
959
960=head2 C<$client-E<gt>find_job_with_coalescing_value( $ability, $coval )>
961
962Returns a C<TheSchwartz::Job> for a random job for a worker capable of
963C<$ability> and with a coalescing value of C<$coval>.
964
965=head2 C<$client-E<gt>find_job_with_coalescing_prefix( $ability, $coval )>
966
967Returns a C<TheSchwartz::Job> for a random job for a worker capable of
968C<$ability> and with a coalescing value beginning with C<$coval>.
969
970Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
971find matching jobs, with all the attendant performance implications for your
972job databases.
973
974=head2 C<$client-E<gt>get_server_time( $driver )>
975
976Given an open driver I<$driver> to a database, gets the current server time from the database.
977
978=head1 COPYRIGHT, LICENSE & WARRANTY
979
980This software is Copyright 2007, Six Apart Ltd, cpan@sixapart.com. All
981rights reserved.
982
983TheSchwartz is free software; you may redistribute it and/or modify it
984under the same terms as Perl itself.
985
986TheScwhartz comes with no warranty of any kind.
987
988=cut
989
Note: See TracBrowser for help on using the browser.