root/branches/release-38/lib/MT/TheSchwartz.pm @ 2315

Revision 2315, 5.1 kB (checked in by bchoate, 19 months ago)

Use DBI driver (non-caching) for TheSchwartz itself. Optimized daemon loop so it clears caches only when work was done.

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::TheSchwartz;
8
9use strict;
10use base qw( TheSchwartz );
11use MT::ObjectDriver::Driver::DBI;
12use List::Util qw( shuffle );
13
14my $instance;
15
16our $RANDOMIZE_JOBS = 0;
17
18sub instance {
19    $instance ||= MT::TheSchwartz->new();
20    return $instance;
21}
22
23sub debug {
24    my $class = shift;
25    $class->instance->SUPER::debug(@_);
26}
27
28sub insert {
29    my $class = shift;
30    $class->instance->SUPER::insert(@_);
31}
32
33sub new {
34    my $class = shift;
35    $class->mt_schwartz_init();
36    my (%param) = @_;
37    my $workers = delete $param{workers} if exists $param{workers};
38    $RANDOMIZE_JOBS = delete $param{randomize} if exists $param{randomize};
39
40    my $client = $class->SUPER::new(%param);
41
42    if ($client) {
43        $instance = $client;
44        unless ( $workers ) {
45            $workers = [];
46
47            my $all_workers ||= MT->registry("task_workers") || {};
48
49            foreach my $id (keys %$all_workers) {
50                my $w = $all_workers->{$id};
51                my $c = $w->{class} or next;
52                push @$workers, $c;
53            }
54        }
55
56        if (@$workers) {
57            # Can we do this?
58            foreach my $c ( @$workers ) {
59                if (eval('require ' . $c)) {
60                    # Yes, we can do this.
61                    $client->can_do( $c );
62                } else {
63                    # No, we can't. Here's why...
64                    print STDERR "Failed to load worker class '$c': $@\n";
65                }
66            }
67        }
68    }
69
70    return $client;
71}
72
73our $initialized;
74
75sub mt_schwartz_init {
76    return if $initialized;
77
78    # Update the datasource for these, since MT adds an addition 'schwartz_'
79    # prefix for them.
80    require TheSchwartz::FuncMap;
81    require TheSchwartz::Job;
82    require TheSchwartz::Error;
83    require TheSchwartz::ExitStatus;
84    TheSchwartz::FuncMap->properties->{datasource}    = 'ts_funcmap';
85    TheSchwartz::Job->properties->{datasource}        = 'ts_job';
86    TheSchwartz::Error->properties->{datasource}      = 'ts_error';
87    TheSchwartz::ExitStatus->properties->{datasource} = 'ts_exitstatus';
88    return $initialized = 1;
89}
90
91sub driver_for {
92    my MT::TheSchwartz $client = shift;
93    return MT::Object->dbi_driver;
94}
95
96sub shuffled_databases {
97    my TheSchwartz $client = shift;
98    return '1';
99}
100
101sub hash_databases {
102    return 1;
103}
104
105sub mark_database_as_dead {
106    return 1;
107}
108
109sub is_database_dead {
110    return 0;
111}
112
113# Replacement for TheSchwartz::get_server_time
114# to simply return value from dbd->sql_for_unixtime
115# if it is a plain number (the driver has no function,
116# it's just returning time())
117sub get_server_time {
118    my TheSchwartz $client = shift;
119    my($driver) = @_;
120    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
121    return $unixtime_sql if $unixtime_sql =~ m/^\d+$/;
122    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
123}
124
125sub work_periodically {
126    my TheSchwartz $client = shift;
127    my ($delay) = @_;
128    $delay ||= 5;
129    my $last_task_run = 0;
130    my $did_work = 0;
131    while (1) {
132        if ($client->work_once) {
133            $did_work = 1;
134        } else {
135            if ($did_work) {
136                my $driver = MT::Object->driver;
137                $driver->clear_cache
138                    if $driver->can('clear_cache');
139                MT->request->reset();
140                $did_work = 0;
141            }
142
143            if ($last_task_run + 60 * 5 < time) {
144                MT->run_tasks();
145                $last_task_run = time;
146            }
147        }
148        sleep $delay;
149    }
150}
151
152sub _grab_a_job {
153    my TheSchwartz $client = shift;
154    my $hashdsn = shift;
155    my $driver = $client->driver_for($hashdsn);
156
157    ## Got some jobs! Randomize them to avoid contention between workers.
158    my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_;
159
160  JOB:
161    while (my $job = shift @jobs) {
162        ## Convert the funcid to a funcname, based on this database's map.
163        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
164
165        ## Update the job's grabbed_until column so that
166        ## no one else takes it.
167        my $worker_class = $job->funcname;
168        my $old_grabbed_until = $job->grabbed_until;
169
170        my $server_time = $client->get_server_time($driver)
171            or die "expected a server time";
172
173        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
174
175        ## Update the job in the database, and end the transaction.
176        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
177            ## We lost the race to get this particular job--another worker must
178            ## have got it and already updated it. Move on to the next job.
179            $TheSchwartz::T_LOST_RACE->() if $TheSchwartz::T_LOST_RACE;
180            next JOB;
181        }
182
183        ## Now prepare the job, and return it.
184        my $handle = TheSchwartz::JobHandle->new({
185            dsn_hashed => $hashdsn,
186            jobid      => $job->jobid,
187        });
188        $handle->client($client);
189        $job->handle($handle);
190        return $job;
191    }
192
193    return undef;
194}
195
1961;
Note: See TracBrowser for help on using the browser.