BaseAdaptor.pm 19.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor

=head1 DESCRIPTION

    The base class for all other Object- or NakedTable- adaptors.
    Performs the low-level SQL needed to retrieve and store data in tables.

=head1 EXTERNAL DEPENDENCIES

    DBI 1.6

=head1 LICENSE

18
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
19
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
20
21
22
23
24
25
26
27
28
29
30
31

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

32
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
33
34
35
36

=cut


37
38
39
package Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor;

use strict;
40
use warnings;
41
no strict 'refs';   # needed to allow AUTOLOAD create new methods
42
use DBI 1.6;        # the 1.6 functionality is important for detecting autoincrement fields and other magic.
43

44
use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');
45

46
47
48
49
50
51
52
53
54
55
56

sub default_table_name {
    die "Please define table_name either by setting it via table_name() method or by redefining default_table_name() in your adaptor class";
}


sub default_insertion_method {
    return 'INSERT_IGNORE';
}


57
58
59
60
sub default_overflow_limit {
    return {
        # 'overflow_column1_name' => column1_size,
        # 'overflow_column2_name' => column2_size,
61
62
63
64
65
66
67
68
69
        # ...
    };
}

sub default_input_column_mapping {
    return {
        # 'original_column1' => "original_column1*10 AS c1_times_ten",
        # 'original_column2' => "original_column2+1 AS c2_plus_one",
        # ...
70
71
72
    };
}

73
74
75
76
sub do_not_update_columns {
    return [];
}

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# ---------------------------------------------------------------------------

sub new {
    my ( $class, $dbobj ) = @_;

    my $self = bless {}, $class;

    if ( !defined $dbobj || !ref $dbobj ) {
        throw("Don't have a db [$dbobj] for new adaptor");
    }

    if ( ref($dbobj) =~ /DBConnection$/ ) {
        $self->dbc($dbobj);
    } elsif( UNIVERSAL::can($dbobj, 'dbc') ) {
        $self->dbc( $dbobj->dbc );
        $self->db( $dbobj );
    } else {
        throw("I was given [$dbobj] for a new adaptor");
    }

    return $self;
}


sub db {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_db} = shift @_;
    }
    return $self->{_db};
}


sub dbc {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_dbc} = shift @_;
    }
    return $self->{_dbc};
}


sub prepare {
    my ( $self, $sql ) = @_;

    # Uncomment next line to cancel caching on the SQL side.
    # Needed for timing comparisons etc.
    #$sql =~ s/SELECT/SELECT SQL_NO_CACHE/i;

    return $self->dbc->prepare($sql);
}

131
132
133
134
135
136
137
138
139
140
141

sub overflow_limit {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_overflow_limit} = shift @_;
    }
    return $self->{_overflow_limit} || $self->default_overflow_limit();
}


142
143
144
145
146
147
148
149
150
151
sub input_column_mapping {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_input_column_mapping} = shift @_;
    }
    return $self->{_input_column_mapping} || $self->default_input_column_mapping();
}


152
153
154
155
156
sub table_name {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_table_name} = shift @_;
157
        $self->_table_info_loader();
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
    }
    return $self->{_table_name} || $self->default_table_name();
}


sub insertion_method {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_insertion_method} = shift @_;
    }
    return $self->{_insertion_method} || $self->default_insertion_method();
}


sub column_set {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_column_set} = shift @_;
    } elsif( !defined( $self->{_column_set} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_column_set};
}


sub primary_key {        # not necessarily auto-incrementing
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_primary_key} = shift @_;
    } elsif( !defined( $self->{_primary_key} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_primary_key};
}


sub updatable_column_list {    # it's just a cashed view, you cannot set it directly
    my $self = shift @_;

    unless($self->{_updatable_column_list}) {
201
202
203
204
        my %primary_key_set     = map { $_ => 1 } @{$self->primary_key};
        my %non_updatable_set   = map { $_ => 1 } @{$self->do_not_update_columns};
        my $column_set          = $self->column_set();
        $self->{_updatable_column_list} = [ grep { not ($primary_key_set{$_} || $non_updatable_set{$_}) } keys %$column_set ];
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    }
    return $self->{_updatable_column_list};
}


sub autoinc_id {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_autoinc_id} = shift @_;
    } elsif( !defined( $self->{_autoinc_id} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_autoinc_id};
}


sub _table_info_loader {
    my $self = shift @_;

    my $dbc         = $self->dbc();
226
    my $dbh         = $dbc->db_handle();
227
    my $driver      = $dbc->driver();
228
    my $dbname      = $dbc->dbname();
229
230
231
232
    my $table_name  = $self->table_name();

    my %column_set  = ();
    my $autoinc_id  = '';
233
234
235
236
237
    my @primary_key = $dbh->primary_key(undef, undef, $table_name);

    my $sth = $dbh->column_info(undef, undef, $table_name, '%');
    $sth->execute();
    while (my $row = $sth->fetchrow_hashref()) {
238
239
240
        my ( $column_name, $column_type ) = @$row{'COLUMN_NAME', 'TYPE_NAME'};

        # warn "ColumnInfo [$table_name/$column_name] = $column_type\n";
241

242
        $column_set{$column_name}  = $column_type;
243

244
        if( ($column_name eq $table_name.'_id')
245
         or ($table_name eq 'analysis_base' and $column_name eq 'analysis_id') ) {    # a special case (historical)
246
            $autoinc_id = $column_name;
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
        }
    }
    $sth->finish;

    $self->column_set(  \%column_set );
    $self->primary_key( \@primary_key );
    $self->autoinc_id(   $autoinc_id );
}


sub count_all {
    my ($self, $constraint) = @_;

    my $table_name      = $self->table_name();

    my $sql = "SELECT COUNT(*) FROM $table_name";

264
265
266
    if($constraint) {
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
        $sql .= (($constraint=~/\bJOIN\b/i) ? ' ' : ' WHERE ') . $constraint;
267
268
    }

269
    # warn "SQL: $sql\n";
270
271
272

    my $sth = $self->prepare($sql);
    $sth->execute;  
273
    my ($count) = $sth->fetchrow_array();
274
275
276
277
278
279
280
    $sth->finish;  

    return $count;
}


sub fetch_all {
281
282
    my ($self, $constraint, $one_per_key, $key_list, $value_column) = @_;
    
283
284
    my $table_name              = $self->table_name();
    my $input_column_mapping    = $self->input_column_mapping();
285

286
    my $sql = 'SELECT ' . join(', ', map { $input_column_mapping->{$_} // "$table_name.$_" } keys %{$self->column_set()}) . " FROM $table_name";
287
288

    if($constraint) { 
289
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
290
        $sql .= (($constraint=~/\bJOIN\b/i or $constraint=~/^LIMIT|ORDER|GROUP/) ? ' ' : ' WHERE ') . $constraint;
291
292
    }

293
    # warn "SQL: $sql\n";
294
295
296
297

    my $sth = $self->prepare($sql);
    $sth->execute;  

298
299
300
    my @overflow_columns = keys %{ $self->overflow_limit() };
    my $overflow_adaptor = scalar(@overflow_columns) && $self->db->get_AnalysisDataAdaptor();

301
    my $result_struct;  # will be autovivified to the correct data structure
302
303

    while(my $hashref = $sth->fetchrow_hashref) {
304
305
306
307
308
309
310

        foreach my $overflow_key (@overflow_columns) {
            if($hashref->{$overflow_key} =~ /^_ext(?:\w+)_data_id (\d+)$/) {
                $hashref->{$overflow_key} = $overflow_adaptor->fetch_by_analysis_data_id_TO_data($1);
            }
        }

311
        my $pptr = \$result_struct;
312
313
314
315
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
316
317
318
319
320
321
        }
        my $object = $value_column
            ? $hashref->{$value_column}
            : $self->objectify($hashref);
        if($one_per_key) {
            $$pptr = $object;
322
        } else {
323
            push @$$pptr, $object;
324
325
326
327
        }
    }
    $sth->finish;  

328
    unless(defined($result_struct)) {
329
        if($key_list and scalar(@$key_list)) {
330
331
332
333
334
335
336
            $result_struct = {};
        } elsif(!$one_per_key) {
            $result_struct = [];
        }
    }

    return $result_struct;  # either listref or hashref is returned, depending on the call parameters
337
338
339
340
}


sub primary_key_constraint {
341
342
    my $self        = shift @_;
    my $sliceref    = shift @_;
343
344
345
346

    my $primary_key  = $self->primary_key();  # Attention: the order of primary_key columns of your call should match the order in the table definition!

    if(@$primary_key) {
347
        return join (' AND ', map { $primary_key->[$_]."='".$sliceref->[$_]."'" } (0..scalar(@$primary_key)-1));
348
349
350
351
352
353
354
355
356
357
    } else {
        my $table_name = $self->table_name();
        die "Table '$table_name' doesn't have a primary_key";
    }
}


sub fetch_by_dbID {
    my $self = shift @_;    # the rest in @_ should be primary_key column values

358
    return $self->fetch_all( $self->primary_key_constraint( \@_ ), 1 );
359
360
361
}


362
363
364
sub remove_all {    # remove entries by a constraint
    my $self        = shift @_;
    my $constraint  = shift @_ || 1;
365

366
    my $table_name  = $self->table_name();
367

368
    my $sql = "DELETE FROM $table_name WHERE $constraint";
369
370
371
372
373
374
    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish();
}


375
376
377
378
379
380
381
382
383
384
sub remove {    # remove the object by primary_key
    my $self        = shift @_;
    my $object      = shift @_;

    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );

    return $self->remove_all( $primary_key_constraint );
}


385
386
387
388
389
390
391
392
393
394
395
396
397
sub update {    # update (some or all) non_primary columns from the primary
    my $self    = shift @_;
    my $object  = shift @_;    # the rest in @_ should be the column names to be updated

    my $table_name              = $self->table_name();
    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );
    my $columns_to_update       = scalar(@_) ? \@_ : $self->updatable_column_list();
    my $values_to_update        = $self->slicer( $object, $columns_to_update );

    unless(@$columns_to_update) {
        die "There are no dependent columns to update, as everything seems to belong to the primary key";
    }

398
    my $sql = "UPDATE $table_name SET ".join(', ', map { "$_=?" } @$columns_to_update)." WHERE $primary_key_constraint";
399
    # warn "SQL: $sql\n";
400
    my $sth = $self->prepare($sql);
401
    # warn "VALUES_TO_UPDATE: ".join(', ', map { "'$_'" } @$values_to_update)."\n";
402
403
    $sth->execute( @$values_to_update);

404
405
406
    $sth->finish();
}

407

408
sub store_or_update_one {
409
    my ($self, $object, $filter_columns) = @_;
410

411
    #use Data::Dumper;
412
    if(UNIVERSAL::can($object, 'adaptor') and $object->adaptor and $object->adaptor==$self) {  # looks like it has been previously stored
413
        if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) {
414
            $self->update( $object );
415
416
417
            #warn "store_or_update_one: updated [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
        } else {
            #warn "store_or_update_one: non-updatable [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
418
        }
419
    } elsif( my $present = $self->check_object_present_in_db_by_content( $object, $filter_columns ) ) {
420
        $self->mark_stored($object, $present);
421
422
423
424
425
        #warn "store_or_update_one: found [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."] in db by content of (".join(', ', @$filter_columns).")\n";
        if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) {
            #warn "store_or_update_one: updating the columns (".join(', ', @{ $self->updatable_column_list() }).")\n";
            $self->update( $object );
        }
426
427
    } else {
        $self->store( $object );
428
        #warn "store_or_update_one: stored [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
429
430
431
    }
}

432

433
434
sub check_object_present_in_db_by_content {    # return autoinc_id/undef if the table has autoinc_id or just 1/undef if not
    my ( $self, $object, $filter_columns ) = @_;
435
436
437
438
439

    my $table_name  = $self->table_name();
    my $column_set  = $self->column_set();
    my $autoinc_id  = $self->autoinc_id();

440
441
442
443
444
445
446
447
448
    if($filter_columns) {
            # make sure all fields exist in the database as columns:
        $filter_columns = [ map { $column_set->{$_} ? $_ : $_.'_id' } @$filter_columns ];
    } else {
            # we look for identical contents, so must skip the autoinc_id columns when fetching:
        $filter_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ];
    }
    my %filter_hash;
    @filter_hash{ @$filter_columns } = @{ $self->slicer( $object, $filter_columns ) };
449

450
451
    my @constraints = ();
    my @values = ();
452
    while(my ($column, $value) = each %filter_hash ) {
453
454
455
456
457
458
459
        if( defined($value) ) {
            push @constraints, "$column = ?";
            push @values, $value;
        } else {
            push @constraints, "$column IS NULL";
        }
    }
460

461
    my $sql = 'SELECT '.($autoinc_id or 1)." FROM $table_name WHERE ".  join(' AND ', @constraints);
462
#warn "check_object_present_in_db_by_content: sql= $sql WITH VALUES (".join(', ', @values).")\n";
463
464
    my $sth = $self->prepare( $sql );
    $sth->execute( @values );
465

466
    my ($return_value) = $sth->fetchrow_array();
467
468
469
470
471
472
473
    $sth->finish;

    return $return_value;
}


sub store {
474
    my ($self, $object_or_list) = @_;
475
476
477
478
479
480

    my $objects = (ref($object_or_list) eq 'ARRAY')     # ensure we get an array of objects to store
        ? $object_or_list
        : [ $object_or_list ];
    return unless(scalar(@$objects));

481
482
    my $table_name              = $self->table_name();
    my $autoinc_id              = $self->autoinc_id();
483
    my $all_storable_columns    = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ];
484
485
486
    my $driver                  = $self->dbc->driver();
    my $insertion_method        = $self->insertion_method;  # INSERT, INSERT_IGNORE or REPLACE
    $insertion_method           =~ s/_/ /g;
487
488
    if($driver eq 'sqlite') {
        $insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
489
490
    } elsif($driver eq 'pgsql') {   # FIXME! temporary hack
        $insertion_method = 'INSERT';
491
    }
492

493
    my %hashed_sth = ();  # do not prepare statements until there is a real need
494

495
496
    my $stored_this_time        = 0;

497
    foreach my $object (@$objects) {
498
            my ($columns_being_stored, $column_key) = $self->keys_to_columns($object);
499
            # warn "COLUMN_KEY='$column_key'\n";
500
501

            my $this_sth;
502

503
504
505
506
                # only prepare (once!) if we get here:
            unless($this_sth = $hashed_sth{$column_key}) {
                    # By using question marks we can insert true NULLs by setting corresponding values to undefs:
                my $sql = "$insertion_method INTO $table_name (".join(', ', @$columns_being_stored).') VALUES ('.join(',', (('?') x scalar(@$columns_being_stored))).')';
507
                # warn "STORE: $sql\n";
508
509
510
                $this_sth = $hashed_sth{$column_key} = $self->prepare( $sql ) or die "Could not prepare statement: $sql";
            }

511
            # warn "STORED_COLUMNS: ".stringify($columns_being_stored)."\n";
512
            my $values_being_stored = $self->slicer( $object, $columns_being_stored );
513
            # warn "STORED_VALUES: ".stringify($values_being_stored)."\n";
514

515
            my $return_code = $this_sth->execute( @$values_being_stored )
516
                    # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
517
                or die "Could not store fields\n\t{$column_key}\nwith data:\n\t(".join(',', @$values_being_stored).')';
Leo Gordon's avatar
Leo Gordon committed
518
            if($return_code > 0) {     # <--- for the same reason we have to be explicitly numeric here
519
520
                my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id);
                $self->mark_stored($object, $liid );
521
                ++$stored_this_time;
522
523
524
            }
    }

525
526
527
    foreach my $sth (values %hashed_sth) {
        $sth->finish();
    }
528

529
    return ($object_or_list, $stored_this_time);
530
531
532
533
534
535
536
537
}


sub DESTROY { }   # to simplify AUTOLOAD

sub AUTOLOAD {
    our $AUTOLOAD;

538
539
540
541
542
    if($AUTOLOAD =~ /::fetch(_all)?(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?(?:_TO_(\w+?))?$/) {
        my $all             = $1;
        my $filter_string   = $2;
        my $key_string      = $3;
        my $value_column    = $4;
543
544
545
546

        my ($self) = @_;
        my $column_set = $self->column_set();

547
            # NB: this filtering happens BEFORE any possible overflow via analysis_data, so will not be done on overflow_columns
548
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
549
550
551
552
553
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
        }
554
        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
555
        foreach my $column_name ( @$key_components ) {
556
557
558
559
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
        }
560
561
562
        if($value_column && !$column_set->{$value_column}) {
            die "unknown column '$value_column'";
        }
563

564
#        warn "Setting up '$AUTOLOAD' method\n";
565
566
567
568
569
570
571
572
573
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->fetch_all(
                join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
                !$all,
                $key_components,
                $value_column
            );
        };
574
        goto &$AUTOLOAD;    # restart the new method
575

576
    } elsif($AUTOLOAD =~ /::count_all_by_(\w+)$/) {
577
        my $filter_string = $1;
578
579
580
581

        my ($self) = @_;
        my $column_set = $self->column_set();

582
583
584
585
586
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
587
        }
588

589
#        warn "Setting up '$AUTOLOAD' method\n";
590
591
592
593
594
595
596
597
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->count_all(
                join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
            );
        };
        goto &$AUTOLOAD;    # restart the new method

598
599
600
601
602
603
604
    } elsif($AUTOLOAD =~ /::remove_all_by_(\w+)$/) {
        my $filter_name = $1;

        my ($self) = @_;
        my $column_set = $self->column_set();

        if($column_set->{$filter_name}) {
605
#            warn "Setting up '$AUTOLOAD' method\n";
606
607
608
609
610
            *$AUTOLOAD = sub { my ($self, $filter_value) = @_; return $self->remove_all("$filter_name='$filter_value'"); };
            goto &$AUTOLOAD;    # restart the new method
        } else {
            die "unknown column '$filter_name'";
        }
611
    } elsif($AUTOLOAD =~ /::update_(\w+)$/) {
612
        my @columns_to_update = split(/_AND_/i, $1);
613
#        warn "Setting up '$AUTOLOAD' method\n";
614
615
616
        *$AUTOLOAD = sub { my ($self, $object) = @_; return $self->update($object, @columns_to_update); };
        goto &$AUTOLOAD;    # restart the new method
    } else {
617
        warn "sub '$AUTOLOAD' not implemented";
618
619
620
621
622
    }
}

1;