root/branches/release-38/lib/MT/TheSchwartz.pm @ 2340

Revision 2340, 7.1 kB (checked in by bchoate, 19 months ago)

Issue reporting through debug method.

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::TheSchwartz;
8
9use strict;
10use base qw( TheSchwartz );
11use MT::ObjectDriver::Driver::DBI;
12use List::Util qw( shuffle );
13
14my $instance;
15
16our $RANDOMIZE_JOBS = 0;
17our $OBJECT_REPORT = 0;
18
19sub instance {
20    $instance ||= MT::TheSchwartz->new();
21    return $instance;
22}
23
24sub debug {
25    my $class = shift;
26    $class->instance->SUPER::debug(@_);
27}
28
29sub insert {
30    my $class = shift;
31    $class->instance->SUPER::insert(@_);
32}
33
34sub default_logger {
35    my ($msg, $job) = @_;
36    # suppress TheSchwartz::Job's 'job completed'
37    return if $msg eq 'job completed';
38
39    $msg =~ s/\s+$//;
40    print STDERR "$msg\n";
41}
42
43sub new {
44    my $class = shift;
45    $class->mt_schwartz_init();
46    my (%param) = @_;
47    my $workers = delete $param{workers} if exists $param{workers};
48    $RANDOMIZE_JOBS = delete $param{randomize} if exists $param{randomize};
49
50    # Reports object usage inbetween jobs if Devel::Leak::Object is loaded
51    $OBJECT_REPORT = 1 if $Devel::Leak::Object::VERSION;
52
53    $param{verbose} = \&default_logger
54        if $param{verbose} && (ref $param{verbose} ne 'CODE');
55
56    my $client = $class->SUPER::new(%param);
57
58    if ($client) {
59        $instance = $client;
60        unless ( $workers ) {
61            $workers = [];
62
63            my $all_workers ||= MT->registry("task_workers") || {};
64
65            foreach my $id (keys %$all_workers) {
66                my $w = $all_workers->{$id};
67                my $c = $w->{class} or next;
68                push @$workers, $c;
69            }
70        }
71
72        if (@$workers) {
73            # Can we do this?
74            foreach my $c ( @$workers ) {
75                if (eval('require ' . $c)) {
76                    # Yes, we can do this.
77                    $client->can_do( $c );
78                } else {
79                    # No, we can't. Here's why...
80                    print STDERR "Failed to load worker class '$c': $@\n";
81                }
82            }
83        }
84    }
85
86    return $client;
87}
88
89our $initialized;
90
91sub mt_schwartz_init {
92    return if $initialized;
93
94    # Update the datasource for these, since MT adds an addition 'schwartz_'
95    # prefix for them.
96    require TheSchwartz::FuncMap;
97    require TheSchwartz::Job;
98    require TheSchwartz::Error;
99    require TheSchwartz::ExitStatus;
100    TheSchwartz::FuncMap->properties->{datasource}    = 'ts_funcmap';
101    TheSchwartz::Job->properties->{datasource}        = 'ts_job';
102    TheSchwartz::Error->properties->{datasource}      = 'ts_error';
103    TheSchwartz::ExitStatus->properties->{datasource} = 'ts_exitstatus';
104    return $initialized = 1;
105}
106
107sub driver_for {
108    my MT::TheSchwartz $client = shift;
109    return MT::Object->dbi_driver;
110}
111
112sub shuffled_databases {
113    my TheSchwartz $client = shift;
114    return '1';
115}
116
117sub hash_databases {
118    return 1;
119}
120
121sub mark_database_as_dead {
122    return 1;
123}
124
125sub is_database_dead {
126    return 0;
127}
128
129# Replacement for TheSchwartz::get_server_time
130# to simply return value from dbd->sql_for_unixtime
131# if it is a plain number (the driver has no function,
132# it's just returning time())
133sub get_server_time {
134    my TheSchwartz $client = shift;
135    my($driver) = @_;
136    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
137    return $unixtime_sql if $unixtime_sql =~ m/^\d+$/;
138    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
139}
140
141sub work_periodically {
142    my TheSchwartz $client = shift;
143    my ($delay) = @_;
144    $delay ||= 5;
145    my $last_task_run = 0;
146    my $did_work = 0;
147
148    # holds state of objects at start
149    my %obj_start;
150    if ($OBJECT_REPORT) {
151        %obj_start = %Devel::Leak::Object::OBJECT_COUNT;
152    }
153
154    while (1) {
155        my %obj_pre;
156        if ($OBJECT_REPORT) {
157            %obj_pre = %Devel::Leak::Object::OBJECT_COUNT;
158        }
159
160        if ($client->work_once) {
161            $did_work = 1;
162        }
163
164        if ($last_task_run + 60 * 5 < time) {
165            MT->run_tasks();
166            $did_work = 1;
167            $last_task_run = time;
168        }
169
170        if ($did_work) {
171            my $driver = MT::Object->driver;
172            $driver->clear_cache
173                if $driver->can('clear_cache');
174            MT->request->reset();
175            $did_work = 0;
176            if ($OBJECT_REPORT) {
177                my $report = leak_report(\%obj_start, \%obj_pre, \%Devel::Leak::Object::OBJECT_COUNT);
178                $client->debug($report) if $report ne '';
179            }
180        }
181
182        sleep $delay;
183    }
184}
185
186our %persistent;
187BEGIN {
188    %persistent = map { $_ => 1 } qw( MT::Callback MT::Task MT::Plugin MT::Component MT::ArchiveType MT::TaskMgr MT::WeblogPublisher MT::Serializer TheSchwartz::Job TheSchwartz::JobHandle );
189}
190sub leak_report {
191    my ($start, $pre, $post) = @_;
192    my $reported;
193    my $report = '';
194    foreach my $class (sort keys %$post) {
195        # skip reporting classes that are persistent in nature
196        next if exists $persistent{$class};
197
198        my $post_count = $post->{$class};
199        next if ! $post_count;
200        my $pre_count = $pre->{$class} || 0;
201        my $start_count = $start->{$class} || 0;
202        next if $post_count == 1;  # ignores most singletons
203        if (($pre_count != $post_count) || ($post_count != $start_count)) {
204            $report .= "Leak report (class, total, delta from last job(s), delta since process start):\n" unless $reported;
205            $report .= sprintf("%-40s %-10d %-10d %-10d\n", $class, $post_count, $post_count - $pre_count, $post_count - $start_count);
206            $reported = 1;
207        }
208    }
209    return $report;
210}
211
212sub _grab_a_job {
213    my TheSchwartz $client = shift;
214    my $hashdsn = shift;
215    my $driver = $client->driver_for($hashdsn);
216
217    ## Got some jobs! Randomize them to avoid contention between workers.
218    my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_;
219
220  JOB:
221    while (my $job = shift @jobs) {
222        ## Convert the funcid to a funcname, based on this database's map.
223        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
224
225        ## Update the job's grabbed_until column so that
226        ## no one else takes it.
227        my $worker_class = $job->funcname;
228        my $old_grabbed_until = $job->grabbed_until;
229
230        my $server_time = $client->get_server_time($driver)
231            or die "expected a server time";
232
233        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
234
235        ## Update the job in the database, and end the transaction.
236        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
237            ## We lost the race to get this particular job--another worker must
238            ## have got it and already updated it. Move on to the next job.
239            $TheSchwartz::T_LOST_RACE->() if $TheSchwartz::T_LOST_RACE;
240            next JOB;
241        }
242
243        ## Now prepare the job, and return it.
244        my $handle = TheSchwartz::JobHandle->new({
245            dsn_hashed => $hashdsn,
246            jobid      => $job->jobid,
247        });
248        $handle->client($client);
249        $job->handle($handle);
250        return $job;
251    }
252
253    return undef;
254}
255
2561;
Note: See TracBrowser for help on using the browser.