root/trunk/t/insert-and-do.t

Revision 164, 4.5 kB (checked in by miyagawa, 10 months ago)

PostgreSQL patch from clkao

Line 
1# -*-perl-*-
2
3use strict;
4use warnings;
5
6require 't/lib/db-common.pl';
7
8use TheSchwartz;
9use Test::More tests => 26*3;
10
11run_tests(26, sub {
12    my $client = test_client(dbs => ['ts1']);
13
14    # insert a job
15    {
16        my $handle = $client->insert("Worker::Addition", { numbers => [1, 2] });
17        isa_ok $handle, 'TheSchwartz::JobHandle', "inserted job";
18    }
19
20    # let's do some work.  the tedious way, specifying which class should grab a job
21    {
22        my $job = Worker::Addition->grab_job($client);
23        isa_ok $job, 'TheSchwartz::Job';
24        my $args = $job->arg;
25        is(ref $args, "HASH");  # thawed it for us
26        is_deeply($args, { numbers => [1, 2] }, "got our args back");
27
28        # insert a dummy job to test that next grab ignors it
29        ok($client->insert("dummy", [1,2,3]));
30
31        # verify no more jobs can be grabbed of this type, even though
32        # we haven't done the first one
33        my $job2 = Worker::Addition->grab_job($client);
34        ok(!$job2, "no addition jobs to be grabbed");
35
36        my $rv = eval { Worker::Addition->work($job); };
37        # ....
38    }
39
40    # inserting and getting job w/ regular scalar arg
41    foreach my $scalar ("short_arg",
42                        "long arg more than 11 bytes long",
43                        "\x05scalar that begins with the 5 byte",
44                        )
45    {
46        my $handle = $client->insert("Worker::Addition", $scalar);
47        isa_ok $handle, 'TheSchwartz::JobHandle', "inserted job";
48
49        my $job = Worker::Addition->grab_job($client);
50        isa_ok $job, 'TheSchwartz::Job';
51        my $args = $job->arg;
52        ok(!ref $args, "not a reference");  # not a reference
53        is($args, $scalar, "got correct scalar arg back");
54    }
55
56    # insert some more jobs
57    {
58        ok($client->insert("Worker::MergeInternalDict", { foo => 'bar' }));
59        ok($client->insert("Worker::MergeInternalDict", { bar => 'baz' }));
60        ok($client->insert("Worker::MergeInternalDict", { baz => 'foo' }));
61    }
62
63    # work the easier way
64    {
65        Worker::MergeInternalDict->reset;
66        $client->can_do("Worker::MergeInternalDict");  # single arg form:  say we can do this job name, which is also its package
67        $client->work_until_done;                   # blocks until all databases are empty
68        is_deeply(Worker::MergeInternalDict->dict,
69                  {
70                      foo => "bar",
71                      bar => "baz",
72                      baz => "foo",
73                  }, "all jobs got completed");
74    }
75
76    # errors
77    {
78        $client->reset_abilities;           # now it, as a worker, can't do anything
79        $client->can_do("Worker::Division");   # now it can only do one thing
80
81        my $handle = $client->insert("Worker::Division", { n => 5, d => 0 });
82        ok($handle);
83
84        my $job = Worker::Division->grab_job($client);
85        isa_ok $job, 'TheSchwartz::Job';
86
87        # wrapper around 'work' implemented in the base class which runs work in
88        # eval and notes a failure (with backoff) if job died.
89        Worker::Division->work_safely($job);
90
91        is($handle->failures, 1, "job has failed once");
92        like(join('', $handle->failure_log), qr/Illegal division by zero/, "noted that we divided by zero");
93    }
94
95    teardown_dbs('ts1');
96});
97
98############################################################################
99package Worker::Addition;
100use base 'TheSchwartz::Worker';
101
102sub work {
103    my ($class, $job) = @_;
104
105    # ....
106}
107
108# tell framework to set 'grabbed_until' to time() + 60.  because if
109# we can't  add some numbers in 30 seconds, our process probably
110# failed and work should be reassigned.
111sub grab_for { 30 }
112
113############################################################################
114package Worker::MergeInternalDict;
115use base 'TheSchwartz::Worker';
116my %internal_dict;
117
118sub reset { %internal_dict = (); }
119
120sub dict { \%internal_dict }
121
122sub work {
123    my ($class, $job) = @_;
124    my $args = $job->arg;
125    %internal_dict = (%internal_dict, %$args);
126    $job->completed;
127}
128
129sub grab_for { 10 }
130
131############################################################################
132package Worker::Division;
133use base 'TheSchwartz::Worker';
134
135sub work {
136    my ($class, $job) = @_;
137    my $args = $job->arg;
138
139    my $ans = $args->{n} / $args->{d};  # throw it away, just here to die on d==0
140
141    $job->set_exit_status(1);
142    $job->completed;
143}
144
145sub keep_exit_status_for { 20 }  # keep exit status for 20 seconds after on_complete
146
147sub grab_for { 10 }
148
149sub max_retries { 1 }
150
151sub retry_delay { my $class = shift; my $fails = shift; return 2 ** $fails; }
152
Note: See TracBrowser for help on using the browser.