| 1 | # -*-perl-*- |
|---|
| 2 | |
|---|
| 3 | use strict; |
|---|
| 4 | use warnings; |
|---|
| 5 | |
|---|
| 6 | require 't/lib/db-common.pl'; |
|---|
| 7 | |
|---|
| 8 | use TheSchwartz; |
|---|
| 9 | use Test::More tests => 26*3; |
|---|
| 10 | |
|---|
| 11 | run_tests(26, sub { |
|---|
| 12 | my $client = test_client(dbs => ['ts1']); |
|---|
| 13 | |
|---|
| 14 | # insert a job |
|---|
| 15 | { |
|---|
| 16 | my $handle = $client->insert("Worker::Addition", { numbers => [1, 2] }); |
|---|
| 17 | isa_ok $handle, 'TheSchwartz::JobHandle', "inserted job"; |
|---|
| 18 | } |
|---|
| 19 | |
|---|
| 20 | # let's do some work. the tedious way, specifying which class should grab a job |
|---|
| 21 | { |
|---|
| 22 | my $job = Worker::Addition->grab_job($client); |
|---|
| 23 | isa_ok $job, 'TheSchwartz::Job'; |
|---|
| 24 | my $args = $job->arg; |
|---|
| 25 | is(ref $args, "HASH"); # thawed it for us |
|---|
| 26 | is_deeply($args, { numbers => [1, 2] }, "got our args back"); |
|---|
| 27 | |
|---|
| 28 | # insert a dummy job to test that next grab ignors it |
|---|
| 29 | ok($client->insert("dummy", [1,2,3])); |
|---|
| 30 | |
|---|
| 31 | # verify no more jobs can be grabbed of this type, even though |
|---|
| 32 | # we haven't done the first one |
|---|
| 33 | my $job2 = Worker::Addition->grab_job($client); |
|---|
| 34 | ok(!$job2, "no addition jobs to be grabbed"); |
|---|
| 35 | |
|---|
| 36 | my $rv = eval { Worker::Addition->work($job); }; |
|---|
| 37 | # .... |
|---|
| 38 | } |
|---|
| 39 | |
|---|
| 40 | # inserting and getting job w/ regular scalar arg |
|---|
| 41 | foreach my $scalar ("short_arg", |
|---|
| 42 | "long arg more than 11 bytes long", |
|---|
| 43 | "\x05scalar that begins with the 5 byte", |
|---|
| 44 | ) |
|---|
| 45 | { |
|---|
| 46 | my $handle = $client->insert("Worker::Addition", $scalar); |
|---|
| 47 | isa_ok $handle, 'TheSchwartz::JobHandle', "inserted job"; |
|---|
| 48 | |
|---|
| 49 | my $job = Worker::Addition->grab_job($client); |
|---|
| 50 | isa_ok $job, 'TheSchwartz::Job'; |
|---|
| 51 | my $args = $job->arg; |
|---|
| 52 | ok(!ref $args, "not a reference"); # not a reference |
|---|
| 53 | is($args, $scalar, "got correct scalar arg back"); |
|---|
| 54 | } |
|---|
| 55 | |
|---|
| 56 | # insert some more jobs |
|---|
| 57 | { |
|---|
| 58 | ok($client->insert("Worker::MergeInternalDict", { foo => 'bar' })); |
|---|
| 59 | ok($client->insert("Worker::MergeInternalDict", { bar => 'baz' })); |
|---|
| 60 | ok($client->insert("Worker::MergeInternalDict", { baz => 'foo' })); |
|---|
| 61 | } |
|---|
| 62 | |
|---|
| 63 | # work the easier way |
|---|
| 64 | { |
|---|
| 65 | Worker::MergeInternalDict->reset; |
|---|
| 66 | $client->can_do("Worker::MergeInternalDict"); # single arg form: say we can do this job name, which is also its package |
|---|
| 67 | $client->work_until_done; # blocks until all databases are empty |
|---|
| 68 | is_deeply(Worker::MergeInternalDict->dict, |
|---|
| 69 | { |
|---|
| 70 | foo => "bar", |
|---|
| 71 | bar => "baz", |
|---|
| 72 | baz => "foo", |
|---|
| 73 | }, "all jobs got completed"); |
|---|
| 74 | } |
|---|
| 75 | |
|---|
| 76 | # errors |
|---|
| 77 | { |
|---|
| 78 | $client->reset_abilities; # now it, as a worker, can't do anything |
|---|
| 79 | $client->can_do("Worker::Division"); # now it can only do one thing |
|---|
| 80 | |
|---|
| 81 | my $handle = $client->insert("Worker::Division", { n => 5, d => 0 }); |
|---|
| 82 | ok($handle); |
|---|
| 83 | |
|---|
| 84 | my $job = Worker::Division->grab_job($client); |
|---|
| 85 | isa_ok $job, 'TheSchwartz::Job'; |
|---|
| 86 | |
|---|
| 87 | # wrapper around 'work' implemented in the base class which runs work in |
|---|
| 88 | # eval and notes a failure (with backoff) if job died. |
|---|
| 89 | Worker::Division->work_safely($job); |
|---|
| 90 | |
|---|
| 91 | is($handle->failures, 1, "job has failed once"); |
|---|
| 92 | like(join('', $handle->failure_log), qr/Illegal division by zero/, "noted that we divided by zero"); |
|---|
| 93 | } |
|---|
| 94 | |
|---|
| 95 | teardown_dbs('ts1'); |
|---|
| 96 | }); |
|---|
| 97 | |
|---|
| 98 | ############################################################################ |
|---|
| 99 | package Worker::Addition; |
|---|
| 100 | use base 'TheSchwartz::Worker'; |
|---|
| 101 | |
|---|
| 102 | sub work { |
|---|
| 103 | my ($class, $job) = @_; |
|---|
| 104 | |
|---|
| 105 | # .... |
|---|
| 106 | } |
|---|
| 107 | |
|---|
| 108 | # tell framework to set 'grabbed_until' to time() + 60. because if |
|---|
| 109 | # we can't add some numbers in 30 seconds, our process probably |
|---|
| 110 | # failed and work should be reassigned. |
|---|
| 111 | sub grab_for { 30 } |
|---|
| 112 | |
|---|
| 113 | ############################################################################ |
|---|
| 114 | package Worker::MergeInternalDict; |
|---|
| 115 | use base 'TheSchwartz::Worker'; |
|---|
| 116 | my %internal_dict; |
|---|
| 117 | |
|---|
| 118 | sub reset { %internal_dict = (); } |
|---|
| 119 | |
|---|
| 120 | sub dict { \%internal_dict } |
|---|
| 121 | |
|---|
| 122 | sub work { |
|---|
| 123 | my ($class, $job) = @_; |
|---|
| 124 | my $args = $job->arg; |
|---|
| 125 | %internal_dict = (%internal_dict, %$args); |
|---|
| 126 | $job->completed; |
|---|
| 127 | } |
|---|
| 128 | |
|---|
| 129 | sub grab_for { 10 } |
|---|
| 130 | |
|---|
| 131 | ############################################################################ |
|---|
| 132 | package Worker::Division; |
|---|
| 133 | use base 'TheSchwartz::Worker'; |
|---|
| 134 | |
|---|
| 135 | sub work { |
|---|
| 136 | my ($class, $job) = @_; |
|---|
| 137 | my $args = $job->arg; |
|---|
| 138 | |
|---|
| 139 | my $ans = $args->{n} / $args->{d}; # throw it away, just here to die on d==0 |
|---|
| 140 | |
|---|
| 141 | $job->set_exit_status(1); |
|---|
| 142 | $job->completed; |
|---|
| 143 | } |
|---|
| 144 | |
|---|
| 145 | sub keep_exit_status_for { 20 } # keep exit status for 20 seconds after on_complete |
|---|
| 146 | |
|---|
| 147 | sub grab_for { 10 } |
|---|
| 148 | |
|---|
| 149 | sub max_retries { 1 } |
|---|
| 150 | |
|---|
| 151 | sub retry_delay { my $class = shift; my $fails = shift; return 2 ** $fails; } |
|---|
| 152 | |
|---|