Commit f48fbeb0 authored by Leo Gordon's avatar Leo Gordon
Browse files

Switching from DBI to DBConnection; data_dbc() as the main focus point;...

Switching from DBI to DBConnection; data_dbc() as the main focus point; standaloneJob.pl examples of basic building blocks
parent 41f97b37
# You may distribute this module under the same terms as perl itself #
=pod
......@@ -86,11 +85,12 @@ package Bio::EnsEMBL::Hive::Process;
use strict;
use warnings;
use DBI;
use Bio::EnsEMBL::Hive::Utils ('url2dbconn_hash');
use Bio::EnsEMBL::Registry;
use Bio::EnsEMBL::DBSQL::DBConnection;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception qw(throw);
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::Utils ('url2dbconn_hash');
#use Bio::EnsEMBL::Hive::AnalysisJob;
use base ('Bio::EnsEMBL::Utils::Exception'); # provide these methods for deriving classes
......@@ -219,15 +219,17 @@ sub DESTROY {
=cut
sub queen {
my $self = shift;
$self->{'_queen'} = shift if(@_);
return $self->{'_queen'};
my $self = shift;
$self->{'_queen'} = shift if(@_);
return $self->{'_queen'};
}
sub worker {
my $self = shift;
$self->{'_worker'} = shift if(@_);
return $self->{'_worker'};
my $self = shift;
$self->{'_worker'} = shift if(@_);
return $self->{'_worker'};
}
=head2 db
......@@ -240,11 +242,12 @@ sub worker {
=cut
sub db {
my $self = shift;
return undef unless($self->queen);
return $self->queen->db;
my $self = shift;
return $self->queen && $self->queen->db;
}
=head2 dbc
Title : dbc
......@@ -255,62 +258,66 @@ sub db {
=cut
sub dbc {
my $self = shift;
return undef unless($self->queen);
return $self->queen->dbc;
my $self = shift;
return $self->queen && $self->queen->dbc;
}
=head2 dbh
=head2 data_dbc
Title : dbh
Usage : my $dbh = $self->dbh;
Function: returns DBI handle to a database (the "current" one by default, but can be set up otherwise)
Returns : DBI handle
Title : data_dbc
Usage : my $data_dbc = $self->data_dbc;
Function: returns a Bio::EnsEMBL::DBSQL::DBConnection object (the "current" one by default, but can be set up otherwise)
Returns : Bio::EnsEMBL::DBSQL::DBConnection
=cut
sub dbh {
sub data_dbc {
my $self = shift;
if(@_ or !$self->{'_dbh'}) {
$self->{'_dbh'} = $self->go_figure_dbh( shift @_ || $self->param('db_conn') || $self->dbc );
if(@_ or !$self->{'_data_dbc'}) {
$self->{'_data_dbc'} = $self->go_figure_dbc( shift @_ || $self->param('db_conn') || $self->dbc );
}
return $self->{'_dbh'};
return $self->{'_data_dbc'};
}
sub go_figure_dbh {
sub go_figure_dbc {
my ($self, $foo) = @_;
if(UNIVERSAL::isa($foo, 'DBI::db')) { # it is already a DBI handle, just return it:
if(UNIVERSAL::isa($foo, 'Bio::EnsEMBL::DBSQL::DBConnection')) { # already a DBConnection, return it:
return $foo;
} elsif(UNIVERSAL::isa($foo, 'Bio::EnsEMBL::DBSQL::DBConnection')) { # an EnsEMBL DBConnection
return $foo->db_handle;
} elsif(UNIVERSAL::can($foo, 'dbc') and UNIVERSAL::isa($foo->dbc, 'Bio::EnsEMBL::DBSQL::DBConnection')) {
} elsif(UNIVERSAL::isa($foo, 'Bio::EnsEMBL::Hive::DBSQL::DBAdaptor')) { # a Hive adaptor
return $foo->dbc;
return $foo->dbc->db_handle;
} elsif(UNIVERSAL::can($foo, 'db') and UNIVERSAL::can($foo->db, 'dbc') and UNIVERSAL::isa($foo->db->dbc, 'Bio::EnsEMBL::DBSQL::DBConnection')) { # another data adaptor or Runnable:
} elsif(my $db_conn = (ref($foo) eq 'HASH') ? $foo : url2dbconn_hash( $foo ) ) { # either a hash or a URL
return $foo->db->dbc;
$db_conn->{-driver} ||= 'mysql';
} elsif(my $db_conn = (ref($foo) eq 'HASH') ? $foo : url2dbconn_hash( $foo ) ) { # either a hash or a URL that translates into a hash
return ($db_conn->{-driver} eq 'sqlite'
? DBI->connect("DBI:SQLite:$db_conn->{-dbname}", '', '', { RaiseError => 1 })
: DBI->connect("DBI:$db_conn->{-driver}:host=$db_conn->{-host}:port=$db_conn->{-port}:database=$db_conn->{-dbname}", $db_conn->{-user}, $db_conn->{-pass}, { RaiseError => 1 })
) or die "Couldn't connect to database: " . DBI->errstr;
return Bio::EnsEMBL::DBSQL::DBConnection->new( %$db_conn );
} else {
die "Sorry, could not figure out how to make a DBI handle out of $foo";
unless(ref($foo)) { # maybe it is simply a registry key?
my $dba;
eval {
$dba = Bio::EnsEMBL::Registry->get_DBAdaptor($foo, 'hive'); # We should not assume it is necessarily a Hive database. It would be sufficient just to get a DBConnection from it
};
if(UNIVERSAL::can($dba, 'dbc')) {
return $dba->dbc;
}
}
die "Sorry, could not figure out how to make a DBConnection object out of '$foo'";
}
}
=head2 analysis
Title : analysis
......
......@@ -7,13 +7,10 @@ Bio::EnsEMBL::Hive::RunnableDB::JobFactory
=head1 SYNOPSIS
This is a RunnableDB module that implements Bio::EnsEMBL::Hive::Process interface
and is ran by Workers during the execution of eHive pipelines.
It is not generally supposed to be instantiated and used outside of this framework.
Please refer to Bio::EnsEMBL::Hive::Process documentation to understand the basics of the RunnableDB interface.
Please refer to Bio::EnsEMBL::Hive::PipeConfig::* pipeline configuration files to understand how to configure pipelines.
standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::JobFactory \
--inputcmd 'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \
--input_id "{'meta_key'=>'module_name','meta_value'=>'#_0#'}" \
--flow_into "{ 2 => ['mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2912/lg4_compara_families_64/meta']}"
=head1 DESCRIPTION
......@@ -209,7 +206,7 @@ sub _get_rows_from_query {
warn qq{inputquery = "$inputquery"\n};
}
my @rows = ();
my $sth = $self->dbh()->prepare($inputquery);
my $sth = $self->data_dbc()->prepare($inputquery);
$sth->execute();
my @column_names_from_data = @{$sth->{NAME}}; # tear it off the original reference to gain some freedom
......
......@@ -7,13 +7,9 @@ Bio::EnsEMBL::Hive::RunnableDB::MySQLTransfer
=head1 SYNOPSIS
This is a RunnableDB module that implements Bio::EnsEMBL::Hive::Process interface
and is ran by Workers during the execution of eHive pipelines.
It is not generally supposed to be instantiated and used outside of this framework.
Please refer to Bio::EnsEMBL::Hive::Process documentation to understand the basics of the RunnableDB interface.
Please refer to Bio::EnsEMBL::Hive::PipeConfig::* pipeline configuration files to understand how to configure pipelines.
standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::MySQLTransfer --table meta_foo \
--src_db_conn mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2913/lg4_compara_homology_merged_64 \
--dest_db_conn mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2912/lg4_compara_families_64
=head1 DESCRIPTION
......@@ -31,7 +27,6 @@ Also, 'where' parameter allows to select subset of rows to be copied/merged over
package Bio::EnsEMBL::Hive::RunnableDB::MySQLTransfer;
use strict;
use DBI;
use base ('Bio::EnsEMBL::Hive::Process');
......@@ -55,8 +50,6 @@ use base ('Bio::EnsEMBL::Hive::Process');
sub fetch_input {
my $self = shift;
my ($src_dbh, $dest_dbh);
my $src_db_conn = $self->param('src_db_conn');
my $dest_db_conn = $self->param('dest_db_conn');
......@@ -69,26 +62,11 @@ sub fetch_input {
$table = $self->param('table', $self->param_substitute($table) );
# Use connection parameters to source database if supplied, otherwise use the current database as default:
#
my ($src_dbh, $src_mysql_conn) = $src_db_conn
? ( (DBI->connect("DBI:mysql:$src_db_conn->{-dbname}:$src_db_conn->{-host}:$src_db_conn->{-port}", $src_db_conn->{-user}, $src_db_conn->{-pass}, { RaiseError => 1 })
|| die "Couldn't connect to database: " . DBI->errstr) ,
$self->mysql_conn_from_hash($src_db_conn) )
: ($self->db->dbc->db_handle, $self->mysql_conn_from_this_dbc );
# Use connection parameters to destination database if supplied, otherwise use the current database as default:
#
my ($dest_dbh, $dest_mysql_conn) = $dest_db_conn
? ( (DBI->connect("DBI:mysql:$dest_db_conn->{-dbname}:$dest_db_conn->{-host}:$dest_db_conn->{-port}", $dest_db_conn->{-user}, $dest_db_conn->{-pass}, { RaiseError => 1 })
|| die "Couldn't connect to database: " . DBI->errstr) ,
$self->mysql_conn_from_hash($dest_db_conn) )
: ($self->db->dbc->db_handle, $self->mysql_conn_from_this_dbc );
$self->param('src_dbh', $src_dbh);
$self->param('dest_dbh', $dest_dbh);
$self->param('src_mysql_conn', $src_mysql_conn);
$self->param('dest_mysql_conn', $dest_mysql_conn);
my $src_dbc = $src_db_conn ? $self->go_figure_dbc( $src_db_conn ) : $self->db->dbc;
my $dest_dbc = $dest_db_conn ? $self->go_figure_dbc( $dest_db_conn ) : $self->db->dbc;
$self->param('src_dbc', $src_dbc);
$self->param('dest_dbc', $dest_dbc);
my $mode = $self->param('mode') || 'overwrite';
$self->param('mode', $self->param('mode'));
......@@ -98,10 +76,10 @@ sub fetch_input {
$where = $self->param( 'where', $self->param_substitute($where) );
}
$self->param('src_before', $self->get_row_count($src_dbh, $table, $where) );
$self->param('src_before', $self->get_row_count($src_dbc, $table, $where) );
if($mode ne 'overwrite') {
$self->param('dest_before_all', $self->get_row_count($dest_dbh, $table) );
$self->param('dest_before_all', $self->get_row_count($dest_dbc, $table) );
}
}
......@@ -115,22 +93,23 @@ sub fetch_input {
sub run {
my $self = shift;
my $filter_cmd = $self->param('filter_cmd');
my $src_mysql_conn = $self->param('src_mysql_conn');
my $dest_mysql_conn = $self->param('dest_mysql_conn');
my $src_dbc = $self->param('src_dbc');
my $dest_dbc = $self->param('dest_dbc');
my $mode = $self->param('mode') || 'overwrite';
my $table = $self->param('table');
my $where = $self->param('where');
my $filter_cmd = $self->param('filter_cmd');
my $mode = $self->param('mode') || 'overwrite';
my $table = $self->param('table');
my $where = $self->param('where');
my $cmd = 'mysqldump '
. { 'overwrite' => '', 'topup' => '--no-create-info ', 'insertignore' => '--no-create-info --insert-ignore ' }->{$mode}
. "$src_mysql_conn $table "
. $self->mysql_conn_from_dbc($src_dbc)
. " $table "
. (defined($where) ? "--where '$where' " : '')
. '| '
. ($filter_cmd ? "$filter_cmd | " : '')
. "mysql $dest_mysql_conn";
. 'mysql '
. $self->mysql_conn_from_dbc($dest_dbc);
if(my $return_value = system($cmd)) { # NB: unfortunately, this code won't catch many errors because of the pipe
$return_value >>= 8;
......@@ -148,16 +127,16 @@ sub run {
sub write_output {
my $self = shift;
my $mode = $self->param('mode');
my $table = $self->param('table');
my $where = $self->param('where');
my $dest_dbc = $self->param('dest_dbc');
my $dest_dbh = $self->param('dest_dbh');
my $mode = $self->param('mode');
my $table = $self->param('table');
my $where = $self->param('where');
my $src_before = $self->param('src_before');
my $src_before = $self->param('src_before');
if($mode eq 'overwrite') {
my $dest_after = $self->get_row_count($dest_dbh, $table, $where);
my $dest_after = $self->get_row_count($dest_dbc, $table, $where);
if($src_before == $dest_after) {
$self->warning("Successfully copied $src_before '$table' rows");
......@@ -166,7 +145,7 @@ sub write_output {
}
} else {
my $dest_row_increase = $self->get_row_count($dest_dbh, $table) - $self->param('dest_before_all');
my $dest_row_increase = $self->get_row_count($dest_dbc, $table) - $self->param('dest_before_all');
if($mode eq 'topup') {
if($src_before == $dest_row_increase) {
......@@ -183,11 +162,11 @@ sub write_output {
########################### private subroutines ####################################
sub get_row_count {
my ($self, $dbh, $table, $where) = @_;
my ($self, $dbc, $table, $where) = @_;
my $sql = "SELECT count(*) FROM $table" . (defined($where) ? " WHERE $where" : '');
my $sth = $dbh->prepare($sql);
my $sth = $dbc->prepare($sql);
$sth->execute();
my ($row_count) = $sth->fetchrow_array();
$sth->finish;
......@@ -195,16 +174,8 @@ sub get_row_count {
return $row_count;
}
sub mysql_conn_from_hash {
my ($self, $db_conn) = @_;
return "--host=$db_conn->{-host} --port=$db_conn->{-port} --user='$db_conn->{-user}' --pass='$db_conn->{-pass}' $db_conn->{-dbname}";
}
sub mysql_conn_from_this_dbc {
my ($self) = @_;
my $dbc = $self->db->dbc();
sub mysql_conn_from_dbc {
my ($self, $dbc) = @_;
return '--host='.$dbc->host.' --port='.$dbc->port." --user='".$dbc->username."' --pass='".$dbc->password."' ".$dbc->dbname;
}
......
......@@ -7,18 +7,17 @@ Bio::EnsEMBL::Hive::RunnableDB::SqlCmd
=head1 SYNOPSIS
This is a RunnableDB module that implements Bio::EnsEMBL::Hive::Process interface
and is ran by Workers during the execution of eHive pipelines.
It is not generally supposed to be instantiated and used outside of this framework.
standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::SqlCmd --db_conn mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2912/lg4_compara_families_64 \
--sql "INSERT INTO meta(meta_key,meta_value) VALUES ('Hello', 'world')"
Please refer to Bio::EnsEMBL::Hive::Process documentation to understand the basics of the RunnableDB interface.
Please refer to Bio::EnsEMBL::Hive::PipeConfig::* pipeline configuration files to understand how to configure pipelines.
standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::SqlCmd --db_conn mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2913/lg4_compara_homology_merged_64 \
--sql "[ 'CREATE TABLE meta_foo LIKE meta', 'INSERT INTO meta_foo SELECT * FROM meta' ]"
=head1 DESCRIPTION
This RunnableDB module acts as a wrapper for a (My)SQL command
run against either the current hive database (default) or against one specified by connection parameters.
This RunnableDB module acts as a wrapper for an SQL command
run against either the current hive database (default) or against one specified by 'db_conn' parameter
(--db_conn becomes obligatory in standalone mode, because there is no hive_db).
If you behave you may also use parameter substitution.
The SQL command(s) can be given using two different syntaxes:
......@@ -98,7 +97,7 @@ sub run {
my $self = shift;
my $sqls = $self->param('sqls');
my $dbh = $self->dbh();
my $data_dbc = $self->data_dbc();
my %output_id;
......@@ -112,10 +111,10 @@ sub run {
warn qq{sql = "$sql"\n};
}
$dbh->do( $sql ) or die "Could not run '$sql': ".$dbh->errstr;
$data_dbc->do( $sql ) or die "Could not run '$sql': ".$data_dbc->db_handle->errstr;
my $insert_id_name = '_insert_id_'.$counter++;
my $insert_id_value = $dbh->last_insert_id(undef, undef, undef, undef);
my $insert_id_value = $data_dbc->db_handle->last_insert_id(undef, undef, undef, undef);
$output_id{$insert_id_name} = $insert_id_value;
$self->param($insert_id_name, $insert_id_value); # for templates
......
......@@ -7,13 +7,7 @@ Bio::EnsEMBL::Hive::RunnableDB::SystemCmd
=head1 SYNOPSIS
This is a RunnableDB module that implements Bio::EnsEMBL::Hive::Process interface
and is ran by Workers during the execution of eHive pipelines.
It is not generally supposed to be instantiated and used outside of this framework.
Please refer to Bio::EnsEMBL::Hive::Process documentation to understand the basics of the RunnableDB interface.
Please refer to Bio::EnsEMBL::Hive::PipeConfig::* pipeline configuration files to understand how to configure pipelines.
standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::SystemCmd --cmd 'ls -1 ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB/*.pm >building_blocks.list'
=head1 DESCRIPTION
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment