root/branches/release-38/lib/MT/TheSchwartz.pm @ 2272

Revision 2272, 5.5 kB (checked in by bchoate, 19 months ago)

Added 'randomly' flag to control whether jobs are shuffled when selecting work to do. Default does not randomize jobs within batch selected.

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::TheSchwartz;
8
9use strict;
10use base qw( TheSchwartz );
11use MT::ObjectDriver::Driver::DBI;
12use List::Util qw( shuffle );
13
14my $instance;
15
16our $RANDOMIZE_JOBS = 0;
17
18sub instance {
19    $instance ||= MT::TheSchwartz->new();
20    return $instance;
21}
22
23sub debug {
24    my $class = shift;
25    $class->instance->SUPER::debug(@_);
26}
27
28sub insert {
29    my $class = shift;
30    $class->instance->SUPER::insert(@_);
31}
32
33sub new {
34    my $class = shift;
35    $class->mt_schwartz_init();
36    my (%param) = @_;
37    my $workers = delete $param{workers} if exists $param{workers};
38    $RANDOMIZE_JOBS = delete $param{randomize} if exists $param{randomize};
39
40    my $client = $class->SUPER::new(%param);
41
42    if ($client) {
43        $instance = $client;
44        unless ( $workers ) {
45            $workers = [];
46
47            my $all_workers ||= MT->registry("task_workers") || {};
48
49            foreach my $id (keys %$all_workers) {
50                my $w = $all_workers->{$id};
51                my $c = $w->{class} or next;
52                push @$workers, $c;
53            }
54        }
55
56        if (@$workers) {
57            # Can we do this?
58            foreach my $c ( @$workers ) {
59                if (eval('require ' . $c)) {
60                    # Yes, we can do this.
61                    $client->can_do( $c );
62                } else {
63                    # No, we can't. Here's why...
64                    print STDERR "Failed to load worker class '$c': $@\n";
65                }
66            }
67        }
68    }
69
70    return $client;
71}
72
73our $initialized;
74
75sub mt_schwartz_init {
76    return if $initialized;
77
78    # Update the datasource for these, since MT adds an addition 'schwartz_'
79    # prefix for them.
80    require TheSchwartz::FuncMap;
81    require TheSchwartz::Job;
82    require TheSchwartz::Error;
83    require TheSchwartz::ExitStatus;
84    TheSchwartz::FuncMap->properties->{datasource}    = 'ts_funcmap';
85    TheSchwartz::Job->properties->{datasource}        = 'ts_job';
86    TheSchwartz::Error->properties->{datasource}      = 'ts_error';
87    TheSchwartz::ExitStatus->properties->{datasource} = 'ts_exitstatus';
88
89    my $job_set_exit_status = \&TheSchwartz::Job::set_exit_status;
90    my $job_add_failure = \&TheSchwartz::Job::add_failure;
91
92    my $driver = MT::Object->driver;
93    no warnings 'redefine';
94    *TheSchwartz::Job::set_exit_status = sub {
95        $driver->Disabled(1);
96        my $res = $job_set_exit_status->(@_);
97        $driver->Disabled(0);
98        return $res;
99    };
100    *TheSchwartz::Job::add_failure = sub {
101        $driver->Disabled(1);
102        my $res = $job_add_failure->(@_);
103        $driver->Disabled(0);
104        return $res;
105    };
106
107    return $initialized = 1;
108}
109
110sub driver_for {
111    my MT::TheSchwartz $client = shift;
112    return MT::Object->driver;
113}
114
115sub shuffled_databases {
116    my TheSchwartz $client = shift;
117    return '1';
118}
119
120sub hash_databases {
121    return 1;
122}
123
124sub mark_database_as_dead {
125    return 1;
126}
127
128sub is_database_dead {
129    return 0;
130}
131
132# Replacement for TheSchwartz::get_server_time
133# to simply return value from dbd->sql_for_unixtime
134# if it is a plain number (the driver has no function,
135# it's just returning time())
136sub get_server_time {
137    my TheSchwartz $client = shift;
138    my($driver) = @_;
139    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
140    return $unixtime_sql if $unixtime_sql =~ m/^\d+$/;
141    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
142}
143
144sub work_periodically {
145    my TheSchwartz $client = shift;
146    my ($delay) = @_;
147    $delay ||= 5;
148    my $last_task_run = 0;
149    while (1) {
150        unless ($client->work_once) {
151            my $driver = $client->driver_for();
152            $driver->clear_cache
153                if $driver->can('clear_cache');
154            MT->request->reset();
155            sleep $delay;
156
157            if ($last_task_run + 60 * 5 < time) {
158                MT->run_tasks();
159                $last_task_run = time;
160            }
161        }
162    }
163}
164
165sub _grab_a_job {
166    my TheSchwartz $client = shift;
167    my $hashdsn = shift;
168    my $driver = $client->driver_for($hashdsn);
169
170    ## Got some jobs! Randomize them to avoid contention between workers.
171    my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_;
172
173  JOB:
174    while (my $job = shift @jobs) {
175        ## Convert the funcid to a funcname, based on this database's map.
176        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
177
178        ## Update the job's grabbed_until column so that
179        ## no one else takes it.
180        my $worker_class = $job->funcname;
181        my $old_grabbed_until = $job->grabbed_until;
182
183        my $server_time = $client->get_server_time($driver)
184            or die "expected a server time";
185
186        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
187
188        ## Update the job in the database, and end the transaction.
189        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
190            ## We lost the race to get this particular job--another worker must
191            ## have got it and already updated it. Move on to the next job.
192            $TheSchwartz::T_LOST_RACE->() if $TheSchwartz::T_LOST_RACE;
193            next JOB;
194        }
195
196        ## Now prepare the job, and return it.
197        my $handle = TheSchwartz::JobHandle->new({
198            dsn_hashed => $hashdsn,
199            jobid      => $job->jobid,
200        });
201        $handle->client($client);
202        $job->handle($handle);
203        return $job;
204    }
205
206    return undef;
207}
208
2091;
Note: See TracBrowser for help on using the browser.