BaseAdaptor.pm 16.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor

=head1 DESCRIPTION

    The base class for all other Object- or NakedTable- adaptors.
    Performs the low-level SQL needed to retrieve and store data in tables.

=head1 EXTERNAL DEPENDENCIES

    DBI 1.6

=head1 LICENSE

18
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.

=cut


36 37 38 39
package Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor;

use strict;
no strict 'refs';   # needed to allow AUTOLOAD create new methods
40
use DBI 1.6;        # the 1.6 functionality is important for detecting autoincrement fields and other magic.
41 42 43 44 45 46 47 48 49 50 51 52 53 54

use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');


sub default_table_name {
    die "Please define table_name either by setting it via table_name() method or by redefining default_table_name() in your adaptor class";
}


sub default_insertion_method {
    return 'INSERT_IGNORE';
}


55 56 57 58
sub default_overflow_limit {
    return {
        # 'overflow_column1_name' => column1_size,
        # 'overflow_column2_name' => column2_size,
59 60 61 62 63 64 65 66 67
        # ...
    };
}

sub default_input_column_mapping {
    return {
        # 'original_column1' => "original_column1*10 AS c1_times_ten",
        # 'original_column2' => "original_column2+1 AS c2_plus_one",
        # ...
68 69 70 71 72 73 74 75 76 77 78 79 80 81
    };
}


sub overflow_limit {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_overflow_limit} = shift @_;
    }
    return $self->{_overflow_limit} || $self->default_overflow_limit();
}


82 83 84 85 86 87 88 89 90 91
sub input_column_mapping {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_input_column_mapping} = shift @_;
    }
    return $self->{_input_column_mapping} || $self->default_input_column_mapping();
}


92 93 94 95 96
sub table_name {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_table_name} = shift @_;
97
        $self->_table_info_loader();
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    }
    return $self->{_table_name} || $self->default_table_name();
}


sub insertion_method {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_insertion_method} = shift @_;
    }
    return $self->{_insertion_method} || $self->default_insertion_method();
}


sub column_set {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_column_set} = shift @_;
    } elsif( !defined( $self->{_column_set} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_column_set};
}


sub primary_key {        # not necessarily auto-incrementing
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_primary_key} = shift @_;
    } elsif( !defined( $self->{_primary_key} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_primary_key};
}


sub updatable_column_list {    # it's just a cashed view, you cannot set it directly
    my $self = shift @_;

    unless($self->{_updatable_column_list}) {
        my %primary_key_set = map { $_ => 1 } @{$self->primary_key()};
        my $column_set      = $self->column_set();
        $self->{_updatable_column_list} = [ grep { not $primary_key_set{$_} } keys %$column_set ];
    }
    return $self->{_updatable_column_list};
}


sub autoinc_id {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_autoinc_id} = shift @_;
    } elsif( !defined( $self->{_autoinc_id} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_autoinc_id};
}


sub _table_info_loader {
    my $self = shift @_;

    my $dbc         = $self->dbc();
165
    my $dbh         = $dbc->db_handle();
166
    my $driver      = $dbc->driver();
167
    my $dbname      = $dbc->dbname();
168 169 170
    my $table_name  = $self->table_name();

    my %column_set  = ();
171
    my %name2type   = ();
172
    my $autoinc_id  = '';
173 174 175 176 177 178
    my @primary_key = $dbh->primary_key(undef, undef, $table_name);

    my $sth = $dbh->column_info(undef, undef, $table_name, '%');
    $sth->execute();
    while (my $row = $sth->fetchrow_hashref()) {
        my ($position, $name, $type, $is_ai) = @$row{'ORDINAL_POSITION','COLUMN_NAME', 'TYPE_NAME', 'mysql_is_auto_increment'};
179

180 181 182 183
        $column_set{$name}  = 1;
        $name2type{$name}   = $type;
        if($is_ai) {
            $autoinc_id = $name;
184 185 186 187
        }
    }
    $sth->finish;

188 189
    if( ($driver ne 'mysql')
     and scalar(@primary_key)==1 and (uc($name2type{$primary_key[0]}) eq 'INTEGER') ) {
190 191 192
        $autoinc_id = $primary_key[0];
    }

193 194 195 196 197 198 199 200 201 202 203 204 205
    $self->column_set(  \%column_set );
    $self->primary_key( \@primary_key );
    $self->autoinc_id(   $autoinc_id );
}


sub count_all {
    my ($self, $constraint) = @_;

    my $table_name      = $self->table_name();

    my $sql = "SELECT COUNT(*) FROM $table_name";

206 207 208
    if($constraint) {
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
        $sql .= (($constraint=~/\bJOIN\b/i) ? ' ' : ' WHERE ') . $constraint;
209 210
    }

211
    # warn "SQL: $sql\n";
212 213 214

    my $sth = $self->prepare($sql);
    $sth->execute;  
215
    my ($count) = $sth->fetchrow_array();
216 217 218 219 220 221 222
    $sth->finish;  

    return $count;
}


sub fetch_all {
223 224
    my ($self, $constraint, $one_per_key, $key_list, $value_column) = @_;
    
225 226
    my $table_name              = $self->table_name();
    my $input_column_mapping    = $self->input_column_mapping();
227

228
    my $sql = 'SELECT ' . join(', ', map { $input_column_mapping->{$_} // "$table_name.$_" } keys %{$self->column_set()}) . " FROM $table_name";
229 230

    if($constraint) { 
231
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
232
        $sql .= (($constraint=~/\bJOIN\b/i or $constraint=~/^LIMIT|ORDER|GROUP/) ? ' ' : ' WHERE ') . $constraint;
233 234
    }

235
    # warn "SQL: $sql\n";
236 237 238 239

    my $sth = $self->prepare($sql);
    $sth->execute;  

240 241 242
    my @overflow_columns = keys %{ $self->overflow_limit() };
    my $overflow_adaptor = scalar(@overflow_columns) && $self->db->get_AnalysisDataAdaptor();

243
    my $result_struct;  # will be autovivified to the correct data structure
244 245

    while(my $hashref = $sth->fetchrow_hashref) {
246 247 248 249 250 251 252

        foreach my $overflow_key (@overflow_columns) {
            if($hashref->{$overflow_key} =~ /^_ext(?:\w+)_data_id (\d+)$/) {
                $hashref->{$overflow_key} = $overflow_adaptor->fetch_by_analysis_data_id_TO_data($1);
            }
        }

253
        my $pptr = \$result_struct;
254 255 256 257
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
258 259 260 261 262 263
        }
        my $object = $value_column
            ? $hashref->{$value_column}
            : $self->objectify($hashref);
        if($one_per_key) {
            $$pptr = $object;
264
        } else {
265
            push @$$pptr, $object;
266 267 268 269
        }
    }
    $sth->finish;  

270
    unless(defined($result_struct)) {
271
        if($key_list and scalar(@$key_list)) {
272 273 274 275 276 277 278
            $result_struct = {};
        } elsif(!$one_per_key) {
            $result_struct = [];
        }
    }

    return $result_struct;  # either listref or hashref is returned, depending on the call parameters
279 280 281 282
}


sub primary_key_constraint {
283 284
    my $self        = shift @_;
    my $sliceref    = shift @_;
285 286 287 288

    my $primary_key  = $self->primary_key();  # Attention: the order of primary_key columns of your call should match the order in the table definition!

    if(@$primary_key) {
289
        return join (' AND ', map { $primary_key->[$_]."='".$sliceref->[$_]."'" } (0..scalar(@$primary_key)-1));
290 291 292 293 294 295 296 297 298 299
    } else {
        my $table_name = $self->table_name();
        die "Table '$table_name' doesn't have a primary_key";
    }
}


sub fetch_by_dbID {
    my $self = shift @_;    # the rest in @_ should be primary_key column values

300
    return $self->fetch_all( $self->primary_key_constraint( \@_ ), 1 );
301 302 303
}


304 305 306
sub remove_all {    # remove entries by a constraint
    my $self        = shift @_;
    my $constraint  = shift @_ || 1;
307

308
    my $table_name  = $self->table_name();
309

310
    my $sql = "DELETE FROM $table_name WHERE $constraint";
311 312 313 314 315 316
    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish();
}


317 318 319 320 321 322 323 324 325 326
sub remove {    # remove the object by primary_key
    my $self        = shift @_;
    my $object      = shift @_;

    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );

    return $self->remove_all( $primary_key_constraint );
}


327 328 329 330 331 332 333 334 335 336 337 338 339
sub update {    # update (some or all) non_primary columns from the primary
    my $self    = shift @_;
    my $object  = shift @_;    # the rest in @_ should be the column names to be updated

    my $table_name              = $self->table_name();
    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );
    my $columns_to_update       = scalar(@_) ? \@_ : $self->updatable_column_list();
    my $values_to_update        = $self->slicer( $object, $columns_to_update );

    unless(@$columns_to_update) {
        die "There are no dependent columns to update, as everything seems to belong to the primary key";
    }

340 341
    my $sql = "UPDATE $table_name SET ".join(', ', map { "$_=?" } @$columns_to_update)." WHERE $primary_key_constraint";
    # print "SQL: $sql\n";
342
    my $sth = $self->prepare($sql);
343 344 345
    # print "VALUES_TO_UPDATE: ".join(', ', map { "'$_'" } @$values_to_update)."\n";
    $sth->execute( @$values_to_update);

346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
    $sth->finish();
}


sub check_object_present_in_db {    # return autoinc_id/undef if the table has autoinc_id or just 1/undef if not
    my ( $self, $object ) = @_;

    my $table_name  = $self->table_name();
    my $column_set  = $self->column_set();
    my $autoinc_id  = $self->autoinc_id();

    my $non_autoinc_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ];
    my $non_autoinc_values  = $self->slicer( $object, $non_autoinc_columns );

    my $sql = 'SELECT '.($autoinc_id or 1)." FROM $table_name WHERE ".
            # we look for identical contents, so must skip the autoinc_id columns when fetching:
        join(' AND ', map { my $v=$non_autoinc_values->[$_]; "$non_autoinc_columns->[$_] ".(defined($v) ? "='$v'" : 'IS NULL') } (0..@$non_autoinc_columns-1) );

    my $sth = $self->prepare($sql);
    $sth->execute();

367
    my ($return_value) = $sth->fetchrow_array();
368 369 370 371 372 373 374 375 376 377 378 379 380 381
    $sth->finish;

    return $return_value;
}


sub store {
    my ($self, $object_or_list, $check_presence_in_db_first) = @_;

    my $objects = (ref($object_or_list) eq 'ARRAY')     # ensure we get an array of objects to store
        ? $object_or_list
        : [ $object_or_list ];
    return unless(scalar(@$objects));

382 383
    my $table_name              = $self->table_name();
    my $autoinc_id              = $self->autoinc_id();
384
    my $all_storable_columns    = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ];
385 386 387
    my $driver                  = $self->dbc->driver();
    my $insertion_method        = $self->insertion_method;  # INSERT, INSERT_IGNORE or REPLACE
    $insertion_method           =~ s/_/ /g;
388 389
    if($driver eq 'sqlite') {
        $insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
390 391
    } elsif($driver eq 'pgsql') {   # FIXME! temporary hack
        $insertion_method = 'INSERT';
392
    }
393

394
    my %hashed_sth = ();  # do not prepare statements until there is a real need
395

396 397
    my $stored_this_time        = 0;

398 399
    foreach my $object (@$objects) {
        if($check_presence_in_db_first and my $present = $self->check_object_present_in_db($object)) {
400
            $self->mark_stored($object, $present);
401
        } else {
402
            my ($columns_being_stored, $column_key) = (ref($object) eq 'HASH') ? $self->keys_to_columns($object) : ($all_storable_columns, '*all*');
403
            # print "COLUMN_KEY='$column_key'\n";
404 405

            my $this_sth;
406

407 408 409 410 411 412 413 414 415 416 417
                # only prepare (once!) if we get here:
            unless($this_sth = $hashed_sth{$column_key}) {
                    # By using question marks we can insert true NULLs by setting corresponding values to undefs:
                my $sql = "$insertion_method INTO $table_name (".join(', ', @$columns_being_stored).') VALUES ('.join(',', (('?') x scalar(@$columns_being_stored))).')';
                # print "STORE: $sql\n";
                $this_sth = $hashed_sth{$column_key} = $self->prepare( $sql ) or die "Could not prepare statement: $sql";
            }

            # print "STORED_COLUMNS: ".join(', ', map { "`$_`" } @$columns_being_stored)."\n";
            my $values_being_stored = $self->slicer( $object, $columns_being_stored );
            # print "STORED_VALUES: ".join(', ', map { "'$_'" } @$values_being_stored)."\n";
418

419
            my $return_code = $this_sth->execute( @$values_being_stored )
420
                    # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
421
                or die "Could not store fields\n\t{$column_key}\nwith data:\n\t(".join(',', @$values_being_stored).')';
Leo Gordon's avatar
Leo Gordon committed
422
            if($return_code > 0) {     # <--- for the same reason we have to be explicitly numeric here
423 424
                my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id);
                $self->mark_stored($object, $liid );
425
                ++$stored_this_time;
426 427 428 429
            }
        }
    }

430 431 432
    foreach my $sth (values %hashed_sth) {
        $sth->finish();
    }
433

434
    return ($object_or_list, $stored_this_time);
435 436 437 438 439 440 441 442
}


sub DESTROY { }   # to simplify AUTOLOAD

sub AUTOLOAD {
    our $AUTOLOAD;

443 444 445 446 447
    if($AUTOLOAD =~ /::fetch(_all)?(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?(?:_TO_(\w+?))?$/) {
        my $all             = $1;
        my $filter_string   = $2;
        my $key_string      = $3;
        my $value_column    = $4;
448 449 450 451

        my ($self) = @_;
        my $column_set = $self->column_set();

452
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
453 454 455 456 457
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
        }
458
        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
459
        foreach my $column_name ( @$key_components ) {
460 461 462 463
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
        }
464 465 466
        if($value_column && !$column_set->{$value_column}) {
            die "unknown column '$value_column'";
        }
467

468
#        print "Setting up '$AUTOLOAD' method\n";
469 470 471 472 473 474 475 476 477
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->fetch_all(
                join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
                !$all,
                $key_components,
                $value_column
            );
        };
478
        goto &$AUTOLOAD;    # restart the new method
479

480
    } elsif($AUTOLOAD =~ /::count_all_by_(\w+)$/) {
481
        my $filter_string = $1;
482 483 484 485

        my ($self) = @_;
        my $column_set = $self->column_set();

486 487 488 489 490
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                die "unknown column '$column_name'";
            }
491
        }
492 493 494 495 496 497 498 499 500 501

#        print "Setting up '$AUTOLOAD' method\n";
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->count_all(
                join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
            );
        };
        goto &$AUTOLOAD;    # restart the new method

502 503 504 505 506 507 508 509 510 511 512 513 514
    } elsif($AUTOLOAD =~ /::remove_all_by_(\w+)$/) {
        my $filter_name = $1;

        my ($self) = @_;
        my $column_set = $self->column_set();

        if($column_set->{$filter_name}) {
#            print "Setting up '$AUTOLOAD' method\n";
            *$AUTOLOAD = sub { my ($self, $filter_value) = @_; return $self->remove_all("$filter_name='$filter_value'"); };
            goto &$AUTOLOAD;    # restart the new method
        } else {
            die "unknown column '$filter_name'";
        }
515
    } elsif($AUTOLOAD =~ /::update_(\w+)$/) {
516
        my @columns_to_update = split(/_AND_/i, $1);
517
#        print "Setting up '$AUTOLOAD' method\n";
518 519 520 521 522 523 524 525 526
        *$AUTOLOAD = sub { my ($self, $object) = @_; return $self->update($object, @columns_to_update); };
        goto &$AUTOLOAD;    # restart the new method
    } else {
        print "sub '$AUTOLOAD' not implemented";
    }
}

1;