Skip to content
Snippets Groups Projects
Commit 273ad63c authored by Yogmatee Roochun's avatar Yogmatee Roochun
Browse files

Updated FTP download cron job - bulk upload

parent c5c80615
No related branches found
No related tags found
3 merge requests!380Pipeline changes,!379Build step added for the master branch,!378k8s release
......@@ -5,8 +5,8 @@ const fs = require('fs')
const ftpUser = require('../utils/ftpUser')
const { errorDevEmail } = require('../email')
const { execSync } = require('child_process')
const ftpBulkUploader = config.get('ftp_bulk_upload')
const FtpAccount = require('../xpub-model/entities/ftpAccount/data-access')
const async = require('async')
if (!process.env.ENABLE_CRONJOB_FROMFTPBULKUPLOAD) {
logger.info(
......@@ -16,55 +16,72 @@ if (!process.env.ENABLE_CRONJOB_FROMFTPBULKUPLOAD) {
}
const BULK_UPLOAD_FILE_EXT = new RegExp(/\S+\.tar\.gz$/i)
const ftp = new Client()
// create local directories
const rootPathLocal = `${process.env.HOME}/${config.get('ftp_directory')}`
const cmd = `mkdir -p ${rootPathLocal}`
execSync(cmd)
;(async () => {
logger.info('Bulk Upload: downloading files from FTP')
const ftpBulkUploaders = await FtpAccount.selectBulkUploaders()
ftpBulkUploaders.forEach(async user => {
queue.push(user)
})
})()
const queue = async.queue(async user => {
logger.info(
`Bulk Upload: Initialising job for file download from FTP folder: ${user.username}`,
)
const ftp = new Client()
// create local directories
const rootPathLocal = `${process.env.HOME}/${config.get('ftp_directory')}/${
user.username
}`
const cmd = `mkdir -p ${rootPathLocal}`
execSync(cmd)
const beforeUpdate = Date.now()
try {
const val = await checkNewFtpPackage()
const val = await checkNewFtpPackage(ftp, user, rootPathLocal)
logger.info(
`Bulk Upload: ${val} file(s) downloaded to local FTP directory ${rootPathLocal}`,
`Bulk Upload: ${val} file(s) downloaded to local FTP directory ${rootPathLocal} in ${Date.now() -
beforeUpdate} ms`,
)
} catch (err) {
errorDevEmail('receiving from EBI FTP for Bulk Upload', err)
errorDevEmail('Bulk Upload Cron Job: ', err)
logger.error(err)
}
close(ftp)
logger.info(
`Bulk Upload: downloading files from FTP was finished in ${Date.now() -
beforeUpdate} ms`,
)
return true
})
queue.drain(() => {
logger.info('Cron job completed.')
process.exit(0)
})()
})
queue.error((err, task) => {
logger.error(`Error in task ${task}: ${err}`)
})
function close(ftp) {
ftp.end()
ftp = null
logger.info('connection terminated')
logger.info('Ftp connection terminated')
}
process
.on('uncaughtException', (err, origin) => {
logger.error('Uncaught Exception thrown: ', err)
logger.error('Exception thrown at: ', origin)
close(ftp)
process.exit(1)
})
.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection: ', reason)
logger.error('Rejected at promise: ', promise)
close(ftp)
process.exit(1)
})
async function checkNewFtpPackage() {
const rootPathFTP = `${config.get('bulk-upload-ftp')['receive-folder']}`
async function checkNewFtpPackage(ftp, ftpBulkUploader, rootPathLocal) {
const rootPathFTP = '/'
const { host } = config.get('bulk-upload-ftp')
const { username: user, password } = await ftpUser.getFTPAccount(
ftpBulkUploader.username,
......@@ -84,7 +101,7 @@ async function checkNewFtpPackage() {
if (!targzFiles.length) {
resolve(0) // No files to download to local FTP space
}
downloadFtpFiles(targzFiles, ftp, rootPathFTP, user).then(() =>
downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal).then(() =>
resolve(targzFiles.length),
)
})
......@@ -99,7 +116,7 @@ async function checkNewFtpPackage() {
})
}
function downloadFtpFiles(targzFiles, ftp, rootPathFTP, ftpUsername) {
function downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal) {
let counter = 0
return Promise.all(
targzFiles.map(
......@@ -126,7 +143,7 @@ function downloadFtpFiles(targzFiles, ftp, rootPathFTP, ftpUsername) {
const writeStream = stream.pipe(fs.createWriteStream(path))
writeStream.on('finish', () => {
logger.info(
`${ftpUsername}${remoteFilePath}: FTP file has been successfully downloaded to local directory ${path}`,
`Bulk Upload: FTP file has been successfully downloaded to local directory ${path}`,
)
resolve()
})
......
......@@ -27,6 +27,16 @@ class FtpAccount extends EpmcBaseModel {
logger.error(e)
}
}
static async selectAll() {
const result = await FtpAccount.query()
return result && result.length > 0 ? result[0] : null
}
static async selectBulkUploaders() {
const result = await FtpAccount.query().whereNot('name', 'beta_plus_tagger')
return result && result.length > 0 ? result : null
}
}
module.exports = FtpAccount
const lodash = require('lodash')
const FtpAccount = require('./data-access')
const empty = {}
const FtpAccountManager = {
findByFtpUsername: FtpAccount.findByFtpUsername,
new: () => lodash.cloneDeep(empty),
save: async ftpAccount => {
const ftpAccToSave = new FtpAccount(ftpAccount)
return ftpAccToSave.save()
},
findAll: FtpAccount.selectAll,
findByType: FtpAccount.selectBulkUploaders,
modelName: 'FtpAccount',
model: FtpAccount,
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment