root/branches/release-38/lib/MT/TheSchwartz.pm @ 2336

Revision 2336, 5.5 kB (checked in by bchoate, 19 months ago)

Conditioned support for Devel::Leak::Object reporting.

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::TheSchwartz;
8
9use strict;
10use base qw( TheSchwartz );
11use MT::ObjectDriver::Driver::DBI;
12use List::Util qw( shuffle );
13
14my $instance;
15
16our $RANDOMIZE_JOBS = 0;
17our $OBJECT_REPORT = 0;
18
19sub instance {
20    $instance ||= MT::TheSchwartz->new();
21    return $instance;
22}
23
24sub debug {
25    my $class = shift;
26    $class->instance->SUPER::debug(@_);
27}
28
29sub insert {
30    my $class = shift;
31    $class->instance->SUPER::insert(@_);
32}
33
34sub new {
35    my $class = shift;
36    $class->mt_schwartz_init();
37    my (%param) = @_;
38    my $workers = delete $param{workers} if exists $param{workers};
39    $RANDOMIZE_JOBS = delete $param{randomize} if exists $param{randomize};
40
41    # Reports object usage inbetween jobs if Devel::Leak::Object is loaded
42    $OBJECT_REPORT = 1 if $Devel::Leak::Object::VERSION;
43
44    my $client = $class->SUPER::new(%param);
45
46    if ($client) {
47        $instance = $client;
48        unless ( $workers ) {
49            $workers = [];
50
51            my $all_workers ||= MT->registry("task_workers") || {};
52
53            foreach my $id (keys %$all_workers) {
54                my $w = $all_workers->{$id};
55                my $c = $w->{class} or next;
56                push @$workers, $c;
57            }
58        }
59
60        if (@$workers) {
61            # Can we do this?
62            foreach my $c ( @$workers ) {
63                if (eval('require ' . $c)) {
64                    # Yes, we can do this.
65                    $client->can_do( $c );
66                } else {
67                    # No, we can't. Here's why...
68                    print STDERR "Failed to load worker class '$c': $@\n";
69                }
70            }
71        }
72    }
73
74    return $client;
75}
76
77our $initialized;
78
79sub mt_schwartz_init {
80    return if $initialized;
81
82    # Update the datasource for these, since MT adds an addition 'schwartz_'
83    # prefix for them.
84    require TheSchwartz::FuncMap;
85    require TheSchwartz::Job;
86    require TheSchwartz::Error;
87    require TheSchwartz::ExitStatus;
88    TheSchwartz::FuncMap->properties->{datasource}    = 'ts_funcmap';
89    TheSchwartz::Job->properties->{datasource}        = 'ts_job';
90    TheSchwartz::Error->properties->{datasource}      = 'ts_error';
91    TheSchwartz::ExitStatus->properties->{datasource} = 'ts_exitstatus';
92    return $initialized = 1;
93}
94
95sub driver_for {
96    my MT::TheSchwartz $client = shift;
97    return MT::Object->dbi_driver;
98}
99
100sub shuffled_databases {
101    my TheSchwartz $client = shift;
102    return '1';
103}
104
105sub hash_databases {
106    return 1;
107}
108
109sub mark_database_as_dead {
110    return 1;
111}
112
113sub is_database_dead {
114    return 0;
115}
116
117# Replacement for TheSchwartz::get_server_time
118# to simply return value from dbd->sql_for_unixtime
119# if it is a plain number (the driver has no function,
120# it's just returning time())
121sub get_server_time {
122    my TheSchwartz $client = shift;
123    my($driver) = @_;
124    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
125    return $unixtime_sql if $unixtime_sql =~ m/^\d+$/;
126    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
127}
128
129sub work_periodically {
130    my TheSchwartz $client = shift;
131    my ($delay) = @_;
132    $delay ||= 5;
133    my $last_task_run = 0;
134    my $did_work = 0;
135
136    if ($OBJECT_REPORT) {
137        Devel::Leak::Object::status();
138        print "\n\n";
139    }
140
141    while (1) {
142        if ($client->work_once) {
143            $did_work = 1;
144        }
145
146        if ($last_task_run + 60 * 5 < time) {
147            MT->run_tasks();
148            $did_work = 1;
149            $last_task_run = time;
150        }
151
152        if ($did_work) {
153            my $driver = MT::Object->driver;
154            $driver->clear_cache
155                if $driver->can('clear_cache');
156            MT->request->reset();
157            $did_work = 0;
158            if ($OBJECT_REPORT) {
159                Devel::Leak::Object::status();
160                print "\n\n";
161            }
162        }
163
164        sleep $delay;
165    }
166}
167
168sub _grab_a_job {
169    my TheSchwartz $client = shift;
170    my $hashdsn = shift;
171    my $driver = $client->driver_for($hashdsn);
172
173    ## Got some jobs! Randomize them to avoid contention between workers.
174    my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_;
175
176  JOB:
177    while (my $job = shift @jobs) {
178        ## Convert the funcid to a funcname, based on this database's map.
179        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
180
181        ## Update the job's grabbed_until column so that
182        ## no one else takes it.
183        my $worker_class = $job->funcname;
184        my $old_grabbed_until = $job->grabbed_until;
185
186        my $server_time = $client->get_server_time($driver)
187            or die "expected a server time";
188
189        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
190
191        ## Update the job in the database, and end the transaction.
192        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
193            ## We lost the race to get this particular job--another worker must
194            ## have got it and already updated it. Move on to the next job.
195            $TheSchwartz::T_LOST_RACE->() if $TheSchwartz::T_LOST_RACE;
196            next JOB;
197        }
198
199        ## Now prepare the job, and return it.
200        my $handle = TheSchwartz::JobHandle->new({
201            dsn_hashed => $hashdsn,
202            jobid      => $job->jobid,
203        });
204        $handle->client($client);
205        $job->handle($handle);
206        return $job;
207    }
208
209    return undef;
210}
211
2121;
Note: See TracBrowser for help on using the browser.