root/branches/release-39/lib/MT/ObjectDriver/SQL.pm @ 2495

Revision 2495, 11.0 kB (checked in by bchoate, 18 months ago)

Fix using binds for 'not in ...' clauses. BugId:80003

  • Property svn:keywords set to Id Revision
Line 
1# Movable Type (r) Open Source (C) 2001-2008 Six Apart, Ltd.
2# This program is distributed under the terms of the
3# GNU General Public License, version 2.
4#
5# $Id$
6
7package MT::ObjectDriver::SQL;
8
9#--------------------------------------#
10# Dependencies
11
12use strict;
13use warnings;
14
15use base qw( Data::ObjectDriver::SQL );
16
17#--------------------------------------#
18# Class Accessors
19
20my @ACCESSORS = qw( transform range range_incl lob_columns date_columns not null not_null like distinct from_stmt binary );
21__PACKAGE__->mk_accessors(@ACCESSORS);
22
23#--------------------------------------#
24# Class Methods
25
26sub new {
27    my $class = shift;
28    my %param = @_;
29
30    my %data;
31    @data{@ACCESSORS} = delete @param{@ACCESSORS};
32
33    my $stmt = $class->SUPER::new(%param);
34
35    for my $field (@ACCESSORS) {
36        next if $field eq 'distinct';
37        next if $field eq 'from_stmt';
38        $stmt->$field(defined $data{$field} ? { %{ $data{$field} } } : {});
39    }
40    $stmt->distinct($data{distinct} || 0);
41    if(defined $data{from_stmt}) {
42        $stmt->from_stmt($data{from_stmt});
43    }
44
45    $stmt;
46}
47
48sub ts2db {
49    return unless $_[0];
50    if($_[0] =~ m{ \A \d{4} - }xms) {
51        return $_[0];
52    }
53    my $ret = sprintf '%04d-%02d-%02d %02d:%02d:%02d', unpack 'A4A2A2A2A2A2', $_[0];
54    return $ret;
55}
56
57sub distinct_stmt {
58    my $class = shift;
59    my ($stmt) = @_;
60    $stmt;
61}
62
63# This method will be used in Postgres and MSSQLServer
64sub _subselect_distinct {
65    my $class = shift;
66    my ($stmt) = @_;
67    ## If we're doing a SELECT DISTINCT, postgres would have us include
68    ## the order field, which means the DISTINCT isn't what we want--so
69    ## let's do a subselect.
70    my $subselect = $class->new;
71    $subselect->from_stmt($stmt);
72    $subselect->select([ @{ $stmt->select } ]);
73    #for my $col (@{ $subselect->select }) {
74    #    $col = $driver->dbd->fix_subselect_column($col); ## FIXME
75    #}
76    $subselect->select_map({ %{ $stmt->select_map } });
77    for my $col (keys %{ $subselect->select_map }) {
78        my $new_col = $col;
79        #$new_col = $driver->dbd->fix_subselect_column($new_col); ## FIXME
80        $subselect->select_map->{$new_col} = delete $subselect->select_map->{$col};
81    }
82    $subselect->bind      ([ @{ $stmt->bind } ]);
83    $subselect->distinct  (1);
84
85    $stmt->distinct(0);
86    $subselect;
87}
88
89
90#--------------------------------------#
91# Instance Methods
92
93sub as_sql {
94    my $stmt = shift;
95    my $sql = '';
96
97    my $old_sel;
98    if (@{ $stmt->select }) {
99        $old_sel = $stmt->select;
100
101        $sql = 'SELECT ';
102        if($stmt->distinct) {
103            $sql .= 'DISTINCT ';
104        }
105        $sql .= join(', ', @{ $stmt->select }) . "\n";
106        $stmt->select([]);
107    }
108
109    if ($stmt->from_stmt) {
110        $sql .= 'FROM ('
111            . $stmt->from_stmt->as_sql(@_)
112            . ") t\n";  # t is the subquery alias
113    } else {
114        $sql .= $stmt->SUPER::as_sql(@_);
115
116        ## Check if we generated an unbounded query for mt_session, since we're seeing those in production.
117        ## TODO: remove this. Or generalize it into query auditing.
118        ## my @from_tbls = @{ $stmt->from };
119        ## if (1 == scalar @from_tbls && $from_tbls[0] eq 'mt_session') {
120        ##     if (!$stmt->where || !@{ $stmt->where } || $sql !~ m{ where }xmsi) {
121        ##         MT->log({
122        ##             message => Carp::longmess("Generated unbounded query on mt_session [$sql]"),
123        ##             level => MT::Log::DEBUG()
124        ##         });
125        ##     }
126        ## }
127    }
128
129    $stmt->select($old_sel) if $old_sel;
130    return $sql;
131}
132
133sub _mk_term {
134    my $stmt = shift;
135    my ($col, $val) = @_;
136
137    $col =~ s/ \A [\w\.]+? \. //x;
138
139    ## Any last-minute property -> field name manipulation
140    if (my $m = $stmt->column_mutator) {
141        $col = $m->($col);
142    }
143
144    if (ref $val eq 'HASH') {
145        if (!exists $val->{op}) {
146            # hash-style value, containing hints on operation
147            if (exists $val->{like}) {
148                $val = { op => 'LIKE', value => $val->{like} };
149            }
150            if (exists $val->{not_like}) {
151                $val = { op => 'NOT LIKE', value => $val->{not_like} };
152            }
153            elsif (exists $val->{not_null}) {
154                $val = \'is not null';
155            }
156            elsif (exists $val->{not}) {
157                my $v = $val->{not};
158                if ('ARRAY' eq ref($v)) {
159                    if(my $transformed_column = $stmt->transform->{$col}) {
160                        $col = $transformed_column;
161                    }
162                    my $term = $col . ' NOT IN (' . join (',', ('?') x scalar @$v ) . ')';
163                    return ($term, $v, $col);
164                } elsif (ref $v) {
165                    die "Unsupported value in 'not' column";
166                } else {
167                    $val = { value => $v,
168                             op    => '!=' };
169                }
170            }
171            elsif (exists $val->{between}) {
172                my $low = @{$val->{between}}[0];
173                my $high = @{$val->{between}}[1];
174                if($stmt->date_columns->{$col}) {
175                    $low = ts2db($low);
176                    $high = ts2db($high);
177                }
178                $val = [ '-and', { op => '>=', value => $low },
179                    { op => '<=', value => $high } ];
180            }
181            elsif (exists $val->{'>='}) {
182                $val = { op => '>=', value => $val->{'>='} };
183            }
184            elsif (exists $val->{'>'}) {
185                $val = { op => '>', value => $val->{'>'} };
186            }
187            elsif (exists $val->{'<='}) {
188                $val = { op => '<=', value => $val->{'<='} };
189            }
190            elsif (exists $val->{'<'}) {
191                $val = { op => '<', value => $val->{'<'} };
192            }
193            elsif (exists $val->{'!='}) {
194                $val = { op => '!=', value => $val->{'!='} };
195            }
196        }
197
198        ## Translate dates from app to database format.
199        if(($stmt->date_columns->{$col}) && (ref($val) eq 'HASH')) {
200            my $v = $val->{value};
201            if (ref($v) eq 'ARRAY') {
202                $v->[$_] = ts2db($v->[$_]) for @$v;
203            }
204            else {
205                $val->{value} = ts2db($v);
206            }
207        }
208    }
209    else {
210        ## Rearrange the value into an inclusive range.
211        my $range_incl = $stmt->range_incl;
212        my $range      = $stmt->range;
213
214        ## We may recurse, so let us empty range inclusions in our scope.
215        local $range_incl->{$col} = $range_incl->{$col};
216        local $range->{$col}      = $range->{$col};
217        if ($range_incl->{$col} || $range->{$col}) {
218            my ($lt, $gt) = $range_incl->{$col} ? ('<=', '>=') : ('<', '>');
219            my @vals;
220
221            my ($first_val, $last_val) = @$val;
222            if ($stmt->date_columns->{$col}) {
223                $first_val = ts2db($first_val) if defined $first_val;
224                $last_val = ts2db($last_val) if defined $last_val;
225            }
226
227            ## Ignore first value if it's undef (right-bounded range, eg [undef, 20050101000000] )
228            if (defined $first_val) {
229                push @vals, { op => $gt, value => $first_val };
230            }
231            ## Ignore last value if it's defined (left-bounded range, eg [20050101000000] )
232            if (defined $last_val) {
233                push @vals, { op => $lt, value => $last_val  };
234            }
235            if (2 == scalar @vals) {
236                $val = [ '-and', @vals ];
237            }
238            else {
239                ($val) = @vals;
240            }
241
242            ## Because the new value is an arrayref, we're about to get
243            ## called recursively with each of those hashrefs inside it.
244            ## So ignore that we're using an inclusive range within this
245            ## call's scope.
246            undef ($range_incl->{$col} ? $range_incl->{$col} : $range->{$col});
247        }
248
249        ## Translate dates from app to database format.
250        if ($stmt->date_columns->{$col}) {
251            if (ref($val) eq 'HASH') {
252                my $v = $val->{value};
253                if (ref($v) eq 'ARRAY') {
254                    $v->[$_] = ts2db($v->[$_]) for @$v;
255                }
256                else {
257                    $val->{value} = ts2db($v);
258                }
259            } elsif (!ref($val)) {
260                $val = ts2db($val);
261            }
262        }
263
264        if ($stmt->not->{$col}) {
265            if ('ARRAY' eq ref($val)) {
266                if(my $transformed_column = $stmt->transform->{$col}) {
267                    $col = $transformed_column;
268                }
269                my $term = $col . ' NOT IN (' . join (',', ('?') x scalar @$val ) . ')';
270                return ($term, $val, $col);
271            }
272            elsif (ref $val) {
273                die "Unsupported value in 'not' column";
274            }
275            else {
276                $val = { value => $val,
277                         op    => '!=', };
278            }
279        }
280
281        if ($stmt->null->{$col}) {
282            $val = \'is null';
283        }
284
285        if ($stmt->not_null->{$col}) {
286            $val = \'is not null';
287        }
288
289        if ($stmt->like->{$col}) {
290            if (ref($val) eq 'HASH') {
291                $val->{op} = 'LIKE';
292            } elsif (!ref($val)) {
293                $val = { op    => 'LIKE',
294                         value => $val,   };
295            }
296        }
297    }
298
299    ## Transformation modifies the column name, so it should be last.
300    if(my $transformed_column = $stmt->transform->{$col}) {
301        $col = $transformed_column;
302    }
303
304    ## Prevent D::OD from re-mutating, since we've done it here
305    local $stmt->{column_mutator} = undef;
306
307    $stmt->SUPER::_mk_term($col, $val);
308}
309
310sub make_subselect {
311    my $stmt = shift;
312    my $class = ref $stmt;
313
314    my $subselect = $class->new();
315    for my $field (qw( bind distinct )) {
316        $subselect->$field($stmt->$field());
317    }
318
319    my @new_selects = map { s{ \A \w+\. }{}xms } @{ $stmt->select };
320    $subselect->select(\@new_selects);
321
322    my %new_select_map;
323    my $sel_map = $stmt->select_map;
324    for my $select_field (keys %$sel_map) {
325        my $new_select_field = $select_field;
326        $new_select_field =~ s{ \A \w+\. }{}xms;
327        $new_select_map{$new_select_field} = $sel_map->{$select_field};
328    }
329
330    $subselect->from_stmt($stmt);
331    return $subselect;
332}
333
334sub field_decorator {
335    my $stmt = shift;
336    my ($class) = @_;
337    return sub {
338        my($term) = @_;
339        my $field_prefix = $class->datasource;
340        for my $col (@{ $class->column_names }) {
341            $term =~ s/\b$col\b/${field_prefix}_$col/g;
342        }
343        return $term;
344    };
345}
346
347sub as_limit {
348    my $stmt = shift;
349    my $n = $stmt->limit;
350    # Support offset without limit
351    my $o = $stmt->offset || 0;
352    $n = 2147483647 if !$n && $o;
353    return '' unless $n;
354    die "Non-numerics in limit/offset clause ($n, $o)" if ($n =~ /\D/) || ($o =~ /\D/);
355    return sprintf "LIMIT %d%s\n", $n,
356           ($o ? " OFFSET " . int($o) : "");
357}
358
359sub add_freetext_where { 0 }
360
3611;
362__END__
363
364=head1 NAME
365
366MT::ObjectDriver::SQL
367
368=head1 METHODS
369
370TODO
371
372=head1 AUTHOR & COPYRIGHT
373
374Please see L<MT/AUTHOR & COPYRIGHT>.
375
376=cut
Note: See TracBrowser for help on using the browser.