root/branches/release-38/lib/MT/TheSchwartz.pm @ 2337

Revision 2337, 6.9 kB (checked in by bchoate, 19 months ago)

Cleanup for TheSchwartz logging. Added support for 'leak' switch to help trace object leaks.

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::TheSchwartz;
8
9use strict;
10use base qw( TheSchwartz );
11use MT::ObjectDriver::Driver::DBI;
12use List::Util qw( shuffle );
13
14my $instance;
15
16our $RANDOMIZE_JOBS = 0;
17our $OBJECT_REPORT = 0;
18
19sub instance {
20    $instance ||= MT::TheSchwartz->new();
21    return $instance;
22}
23
24sub debug {
25    my $class = shift;
26    $class->instance->SUPER::debug(@_);
27}
28
29sub insert {
30    my $class = shift;
31    $class->instance->SUPER::insert(@_);
32}
33
34sub default_logger {
35    my ($msg, $job) = @_;
36    # suppress TheSchwartz::Job's 'job completed'
37    return if $msg eq 'job completed';
38
39    $msg =~ s/\s+$//;
40    print STDERR "$msg\n";
41}
42
43sub new {
44    my $class = shift;
45    $class->mt_schwartz_init();
46    my (%param) = @_;
47    my $workers = delete $param{workers} if exists $param{workers};
48    $RANDOMIZE_JOBS = delete $param{randomize} if exists $param{randomize};
49
50    # Reports object usage inbetween jobs if Devel::Leak::Object is loaded
51    $OBJECT_REPORT = 1 if $Devel::Leak::Object::VERSION;
52
53    $param{verbose} = \&default_logger
54        if $param{verbose} && (ref $param{verbose} ne 'CODE');
55
56    my $client = $class->SUPER::new(%param);
57
58    if ($client) {
59        $instance = $client;
60        unless ( $workers ) {
61            $workers = [];
62
63            my $all_workers ||= MT->registry("task_workers") || {};
64
65            foreach my $id (keys %$all_workers) {
66                my $w = $all_workers->{$id};
67                my $c = $w->{class} or next;
68                push @$workers, $c;
69            }
70        }
71
72        if (@$workers) {
73            # Can we do this?
74            foreach my $c ( @$workers ) {
75                if (eval('require ' . $c)) {
76                    # Yes, we can do this.
77                    $client->can_do( $c );
78                } else {
79                    # No, we can't. Here's why...
80                    print STDERR "Failed to load worker class '$c': $@\n";
81                }
82            }
83        }
84    }
85
86    return $client;
87}
88
89our $initialized;
90
91sub mt_schwartz_init {
92    return if $initialized;
93
94    # Update the datasource for these, since MT adds an addition 'schwartz_'
95    # prefix for them.
96    require TheSchwartz::FuncMap;
97    require TheSchwartz::Job;
98    require TheSchwartz::Error;
99    require TheSchwartz::ExitStatus;
100    TheSchwartz::FuncMap->properties->{datasource}    = 'ts_funcmap';
101    TheSchwartz::Job->properties->{datasource}        = 'ts_job';
102    TheSchwartz::Error->properties->{datasource}      = 'ts_error';
103    TheSchwartz::ExitStatus->properties->{datasource} = 'ts_exitstatus';
104    return $initialized = 1;
105}
106
107sub driver_for {
108    my MT::TheSchwartz $client = shift;
109    return MT::Object->dbi_driver;
110}
111
112sub shuffled_databases {
113    my TheSchwartz $client = shift;
114    return '1';
115}
116
117sub hash_databases {
118    return 1;
119}
120
121sub mark_database_as_dead {
122    return 1;
123}
124
125sub is_database_dead {
126    return 0;
127}
128
129# Replacement for TheSchwartz::get_server_time
130# to simply return value from dbd->sql_for_unixtime
131# if it is a plain number (the driver has no function,
132# it's just returning time())
133sub get_server_time {
134    my TheSchwartz $client = shift;
135    my($driver) = @_;
136    my $unixtime_sql = $driver->dbd->sql_for_unixtime;
137    return $unixtime_sql if $unixtime_sql =~ m/^\d+$/;
138    return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
139}
140
141sub work_periodically {
142    my TheSchwartz $client = shift;
143    my ($delay) = @_;
144    $delay ||= 5;
145    my $last_task_run = 0;
146    my $did_work = 0;
147
148    # holds state of objects at start
149    my %obj_start;
150    if ($OBJECT_REPORT) {
151        %obj_start = %Devel::Leak::Object::OBJECT_COUNT;
152    }
153
154    while (1) {
155        my %obj_pre;
156        if ($OBJECT_REPORT) {
157            %obj_pre = %Devel::Leak::Object::OBJECT_COUNT;
158        }
159
160        if ($client->work_once) {
161            $did_work = 1;
162        }
163
164        if ($last_task_run + 60 * 5 < time) {
165            MT->run_tasks();
166            $did_work = 1;
167            $last_task_run = time;
168        }
169
170        if ($did_work) {
171            my $driver = MT::Object->driver;
172            $driver->clear_cache
173                if $driver->can('clear_cache');
174            MT->request->reset();
175            $did_work = 0;
176            if ($OBJECT_REPORT) {
177                leak_report(\%obj_start, \%obj_pre, \%Devel::Leak::Object::OBJECT_COUNT);
178            }
179        }
180
181        sleep $delay;
182    }
183}
184
185our %persistent;
186BEGIN {
187    %persistent = map { $_ => 1 } qw( MT::Task MT::Plugin MT::Component MT::ArchiveType MT::TaskMgr MT::WeblogPublisher MT::Serializer TheSchwartz::Job TheSchwartz::JobHandle );
188}
189sub leak_report {
190    my ($start, $pre, $post) = @_;
191    my $reported;
192    foreach my $class (sort keys %$post) {
193        # skip reporting classes that are persistent in nature
194        next if exists $persistent{$class};
195
196        my $post_count = $post->{$class};
197        next if ! $post_count;
198        my $pre_count = $pre->{$class} || 0;
199        my $start_count = $start->{$class} || 0;
200        next if $post_count == 1;  # ignores most singletons
201        if (($pre_count != $post_count) || ($post_count != $start_count)) {
202            print "Leak report (class, total, delta from last job(s), delta since process start):\n" unless $reported;
203            printf "%-40s %-10d %-10d %-10d\n", $class, $post_count, $post_count - $pre_count, $post_count - $start_count;
204            $reported = 1;
205        }
206    }
207}
208
209sub _grab_a_job {
210    my TheSchwartz $client = shift;
211    my $hashdsn = shift;
212    my $driver = $client->driver_for($hashdsn);
213
214    ## Got some jobs! Randomize them to avoid contention between workers.
215    my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_;
216
217  JOB:
218    while (my $job = shift @jobs) {
219        ## Convert the funcid to a funcname, based on this database's map.
220        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
221
222        ## Update the job's grabbed_until column so that
223        ## no one else takes it.
224        my $worker_class = $job->funcname;
225        my $old_grabbed_until = $job->grabbed_until;
226
227        my $server_time = $client->get_server_time($driver)
228            or die "expected a server time";
229
230        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
231
232        ## Update the job in the database, and end the transaction.
233        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
234            ## We lost the race to get this particular job--another worker must
235            ## have got it and already updated it. Move on to the next job.
236            $TheSchwartz::T_LOST_RACE->() if $TheSchwartz::T_LOST_RACE;
237            next JOB;
238        }
239
240        ## Now prepare the job, and return it.
241        my $handle = TheSchwartz::JobHandle->new({
242            dsn_hashed => $hashdsn,
243            jobid      => $job->jobid,
244        });
245        $handle->client($client);
246        $job->handle($handle);
247        return $job;
248    }
249
250    return undef;
251}
252
2531;
Note: See TracBrowser for help on using the browser.