BaseAdaptor.pm 22.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor

=head1 DESCRIPTION

    The base class for all other Object- or NakedTable- adaptors.
    Performs the low-level SQL needed to retrieve and store data in tables.

=head1 EXTERNAL DEPENDENCIES

    DBI 1.6

=head1 LICENSE

18
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
19
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
20 21 22 23 24 25 26 27 28 29 30 31

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

32
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
33 34 35 36

=cut


37 38 39
package Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor;

use strict;
40
use warnings;
41
no strict 'refs';   # needed to allow AUTOLOAD create new methods
42
use DBI 1.6;        # the 1.6 functionality is important for detecting autoincrement fields and other magic.
43

44
use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');
45

46 47

sub default_table_name {
48
    throw("Please define table_name either by setting it via table_name() method or by redefining default_table_name() in your adaptor class");
49 50 51 52
}


sub default_insertion_method {
53
    return 'INSERT';
54 55 56
}


57 58 59 60
sub default_overflow_limit {
    return {
        # 'overflow_column1_name' => column1_size,
        # 'overflow_column2_name' => column2_size,
61 62 63 64 65 66 67 68 69
        # ...
    };
}

sub default_input_column_mapping {
    return {
        # 'original_column1' => "original_column1*10 AS c1_times_ten",
        # 'original_column2' => "original_column2+1 AS c2_plus_one",
        # ...
70 71 72
    };
}

73 74 75 76
sub do_not_update_columns {
    return [];
}

77 78 79
# ---------------------------------------------------------------------------

sub new {
80 81
    my $class   = shift @_;
    my $dbobj   = shift @_;
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97

    my $self = bless {}, $class;

    if ( !defined $dbobj || !ref $dbobj ) {
        throw("Don't have a db [$dbobj] for new adaptor");
    }

    if ( ref($dbobj) =~ /DBConnection$/ ) {
        $self->dbc($dbobj);
    } elsif( UNIVERSAL::can($dbobj, 'dbc') ) {
        $self->dbc( $dbobj->dbc );
        $self->db( $dbobj );
    } else {
        throw("I was given [$dbobj] for a new adaptor");
    }

98 99 100 101 102 103 104 105
    my %options = @_;

    foreach my $option_name (keys %options) {
        if( UNIVERSAL::can( $self, $option_name ) ) {
            if(defined(my $option_value = delete $options{ $option_name })) {
                $self->$option_name( $option_value );
            }
        }
106 107
    }

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    return $self;
}


sub db {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_db} = shift @_;
    }
    return $self->{_db};
}


sub dbc {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_dbc} = shift @_;
    }
    return $self->{_dbc};
}


sub prepare {
    my ( $self, $sql ) = @_;

    # Uncomment next line to cancel caching on the SQL side.
    # Needed for timing comparisons etc.
    #$sql =~ s/SELECT/SELECT SQL_NO_CACHE/i;

    return $self->dbc->prepare($sql);
}

142 143 144 145 146 147 148 149 150 151 152

sub overflow_limit {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_overflow_limit} = shift @_;
    }
    return $self->{_overflow_limit} || $self->default_overflow_limit();
}


153 154 155 156 157 158 159 160 161 162
sub input_column_mapping {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_input_column_mapping} = shift @_;
    }
    return $self->{_input_column_mapping} || $self->default_input_column_mapping();
}


163 164 165 166 167
sub table_name {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_table_name} = shift @_;
168
        $self->_table_info_loader();
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
    }
    return $self->{_table_name} || $self->default_table_name();
}


sub insertion_method {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_insertion_method} = shift @_;
    }
    return $self->{_insertion_method} || $self->default_insertion_method();
}


sub column_set {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_column_set} = shift @_;
    } elsif( !defined( $self->{_column_set} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_column_set};
}


sub primary_key {        # not necessarily auto-incrementing
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_primary_key} = shift @_;
    } elsif( !defined( $self->{_primary_key} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_primary_key};
}


sub updatable_column_list {    # it's just a cashed view, you cannot set it directly
    my $self = shift @_;

    unless($self->{_updatable_column_list}) {
212 213 214 215
        my %primary_key_set     = map { $_ => 1 } @{$self->primary_key};
        my %non_updatable_set   = map { $_ => 1 } @{$self->do_not_update_columns};
        my $column_set          = $self->column_set();
        $self->{_updatable_column_list} = [ grep { not ($primary_key_set{$_} || $non_updatable_set{$_}) } keys %$column_set ];
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    }
    return $self->{_updatable_column_list};
}


sub autoinc_id {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_autoinc_id} = shift @_;
    } elsif( !defined( $self->{_autoinc_id} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_autoinc_id};
}


sub _table_info_loader {
    my $self = shift @_;

    my $dbc         = $self->dbc();
237
    my $driver      = $dbc->driver();
238
    my $dbname      = $dbc->dbname();
239 240 241 242
    my $table_name  = $self->table_name();

    my %column_set  = ();
    my $autoinc_id  = '';
243
    my @primary_key = $dbc->primary_key(undef, undef, $table_name);
244

245
    my $sth = $dbc->column_info(undef, undef, $table_name, '%');
246 247
    $sth->execute();
    while (my $row = $sth->fetchrow_hashref()) {
248 249 250
        my ( $column_name, $column_type ) = @$row{'COLUMN_NAME', 'TYPE_NAME'};

        # warn "ColumnInfo [$table_name/$column_name] = $column_type\n";
251

252
        $column_set{$column_name}  = $column_type;
253

254
        if( ($column_name eq $table_name.'_id')
255
         or ($table_name eq 'analysis_base' and $column_name eq 'analysis_id') ) {    # a special case (historical)
256
            $autoinc_id = $column_name;
257 258 259 260 261 262 263 264 265 266 267
        }
    }
    $sth->finish;

    $self->column_set(  \%column_set );
    $self->primary_key( \@primary_key );
    $self->autoinc_id(   $autoinc_id );
}


sub count_all {
268
    my ($self, $constraint, $key_list, @bind_values) = @_;
269 270

    my $table_name      = $self->table_name();
271 272
    my $driver          = $self->dbc->driver();
    my $count_col_name  = $driver eq 'pgsql' ? 'count' : 'COUNT(*)';
273

274
    my $sql = "SELECT ".($key_list ? join(', ', @$key_list, '') : '')."COUNT(*) FROM $table_name";
275

276 277 278
    if($constraint) {
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
        $sql .= (($constraint=~/\bJOIN\b/i) ? ' ' : ' WHERE ') . $constraint;
279 280
    }

281 282 283
    if($key_list) {
        $sql .= " GROUP BY ".join(', ', @$key_list);
    }
284
    # warn "SQL: $sql\n";
285 286

    my $sth = $self->prepare($sql);
287
    $sth->execute(@bind_values);
288 289 290 291 292 293 294 295 296 297 298

    my $result_struct;  # will be autovivified to the correct data structure

    while(my $hashref = $sth->fetchrow_hashref) {

        my $pptr = \$result_struct;
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
        }
299
        $$pptr = $hashref->{$count_col_name};
300
    }
301

302 303 304 305 306 307 308 309 310
    unless(defined($result_struct)) {
        if($key_list and scalar(@$key_list)) {
            $result_struct = {};
        } else {
            $result_struct = 0;
        }
    }

    return $result_struct;
311 312 313 314
}


sub fetch_all {
315 316
    my ($self, $constraint, $one_per_key, $key_list, $value_column) = @_;
    
317 318
    my $table_name              = $self->table_name();
    my $input_column_mapping    = $self->input_column_mapping();
319

320
    my $sql = 'SELECT ' . join(', ', map { $input_column_mapping->{$_} // "$table_name.$_" } keys %{$self->column_set()}) . " FROM $table_name";
321 322

    if($constraint) { 
323
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
324
        $sql .= (($constraint=~/\bJOIN\b/i or $constraint=~/^LIMIT|ORDER|GROUP/) ? ' ' : ' WHERE ') . $constraint;
325 326
    }

327
    # warn "SQL: $sql\n";
328 329 330 331

    my $sth = $self->prepare($sql);
    $sth->execute;  

332 333 334
    my @overflow_columns = keys %{ $self->overflow_limit() };
    my $overflow_adaptor = scalar(@overflow_columns) && $self->db->get_AnalysisDataAdaptor();

335
    my $result_struct;  # will be autovivified to the correct data structure
336 337

    while(my $hashref = $sth->fetchrow_hashref) {
338 339 340 341 342 343 344

        foreach my $overflow_key (@overflow_columns) {
            if($hashref->{$overflow_key} =~ /^_ext(?:\w+)_data_id (\d+)$/) {
                $hashref->{$overflow_key} = $overflow_adaptor->fetch_by_analysis_data_id_TO_data($1);
            }
        }

345
        my $pptr = \$result_struct;
346 347 348 349
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
350 351
        }
        my $object = $value_column
352 353 354 355 356
            ? ( (ref($value_column) eq 'ARRAY')
                    ? { map { ($_ => $hashref->{$_}) } @$value_column } # project to a subhash
                    : $hashref->{$value_column}                         # project to just one field
              )
            : $self->objectify($hashref);                               # keep the whole object
357

358
        if($one_per_key) {
359
            $$pptr = $object;                                           # just return the one value (either the key_list is unique or override)
360
        } else {
361
            push @$$pptr, $object;                                      # return a list of values that potentially share the same key_list
362 363 364 365
        }
    }
    $sth->finish;  

366
    unless(defined($result_struct)) {
367
        if($key_list and scalar(@$key_list)) {
368 369 370 371 372 373 374
            $result_struct = {};
        } elsif(!$one_per_key) {
            $result_struct = [];
        }
    }

    return $result_struct;  # either listref or hashref is returned, depending on the call parameters
375 376 377 378
}


sub primary_key_constraint {
379 380
    my $self        = shift @_;
    my $sliceref    = shift @_;
381 382 383 384

    my $primary_key  = $self->primary_key();  # Attention: the order of primary_key columns of your call should match the order in the table definition!

    if(@$primary_key) {
385
        return join (' AND ', map { $primary_key->[$_]."='".$sliceref->[$_]."'" } (0..scalar(@$primary_key)-1));
386 387
    } else {
        my $table_name = $self->table_name();
388
        throw("Table '$table_name' doesn't have a primary_key");
389 390 391 392 393 394 395
    }
}


sub fetch_by_dbID {
    my $self = shift @_;    # the rest in @_ should be primary_key column values

396
    return $self->fetch_all( $self->primary_key_constraint( \@_ ), 1 );
397 398 399
}


400 401 402
sub remove_all {    # remove entries by a constraint
    my $self        = shift @_;
    my $constraint  = shift @_ || 1;
403

404
    my $table_name  = $self->table_name();
405

406
    my $sql = "DELETE FROM $table_name WHERE $constraint";
407 408 409 410 411 412
    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish();
}


413 414 415 416
sub remove {    # remove the object by primary_key
    my $self        = shift @_;
    my $object      = shift @_;

417 418 419
    # the object hasn't actually been stored yet / in this database
    return if(UNIVERSAL::can($object, 'adaptor') and (!$object->adaptor or $object->adaptor != $self));

420 421 422 423 424 425
    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );

    return $self->remove_all( $primary_key_constraint );
}


426 427 428 429 430 431 432 433 434 435
sub update {    # update (some or all) non_primary columns from the primary
    my $self    = shift @_;
    my $object  = shift @_;    # the rest in @_ should be the column names to be updated

    my $table_name              = $self->table_name();
    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );
    my $columns_to_update       = scalar(@_) ? \@_ : $self->updatable_column_list();
    my $values_to_update        = $self->slicer( $object, $columns_to_update );

    unless(@$columns_to_update) {
436
        throw("There are no dependent columns to update, as everything seems to belong to the primary key");
437 438
    }

439 440 441 442 443 444 445 446 447 448 449 450 451 452
    my @placeholders = ();
    my @values = ();
    foreach my $idx (0..scalar(@$columns_to_update)-1) {
        my ($column_name, $value) = ($columns_to_update->[$idx], $values_to_update->[$idx]);

        if($column_name =~ /^when_/ and defined($value) and $value eq 'CURRENT_TIMESTAMP') {
            push @placeholders, $column_name.'=CURRENT_TIMESTAMP';
        } else {
            push @placeholders, $column_name.'=?';
            push @values, $value;
        }
    }

    my $sql = "UPDATE $table_name SET ".join(', ', @placeholders)." WHERE $primary_key_constraint";
453
    # warn "SQL: $sql\n";
454
    my $sth = $self->prepare($sql);
455 456
    # warn "VALUES_TO_UPDATE: ".join(', ', map { "'$_'" } @values)."\n";
    $sth->execute( @values);
457

458 459 460
    $sth->finish();
}

461

462
sub store_or_update_one {
463
    my ($self, $object, $filter_columns) = @_;
464

465
    #use Data::Dumper;
466
    if(UNIVERSAL::can($object, 'adaptor') and $object->adaptor and $object->adaptor==$self) {  # looks like it has been previously stored
467
        if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) {
468
            $self->update( $object );
469 470 471
            #warn "store_or_update_one: updated [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
        } else {
            #warn "store_or_update_one: non-updatable [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
472
        }
473
    } elsif( my $present = $self->check_object_present_in_db_by_content( $object, $filter_columns ) ) {
474
        $self->mark_stored($object, $present);
475 476 477 478 479
        #warn "store_or_update_one: found [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."] in db by content of (".join(', ', @$filter_columns).")\n";
        if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) {
            #warn "store_or_update_one: updating the columns (".join(', ', @{ $self->updatable_column_list() }).")\n";
            $self->update( $object );
        }
480 481
    } else {
        $self->store( $object );
482
        #warn "store_or_update_one: stored [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
483 484 485
    }
}

486

487 488
sub check_object_present_in_db_by_content {    # return autoinc_id/undef if the table has autoinc_id or just 1/undef if not
    my ( $self, $object, $filter_columns ) = @_;
489 490 491 492 493

    my $table_name  = $self->table_name();
    my $column_set  = $self->column_set();
    my $autoinc_id  = $self->autoinc_id();

494 495 496 497 498 499 500 501 502
    if($filter_columns) {
            # make sure all fields exist in the database as columns:
        $filter_columns = [ map { $column_set->{$_} ? $_ : $_.'_id' } @$filter_columns ];
    } else {
            # we look for identical contents, so must skip the autoinc_id columns when fetching:
        $filter_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ];
    }
    my %filter_hash;
    @filter_hash{ @$filter_columns } = @{ $self->slicer( $object, $filter_columns ) };
503

504 505
    my @constraints = ();
    my @values = ();
506
    while(my ($column, $value) = each %filter_hash ) {
507 508 509 510 511 512 513
        if( defined($value) ) {
            push @constraints, "$column = ?";
            push @values, $value;
        } else {
            push @constraints, "$column IS NULL";
        }
    }
514

515 516 517
    my $sql = 'SELECT '.($autoinc_id or 1)." FROM $table_name WHERE ".  join(' AND ', @constraints);
    my $sth = $self->prepare( $sql );
    $sth->execute( @values );
518

519
    my ($return_value) = $sth->fetchrow_array();
520
#warn "check_object_present_in_db_by_content: sql= $sql WITH VALUES (".join(', ', @values).") ---> return_value=".($return_value//'undef')."\n";
521 522 523 524 525 526
    $sth->finish;

    return $return_value;
}


527 528 529 530 531 532 533 534 535
sub class_specific_execute {
    my ($self, $object, $sth, $values) = @_;

    my $return_code = $sth->execute( @$values );

    return $return_code;
}


536
sub store {
537
    my ($self, $object_or_list) = @_;
538 539 540 541

    my $objects = (ref($object_or_list) eq 'ARRAY')     # ensure we get an array of objects to store
        ? $object_or_list
        : [ $object_or_list ];
542
    return ([], 0) unless(scalar(@$objects));
543

544 545
    my $table_name              = $self->table_name();
    my $autoinc_id              = $self->autoinc_id();
546
    my $all_storable_columns    = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ];
547 548 549
    my $driver                  = $self->dbc->driver();
    my $insertion_method        = $self->insertion_method;  # INSERT, INSERT_IGNORE or REPLACE
    $insertion_method           =~ s/_/ /g;
550 551
    if($driver eq 'sqlite') {
        $insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
552 553 554
    } elsif($driver eq 'pgsql') {
        # Rules have been created to mimic the behaviour INSERT IGNORE / REPLACE
        # Here we can do fall-back to a standard INSERT
555
        $insertion_method = 'INSERT';
556
    }
557

558
    my %hashed_sth = ();  # do not prepare statements until there is a real need
559

560 561
    my $stored_this_time        = 0;

562
    foreach my $object (@$objects) {
563
            my ($columns_being_stored, $column_key) = $self->keys_to_columns($object);
564
            # warn "COLUMN_KEY='$column_key'\n";
565 566

            my $this_sth;
567

568 569 570 571
                # only prepare (once!) if we get here:
            unless($this_sth = $hashed_sth{$column_key}) {
                    # By using question marks we can insert true NULLs by setting corresponding values to undefs:
                my $sql = "$insertion_method INTO $table_name (".join(', ', @$columns_being_stored).') VALUES ('.join(',', (('?') x scalar(@$columns_being_stored))).')';
572
                # warn "STORE: $sql\n";
573
                $this_sth = $hashed_sth{$column_key} = $self->prepare( $sql ) or throw("Could not prepare statement: $sql");
574 575
            }

576
            # warn "STORED_COLUMNS: ".stringify($columns_being_stored)."\n";
577
            my $values_being_stored = $self->slicer( $object, $columns_being_stored );
578
            # warn "STORED_VALUES: ".stringify($values_being_stored)."\n";
579

580
            my $return_code = $self->class_specific_execute($object, $this_sth, $values_being_stored )
581
                    # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
582
                or throw("Could not store fields\n\t{$column_key}\nwith data:\n\t(".join(',', @$values_being_stored).')');
583

Leo Gordon's avatar
Leo Gordon committed
584
            if($return_code > 0) {     # <--- for the same reason we have to be explicitly numeric here
585
                # FIXME: does this work if the "MySQL server has gone away" ?
586 587
                my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id);
                $self->mark_stored($object, $liid );
588
                ++$stored_this_time;
589 590 591
            }
    }

592 593 594
    foreach my $sth (values %hashed_sth) {
        $sth->finish();
    }
595

596
    return ($object_or_list, $stored_this_time);
597 598 599
}


600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
sub _multi_column_filter {
    my ($self, $filter_string, $filter_values, $column_set) = @_;

        # NB: this filtering happens BEFORE any possible overflow via analysis_data, so will not be done on overflow_columns
    my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
    if($filter_components) {
        foreach my $column_name ( @$filter_components ) {
            unless($column_set->{$column_name}) {
                throw("unknown column '$column_name'");
            }
        }
    }

    my $filter_sql = $filter_components && join(' AND ', map { defined($filter_values->[$_]) ? "$filter_components->[$_]='$filter_values->[$_]'" : $filter_components->[$_].' IS NULL' } 0..scalar(@$filter_components)-1);

    return $filter_sql;
}


619 620 621 622 623
sub DESTROY { }   # to simplify AUTOLOAD

sub AUTOLOAD {
    our $AUTOLOAD;

624 625 626 627 628
    if($AUTOLOAD =~ /::fetch(_all)?(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?(?:_TO_(\w+?))?$/) {
        my $all             = $1;
        my $filter_string   = $2;
        my $key_string      = $3;
        my $value_column    = $4;
629 630 631 632

        my ($self) = @_;
        my $column_set = $self->column_set();

633
        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
634 635 636
        if($key_components) {
            foreach my $column_name ( @$key_components ) {
                unless($column_set->{$column_name}) {
637
                    throw("unknown column '$column_name'");
638
                }
639 640
            }
        }
641

642
        if($value_column && !$column_set->{$value_column}) {
643
            throw("unknown column '$value_column'");
644
        }
645

646
#        warn "Setting up '$AUTOLOAD' method\n";
647 648 649
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->fetch_all(
650
                $self->_multi_column_filter($filter_string, \@_, $column_set),
651 652 653 654 655
                !$all,
                $key_components,
                $value_column
            );
        };
656
        goto &$AUTOLOAD;    # restart the new method
657

658
    } elsif($AUTOLOAD =~ /::count_all(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?$/) {
659 660
        my $filter_string   = $1;
        my $key_string      = $2;
661 662 663 664

        my ($self) = @_;
        my $column_set = $self->column_set();

665 666 667 668
        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
        if($key_components) {
            foreach my $column_name ( @$key_components ) {
                unless($column_set->{$column_name}) {
669
                    throw("unknown column '$column_name'");
670
                }
671
            }
672
        }
673

674
#        warn "Setting up '$AUTOLOAD' method\n";
675 676 677
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->count_all(
678
                $self->_multi_column_filter($filter_string, \@_, $column_set),
679
                $key_components,
680 681 682 683
            );
        };
        goto &$AUTOLOAD;    # restart the new method

684
    } elsif($AUTOLOAD =~ /::remove_all_by_(\w+)$/) {
685
        my $filter_string   = $1;
686 687 688 689

        my ($self) = @_;
        my $column_set = $self->column_set();

690 691 692 693 694 695 696 697 698
#        warn "Setting up '$AUTOLOAD' method\n";
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->remove_all(
                $self->_multi_column_filter($filter_string, \@_, $column_set),
            );
        };
        goto &$AUTOLOAD;    # restart the new method

699
    } elsif($AUTOLOAD =~ /::update_(\w+)$/) {
700
        my @columns_to_update = split(/_AND_/i, $1);
701
#        warn "Setting up '$AUTOLOAD' method\n";
702 703 704
        *$AUTOLOAD = sub { my ($self, $object) = @_; return $self->update($object, @columns_to_update); };
        goto &$AUTOLOAD;    # restart the new method
    } else {
705
        warn "sub '$AUTOLOAD' not implemented";
706 707 708 709 710
    }
}

1;