root/branches/boomer/extlib/TheSchwartz.pm @ 1104

Revision 1104, 29.0 kB (checked in by hachi, 2 years ago)

Merging release-24 to boomer branch: svn merge -r67670:69246 http://svn.sixapart.com/repos/eng/movabletype/branches/release-24 .

  • Property svn:keywords set to Id Revision
Line 
1# $Id$
2
3package TheSchwartz;
4use strict;
5use fields qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard );
6
7our $VERSION = "1.06";
8
9use Carp qw( croak );
10use Data::ObjectDriver::Errors;
11use Data::ObjectDriver::Driver::DBI;
12use Digest::MD5 qw( md5_hex );
13use List::Util qw( shuffle );
14use TheSchwartz::FuncMap;
15use TheSchwartz::Job;
16use TheSchwartz::JobHandle;
17
18use constant RETRY_DEFAULT => 30;
19use constant OK_ERRORS => { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, };
20
21# test harness hooks
22our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
23our $T_LOST_RACE;
24
25## Number of jobs to fetch at a time in find_job_for_workers.
26our $FIND_JOB_BATCH_SIZE = 50;
27
28sub new {
29    my TheSchwartz $client = shift;
30    my %args = @_;
31    $client = fields::new($client) unless ref $client;
32
33    croak "databases must be an arrayref if specified"
34        unless !exists $args{databases} || ref $args{databases} eq 'ARRAY';
35    my $databases = delete $args{databases};
36
37    $client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
38    $client->set_verbose(delete $args{verbose});
39    $client->set_scoreboard(delete $args{scoreboard});
40    $client->{driver_cache_expiration} = delete $args{driver_cache_expiration} || 0;
41    croak "unknown options ", join(', ', keys %args) if keys %args;
42
43    $client->hash_databases($databases);
44    $client->reset_abilities;
45    $client->{dead_dsns} = {};
46    $client->{retry_at} = {};
47    $client->{funcmap_cache} = {};
48
49    return $client;
50}
51
52sub debug {
53    my TheSchwartz $client = shift;
54    return unless $client->{verbose};
55    $client->{verbose}->(@_);  # ($msg, $job)   but $job is optional
56}
57
58sub hash_databases {
59    my TheSchwartz $client = shift;
60    my($list) = @_;
61    for my $ref (@$list) {
62        my $full = join '|', map { $ref->{$_} || '' } qw( dsn user pass );
63        $client->{databases}{ md5_hex($full) } = $ref;
64    }
65}
66
67sub driver_for {
68    my TheSchwartz $client = shift;
69    my($hashdsn) = @_;
70    my $driver;
71    my $t = time;
72    my $cache_duration = $client->{driver_cache_expiration};
73    if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
74        $driver = $client->{cached_drivers}{$hashdsn}{driver};
75    } else {
76        my $db = $client->{databases}{$hashdsn};
77        $driver = Data::ObjectDriver::Driver::DBI->new(
78                dsn      => $db->{dsn},
79                username => $db->{user},
80                password => $db->{pass},
81                ($db->{prefix} ? (prefix   => $db->{prefix}) : ()),
82        );
83        if ($cache_duration) {
84            $client->{cached_drivers}{$hashdsn}{driver} = $driver;
85            $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
86        }
87    }
88    return $driver;
89}
90
91sub mark_database_as_dead {
92    my TheSchwartz $client = shift;
93    my($hashdsn) = @_;
94    $client->{dead_dsns}{$hashdsn} = 1;
95    $client->{retry_at}{$hashdsn} = time + $client->{retry_seconds};
96}
97
98sub is_database_dead {
99    my TheSchwartz $client = shift;
100    my($hashdsn) = @_;
101    ## If this database is marked as dead, check the retry time. If
102    ## it has passed, try the database again to see if it's undead.
103    if ($client->{dead_dsns}{$hashdsn}) {
104        if ($client->{retry_at}{$hashdsn} < time) {
105            delete $client->{dead_dsns}{$hashdsn};
106            delete $client->{retry_at}{$hashdsn};
107            return 0;
108        } else {
109            return 1;
110        }
111    }
112    return 0;
113}
114
115sub lookup_job {
116    my TheSchwartz $client = shift;
117    my $handle = $client->handle_from_string(@_);
118    my $driver = $client->driver_for($handle->dsn_hashed);
119
120    my $id = $handle->jobid;
121    my $job = $driver->lookup('TheSchwartz::Job' => $handle->jobid)
122        or return;
123
124    $job->handle($handle);
125    $job->funcname( $client->funcid_to_name($driver, $handle->dsn_hashed, $job->funcid) );
126    return $job;
127}
128
129sub list_jobs {
130    my TheSchwartz $client = shift;
131    my $arg = shift;
132    my @options;
133    push @options, run_after     => { op => '<=', value => $arg->{run_after} }     if exists $arg->{run_after};
134    push @options, grabbed_until => { op => '<=', value => $arg->{grabbed_until} } if exists $arg->{grabbed_until};
135    die "No funcname" unless exists $arg->{funcname};
136
137    $arg->{want_handle} = 1 unless defined $arg->{want_handle};
138    my $limit = $arg->{limit} || $FIND_JOB_BATCH_SIZE;
139
140    if ($arg->{coalesce}) {
141        $arg->{coalesce_op} ||= '=';
142        push @options, coalesce => { op => $arg->{coalesce_op}, value => $arg->{coalesce}};
143    }
144
145    my @jobs;
146    for my $hashdsn ($client->shuffled_databases) {
147        ## If the database is dead, skip it
148        next if $client->is_database_dead($hashdsn);
149        my $driver = $client->driver_for($hashdsn);
150        my $funcid;
151        if (ref($arg->{funcname})) {
152            $funcid = [map { $client->funcname_to_id($driver, $hashdsn, $_) } @{$arg->{funcname}}];
153        } else {
154            $funcid = $client->funcname_to_id($driver, $hashdsn, $arg->{funcname});
155        }
156
157        if ($arg->{want_handle}) {
158            push @jobs, map {
159                my $handle = TheSchwartz::JobHandle->new({
160                    dsn_hashed => $hashdsn,
161                    client     => $client,
162                    jobid      => $_->jobid
163                    });
164                $_->handle($handle);
165                $_;
166            } $driver->search('TheSchwartz::Job' => {
167                funcid        => $funcid,
168                @options
169                }, { limit => $limit });
170        } else {
171            push @jobs, $driver->search('TheSchwartz::Job' => {
172                funcid        => $funcid,
173                @options
174                }, { limit => $limit });
175        }
176    }
177    return @jobs;
178}
179
180sub find_job_with_coalescing_prefix {
181    my TheSchwartz $client = shift;
182    my ($funcname, $coval) = @_;
183    $coval .= "%";
184    return $client->_find_job_with_coalescing('LIKE', $funcname, $coval);
185}
186
187sub find_job_with_coalescing_value {
188    my TheSchwartz $client = shift;
189    return $client->_find_job_with_coalescing('=', @_);
190}
191
192sub _find_job_with_coalescing {
193    my TheSchwartz $client = shift;
194    my ($op, $funcname, $coval) = @_;
195
196    for my $hashdsn ($client->shuffled_databases) {
197        ## If the database is dead, skip it
198        next if $client->is_database_dead($hashdsn);
199
200        my $driver = $client->driver_for($hashdsn);
201        my $unixtime = $driver->dbd->sql_for_unixtime;
202
203        my @jobs;
204        eval {
205            ## Search for jobs in this database where:
206            ## 1. funcname is in the list of abilities this $client supports;
207            ## 2. the job is scheduled to be run (run_after is in the past);
208            ## 3. no one else is working on the job (grabbed_until is in
209            ##    in the past).
210            my $funcid = $client->funcname_to_id($driver, $hashdsn, $funcname);
211
212            @jobs = $driver->search('TheSchwartz::Job' => {
213                    funcid        => $funcid,
214                    run_after     => \ "<= $unixtime",
215                    grabbed_until => \ "<= $unixtime",
216                    coalesce      => { op => $op, value => $coval },
217                }, { limit => $FIND_JOB_BATCH_SIZE });
218        };
219        if ($@) {
220            unless (OK_ERRORS->{ $driver->last_error || 0 }) {
221                $client->mark_database_as_dead($hashdsn);
222            }
223        }
224
225        my $job = $client->_grab_a_job($hashdsn, @jobs);
226        return $job if $job;
227    }
228}
229
230sub find_job_for_workers {
231    my TheSchwartz $client = shift;
232    my($worker_classes) = @_;
233    $worker_classes ||= $client->{current_abilities};
234
235    for my $hashdsn ($client->shuffled_databases) {
236        ## If the database is dead, skip it.
237        next if $client->is_database_dead($hashdsn);
238
239        my $driver = $client->driver_for($hashdsn);
240        my $unixtime = $driver->dbd->sql_for_unixtime;
241
242        my @jobs;
243        eval {
244            ## Search for jobs in this database where:
245            ## 1. funcname is in the list of abilities this $client supports;
246            ## 2. the job is scheduled to be run (run_after is in the past);
247            ## 3. no one else is working on the job (grabbed_until is in
248            ##    in the past).
249            my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
250                      @$worker_classes;
251
252            @jobs = $driver->search('TheSchwartz::Job' => {
253                    funcid        => \@ids,
254                    run_after     => \ "<= $unixtime",
255                    grabbed_until => \ "<= $unixtime",
256                }, { limit => $FIND_JOB_BATCH_SIZE });
257        };
258        if ($@) {
259            unless (OK_ERRORS->{ $driver->last_error || 0 }) {
260                $client->mark_database_as_dead($hashdsn);
261            }
262        }
263
264        # for test harness race condition testing
265        $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
266
267        my $job = $client->_grab_a_job($hashdsn, @jobs);
268        return $job if $job;
269    }
270}
271
272sub get_server_time {
273    my TheSchwartz $client = shift;
274    my($driver) = @_;
275    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
276    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
277}
278
279sub _grab_a_job {
280    my TheSchwartz $client = shift;
281    my $hashdsn = shift;
282    my $driver = $client->driver_for($hashdsn);
283
284    ## Got some jobs! Randomize them to avoid contention between workers.
285    my @jobs = shuffle(@_);
286
287  JOB:
288    while (my $job = shift @jobs) {
289        ## Convert the funcid to a funcname, based on this database's map.
290        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
291
292        ## Update the job's grabbed_until column so that
293        ## no one else takes it.
294        my $worker_class = $job->funcname;
295        my $old_grabbed_until = $job->grabbed_until;
296
297        my $server_time = $client->get_server_time($driver)
298            or die "expected a server time";
299
300        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
301
302        ## Update the job in the database, and end the transaction.
303        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
304            ## We lost the race to get this particular job--another worker must
305            ## have got it and already updated it. Move on to the next job.
306            $T_LOST_RACE->() if $T_LOST_RACE;
307            next JOB;
308        }
309
310        ## Now prepare the job, and return it.
311        my $handle = TheSchwartz::JobHandle->new({
312            dsn_hashed => $hashdsn,
313            jobid      => $job->jobid,
314        });
315        $handle->client($client);
316        $job->handle($handle);
317        return $job;
318    }
319
320    return undef;
321}
322
323
324sub shuffled_databases {
325    my TheSchwartz $client = shift;
326    my @dsns = keys %{ $client->{databases} };
327    return shuffle(@dsns);
328}
329
330sub insert_job_to_driver {
331    my $client = shift;
332    my($job, $driver, $hashdsn) = @_;
333    eval {
334        ## Set the funcid of the job, based on the funcname. Since each
335        ## database has a separate cache, this needs to be calculated based
336        ## on the hashed DSN. Also: this might fail, if the database is dead.
337        $job->funcid( $client->funcname_to_id($driver, $hashdsn, $job->funcname) );
338
339        ## This is sub-optimal because of clock skew, but something is
340        ## better than a NULL value. And currently, nothing in TheSchwartz
341        ## code itself uses insert_time. TODO: use server time, but without
342        ## having to do a roundtrip just to get the server time.
343        $job->insert_time(time);
344
345        ## Now, insert the job. This also might fail.
346        $driver->insert($job);
347    };
348    if ($@) {
349        unless (OK_ERRORS->{ $driver->last_error || 0 }) {
350            $client->mark_database_as_dead($hashdsn);
351        }
352    } elsif ($job->jobid) {
353        ## We inserted the job successfully!
354        ## Attach a handle to the job, and return the handle.
355        my $handle = TheSchwartz::JobHandle->new({
356                dsn_hashed => $hashdsn,
357                client     => $client,
358                jobid      => $job->jobid
359            });
360        $job->handle($handle);
361        return $handle;
362    }
363    return undef;
364}
365
366sub insert_jobs {
367    my TheSchwartz $client = shift;
368    my (@jobs) = @_;
369
370    ## Try each of the databases that are registered with $client, in
371    ## random order. If we successfully create the job, exit the loop.
372    my @handles;
373  DATABASE:
374    for my $hashdsn ($client->shuffled_databases) {
375        ## If the database is dead, skip it.
376        next if $client->is_database_dead($hashdsn);
377
378        my $driver = $client->driver_for($hashdsn);
379        $driver->begin_work;
380        for my $j (@jobs) {
381            my $h = $client->insert_job_to_driver($j, $driver, $hashdsn);
382            if ($h) {
383                push @handles, $h;
384            } else {
385                $driver->rollback;
386                @handles = ();
387                next DATABASE;
388            }
389        }
390        last if eval { $driver->commit };
391        @handles = ();
392        next DATABASE;
393    }
394
395    return wantarray ? @handles : scalar @handles;
396}
397
398sub insert {
399    my TheSchwartz $client = shift;
400    my $job = shift;
401    if (ref($_[0]) eq "TheSchwartz::Job") {
402        croak "Can't insert multiple jobs with method 'insert'\n";
403    }
404    unless (ref($job) eq 'TheSchwartz::Job') {
405        $job = TheSchwartz::Job->new_from_array($job, $_[0]);
406    }
407
408    ## Try each of the databases that are registered with $client, in
409    ## random order. If we successfully create the job, exit the loop.
410    for my $hashdsn ($client->shuffled_databases) {
411        ## If the database is dead, skip it.
412        next if $client->is_database_dead($hashdsn);
413
414        my $driver = $client->driver_for($hashdsn);
415
416        ## Try to insert the job into this database. If we get a handle
417        ## back, return it.
418        my $handle = $client->insert_job_to_driver($job, $driver, $hashdsn);
419        return $handle if $handle;
420    }
421
422    ## If the job wasn't submitted successfully to any database, return.
423    return undef;
424}
425
426sub handle_from_string {
427    my TheSchwartz $client = shift;
428    my $handle = TheSchwartz::JobHandle->new_from_string(@_);
429    $handle->client($client);
430    return $handle;
431}
432
433sub can_do {
434    my TheSchwartz $client = shift;
435    my($class) = @_;
436    push @{ $client->{all_abilities} }, $class;
437    push @{ $client->{current_abilities} }, $class;
438}
439
440sub reset_abilities {
441    my TheSchwartz $client = shift;
442    $client->{all_abilities} = [];
443    $client->{current_abilities} = [];
444}
445
446sub restore_full_abilities {
447    my $client = shift;
448    $client->{current_abilities} = [ @{ $client->{all_abilities} } ];
449}
450
451sub temporarily_remove_ability {
452    my $client = shift;
453    my($class) = @_;
454    $client->{current_abilities} = [
455            grep { $_ ne $class } @{ $client->{current_abilities} }
456        ];
457    if (!@{ $client->{current_abilities} }) {
458        $client->restore_full_abilities;
459    }
460}
461
462sub work_on {
463    my TheSchwartz $client = shift;
464    my $hstr = shift;  # Handle string
465    my $job = $client->lookup_job($hstr) or
466        return 0;
467    return $client->work_once($job);
468}
469
470sub work {
471    my TheSchwartz $client = shift;
472    my($delay) = @_;
473    $delay ||= 5;
474    while (1) {
475        sleep $delay unless $client->work_once;
476    }
477}
478
479sub work_until_done {
480    my TheSchwartz $client = shift;
481    while (1) {
482        $client->work_once or last;
483    }
484}
485
486## Returns true if it did something, false if no jobs were found
487sub work_once {
488    my TheSchwartz $client = shift;
489    my $job = shift;  # optional specific job to work on
490
491    ## Look for a job with our current set of abilities. Note that the
492    ## list of current abilities may not be equal to the full set of
493    ## abilities, to allow for even distribution between jobs.
494    $job ||= $client->find_job_for_workers;
495
496    ## If we didn't find anything, restore our full abilities, and try
497    ## again.
498    if (!$job &&
499        @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
500        $client->restore_full_abilities;
501        $job = $client->find_job_for_workers;
502    }
503
504    my $class = $job ? $job->funcname : undef;
505    if ($job) {
506        $job->debug("TheSchwartz::work_once got job of class '$class'");
507    } else {
508        $client->debug("TheSchwartz::work_once found no jobs");
509    }
510
511    ## If we still don't have anything, return.
512    return unless $job;
513
514    ## Now that we found a job for this particular funcname, remove it
515    ## from our list of current abilities. So the next time we look for a
516    ## we'll find a job for a different funcname. This prevents starvation of
517    ## high funcid values because of the way MySQL's indexes work.
518    $client->temporarily_remove_ability($class);
519
520    $class->work_safely($job);
521
522    ## We got a job, so return 1 so work_until_done (which calls this method)
523    ## knows to keep looking for jobs.
524    return 1;
525}
526
527sub funcid_to_name {
528    my TheSchwartz $client = shift;
529    my($driver, $hashdsn, $funcid) = @_;
530    my $cache = $client->_funcmap_cache($hashdsn);
531    return $cache->{funcid2name}{$funcid};
532}
533
534sub funcname_to_id {
535    my TheSchwartz $client = shift;
536    my($driver, $hashdsn, $funcname) = @_;
537    my $cache = $client->_funcmap_cache($hashdsn);
538    unless (exists $cache->{funcname2id}{$funcname}) {
539        my $map = TheSchwartz::FuncMap->create_or_find($driver, $funcname);
540        $cache->{funcname2id}{ $map->funcname } = $map->funcid;
541        $cache->{funcid2name}{ $map->funcid }   = $map->funcname;
542    }
543    return $cache->{funcname2id}{$funcname};
544}
545
546sub _funcmap_cache {
547    my TheSchwartz $client = shift;
548    my($hashdsn) = @_;
549    unless (exists $client->{funcmap_cache}{$hashdsn}) {
550        my $driver = $client->driver_for($hashdsn);
551        my @maps = $driver->search('TheSchwartz::FuncMap');
552        my $cache = { funcname2id => {}, funcid2name => {} };
553        for my $map (@maps) {
554            $cache->{funcname2id}{ $map->funcname } = $map->funcid;
555            $cache->{funcid2name}{ $map->funcid }   = $map->funcname;
556        }
557        $client->{funcmap_cache}{$hashdsn} = $cache;
558    }
559    return $client->{funcmap_cache}{$hashdsn};
560}
561
562# accessors
563
564sub verbose {
565    my TheSchwartz $client = shift;
566    return $client->{verbose};
567}
568
569sub set_verbose {
570    my TheSchwartz $client = shift;
571    my $logger = shift;   # or non-coderef to just print to stderr
572    if ($logger && ref $logger ne "CODE") {
573        $logger = sub {
574            my $msg = shift;
575            $msg =~ s/\s+$//;
576            print STDERR "$msg\n";
577        };
578    }
579    $client->{verbose} = $logger;
580}
581
582sub scoreboard {
583    my TheSchwartz $client = shift;
584
585    return $client->{scoreboard};
586}
587
588sub set_scoreboard {
589    my TheSchwartz $client = shift;
590    my ($dir) = @_;
591
592    return unless $dir;
593
594    # They want the scoreboard but don't care where it goes
595    if (($dir eq '1') or ($dir eq 'on')) {
596        # Find someplace in tmpfs to save this
597        foreach my $d (qw(/var/run /dev/shm)) {
598            $dir = $d;
599            last if -e $dir;
600        }
601    }
602
603    $dir .= '/theschwartz';
604    unless (-e $dir) {
605        mkdir($dir, 0755) or die "Can't create scoreboard directory '$dir': $!";
606    }
607
608    $client->{scoreboard} = $dir."/scoreboard.$$";
609}
610
611sub start_scoreboard {
612    my TheSchwartz $client = shift;
613
614    # Don't do anything if we're not configured to write to the scoreboard
615    my $scoreboard = $client->scoreboard;
616    return unless $scoreboard;
617
618    # Don't do anything of (for some reason) we don't have a current job
619    my $job = $client->current_job;
620    return unless $job;
621
622    my $class = $job->funcname;
623
624    open(SB, '>', $scoreboard)
625      or $job->debug("Could not write scoreboard '$scoreboard': $!");
626    print SB join("\n", ("pid=$$",
627                         'funcname='.($class||''),
628                         'started='.($job->grabbed_until-($class->grab_for||1)),
629                         'arg='._serialize_args($job->arg),
630                        )
631                 ), "\n";
632    close(SB);
633
634    return;
635}
636
637# Quick and dirty serializer.  Don't use Data::Dumper because we don't need to
638# recurse indefinitely and we want to truncate the output produced
639sub _serialize_args {
640    my ($args) = @_;
641
642    if (ref $args) {
643        if (ref $args eq 'HASH') {
644            return join ',',
645                   map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
646                   keys %$args;
647        } elsif (ref $args eq 'ARRAY') {
648            return join ',',
649                   map { substr($_||'', 0, 200) }
650                   @$args;
651        }
652    } else {
653        return $args;
654    }
655}
656
657sub end_scoreboard {
658    my TheSchwartz $client = shift;
659
660    # Don't do anything if we're not configured to write to the scoreboard
661    my $scoreboard = $client->scoreboard;
662    return unless $scoreboard;
663
664    my $job = $client->current_job;
665
666    open(SB, '>>', $scoreboard)
667      or $job->debug("Could not append scoreboard '$scoreboard': $!");
668    print SB "done=".time."\n";
669    close(SB);
670
671    return;
672}
673
674sub clean_scoreboard {
675    my TheSchwartz $client = shift;
676
677    # Don't do anything if we're not configured to write to the scoreboard
678    my $scoreboard = $client->scoreboard;
679    return unless $scoreboard;
680
681    unlink($scoreboard);
682}
683
684# current job being worked.  so if something dies, work_safely knows which to mark as dead.
685sub current_job {
686    my TheSchwartz $client = shift;
687    $client->{current_job};
688}
689
690sub set_current_job {
691    my TheSchwartz $client = shift;
692    $client->{current_job} = shift;
693}
694
695DESTROY {
696    foreach my $arg (@_) {
697        # Call 'clean_scoreboard' on TheSchwartz objects
698        if (ref($arg) and $arg->isa('TheSchwartz')) {
699            $arg->clean_scoreboard;
700        }
701    }
702}
703
7041;
705
706__END__
707
708=head1 NAME
709
710TheSchwartz - reliable job queue
711
712=head1 SYNOPSIS
713
714    # MyApp.pm
715    package MyApp;
716
717    sub work_asynchronously {
718        my %args = @_;
719
720        my $client = TheSchwartz->new( databases => $DATABASE_INFO );
721        $client->insert('MyWorker', \%args);
722    }
723
724
725    # myworker.pl
726    package MyWorker;
727    use base qw( TheSchwartz::Worker );
728
729    sub work {
730        my $class = shift;
731        my TheSchwartz::Job $job = shift;
732
733        print "Workin' hard or hardly workin'? Hyuk!!\n";
734
735        $job->completed();
736    }
737
738    package main;
739
740    my $client = TheSchwartz->new( databases => $DATABASE_INFO );
741    $client->can_do('MyWorker');
742    $client->work();
743
744
745=head1 DESCRIPTION
746
747TheSchwartz is a reliable job queue system. Your application can put jobs into
748the system, and your worker processes can pull jobs from the queue atomically
749to perform. Failed jobs can be left in the queue to retry later.
750
751I<Abilities> specify what jobs a worker process can perform. Abilities are the
752names of C<TheSchwartz::Worker> subclasses, as in the synopsis: the C<MyWorker>
753class name is used to specify that the worker script can perform the job. When
754using the C<TheSchwartz> client's C<work> functions, the class-ability duality
755is used to automatically dispatch to the proper class to do the actual work.
756
757TheSchwartz clients will also prefer to do jobs for unused abilities before
758reusing a particular ability, to avoid exhausting the supply of one kind of job
759while jobs of other types stack up.
760
761Some jobs with high setup times can be performed more efficiently if a group of
762related jobs are performed together. TheSchwartz offers a facility to
763I<coalesce> jobs into groups, which a properly constructed worker can find and
764perform at once. For example, if your worker were delivering email, you might
765store the domain name from the recipient's address as the coalescing value. The
766worker that grabs that job could then batch deliver all the mail for that
767domain once it connects to that domain's mail server.
768
769=head1 USAGE
770
771=head2 C<TheSchwartz-E<gt>new( %args )>
772
773Optional members of C<%args> are:
774
775=over 4
776
777=item * C<databases>
778
779An arrayref of database information. TheSchwartz workers can use multiple
780databases, such that if any of them are unavailable, the worker will search for
781appropriate jobs in the other databases automatically.
782
783Each member of the C<databases> value should be a hashref containing:
784
785=over 4
786
787=item * C<dsn>
788
789The database DSN for this database.
790
791=item * C<user>
792
793The username to use when connecting to this database.
794
795=item * C<pass>
796
797The password to use when connecting to this database.
798
799=back
800
801=item * C<verbose>
802
803A value indicating whether to log debug messages. If C<verbose> is a coderef,
804it is called to log debug messages. If C<verbose> is not a coderef but is some
805other true value, debug messages will be sent to C<STDERR>. Otherwise, debug
806messages will not be logged.
807
808=item * C<driver_cache_expiration>
809
810Optional value to control how long database connections are cached for in seconds.
811By default, connections are not cached. To re-use the same database connection for
812five minutes, pass driver_cache_expiration => 300 to the constructor. Improves job
813throughput in cases where the work to process a job is small compared to the database
814connection set-up and tear-down time.
815
816=item * C<retry_seconds>
817
818The number of seconds after which to try reconnecting to apparently dead
819databases. If not given, TheSchwartz will retry connecting to databases after
82030 seconds.
821
822=back
823
824=head2 C<$client-E<gt>list_jobs( %args )>
825
826Returns a list of C<TheSchwartz::Job> objects matching the given arguments. The
827required members of C<%args> are:
828
829=over 4
830
831=item * C<funcname>
832
833the name of the function or a reference to an array of functions
834
835=item * C<run_after>
836
837the value you want to check <= against on the run_after column
838
839=item * C<grabbed_until>
840
841the value you want to check <= against on the grabbed_until column
842
843=item * C<coalesce_op>
844
845defaults to '=', set it to whatever you want to compare the coalesce field too
846if you want to search, you can use 'LIKE'
847
848=item * C<coalesce>
849
850coalesce value to search for, if you set op to 'LIKE' you can use '%' here,
851do remember that '%' searches anchored at the beginning of the string are
852much faster since it is can do a btree index lookup
853
854=item * C<want_handle>
855
856if you want all your jobs to be set up using a handle.  defaults to true.
857this option might be removed, as you should always have this on a Job object.
858
859=back
860
861It is important to remember that this function doesnt lock anything, it just
862returns as many jobs as there is up to amount of databases * FIND_JOB_BATCH_SIZE
863
864=head2 C<$client-E<gt>lookup_job( $handle_id )>
865
866Returns a C<TheSchwartz::Job> corresponding to the given handle ID.
867
868=head2 C<$client-E<gt>set_verbose( $verbose )>
869
870Sets the current logging function to C<$verbose> if it's a coderef. If not a
871coderef, enables debug logging to C<STDERR> if C<$verbose> is true; otherwise,
872disables logging.
873
874=head1 POSTING JOBS
875
876The methods of TheSchwartz clients used by applications posting jobs to the
877queue are:
878
879=head2 C<$client-E<gt>insert( $job )>
880
881Adds the given C<TheSchwartz::Job> to one of the client's job databases.
882
883=head2 C<$client-E<gt>insert( $funcname, $arg )>
884
885Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
886
887=head2 C<$client-E<gt>insert_jobs( @jobs )>
888
889Adds the given C<TheSchwartz::Job> objects to one of the client's job
890databases. All the given jobs are recorded in I<one> job database.
891
892=head1 WORKING
893
894The methods of TheSchwartz clients for use in worker processes are:
895
896=head2 C<$client-E<gt>can_do( $ability )>
897
898Adds C<$ability> to the list of abilities C<$client> is capable of performing.
899Subsequent calls to that client's C<work> methods will find jobs requiring the
900given ability.
901
902=head2 C<$client-E<gt>work_once()>
903
904Find and perform one job C<$client> can do.
905
906=head2 C<$client-E<gt>work_until_done()>
907
908Find and perform jobs C<$client> can do until no more such jobs are found in
909any of the client's job databases.
910
911=head2 C<$client-E<gt>work( [$delay] )>
912
913Find and perform any jobs C<$client> can do, forever. When no job is available,
914the working process will sleep for C<$delay> seconds (or 5, if not specified)
915before looking again.
916
917=head2 C<$client-E<gt>work_on($handle)>
918
919Given a job handle (a scalar string) I<$handle>, runs the job, then returns.
920
921=head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
922
923Returns a C<TheSchwartz::Job> for a random job that the client can do. If
924specified, the job returned matches one of the abilities in the arrayref
925C<$abilities>, rather than C<$client>'s abilities.
926
927=head2 C<$client-E<gt>find_job_with_coalescing_value( $ability, $coval )>
928
929Returns a C<TheSchwartz::Job> for a random job for a worker capable of
930C<$ability> and with a coalescing value of C<$coval>.
931
932=head2 C<$client-E<gt>find_job_with_coalescing_prefix( $ability, $coval )>
933
934Returns a C<TheSchwartz::Job> for a random job for a worker capable of
935C<$ability> and with a coalescing value beginning with C<$coval>.
936
937Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
938find matching jobs, with all the attendant performance implications for your
939job databases.
940
941=head2 C<$client-E<gt>get_server_time( $driver )>
942
943Given an open driver I<$driver> to a database, gets the current server time from the database.
944
945=head1 COPYRIGHT, LICENSE & WARRANTY
946
947This software is Copyright 2007, Six Apart Ltd, cpan@sixapart.com. All
948rights reserved.
949
950TheSchwartz is free software; you may redistribute it and/or modify it
951under the same terms as Perl itself.
952
953TheScwhartz comes with no warranty of any kind.
954
955=cut
956
Note: See TracBrowser for help on using the browser.