BaseAdaptor.pm 17.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor

=head1 DESCRIPTION

    The base class for all other Object- or NakedTable- adaptors.
    Performs the low-level SQL needed to retrieve and store data in tables.

=head1 EXTERNAL DEPENDENCIES

    DBI 1.6

=head1 LICENSE

18
19
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
    Copyright [2016] EMBL-European Bioinformatics Institute
20
21
22
23
24
25
26
27
28
29
30
31

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

32
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
33
34
35
36

=cut


37
38
39
40
package Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor;

use strict;
no strict 'refs';   # needed to allow AUTOLOAD create new methods
41
use DBI 1.6;        # the 1.6 functionality is important for detecting autoincrement fields and other magic.
42
43
44
45
46
47
48
49
50
51
52
53
54
55

use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');


sub default_table_name {
    die "Please define table_name either by setting it via table_name() method or by redefining default_table_name() in your adaptor class";
}


sub default_insertion_method {
    return 'INSERT_IGNORE';
}


56
57
58
59
sub default_overflow_limit {
    return {
        # 'overflow_column1_name' => column1_size,
        # 'overflow_column2_name' => column2_size,
60
61
62
63
64
65
66
67
68
        # ...
    };
}

sub default_input_column_mapping {
    return {
        # 'original_column1' => "original_column1*10 AS c1_times_ten",
        # 'original_column2' => "original_column2+1 AS c2_plus_one",
        # ...
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    };
}


sub overflow_limit {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_overflow_limit} = shift @_;
    }
    return $self->{_overflow_limit} || $self->default_overflow_limit();
}


83
84
85
86
87
88
89
90
91
92
sub input_column_mapping {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_input_column_mapping} = shift @_;
    }
    return $self->{_input_column_mapping} || $self->default_input_column_mapping();
}


93
94
95
96
97
sub table_name {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_table_name} = shift @_;
98
        $self->_table_info_loader();
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    }
    return $self->{_table_name} || $self->default_table_name();
}


sub insertion_method {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_insertion_method} = shift @_;
    }
    return $self->{_insertion_method} || $self->default_insertion_method();
}


sub column_set {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_column_set} = shift @_;
    } elsif( !defined( $self->{_column_set} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_column_set};
}


sub primary_key {        # not necessarily auto-incrementing
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_primary_key} = shift @_;
    } elsif( !defined( $self->{_primary_key} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_primary_key};
}


sub updatable_column_list {    # it's just a cashed view, you cannot set it directly
    my $self = shift @_;

    unless($self->{_updatable_column_list}) {
        my %primary_key_set = map { $_ => 1 } @{$self->primary_key()};
        my $column_set      = $self->column_set();
        $self->{_updatable_column_list} = [ grep { not $primary_key_set{$_} } keys %$column_set ];
    }
    return $self->{_updatable_column_list};
}


sub autoinc_id {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_autoinc_id} = shift @_;
    } elsif( !defined( $self->{_autoinc_id} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_autoinc_id};
}


sub _table_info_loader {
    my $self = shift @_;

    my $dbc         = $self->dbc();
166
    my $dbh         = $dbc->db_handle();
167
    my $driver      = $dbc->driver();
168
    my $dbname      = $dbc->dbname();
169
170
171
    my $table_name  = $self->table_name();

    my %column_set  = ();
172
    my %name2type   = ();
173
    my $autoinc_id  = '';
174
175
176
177
178
    my @primary_key = $dbh->primary_key(undef, undef, $table_name);

    my $sth = $dbh->column_info(undef, undef, $table_name, '%');
    $sth->execute();
    while (my $row = $sth->fetchrow_hashref()) {
179
        my ($position, $column_name, $column_type, $is_ai) = @$row{'ORDINAL_POSITION','COLUMN_NAME', 'TYPE_NAME', 'mysql_is_auto_increment'};
180

181
182
183
184
185
186
187
        $column_set{$column_name}  = 1;
        $name2type{$column_name}   = $column_type;

        if( $is_ai  # careful! This is only supported by DBD::mysql and will not work with other drivers
         or ($column_name eq $table_name.'_id')
         or ($table_name eq 'analysis_base' and $column_name eq 'analysis_id') ) {    # a special case (historical)
            $autoinc_id = $column_name;    # careful! This is only supported by DBD::mysql and will not work with other drivers
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
        }
    }
    $sth->finish;

    $self->column_set(  \%column_set );
    $self->primary_key( \@primary_key );
    $self->autoinc_id(   $autoinc_id );
}


sub count_all {
    my ($self, $constraint) = @_;

    my $table_name      = $self->table_name();

    my $sql = "SELECT COUNT(*) FROM $table_name";

205
206
207
    if($constraint) {
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
        $sql .= (($constraint=~/\bJOIN\b/i) ? ' ' : ' WHERE ') . $constraint;
208
209
    }

210
    # warn "SQL: $sql\n";
211
212
213

    my $sth = $self->prepare($sql);
    $sth->execute;  
214
    my ($count) = $sth->fetchrow_array();
215
216
217
218
219
220
221
    $sth->finish;  

    return $count;
}


sub fetch_all {
222
223
    my ($self, $constraint, $one_per_key, $key_list, $value_column) = @_;
    
224
225
    my $table_name              = $self->table_name();
    my $input_column_mapping    = $self->input_column_mapping();
226

227
    my $sql = 'SELECT ' . join(', ', map { $input_column_mapping->{$_} // "$table_name.$_" } keys %{$self->column_set()}) . " FROM $table_name";
228
229

    if($constraint) { 
230
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
231
        $sql .= (($constraint=~/\bJOIN\b/i or $constraint=~/^LIMIT|ORDER|GROUP/) ? ' ' : ' WHERE ') . $constraint;
232
233
    }

234
    # warn "SQL: $sql\n";
235
236
237
238

    my $sth = $self->prepare($sql);
    $sth->execute;  

239
240
241
    my @overflow_columns = keys %{ $self->overflow_limit() };
    my $overflow_adaptor = scalar(@overflow_columns) && $self->db->get_AnalysisDataAdaptor();

242
    my $result_struct;  # will be autovivified to the correct data structure
243
244

    while(my $hashref = $sth->fetchrow_hashref) {
245
246
247
248
249
250
251

        foreach my $overflow_key (@overflow_columns) {
            if($hashref->{$overflow_key} =~ /^_ext(?:\w+)_data_id (\d+)$/) {
                $hashref->{$overflow_key} = $overflow_adaptor->fetch_by_analysis_data_id_TO_data($1);
            }
        }

252
        my $pptr = \$result_struct;
253
254
255
256
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
257
258
259
260
261
262
        }
        my $object = $value_column
            ? $hashref->{$value_column}
            : $self->objectify($hashref);
        if($one_per_key) {
            $$pptr = $object;
263
        } else {
264
            push @$$pptr, $object;
265
266
267
268
        }
    }
    $sth->finish;  

269
    unless(defined($result_struct)) {
270
        if($key_list and scalar(@$key_list)) {
271
272
273
274
275
276
277
            $result_struct = {};
        } elsif(!$one_per_key) {
            $result_struct = [];
        }
    }

    return $result_struct;  # either listref or hashref is returned, depending on the call parameters
278
279
280
281
}


sub primary_key_constraint {
282
283
    my $self        = shift @_;
    my $sliceref    = shift @_;
284
285
286
287

    my $primary_key  = $self->primary_key();  # Attention: the order of primary_key columns of your call should match the order in the table definition!

    if(@$primary_key) {
288
        return join (' AND ', map { $primary_key->[$_]."='".$sliceref->[$_]."'" } (0..scalar(@$primary_key)-1));
289
290
291
292
293
294
295
296
297
298
    } else {
        my $table_name = $self->table_name();
        die "Table '$table_name' doesn't have a primary_key";
    }
}


sub fetch_by_dbID {
    my $self = shift @_;    # the rest in @_ should be primary_key column values

299
    return $self->fetch_all( $self->primary_key_constraint( \@_ ), 1 );
300
301
302
}


303
304
305
sub remove_all {    # remove entries by a constraint
    my $self        = shift @_;
    my $constraint  = shift @_ || 1;
306

307
    my $table_name  = $self->table_name();
308

309
    my $sql = "DELETE FROM $table_name WHERE $constraint";
310
311
312
313
314
315
    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish();
}


316
317
318
319
320
321
322
323
324
325
sub remove {    # remove the object by primary_key
    my $self        = shift @_;
    my $object      = shift @_;

    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );

    return $self->remove_all( $primary_key_constraint );
}


326
327
328
329
330
331
332
333
334
335
336
337
338
sub update {    # update (some or all) non_primary columns from the primary
    my $self    = shift @_;
    my $object  = shift @_;    # the rest in @_ should be the column names to be updated

    my $table_name              = $self->table_name();
    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );
    my $columns_to_update       = scalar(@_) ? \@_ : $self->updatable_column_list();
    my $values_to_update        = $self->slicer( $object, $columns_to_update );

    unless(@$columns_to_update) {
        die "There are no dependent columns to update, as everything seems to belong to the primary key";
    }

339
    my $sql = "UPDATE $table_name SET ".join(', ', map { "$_=?" } @$columns_to_update)." WHERE $primary_key_constraint";
340
    # warn "SQL: $sql\n";
341
    my $sth = $self->prepare($sql);
342
    # warn "VALUES_TO_UPDATE: ".join(', ', map { "'$_'" } @$values_to_update)."\n";
343
344
    $sth->execute( @$values_to_update);

345
346
347
348
349
350
351
352
353
354
355
    $sth->finish();
}


sub check_object_present_in_db {    # return autoinc_id/undef if the table has autoinc_id or just 1/undef if not
    my ( $self, $object ) = @_;

    my $table_name  = $self->table_name();
    my $column_set  = $self->column_set();
    my $autoinc_id  = $self->autoinc_id();

356
        # we look for identical contents, so must skip the autoinc_id columns when fetching:
357
358
359
    my $non_autoinc_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ];
    my $non_autoinc_values  = $self->slicer( $object, $non_autoinc_columns );

360
361
362
363
364
365
366
367
368
369
370
371
    my @constraints = ();
    my @values = ();
    foreach my $idx (0..scalar(@$non_autoinc_columns)-1) {
        my $column = $non_autoinc_columns->[$idx];
        my $value  = $non_autoinc_values->[$idx];
        if( defined($value) ) {
            push @constraints, "$column = ?";
            push @values, $value;
        } else {
            push @constraints, "$column IS NULL";
        }
    }
372

373
374
375
    my $sql = 'SELECT '.($autoinc_id or 1)." FROM $table_name WHERE ".  join(' AND ', @constraints);
    my $sth = $self->prepare( $sql );
    $sth->execute( @values );
376

377
    my ($return_value) = $sth->fetchrow_array();
378
379
380
381
382
383
384
385
386
387
388
389
390
391
    $sth->finish;

    return $return_value;
}


sub store {
    my ($self, $object_or_list, $check_presence_in_db_first) = @_;

    my $objects = (ref($object_or_list) eq 'ARRAY')     # ensure we get an array of objects to store
        ? $object_or_list
        : [ $object_or_list ];
    return unless(scalar(@$objects));

392
393
    my $table_name              = $self->table_name();
    my $autoinc_id              = $self->autoinc_id();
394
    my $all_storable_columns    = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ];
395
396
397
    my $driver                  = $self->dbc->driver();
    my $insertion_method        = $self->insertion_method;  # INSERT, INSERT_IGNORE or REPLACE
    $insertion_method           =~ s/_/ /g;
398
399
    if($driver eq 'sqlite') {
        $insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
400
401
    } elsif($driver eq 'pgsql') {   # FIXME! temporary hack
        $insertion_method = 'INSERT';
402
    }
403

404
    my %hashed_sth = ();  # do not prepare statements until there is a real need
405

406
407
    my $stored_this_time        = 0;

408
409
    foreach my $object (@$objects) {
        if($check_presence_in_db_first and my $present = $self->check_object_present_in_db($object)) {
410
            $self->mark_stored($object, $present);
411
        } else {
412
            my ($columns_being_stored, $column_key) = (ref($object) eq 'HASH') ? $self->keys_to_columns($object) : ($all_storable_columns, '*all*');
413
            # warn "COLUMN_KEY='$column_key'\n";
414
415

            my $this_sth;
416

417
418
419
420
                # only prepare (once!) if we get here:
            unless($this_sth = $hashed_sth{$column_key}) {
                    # By using question marks we can insert true NULLs by setting corresponding values to undefs:
                my $sql = "$insertion_method INTO $table_name (".join(', ', @$columns_being_stored).') VALUES ('.join(',', (('?') x scalar(@$columns_being_stored))).')';
421
                # warn "STORE: $sql\n";
422
423
424
                $this_sth = $hashed_sth{$column_key} = $self->prepare( $sql ) or die "Could not prepare statement: $sql";
            }

425
            # warn "STORED_COLUMNS: ".join(', ', map { "`$_`" } @$columns_being_stored)."\n";
426
            my $values_being_stored = $self->slicer( $object, $columns_being_stored );
427
            # warn "STORED_VALUES: ".join(', ', map { "'$_'" } @$values_being_stored)."\n";
428

429
            my $return_code = $this_sth->execute( @$values_being_stored )
430
                    # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
431
                or die "Could not store fields\n\t{$column_key}\nwith data:\n\t(".join(',', @$values_being_stored).')';
Leo Gordon's avatar
Leo Gordon committed
432
            if($return_code > 0) {     # <--- for the same reason we have to be explicitly numeric here
433
434
                my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id);
                $self->mark_stored($object, $liid );
435
                ++$stored_this_time;
436
437
438
439
            }
        }
    }

440
441
442
    foreach my $sth (values %hashed_sth) {
        $sth->finish();
    }
443

444
    return ($object_or_list, $stored_this_time);
445
446
447
448
449
450
451
452
}


sub DESTROY { }   # to simplify AUTOLOAD

sub AUTOLOAD {
    our $AUTOLOAD;

453
454
455
456
457
    if($AUTOLOAD =~ /::fetch(_all)?(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?(?:_TO_(\w+?))?$/) {
        my $all             = $1;
        my $filter_string   = $2;
        my $key_string      = $3;
        my $value_column    = $4;
458
459
460
461

        my ($self) = @_;
        my $column_set = $self->column_set();

462
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
463
464
465
466
467
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
        }
468
        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
469
        foreach my $column_name ( @$key_components ) {
470
471
472
473
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
        }
474
475
476
        if($value_column && !$column_set->{$value_column}) {
            die "unknown column '$value_column'";
        }
477

478
#        warn "Setting up '$AUTOLOAD' method\n";
479
480
481
482
483
484
485
486
487
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->fetch_all(
                join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
                !$all,
                $key_components,
                $value_column
            );
        };
488
        goto &$AUTOLOAD;    # restart the new method
489

490
    } elsif($AUTOLOAD =~ /::count_all_by_(\w+)$/) {
491
        my $filter_string = $1;
492
493
494
495

        my ($self) = @_;
        my $column_set = $self->column_set();

496
497
498
499
500
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
501
        }
502

503
#        warn "Setting up '$AUTOLOAD' method\n";
504
505
506
507
508
509
510
511
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->count_all(
                join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
            );
        };
        goto &$AUTOLOAD;    # restart the new method

512
513
514
515
516
517
518
    } elsif($AUTOLOAD =~ /::remove_all_by_(\w+)$/) {
        my $filter_name = $1;

        my ($self) = @_;
        my $column_set = $self->column_set();

        if($column_set->{$filter_name}) {
519
#            warn "Setting up '$AUTOLOAD' method\n";
520
521
522
523
524
            *$AUTOLOAD = sub { my ($self, $filter_value) = @_; return $self->remove_all("$filter_name='$filter_value'"); };
            goto &$AUTOLOAD;    # restart the new method
        } else {
            die "unknown column '$filter_name'";
        }
525
    } elsif($AUTOLOAD =~ /::update_(\w+)$/) {
526
        my @columns_to_update = split(/_AND_/i, $1);
527
#        warn "Setting up '$AUTOLOAD' method\n";
528
529
530
        *$AUTOLOAD = sub { my ($self, $object) = @_; return $self->update($object, @columns_to_update); };
        goto &$AUTOLOAD;    # restart the new method
    } else {
531
        warn "sub '$AUTOLOAD' not implemented";
532
533
534
535
536
    }
}

1;