From 802d3815c0a0809f84e9652c934cc47dceab04cf Mon Sep 17 00:00:00 2001
From: Audrey Hamelers <hamelers@ebi.ac.uk>
Date: Fri, 30 Jul 2021 11:14:20 +0000
Subject: [PATCH] Recursive buik download

---
 server/ftp-integration/api.js                |  35 ++-
 server/ftp-integration/fromFTP-BulkUpload.js | 108 ++++++--
 server/ftp-integration/recursiveDownload.js  | 246 +++++++++++++++++++
 server/utils/ebi-ftp.js                      |  21 +-
 4 files changed, 367 insertions(+), 43 deletions(-)
 create mode 100644 server/ftp-integration/recursiveDownload.js

diff --git a/server/ftp-integration/api.js b/server/ftp-integration/api.js
index 493e45ad6..bf0fe5d11 100644
--- a/server/ftp-integration/api.js
+++ b/server/ftp-integration/api.js
@@ -172,6 +172,7 @@ watcher.on('change', (path, stats) => {
 })
 
 const runProcess = async (packagePath, submitter) => {
+  const RootDirReg = new RegExp(`^${rootPath}${submitter.meta.ftpUserName}`)
   let packageName
   let tmpPath
   let manuscriptId
@@ -215,7 +216,9 @@ const runProcess = async (packagePath, submitter) => {
       tmpPath,
       manuscriptId,
     )
-    await updateFTP(updatedFilepath, submitter)
+
+    const ftpDirPath = path.dirname(suffixedFilePath.replace(RootDirReg, ''))
+    await updateFTP(updatedFilepath, ftpDirPath, submitter)
   } catch (error) {
     if (submitter) {
       bulkUploaderEmail(submitter.email, error, packageName)
@@ -231,21 +234,27 @@ const runProcess = async (packagePath, submitter) => {
       manuscriptId,
       true,
     )
-    await updateFTP(updatedFilepath, submitter)
+    const ftpDirPath = path.dirname(suffixedFilePath.replace(RootDirReg, ''))
+    await updateFTP(updatedFilepath, ftpDirPath, submitter)
     return true
   }
 }
 
-const validateCompressedFile = (source, submitter) =>
+const validateCompressedFile = (filePath, submitter) =>
   new Promise((resolve, reject) => {
-    exec(`gunzip -t ${source}`, async err => {
+    exec(`gunzip -t ${filePath}`, async err => {
       if (err) {
         logger.error(
-          `FTP bulk upload file: ${source} is corrupted and cannot be processed.`,
+          `FTP bulk upload file: ${filePath} is corrupted and cannot be processed.`,
         )
-        const updatedFilepath = await tidyUp(source, '', '', true)
-        logger.info(`File ${source} has been moved to ${updatedFilepath}`)
-        await updateFTP(updatedFilepath, submitter)
+        const updatedFilepath = await tidyUp(filePath, '', '', true)
+        logger.info(`File ${filePath} has been moved to ${updatedFilepath}`)
+
+        const RootDirReg = new RegExp(
+          `^${rootPath}${submitter.meta.ftpUserName}`,
+        )
+        const ftpDirPath = path.dirname(filePath.replace(RootDirReg, ''))
+        await updateFTP(updatedFilepath, ftpDirPath, submitter)
         reject(err)
       }
       resolve(true)
@@ -261,7 +270,12 @@ const extractFiles = (source, dest, submitter) =>
         logger.error(`Error extracting FTP bulk upload file: ${source}`)
         const updatedFilepath = await tidyUp(source, dest, '', true)
         logger.info(`File ${source} has been moved to ${updatedFilepath}`)
-        await updateFTP(updatedFilepath, submitter)
+
+        const RootDirReg = new RegExp(
+          `^${rootPath}${submitter.meta.ftpUserName}`,
+        )
+        const ftpDirPath = path.dirname(source.replace(RootDirReg, ''))
+        await updateFTP(updatedFilepath, ftpDirPath, submitter)
         reject(
           new Error(`Error extracting FTP bulk upload file: ${source}\n${err}`),
         )
@@ -875,7 +889,7 @@ const validateEmail = email => {
   return re.test(email)
 }
 
-const updateFTP = async (updatedFilepath, submitter) => {
+const updateFTP = async (updatedFilepath, ftpDirPath, submitter) => {
   try {
     const [datedFolder, processedFilename] = await fileUtils.getDirAndFilename(
       updatedFilepath,
@@ -886,6 +900,7 @@ const updateFTP = async (updatedFilepath, submitter) => {
       submitter ? submitter.meta.ftpUserName : null,
       datedFolder,
       processedFilename,
+      ftpDirPath,
     )
   } catch (err) {
     logger.error('Error with updating FTP', err)
diff --git a/server/ftp-integration/fromFTP-BulkUpload.js b/server/ftp-integration/fromFTP-BulkUpload.js
index 536059c9d..337e2b154 100644
--- a/server/ftp-integration/fromFTP-BulkUpload.js
+++ b/server/ftp-integration/fromFTP-BulkUpload.js
@@ -7,13 +7,14 @@ const ftpUser = require('../utils/ftpUser')
 const { errorDevEmail } = require('../email')
 const { checkJobStatus, uncaughtError } = require('../job-runner')
 const { FtpAccountManager } = require('../xpub-model')
+const path = require('path')
 
-if (!process.env.ENABLE_CRONJOB_FROMFTPBULKUPLOAD) {
+/* if (!process.env.ENABLE_CRONJOB_FROMFTPBULKUPLOAD) {
   logger.info(
     'ENABLE_CRONJOB_FROMFTPBULKUPLOAD not defined. fromFtp-BulkUpload cronjob exits.',
   )
   process.exit(0)
-}
+} */
 
 const BULK_UPLOAD_FILE_EXT = new RegExp(/\S+\.tar\.gz$/i)
 
@@ -112,27 +113,18 @@ async function checkNewFtpPackage(ftp, ftpBulkUploader, rootPathLocal) {
     ftpBulkUploader.username,
   )
   return new Promise((resolve, reject) => {
-    ftp.on('ready', () => {
-      ftp.list(rootPathFTP, false, async (err, list) => {
-        if (err) {
-          close(ftp)
-          reject(err)
-        }
-        // get array of tar.gz files only
-        const targzFiles = list.filter(
-          file => BULK_UPLOAD_FILE_EXT.test(file.name) && file.type === '-',
-        )
+    ftp.on('ready', async () => {
+      try {
+        const targzFiles = await readDirRecursive(ftp, '')
         if (!targzFiles.length) {
           resolve(0) // No files to download to local FTP space
         }
-
-        try {
-          await downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal)
-          resolve(targzFiles.length)
-        } catch (err) {
-          reject(err.message)
-        }
-      })
+        await downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal)
+        resolve(targzFiles.length)
+      } catch (err) {
+        logger.error('Rejection with Bulk FTP file download', err.message)
+        reject(err.message)
+      }
     })
     ftp.on('error', () =>
       reject(new Error('Unable to connect to Bulk Upload FTP')),
@@ -144,10 +136,10 @@ async function checkNewFtpPackage(ftp, ftpBulkUploader, rootPathLocal) {
   })
 }
 
-function downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal) {
-  return targzFiles.reduce(async (promise, file) => {
+const downloadFtpFiles = (targzFiles, ftp, rootPathFTP, rootPathLocal) =>
+  targzFiles.reduce(async (promise, file) => {
     await promise
-    const remoteFilePath = `${rootPathFTP}/${file.name}`
+    const remoteFilePath = file
     return new Promise((resolve, reject) => {
       ftp.get(remoteFilePath, (err, stream) => {
         if (err) {
@@ -157,11 +149,21 @@ function downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal) {
           logger.error(`${remoteFilePath}: No FTP stream`)
           reject(err.message)
         } else {
-          const path = `${rootPathLocal}/${file.name}`
-          const writeStream = stream.pipe(fs.createWriteStream(path))
+          const pathName = path.dirname(file)
+          const fileName = path.basename(file)
+          const filePathLocal = `${rootPathLocal}${pathName}`
+
+          // create subfolders on MSS server
+          const cmd = `mkdir -p ${filePathLocal}`
+          execSync(cmd)
+
+          const downloadFilePath = `${filePathLocal}/${fileName}`
+          const writeStream = stream.pipe(
+            fs.createWriteStream(downloadFilePath),
+          )
           writeStream.on('finish', () => {
             logger.info(
-              `Bulk Upload: FTP file has been successfully downloaded to local directory ${path}`,
+              `Bulk Upload: FTP file has been successfully downloaded to local directory ${downloadFilePath}`,
             )
             resolve()
           })
@@ -169,4 +171,58 @@ function downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal) {
       })
     })
   }, Promise.resolve())
+
+const readDirRecursive = (ftp, startDir) => {
+  const readDirQueue = []
+  const fileList = []
+
+  const readDir = dir => {
+    const getDirList = dirName =>
+      new Promise((resolve, reject) => {
+        ftp.list(dirName, false, async (err, dirList) => {
+          if (err) {
+            close(ftp)
+            reject(err)
+          }
+
+          // filter out directories which are 'dated folders'
+          const datedRegex = new RegExp(/\d{4}-\d{2}-\d{2}$/i)
+          const filteredList = dirList
+            .filter(ele => !(datedRegex.test(ele.name) && ele.type === 'd'))
+            .map(ele => ({
+              objPath: `${dirName}/${ele.name}`,
+              objType: ele.type,
+            }))
+          resolve(filteredList)
+        })
+      })
+
+    // Files are added to accumulating fileList array.
+    // Directories are added to readDirQueue.
+    const getPackages = dirList => {
+      dirList.forEach(ele => {
+        if (ele.objType === 'd') {
+          readDirQueue.push(ele.objPath)
+        }
+        if (
+          BULK_UPLOAD_FILE_EXT.test(ele.objPath.split('/').pop()) &&
+          ele.objType === '-'
+        ) {
+          // push tar.gz files
+          fileList.push(ele.objPath)
+        }
+      })
+
+      if (readDirQueue.length > 0) {
+        return readDir(readDirQueue.shift())
+      }
+
+      return fileList
+    }
+
+    return getDirList(dir).then(getPackages)
+  }
+
+  // start reading at the top
+  return readDir(startDir)
 }
diff --git a/server/ftp-integration/recursiveDownload.js b/server/ftp-integration/recursiveDownload.js
new file mode 100644
index 000000000..242ce9021
--- /dev/null
+++ b/server/ftp-integration/recursiveDownload.js
@@ -0,0 +1,246 @@
+const Client = require('ftp')
+const logger = require('@pubsweet/logger')
+const config = require('config')
+const fs = require('fs')
+const ftpUser = require('../utils/ftpUser')
+const { errorDevEmail } = require('../email')
+const { execSync } = require('child_process')
+const FtpAccount = require('../xpub-model/entities/ftpAccount/data-access')
+const async = require('async')
+
+const pathModule = require('path')
+
+/* eslint-disable */
+
+/*
+if (!process.env.ENABLE_CRONJOB_FROMFTPBULKUPLOAD) {
+  logger.info(
+    'ENABLE_CRONJOB_FROMFTP-BULKUPLOAD not defined. fromFtp-BulkUpload cronjob exits.',
+  )
+  process.exit(0)
+}
+*/
+
+const BULK_UPLOAD_FILE_EXT = new RegExp(/\S+\.tar\.gz$/i)
+
+;(async () => {
+  const ftpBulkUploaders = await FtpAccount.selectBulkUploaders()
+
+  // console.log(ftpBulkUploaders)
+
+  /*ftpBulkUploaders.forEach(async user => {
+    queue.push(user)
+  })*/
+
+  const bulkUploader = {
+    name: 'beta_plus_upload',
+    description: 'test beta_plus_upload',
+    username: 'beta_plus_upload',
+    password: 'GSFj5KC8',
+  }
+
+  queue.push(bulkUploader)
+})()
+
+const queue = async.queue(async user => {
+  logger.info(
+    `Bulk Upload: Initialising job for file download from FTP folder: ${user.username}`,
+  )
+  const ftp = new Client()
+
+  const rootFolderLocal = `${process.env.HOME}/${config.get('ftp_directory')}`
+
+  fs.access(rootFolderLocal, error => {
+    if (error) {
+      logger.error(`${rootFolderLocal} directory does not exist. ${error}`)
+      process.exit(0)
+    }
+  })
+
+  // create local directories
+  const rootPathLocal = `${rootFolderLocal}/${user.username}`
+  const cmd = `mkdir -p ${rootPathLocal}`
+  execSync(cmd)
+
+  const beforeUpdate = Date.now()
+
+  try {
+    const val = await checkNewFtpPackage(ftp, user, rootPathLocal)
+    logger.info(
+      `Bulk Upload: ${val} file(s) downloaded to local directory ${rootPathLocal} in ${Date.now() -
+        beforeUpdate} ms`,
+    )
+  } catch (err) {
+    errorDevEmail('Bulk Upload Cron Job: ', err)
+    logger.error(err)
+  }
+  close(ftp)
+  return true
+})
+
+queue.drain(() => {
+  logger.info('Cron job completed.')
+  process.exit(0)
+})
+
+queue.error((err, task) => {
+  logger.error(`Error in task ${task}: ${err}`)
+})
+
+function close(ftp) {
+  ftp.end()
+  ftp = null
+  logger.info('Ftp connection terminated')
+}
+
+process
+  .on('uncaughtException', (err, origin) => {
+    logger.error('Uncaught Exception thrown: ', err)
+    logger.error('Exception thrown at: ', origin)
+    process.exit(1)
+  })
+  .on('unhandledRejection', (reason, promise) => {
+    logger.error('Unhandled Rejection: ', reason)
+    logger.error('Rejected at promise: ', promise)
+    process.exit(1)
+  })
+
+async function checkNewFtpPackage(ftp, ftpBulkUploader, rootPathLocal) {
+  const rootPathFTP = '/'
+  const { host } = config.get('bulk-upload-ftp')
+  const { username: user, password } = await ftpUser.getFTPAccount(
+    ftpBulkUploader.username,
+  )
+
+  return new Promise((resolve, reject) => {
+    ftp.on('ready', async () => {
+      try {
+        const targzFiles = await readDirRecursive(ftp, '')
+        if (!targzFiles.length) {
+          resolve(0) // No files to download to local FTP space
+        }
+        await downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal)
+        resolve(targzFiles.length)
+      } catch (err) {
+        logger.error('Rejection with Bulk FTP file download', err)
+        reject(err)
+      }
+    })
+    ftp.on('error', () =>
+      reject(new Error('Unable to connect to Bulk Upload FTP')),
+    )
+    const keepalive = 10000
+    const connTimeout = 60000
+    const pasvTimeout = 60000
+    ftp.connect({ host, user, password, keepalive, connTimeout, pasvTimeout })
+  })
+}
+
+function downloadFtpFiles(targzFiles, ftp, rootPathFTP, rootPathLocal) {
+  let counter = 0
+  return Promise.all(
+    targzFiles.map(
+      (file, index, array) =>
+        new Promise((resolve, reject) => {
+          const remoteFilePath = file //`${rootPathFTP}/${file.name}`
+          ftp.get(remoteFilePath, (err, stream) => {
+            if (err) {
+              logger.error(remoteFilePath)
+              logger.error(err.message)
+              counter += 1
+              if (counter === array.length) {
+                reject(err.message)
+              }
+            } else if (!stream) {
+              logger.error(remoteFilePath)
+              logger.error(`No FTP get stream`)
+              counter += 1
+              if (counter === array.length) {
+                reject(err.message)
+              }
+            } else {
+              const pathName = pathModule.dirname(file)
+              const fileName = pathModule.basename(file)
+              const filePathLocal = `${rootPathLocal}${pathName}`
+
+              console.log(file, '---', pathName, '---', filePathLocal)
+
+              const cmd = `mkdir -p ${filePathLocal}`
+              execSync(cmd)
+
+              const downloadFilePath = `${filePathLocal}/${fileName}`
+              const writeStream = stream.pipe(
+                fs.createWriteStream(downloadFilePath),
+              )
+              writeStream.on('finish', () => {
+                logger.info(
+                  `Bulk Upload: FTP file has been successfully downloaded to local directory ${downloadFilePath}`,
+                )
+                resolve()
+              })
+            }
+          })
+        }),
+    ),
+  )
+}
+
+function readDirRecursive(ftp, startDir) {
+  const readDirQueue = []
+  const fileList = []
+
+  //readDir() is called and passed starting directory to read from.
+  const readDir = dir => {
+    const getDirList = readDir => {
+      return new Promise((resolve, reject) => {
+        ftp.list(readDir, false, async (err, itemList) => {
+          if (err) {
+            close(ftp)
+            reject(err)
+          }
+
+          // filter out directories which are 'dated folders'
+          const datedRegex = new RegExp(/\d{4}-\d{2}-\d{2}$/i)
+          const filteredList = itemList
+            .filter(item => !(datedRegex.test(item.name) && item.type === 'd'))
+            .map(item => {
+              return { objPath: `${readDir}/${item.name}`, objType: item.type }
+            })
+          resolve(filteredList)
+        })
+      })
+    }
+
+    // Files are added to accumulating fileList array.
+    // Directories are added to readDirQueue.
+    // If directories exist on readDirQueue, next directory is shifted off to another readDir() call and returned to parent Promise.
+    const processDirList = dirList => {
+      for (const { objPath, objType } of dirList) {
+        // if directory add to queue
+        if (objType === 'd') {
+          readDirQueue.push(objPath)
+          // continue
+        } else if (
+          BULK_UPLOAD_FILE_EXT.test(objPath.split('/').pop()) &&
+          objType === '-'
+        ) {
+          // push tar.gz files
+          fileList.push(objPath)
+        }
+      }
+
+      // if queue, process next item recursively
+      if (readDirQueue.length > 0) {
+        return readDir(readDirQueue.shift())
+      }
+
+      // finished - return file list
+      return fileList
+    }
+    // read FS objects list from directory
+    return getDirList(dir).then(processDirList)
+  }
+
+  // start reading at the top
+  return readDir(startDir)
+}
diff --git a/server/utils/ebi-ftp.js b/server/utils/ebi-ftp.js
index dc3cb2ad1..0be652461 100644
--- a/server/utils/ebi-ftp.js
+++ b/server/utils/ebi-ftp.js
@@ -114,6 +114,7 @@ module.exports.renameBulkFtpPackage = async function renameBulkFtpPackage(
   username,
   datedFolder,
   processedFilename,
+  ftpDirPath,
 ) {
   try {
     const ftp = new Client()
@@ -129,7 +130,7 @@ module.exports.renameBulkFtpPackage = async function renameBulkFtpPackage(
 
     return new Promise((resolve, reject) => {
       ftp.on('ready', () => {
-        ftp.list('/', false, async (err, list) => {
+        ftp.list(ftpDirPath, false, async (err, list) => {
           if (err) {
             close(ftp)
             reject(err)
@@ -142,22 +143,28 @@ module.exports.renameBulkFtpPackage = async function renameBulkFtpPackage(
           )[0]
           if (!ftpFile) {
             close(ftp)
-            logger.error(`${ftpPackageName}: FTP file does not exist.`)
-            reject(new Error(`${ftpPackageName}: FTP file does not exist.`))
+            logger.error(
+              `${username}/${ftpPackageName}: FTP file does not exist.`,
+            )
+            reject(
+              new Error(
+                `${username}/${ftpPackageName}: FTP file does not exist.`,
+              ),
+            )
           }
           const newFtpPath = `${datedFolder}/${processedFilename}`
 
           // processed tar.gz file moved to appropriate folder
-          ftp.rename(ftpPackageName, newFtpPath, err => {
+          ftp.rename(`${ftpDirPath}/${ftpPackageName}`, newFtpPath, err => {
             if (err) {
               logger.error(
-                `${ftpPackageName}: FTP file could not be moved to ${newFtpPath}`,
+                `${ftpPackageName}: FTP file could not be moved to ${username}/${newFtpPath}`,
               )
               close(ftp)
               reject(err)
             }
             logger.info(
-              `${ftpPackageName}: FTP file has been moved to ${newFtpPath}`,
+              `${ftpPackageName}: FTP file has been moved to ${username}/${newFtpPath}`,
             )
             close(ftp)
             resolve(true)
@@ -174,7 +181,7 @@ module.exports.renameBulkFtpPackage = async function renameBulkFtpPackage(
     })
   } catch (e) {
     logger.error(
-      `${processedFilename}: Unable to rename bulk upload package on FTP. \n ${e}`,
+      `${username}/${processedFilename}: Unable to rename bulk upload package on FTP. \n ${e}`,
     )
   }
 }
-- 
GitLab