root/trunk/lib/MT/ObjectDriver/Driver/DBI.pm @ 3082

Revision 3082, 18.1 kB (checked in by bchoate, 14 months ago)

Merging fireball branch changes to-date to trunk: svn merge -r2974:3081 http://code.sixapart.com/svn/movabletype/branches/fireball .

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::ObjectDriver::Driver::DBI;
8
9use strict;
10use base qw( Data::ObjectDriver::Driver::DBI );
11
12sub init {
13    my $driver = shift;
14    my (%param) = @_;
15    $param{prefix} ||= 'mt_';
16    $driver->SUPER::init(%param);
17    my $opts = $driver->connect_options || {};
18    $opts->{RaiseError} = 0;
19    $driver->connect_options($opts);
20    $driver;
21}
22
23sub configure {
24    my $driver = shift;
25    $driver->dbd->configure($driver, @_);
26}
27
28sub table_exists {
29    my $driver = shift;
30    my ($class) = @_;
31    return $driver->dbd->ddl_class->table_exists($class);
32}
33
34# Be mindful of SQLite when you modify the method.
35# SQLite has its own count method in its DBD.
36sub count {
37    my $driver = shift;
38    my($class, $terms, $args) = @_;
39
40    my $join = $args->{join};
41    my $select = 'COUNT(*)';
42    if ($join && $join->[3]->{unique}) {
43        my $col;
44        if ($join->[3]{unique} =~ m/\D/) {
45            $col = $args->{join}[3]{unique};
46        } else {
47            $col = $class->properties->{primary_key};
48        }
49        my $dbcol = $driver->dbd->db_column_name($class->datasource, $col);
50        $select = "COUNT(DISTINCT $dbcol)";
51    }
52
53    return $driver->_select_aggregate(
54        select   => $select,
55        class    => $class,
56        terms    => $terms,
57        args     => $args,
58        override => {
59                     order  => '',
60                     limit  => undef,
61                     offset => undef,
62                    },
63    );
64}
65
66sub exist {
67    my $driver = shift;
68    my($class, $terms, $args) = @_;
69
70    return $driver->_select_aggregate(
71        select   => '1',
72        class    => $class,
73        terms    => $terms,
74        args     => $args,
75        override => {
76                     order  => '',
77                     limit  => 1,
78                     offset => undef,
79                    },
80    );
81}
82
83sub remove_all {
84    my $driver = shift;
85    my ($class) = @_;
86    return $driver->direct_remove($class);
87}
88
89sub count_group_by {
90    my $driver = shift;
91    my ($class, $terms, $args) = @_;
92
93    $driver->_do_group_by('COUNT(*)', @_);
94}
95
96sub sum_group_by {
97    my $driver = shift;
98    my ($class, $terms, $args) = @_;
99
100    my $sum_column = delete $args->{sum};
101    return unless $sum_column;
102    $sum_column = $driver->_decorate_column_name($class, $sum_column);
103    $args->{sort} = "sum_$sum_column" unless exists $args->{sort};
104    $args->{direction} = 'descend' unless exists $args->{direction};
105    $driver->_do_group_by("SUM($sum_column) AS sum_$sum_column", @_);
106}
107
108sub avg_group_by {
109    my $driver = shift;
110    my ($class, $terms, $args) = @_;
111
112    my $avg_column = delete $args->{avg};
113    return unless $avg_column;
114    $avg_column = $driver->_decorate_column_name($class, $avg_column);
115    $args->{sort} = "avg_$avg_column" unless exists $args->{sort};
116    $args->{direction} = 'descend' unless exists $args->{direction};
117    $driver->_do_group_by("AVG($avg_column) AS avg_$avg_column", @_);
118}
119
120sub max_group_by {
121    my $driver = shift;
122    my ($class, $terms, $args) = @_;
123
124    my $max_column = delete $args->{max};
125    return unless $max_column;
126    $max_column = $driver->_decorate_column_name($class, $max_column);
127    $args->{sort} = "max_$max_column" unless exists $args->{sort};
128    $args->{direction} = 'descend' unless exists $args->{direction};
129    $driver->_do_group_by("MAX($max_column) AS max_$max_column", @_);
130}
131
132sub _do_group_by {
133    my $driver = shift;
134    my ($agg_func, $class, $terms, $args) = @_;
135    my $props = $class->properties;
136    $terms ||= {}; $args ||= {}; # declare these for pre_search to work
137    $class->call_trigger('pre_search', $terms, $args);
138    my $order = delete $args->{sort} || '';
139    my $direction = delete $args->{direction};
140    if ( $order =~ /\sdesc|asc/i ) {
141        my @new_order;
142        while ($order =~ /(?:\s*([\w\s\(\)]+?)\s(desc|asc))/ig) {
143            push @new_order, { column => $1, desc => $2 };
144        }
145        $order = \@new_order if @new_order;
146    }
147    my $limit = exists $args->{limit} ? delete $args->{limit} : undef;
148    my $offset = exists $args->{offset} ? delete $args->{offset} : undef;
149    my $stmt = $driver->prepare_statement($class, $terms, $args);
150
151    ## Ugly. Maybe we need a clear_select method in D::OD::SQL?
152    $stmt->select([]);
153    $stmt->select_map({});
154    $stmt->select_map_reverse({});
155
156    $stmt->add_select($agg_func);
157
158    ## This is the nastiest thing I've ever seen. The caller should really
159    ## just give the full column name, instead, rather than having to
160    ## loop over all of the columns to replace something like
161    ## EXTRACT(year FROM created_on) with EXTRACT(year FROM entry_created_on).
162    my $decorate = $stmt->field_decorator($class);
163
164    my @group = map { $decorate->($_) } @{ $args->{group} };
165    for my $term (@group) {
166        $stmt->add_select($term);
167    }
168    $stmt->group([ map { { column => $_ } } @group ]);
169
170    ## Set statement's ORDER clause if any.
171    if ($order) {
172        if (! ref($order)) {
173            $stmt->order( [ { column => $decorate->($order),
174                desc => ($direction || '') eq 'descend' ? 'DESC' : 'ASC'
175            } ] );
176        } else {
177            my @order;
178            foreach my $ord (@$order) {
179                push @order, {
180                    column => $decorate->($ord->{column}),
181                    desc => $ord->{desc},
182                };
183            }
184            $stmt->order(\@order);
185        }
186    }
187
188    my $sql = $stmt->as_sql;
189
190    my $dbh = $driver->r_handle;
191    $driver->start_query($sql, $stmt->bind);
192    my $sth = $dbh->prepare_cached($sql);
193    $sth->execute(@{ $stmt->bind });
194
195    my @bindvars;
196    for (@{ $args->{group} }) {
197        push @bindvars, \my($var);
198    }
199    $sth->bind_columns(undef, \my($count), @bindvars);
200
201    if ($offset) {
202        while ($offset--) {
203            unless ($sth->fetch) {
204                $driver->end_query($sth);
205                return;
206            }
207        }
208    }
209    my $i = 0;
210    my $finish = sub {
211        return unless $sth;
212        $sth->finish;
213        $driver->end_query($sth);
214        undef $sth;
215    };
216    my $iter = sub {
217        unless ($sth->fetch && defined $count && (!defined $limit || ($i < $limit))) {
218            $sth->finish;
219            $driver->end_query($sth);
220            return;
221        }
222        my @returnvals = map { $$_ } @bindvars;
223        $i++;
224        $class->call_trigger('post_group_by', \$count, \@returnvals)
225            unless $args->{no_triggers};
226        return($count, @returnvals);
227    };
228    return Data::ObjectDriver::Iterator->new($iter, $finish);
229}
230
231sub _select_aggregate {
232    my $driver = shift;
233    my %param = @_;
234
235    my($class, $orig_terms, $orig_args) = @param{qw( class terms args )};
236    my $overrides = $param{override};
237    my $select = $param{select};
238
239    ## Handle legacy load-by-id syntax.
240    if($orig_terms && !ref $orig_terms) {
241        $orig_terms = { id => $orig_terms };
242    }
243
244    ## Convert $terms and $args like we would for a search.
245    my $terms = {};
246    if (ref($orig_terms) eq 'HASH') {
247        $terms = { %$orig_terms };
248    } elsif (ref($orig_terms) eq 'ARRAY') {
249        $terms = [ @$orig_terms ];
250    }
251    my $args  = $orig_args  ? { %$orig_args  } : {};
252    $class->call_trigger('pre_search', $terms, $args);
253
254    my $stmt = $driver->prepare_statement($class, $terms, $args);
255    ## Remove any unnecessary clauses, because they will cause errors in
256    ## some drivers (and they're not necessary)
257    while(my ($clause, $value) = each %$overrides) {
258        $stmt->$clause($value);
259    }
260    $stmt->select([]);
261    $stmt->select_map({});
262    $stmt->select_map_reverse({});
263    $stmt->add_select($select => $select);
264    my $sql = $stmt->as_sql;
265    my $value = $driver->select_one($sql, $stmt->bind);
266    $class->call_trigger('post_select_aggregate', \$value)
267        unless $orig_args->{no_triggers};
268    return $value;
269}
270
271sub _decorate_column_names_in {
272    my $driver = shift;
273    my ($hash, $class) = @_;
274
275    my $dbd = $driver->dbd;
276    for my $col (keys %$hash) {
277        my $new_col = $dbd->db_column_name($class->datasource, $col);
278        $hash->{$new_col} = delete $hash->{$col};
279    }
280
281    return $hash;
282}
283
284sub _decorate_column_name {
285    my $driver = shift;
286    my ($class, $col) = @_;
287    return $driver->dbd->db_column_name($class->datasource, $col);
288}
289
290sub prepare_statement {
291    my $driver = shift;
292    my($class, $terms, $orig_args) = @_;
293    my $args = defined $orig_args ? { %$orig_args } : {};
294
295    my %stmt_args;
296
297    ## Statements don't know anything about table/column name decoration,
298    ## so for any set of column names we send the statement, we must pre-
299    ## decorate the column names.
300
301    for my $arg (qw( transform range range_incl not null not_null like binary count_distinct )) {
302        if(exists $args->{$arg}) {
303            my %stmt_data = %{ delete $args->{$arg} };
304            $driver->_decorate_column_names_in(\%stmt_data, $class);
305            $stmt_args{$arg} = \%stmt_data;
306        }
307    }
308
309    ## Tell the statement what's a date column.
310    if(my $date_columns = $class->columns_of_type('datetime')) {
311        my %date_columns_hash;
312        @date_columns_hash{@$date_columns} = (1) x scalar @$date_columns;
313        $driver->_decorate_column_names_in(\%date_columns_hash, $class);
314        $stmt_args{date_columns} = \%date_columns_hash;
315    }
316
317    ## Tell the statement what's a lob column.
318    if(my $lob_columns = $class->columns_of_type('text', 'blob')) {
319        my %lob_columns_hash;
320        @lob_columns_hash{@$lob_columns} = (1) x scalar @$lob_columns;
321        $driver->_decorate_column_names_in(\%lob_columns_hash, $class);
322        $stmt_args{lob_columns} = \%lob_columns_hash;
323    }
324
325    my $join = delete $args->{join};
326
327    ## Convert fetchonly args from legacy hashes to Data::ObjectDriver's
328    ## expected arrays.
329    ## TODO: handle this in MT::OD::SQL instead of converting a hash to an
330    ## array to a hash again?
331    if(exists $args->{fetchonly}) {
332        if ('HASH' eq ref $args->{fetchonly}) {
333            $args->{fetchonly} = [ keys %{ $args->{fetchonly} } ];
334        }
335    }
336
337    ## Make sure to include our ORDER BY field in the SELECT fields if
338    ## we're doing a SELECT DISTINCT (for postgres).
339    if($join && $join->[3]->{unique}) {
340        my $sort = $args->{sort};
341        if (my $fonly = $args->{fetchonly}) {
342            if (defined $sort) {
343                unless (grep { $_ eq $sort } @$fonly) {
344                    push @$fonly, $sort;
345                }
346            }
347            $args->{fetchonly} = $fonly;
348        }
349
350        my $j_sort = $join->[3]->{sort};
351        if (my $j_fonly = $join->[3]->{fetchonly}) {
352            if (defined $j_sort) {
353                unless (grep { $_ eq $j_sort } @$j_fonly) {
354                    push @$j_fonly, $j_sort;
355                }
356            }
357            $join->[3]->{fetchonly} = $j_fonly;
358        }
359    }
360
361    my $start_val = $args->{sort} ? delete $args->{start_val} : undef;
362
363    my $stmt = $driver->dbd->sql_class->new(%stmt_args);
364
365    ## START CORE D::OD::Driver::DBI prepare_statement
366    my $dbd = $driver->dbd;
367    my $tbl = $driver->table_for($class);
368
369    if ($tbl) {
370        my $cols = $class->column_names;
371        my %fetch = $args->{fetchonly} ?
372            (map { $_ => 1 } @{ $args->{fetchonly} }) : ();
373        my $skip = $stmt->select_map_reverse;
374        for my $col (@$cols) {
375            next if $skip->{$col};
376            if (keys %fetch) {
377                next unless $fetch{$col};
378            }
379            my $dbcol = $dbd->db_column_name($tbl, $col);
380            $stmt->add_select($dbcol => $col);
381        }
382
383        if ( my $alias = $orig_args->{alias} ) {
384            $stmt->from([ "$tbl $alias" ]);
385        }
386        else {
387            $stmt->from([ $tbl ]);
388        }
389
390        if (defined($terms)) {
391            my $mutator = $stmt->column_mutator;
392            $stmt->column_mutator(sub {
393                my ($col) = @_;
394                my $db_col = $dbd->db_column_name($tbl, $col);
395                if ( my $alias = $orig_args->{alias} ) {
396                    $db_col = "$alias.$db_col";
397                }
398                if ( $mutator && 'CODE' eq ref($mutator) ) {
399                    $db_col = $mutator->($db_col);
400                }
401                return $db_col;
402            });
403            if (ref $terms eq 'ARRAY') {
404                $stmt->add_complex_where($terms);
405            }
406            else {
407                for my $col (keys %$terms) {
408                    $stmt->add_where(join('.', $tbl, $col), $terms->{$col});
409                }
410            }
411            $stmt->column_mutator(undef);
412        }
413
414        ## Set statement's ORDER clause if any.
415        if ($args->{sort} || $args->{direction}) {
416            my $order = $args->{sort} || 'id';
417            if (! ref($order)) {
418                my $dir = $args->{direction} &&
419                          $args->{direction} eq 'descend' ? 'DESC' : 'ASC';
420                $stmt->order({
421                    column => $dbd->db_column_name($tbl, $order),
422                    desc   => $dir,
423                });
424            } else {
425                my @order;
426                foreach my $ord (@$order) {
427                    push @order, {
428                        column => $dbd->db_column_name($tbl, $ord->{column}),
429                        desc => $ord->{desc},
430                    };
431                }
432                $stmt->order(\@order);
433            }
434        }
435
436        if ( my $ft_arg = delete $args->{'freetext'} ) {
437            my @columns = map { $dbd->db_column_name($tbl, $_) } @{ $ft_arg->{'columns'} };
438            $stmt->add_freetext_where( \@columns, $ft_arg->{'search_string'} );
439        }
440    }
441    $stmt->limit($args->{limit}) if $args->{limit};
442    $stmt->offset($args->{offset}) if $args->{offset};
443
444    if (my $terms = $args->{having}) {
445        for my $col (keys %$terms) {
446            $stmt->add_having($col => $terms->{$col});
447        }
448    }
449    ## END
450
451    ## Keep the statement reference we're going to return with, in case
452    ## we have to subselect from it.
453    my $major_stmt = $stmt;
454
455    ## Implement `join` arg like MT::ObjectDriver, for compatibility.
456    if($join) {
457        my ($j_class, $j_col, $j_terms, $j_args) = @$join;
458        my $j_unique;
459        if($j_unique = delete $j_args->{unique}) {
460            $stmt->distinct(1);
461        }
462
463        ## Handle legacy load-by-ID in join.
464        if(defined $j_terms && !ref $j_terms) {
465            ## TODO: don't assume primary key
466            my $key = $j_class->properties->{primary_key};
467            $j_terms = { $key => $j_terms };
468        }
469
470        my $join_stmt = $driver->prepare_statement($j_class, $j_terms, $j_args);  # recursive
471
472        $j_args->{unique} = $j_unique if $j_unique;
473
474        for my $field (qw( from where bind )) {
475            push @{ $stmt->$field() }, @{ $join_stmt->$field() };
476        }
477        $stmt->from_stmt($join_stmt->from_stmt);
478        $stmt->limit($j_args->{limit}) if exists $j_args->{limit};
479        $stmt->offset($j_args->{offset}) if exists $j_args->{offset};
480
481        if($join_stmt->order) {
482            ## Preserve the sort order.
483            my @new_order;
484            for my $sql_stmt ($stmt, $join_stmt) {
485                if(my $order = $sql_stmt->order) {
486                    if('ARRAY' eq ref $order) {
487                        push @new_order, @$order;
488                    } else {
489                        push @new_order, $order;
490                    }
491                }
492            }
493            $stmt->order(\@new_order);
494
495            if ($stmt->distinct) {
496                $major_stmt = $driver->dbd->sql_class->distinct_stmt($stmt);
497            }
498        }
499
500        ## Join across the given column(s).
501        $j_col = [$j_col] unless ref $j_col;
502        my $tuple = $class->primary_key_tuple;
503        COLUMN: foreach my $i (0..$#$j_col) {
504            next unless defined $j_col->[$i];
505            my $t = $tuple->[$i];
506            my $c = $j_col->[$i];
507
508            my $where_col = $driver->_decorate_column_name($class, $t);
509            my $dec_j_col = $driver->_decorate_column_name($j_class, $c);
510            my $where_val = "= $dec_j_col";
511            $stmt->add_where($where_col, \$where_val);
512        }
513    }
514
515    if ($start_val) {
516        ## TODO: support complex primary keys
517        my $col = $args->{sort} || $class->primary_key;
518        if (ref $col eq 'ARRAY') {
519            if (ref $col->[0] eq 'HASH') {
520                # complex 'sort' array/hash structure
521                foreach (@$col) {
522                    $_->{column} = $driver->_decorate_column_name($class, $_->{column});
523                }
524            } else {
525                # primary key as array of column names
526                foreach (@$col) {
527                    $_ = $driver->_decorate_column_name($class, $_);
528                }
529            }
530        } else {
531            $col = $driver->_decorate_column_name($class, $col);
532        }
533        my $op = $args->{direction} eq 'descend' ? '<' : '>';
534        $stmt->add_where($col, { value => $start_val, op => $op });
535    }
536
537    ## Return with this reference, because we might have wrapped $stmt in
538    ## a subselect.
539    return $major_stmt;
540}
541
542sub sql {
543    my $driver = shift;
544    my ($sql) = @_;
545    my $dbh = $driver->rw_handle;
546    if (!ref $sql) {
547        $sql = [ $sql ];
548    }
549    foreach (@$sql) {
550        $dbh->do($_) or return $driver->last_error;
551    }
552    1;
553}   
554
5551;
556__END__
557
558=head1 NAME
559
560MT::ObjectDriver::Driver::DBI
561
562=head1 METHODS
563
564TODO
565
566=head1 Callbacks
567
568MT::ObjectDriver::Driver::DBI fires the following callbacks,
569or "triggers" when it loads data from the database.
570
571=over 4
572
573=item * post_select_aggregate
574
575    callback($class, \$value)
576
577Callback issued prior to returning the value that is retrieved
578as the result of select_one method.
579
580=item * post_group_by
581
582    callback($class, \$value, \@returnvals)
583
584Callback issued prior to returning the number and additional return
585values that are retrieved as the result of grouping query.  The value
586in the $value parameter is what was calculated from the database.
587For example, in count_group_by method, $value holds the count for each
588group, while in sum_group_by method, $value holds the sum for each group.
589@returnvals parameter holds the additional data that wiil be retured.
590
591=head1 AUTHOR & COPYRIGHT
592
593Please see L<MT/AUTHOR & COPYRIGHT>.
594
595=cut
Note: See TracBrowser for help on using the browser.