root/branches/release-30/lib/MT/ObjectDriver/Driver/DBI.pm @ 1410

Revision 1410, 14.9 kB (checked in by bchoate, 21 months ago)

Changes to nextprev method to leverage database order by clause to select adjacent object. BugId:68994

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::ObjectDriver::Driver::DBI;
8
9use strict;
10use base qw( Data::ObjectDriver::Driver::DBI );
11
12sub init {
13    my $driver = shift;
14    my (%param) = @_;
15    $param{prefix} ||= 'mt_';
16    $driver->SUPER::init(%param);
17    my $opts = $driver->connect_options || {};
18    $opts->{RaiseError} = 0;
19    $driver->connect_options($opts);
20    $driver;
21}
22
23sub configure {
24    my $driver = shift;
25    $driver->dbd->configure($driver, @_);
26}
27
28sub table_exists {
29    my $driver = shift;
30    my ($class) = @_;
31    return $driver->dbd->ddl_class->table_exists($class);
32}
33
34# Be mindful of SQLite when you modify the method.
35# SQLite has its own count method in its DBD.
36sub count {
37    my $driver = shift;
38    my($class, $terms, $args) = @_;
39
40    my $join = $args->{join};
41    my $select = 'COUNT(*)';
42    if ($join && $join->[3]->{unique}) {
43        my $col;
44        if ($join->[3]{unique} =~ m/\D/) {
45            $col = $args->{join}[3]{unique};
46        } else {
47            $col = $class->properties->{primary_key};
48        }
49        my $dbcol = $driver->dbd->db_column_name($class->datasource, $col);
50        $select = "COUNT(DISTINCT $dbcol)";
51    }
52
53    return $driver->_select_aggregate(
54        select   => $select,
55        class    => $class,
56        terms    => $terms,
57        args     => $args,
58        override => {
59                     order  => '',
60                     limit  => undef,
61                     offset => undef,
62                    },
63    );
64}
65
66sub remove_all {
67    my $driver = shift;
68    my ($class) = @_;
69    return $driver->direct_remove($class);
70}
71
72sub count_group_by {
73    my $driver = shift;
74    my ($class, $terms, $args) = @_;
75
76    $driver->_do_group_by('COUNT(*)', @_);
77}
78
79sub sum_group_by {
80    my $driver = shift;
81    my ($class, $terms, $args) = @_;
82
83    my $sum_column = delete $args->{sum};
84    return unless $sum_column;
85    $sum_column = $driver->_decorate_column_name($class, $sum_column);
86    $args->{sort} = "sum_$sum_column" unless exists $args->{sort};
87    $args->{direction} = 'descend' unless exists $args->{direction};
88    $driver->_do_group_by("SUM($sum_column) AS sum_$sum_column", @_);
89}
90
91sub avg_group_by {
92    my $driver = shift;
93    my ($class, $terms, $args) = @_;
94
95    my $avg_column = delete $args->{avg};
96    return unless $avg_column;
97    $avg_column = $driver->_decorate_column_name($class, $avg_column);
98    $args->{sort} = "avg_$avg_column" unless exists $args->{sort};
99    $args->{direction} = 'descend' unless exists $args->{direction};
100    $driver->_do_group_by("AVG($avg_column) AS avg_$avg_column", @_);
101}
102
103sub _do_group_by {
104    my $driver = shift;
105    my ($agg_func, $class, $terms, $args) = @_;
106    my $props = $class->properties;
107    if ($props->{class_type}) {
108        my $class_col = $props->{class_column};
109        unless ($terms->{$class_col}) {
110            $terms->{$class_col} = $class->class_type;
111        }
112    }
113    if ($args->{no_class}) {
114        delete $terms->{$props->{class_column}};
115        delete $args->{no_class};
116    }
117    my $order = delete $args->{sort};
118    my $direction = delete $args->{direction};
119    my $limit = exists $args->{limit} ? delete $args->{limit} : undef;
120    my $offset = exists $args->{offset} ? delete $args->{offset} : undef;
121    my $stmt = $driver->prepare_statement($class, $terms, $args);
122
123    ## Ugly. Maybe we need a clear_select method in D::OD::SQL?
124    $stmt->select([]);
125    $stmt->select_map({});
126    $stmt->select_map_reverse({});
127
128    $stmt->add_select($agg_func);
129
130    ## This is the nastiest thing I've ever seen. The caller should really
131    ## just give the full column name, instead, rather than having to
132    ## loop over all of the columns to replace something like
133    ## EXTRACT(year FROM created_on) with EXTRACT(year FROM entry_created_on).
134    my $decorate = $stmt->field_decorator($class);
135
136    my @group = map { $decorate->($_) } @{ $args->{group} };
137    for my $term (@group) {
138        $stmt->add_select($term);
139    }
140    $stmt->group([ map { { column => $_ } } @group ]);
141
142    ## Ugly.
143    my $sql = $stmt->as_sql;
144    if ($order) {
145        $sql .= "\nORDER BY " . $decorate->($order);
146        if ($direction) {
147            $sql .= $direction eq 'descend' ? ' DESC' : ' ASC';
148        }
149    }
150
151    my $dbh = $driver->r_handle;
152    $driver->start_query($sql, $stmt->bind);
153    my $sth = $dbh->prepare_cached($sql) or die $sql;
154    $sth->execute(@{ $stmt->bind }) or die $sql;
155
156    my @bindvars;
157    for (@{ $args->{group} }) {
158        push @bindvars, \my($var);
159    }
160    $sth->bind_columns(undef, \my($count), @bindvars);
161
162    if ($offset) {
163        while ($offset--) {
164            unless ($sth->fetch) {
165                $driver->end_query($sth);
166                return;
167            }
168        }
169    }
170    my $i = 0;
171    return sub {
172        unless ($sth->fetch && defined $count && (!defined $limit || ($i < $limit))) {
173            $sth->finish;
174            $driver->end_query($sth);
175            return;
176        }
177        my @returnvals = map { $$_ } @bindvars;
178        $i++;
179        return($count, @returnvals);
180    }
181}
182
183sub _select_aggregate {
184    my $driver = shift;
185    my %param = @_;
186
187    my($class, $orig_terms, $orig_args) = @param{qw( class terms args )};
188    my $overrides = $param{override};
189    my $select = $param{select};
190
191    ## Handle legacy load-by-id syntax.
192    if($orig_terms && !ref $orig_terms) {
193        $orig_terms = { id => $orig_terms };
194    }
195
196    ## Convert $terms and $args like we would for a search.
197    my $terms;
198    if (ref($orig_terms) eq 'HASH') {
199        $terms = { %$orig_terms };
200    } elsif (ref($orig_terms) eq 'ARRAY') {
201        $terms = [ @$orig_terms ];
202    }
203    my $args  = $orig_args  ? { %$orig_args  } : undef;
204    $class->call_trigger('pre_search', $terms, $args);
205
206    my $stmt = $driver->prepare_statement($class, $terms, $args);
207    ## Remove any unnecessary clauses, because they will cause errors in
208    ## some drivers (and they're not necessary)
209    while(my ($clause, $value) = each %$overrides) {
210        $stmt->$clause($value);
211    }
212    $stmt->select([]);
213    my $sql = "SELECT $select\n" . $stmt->as_sql;
214    $driver->select_one($sql, $stmt->bind);
215}
216
217sub _decorate_column_names_in {
218    my $driver = shift;
219    my ($hash, $class) = @_;
220
221    my $dbd = $driver->dbd;
222    for my $col (keys %$hash) {
223        my $new_col = $dbd->db_column_name($class->datasource, $col);
224        $hash->{$new_col} = delete $hash->{$col};
225    }
226
227    return $hash;
228}
229
230sub _decorate_column_name {
231    my $driver = shift;
232    my ($class, $col) = @_;
233    return $driver->dbd->db_column_name($class->datasource, $col);
234}
235
236sub prepare_statement {
237    my $driver = shift;
238    my($class, $terms, $orig_args) = @_;
239    my $args = defined $orig_args ? { %$orig_args } : {};
240
241    my %stmt_args;
242
243    ## Statements don't know anything about table/column name decoration,
244    ## so for any set of column names we send the statement, we must pre-
245    ## decorate the column names.
246
247    for my $arg (qw( transform range range_incl not null not_null like binary count_distinct )) {
248        if(exists $args->{$arg}) {
249            my %stmt_data = %{ delete $args->{$arg} };
250            $driver->_decorate_column_names_in(\%stmt_data, $class);
251            $stmt_args{$arg} = \%stmt_data;
252        }
253    }
254
255    ## Tell the statement what's a date column.
256    if(my $date_columns = $class->columns_of_type('datetime')) {
257        my %date_columns_hash;
258        @date_columns_hash{@$date_columns} = (1) x scalar @$date_columns;
259        $driver->_decorate_column_names_in(\%date_columns_hash, $class);
260        $stmt_args{date_columns} = \%date_columns_hash;
261    }
262
263    ## Tell the statement what's a lob column.
264    if(my $lob_columns = $class->columns_of_type('text', 'blob')) {
265        my %lob_columns_hash;
266        @lob_columns_hash{@$lob_columns} = (1) x scalar @$lob_columns;
267        $driver->_decorate_column_names_in(\%lob_columns_hash, $class);
268        $stmt_args{lob_columns} = \%lob_columns_hash;
269    }
270
271    my $join = delete $args->{join};
272
273    ## Convert fetchonly args from legacy hashes to Data::ObjectDriver's
274    ## expected arrays.
275    ## TODO: handle this in MT::OD::SQL instead of converting a hash to an
276    ## array to a hash again?
277    if(exists $args->{fetchonly}) {
278        if ('HASH' eq ref $args->{fetchonly}) {
279            $args->{fetchonly} = [ keys %{ $args->{fetchonly} } ];
280        }
281    }
282
283    ## Make sure to include our ORDER BY field in the SELECT fields if
284    ## we're doing a SELECT DISTINCT (for postgres).
285    if($join && $join->[3]->{unique}) {
286        my $sort = $args->{sort};
287        if (my $fonly = $args->{fetchonly}) {
288            if (defined $sort) {
289                unless (grep { $_ eq $sort } @$fonly) {
290                    push @$fonly, $sort;
291                }
292            }
293            $args->{fetchonly} = $fonly;
294        }
295
296        my $j_sort = $join->[3]->{sort};
297        if (my $j_fonly = $join->[3]->{fetchonly}) {
298            if (defined $j_sort) {
299                unless (grep { $_ eq $j_sort } @$j_fonly) {
300                    push @$j_fonly, $j_sort;
301                }
302            }
303            $join->[3]->{fetchonly} = $j_fonly;
304        }
305    }
306
307    my $start_val = $args->{sort} ? delete $args->{start_val} : undef;
308
309    my $stmt = $driver->dbd->sql_class->new(%stmt_args);
310
311    ## START CORE D::OD::Driver::DBI prepare_statement
312    my $dbd = $driver->dbd;
313    my $tbl = $driver->table_for($class);
314
315    if ($tbl) {
316        my $cols = $class->column_names;
317        my %fetch = $args->{fetchonly} ?
318            (map { $_ => 1 } @{ $args->{fetchonly} }) : ();
319        my $skip = $stmt->select_map_reverse;
320        for my $col (@$cols) {
321            next if $skip->{$col};
322            if (keys %fetch) {
323                next unless $fetch{$col};
324            }
325            my $dbcol = $dbd->db_column_name($tbl, $col);
326            $stmt->add_select($dbcol => $col);
327        }
328
329        $stmt->from([ $tbl ]);
330
331        if (defined($terms)) {
332            $stmt->column_mutator(sub {
333                my ($col) = @_;
334                return $dbd->db_column_name($tbl, $col);
335            });
336            if (ref $terms eq 'ARRAY') {
337                $stmt->add_complex_where($terms);
338            }
339            else {
340                for my $col (keys %$terms) {
341                    $stmt->add_where(join('.', $tbl, $col), $terms->{$col});
342                }
343            }
344            $stmt->column_mutator(undef);
345        }
346
347        ## Set statement's ORDER clause if any.
348        if ($args->{sort} || $args->{direction}) {
349            my $order = $args->{sort} || 'id';
350            if (! ref($order)) {
351                my $dir = $args->{direction} &&
352                          $args->{direction} eq 'descend' ? 'DESC' : 'ASC';
353                $stmt->order({
354                    column => $dbd->db_column_name($tbl, $order),
355                    desc   => $dir,
356                });
357            } else {
358                my @order;
359                foreach my $ord (@$order) {
360                    push @order, {
361                        column => $dbd->db_column_name($tbl, $ord->{column}),
362                        desc => $ord->{desc},
363                    };
364                }
365                $stmt->order(\@order);
366            }
367        }
368    }
369    $stmt->limit($args->{limit}) if $args->{limit};
370    $stmt->offset($args->{offset}) if $args->{offset};
371
372    if (my $terms = $args->{having}) {
373        for my $col (keys %$terms) {
374            $stmt->add_having($col => $terms->{$col});
375        }
376    }
377    ## END
378
379    ## Keep the statement reference we're going to return with, in case
380    ## we have to subselect from it.
381    my $major_stmt = $stmt;
382
383    ## Implement `join` arg like MT::ObjectDriver, for compatibility.
384    if($join) {
385        my ($j_class, $j_col, $j_terms, $j_args) = @$join;
386        my $j_unique;
387        if($j_unique = delete $j_args->{unique}) {
388            $stmt->distinct(1);
389        }
390
391        ## Handle legacy load-by-ID in join.
392        if(defined $j_terms && !ref $j_terms) {
393            ## TODO: don't assume primary key
394            my $key = $j_class->properties->{primary_key};
395            $j_terms = { $key => $j_terms };
396        }
397
398        my $join_stmt = $driver->prepare_statement($j_class, $j_terms, $j_args);  # recursive
399
400        $j_args->{unique} = $j_unique if $j_unique;
401
402        for my $field (qw( from where bind )) {
403            push @{ $stmt->$field() }, @{ $join_stmt->$field() };
404        }
405        $stmt->from_stmt($join_stmt->from_stmt);
406        $stmt->limit($j_args->{limit}) if exists $j_args->{limit};
407        $stmt->offset($j_args->{offset}) if exists $j_args->{offset};
408
409        if($join_stmt->order) {
410            ## Preserve the sort order.
411            my @new_order;
412            for my $sql_stmt ($stmt, $join_stmt) {
413                if(my $order = $sql_stmt->order) {
414                    if('ARRAY' eq ref $order) {
415                        push @new_order, @$order;
416                    } else {
417                        push @new_order, $order;
418                    }
419                }
420            }
421            $stmt->order(\@new_order);
422
423            if ($stmt->distinct) {
424                $major_stmt = $driver->dbd->sql_class->distinct_stmt($stmt);
425            }
426        }
427
428        ## Join across the given column(s).
429        $j_col = [$j_col] unless ref $j_col;
430        my $tuple = $class->primary_key_tuple;
431        COLUMN: foreach my $i (0..$#$j_col) {
432            next unless defined $j_col->[$i];
433            my $t = $tuple->[$i];
434            my $c = $j_col->[$i];
435
436            my $where_col = $driver->_decorate_column_name($class, $t);
437            my $dec_j_col = $driver->_decorate_column_name($j_class, $c);
438            my $where_val = "= $dec_j_col";
439            $stmt->add_where($where_col, \$where_val);
440        }
441    }
442
443    if ($start_val) {
444        ## TODO: support complex primary keys
445        my $col = $args->{sort} || $class->primary_key;
446        if (ref $col eq 'ARRAY') {
447            if (ref $col->[0] eq 'HASH') {
448                # complex 'sort' array/hash structure
449                foreach (@$col) {
450                    $_->{column} = $driver->_decorate_column_name($class, $_->{column});
451                }
452            } else {
453                # primary key as array of column names
454                foreach (@$col) {
455                    $_ = $driver->_decorate_column_name($class, $_);
456                }
457            }
458        } else {
459            $col = $driver->_decorate_column_name($class, $col);
460        }
461        my $op = $args->{direction} eq 'descend' ? '<' : '>';
462        $stmt->add_where($col, { value => $start_val, op => $op });
463    }
464
465    ## Return with this reference, because we might have wrapped $stmt in
466    ## a subselect.
467    return $major_stmt;
468}
469
470sub sql {
471    my $driver = shift;
472    my ($sql) = @_;
473    my $dbh = $driver->rw_handle;
474    if (!ref $sql) {
475        $sql = [ $sql ];
476    }
477    foreach (@$sql) {
478        $dbh->do($_) or return $driver->error($dbh->errstr);
479    }
480    1;
481}   
482
4831;
484__END__
485
486=head1 NAME
487
488MT::ObjectDriver::Driver::DBI
489
490=head1 METHODS
491
492TODO
493
494=head1 AUTHOR & COPYRIGHT
495
496Please see L<MT/AUTHOR & COPYRIGHT>.
497
498=cut
Note: See TracBrowser for help on using the browser.