Source code for camparee.kallisto

import os
import argparse
import subprocess

from camparee.abstract_camparee_step import AbstractCampareeStep
from camparee.camparee_utils import CampareeException
from camparee.camparee_constants import CAMPAREE_CONSTANTS
from beers_utils.sample import Sample

# TODO: Add support for additional command line arguments to pass to kallisto commands.

[docs]class KallistoIndexStep(AbstractCampareeStep): """Wrapper around generating a kallisto transcriptome index. """ KALLISTO_INDEX_DIR_PATTERN = CAMPAREE_CONSTANTS.KALLISTO_INDEX_DIR_PATTERN KALLISTO_INDEX_FILENAME_PATTERN = CAMPAREE_CONSTANTS.KALLISTO_INDEX_FILENAME_PATTERN KALLISTO_INDEX_LOG_FILENAME_PATTERN = CAMPAREE_CONSTANTS.KALLISTO_INDEX_LOG_FILENAME_PATTERN #The basic kallisto command used to generate transcriptome indexes. BASE_KALLISTO_INDEX_COMMAND = ('{kallisto_bin_path} index' ' --index={kallisto_index_file}' ' {transcriptome_fasta}') def __init__(self, log_directory_path, data_directory_path, parameters=None): """Constructor for KallistoIndexStep object. Parameters ---------- data_directory_path: string Full path to data directory log_directory_path : string Full path to log directory. parameters : dict Dictionary of other parameters specified by the config file. This parameter is not used by this class and is retained for uniformity with all other CAMPAREE steps. """ self.data_directory_path = data_directory_path self.log_directory_path = log_directory_path
[docs] def validate(self): return True
[docs] def execute(self, sample_id, genome_suffix, kallisto_bin_path, transcriptome_fasta_path): """Build kallisto index from the given FASTA file of transcripts. Parameters ---------- sample_id : string Identifier for sample corresponding to reference transcriptome. Used to construct index and log paths for this specific kallisto execution. genome_suffix : string Suffix to identify the parent/allele of the transcriptome. Should be 1 or 2. This same suffix is a appended to all output files/directories. kallisto_bin_path : string Path to the kallisto exectuable binary. transcriptome_fasta_path : string Path to the FASTA file of transcripts, used as the basis for the kallisto index. This is generally prepared by the TranscriptomeFastaPreparationStep. """ kallisto_index_dir_path = os.path.join(self.data_directory_path, f'sample{sample_id}', KallistoIndexStep.KALLISTO_INDEX_DIR_PATTERN.format(genome_name=genome_suffix)) kallisto_index_file_path = os.path.join(kallisto_index_dir_path, KallistoIndexStep.KALLISTO_INDEX_FILENAME_PATTERN.format(genome_name=genome_suffix)) log_file_path = os.path.join(self.log_directory_path, f'sample{sample_id}', KallistoIndexStep.KALLISTO_INDEX_LOG_FILENAME_PATTERN.format(genome_name=genome_suffix)) with open(log_file_path, 'w') as log_file: print(f"Building kallisto indexes for transcriptome {genome_suffix} " f"of sample{sample_id}.") log_file.write(f"Building kallisto indexes for transcriptome {genome_suffix} " f"of sample{sample_id}.\n") log_file.write(f"Parameters:\n" f" kallisto binary path: {kallisto_bin_path}\n" f" kallisto index directory: {kallisto_index_dir_path}\n" f" kallisto index file: {kallisto_index_file_path}\n" f" input transcriptome FASTA: {transcriptome_fasta_path}\n") log_file.write("Create kallisto index directory.\n") if os.path.isdir(kallisto_index_dir_path): log_file.write("kallisto index directory already exists.\n") else: os.mkdir(kallisto_index_dir_path) kallisto_command = KallistoIndexStep.BASE_KALLISTO_INDEX_COMMAND.format(kallisto_bin_path=kallisto_bin_path, kallisto_index_file=kallisto_index_file_path, transcriptome_fasta=transcriptome_fasta_path) print(f"Running kallisto with command: {kallisto_command}") print(f"For full kallisto index output see {log_file_path}") log_file.write(f"Running kallisto with command: {kallisto_command}.\n\n") log_file.write("kallisto index output follows:\n") try: kallisto_result = subprocess.run(kallisto_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Redirect stderr to stdout. encoding="ascii") except subprocess.CalledProcessError as kallisto_index_exception: log_file.write("\n*****ERROR: kallist index command failed:\n") log_file.write(f"\tExit code: {kallisto_index_exception.returncode}\n") log_file.write("\n*****STDOUT:\n") log_file.write(f"{kallisto_index_exception.stdout}\n") log_file.write("\n*****STDERR:\n") log_file.write(f"{kallisto_index_exception.stderr}\n") raise CampareeException(f"\nkallisto index process failed. " f"For full details see {log_file_path}\n") print("Finished generating kallisto index.\n") log_file.write(f"{kallisto_result.stdout}\n") log_file.write("Finished generating kallisto index.\n") log_file.write("ALL DONE!\n")
[docs] def get_commandline_call(self, sample_id, genome_suffix, kallisto_bin_path, transcriptome_fasta_path): """ Prepare command to execute the KallistoIndexStep from the command line, given all of the arugments used to run the execute() function. Parameters ---------- sample_id : string Identifier for sample corresponding to reference transcriptome. Used to construct index and log paths for this specific kallisto execution. genome_suffix : string Suffix to identify the parent/allele of the transcriptome. Should be 1 or 2. This same suffix is a appended to all output files/directories. kallisto_bin_path : string Path to the kallisto binary. transcriptome_fasta_path : string Path to the FASTA file of transcripts, used as the basis for the kallisto index. This is generally prepared by the TranscriptomeFastaPreparationStep. Returns ------- string Command to execute on the command line. It will perform the same operations as a call to execute() with the same parameters. """ #Retrieve path to the kallisto.py script. kallisto_step_path = os.path.realpath(__file__) #If the above command returns a string with a "pyc" extension, instead #of "py", strip off "c" so it points to this script. kallisto_step_path = kallisto_step_path.rstrip('c') command = (f" python {kallisto_step_path} index" f" --log_directory_path {self.log_directory_path}" f" --data_directory_path {self.data_directory_path}" f" --sample_id {sample_id}" f" --genome_suffix {genome_suffix}" f" --kallisto_bin_path {kallisto_bin_path}" f" --transcriptome_fasta_file_path {transcriptome_fasta_path}") return command
[docs] def get_validation_attributes(self, sample_id, genome_suffix, kallisto_bin_path, transcriptome_fasta_path): """ Prepare attributes required by is_output_valid() function to validate output generated by the KallistoIndexStep job. Parameters ---------- sample_id : string Identifier for sample corresponding to reference transcriptome. Used to construct index and log paths for this specific kallisto execution. genome_suffix : string Suffix to identify the parent/allele of the transcriptome. Should be 1 or 2. This same suffix is a appended to all output files/directories. kallisto_bin_path : string Path to the kallisto binary. [Note: this parameter is captured just so get_validation_attributes() accepts the same arguments as get_commandline_call(). It is not used here.] transcriptome_fasta_path : string Path to the FASTA file of transcripts, used as the basis for the kallisto index. This is generally prepared by the TranscriptomeFastaPreparationStep. [Note: this parameter is captured just so get_validation_attributes() accepts the same arguments as get_commandline_call(). It is not used here.] Returns ------- dict A KallistoIndexStep job's data_directory, log_directory, corresponding sample ID, and genome_suffix. """ validation_attributes = {} validation_attributes['data_directory'] = self.data_directory_path validation_attributes['log_directory'] = self.log_directory_path validation_attributes['sample_id'] = sample_id validation_attributes['genome_suffix'] = genome_suffix return validation_attributes
[docs] @staticmethod def is_output_valid(validation_attributes): """ Check if output of KallistoIndexStep for a specific job/execution is correctly formed and valid, given a job's data directory, log directory, sample ID, and genome suffix. Prepare these attributes for a given job using the get_validation_attributes() method. Parameters ---------- validation_attributes : dict A job's data_directory, log_directory, corresponding sample_id, and genome_suffix used when creating the kallisto index. Returns ------- boolean True - KallistoIndexStep output files were created and are well formed. False - KallistoIndexStep output files do not exist or are missing data. """ data_directory_path = validation_attributes['data_directory'] log_directory_path = validation_attributes['log_directory'] sample_id = validation_attributes['sample_id'] genome_suffix = validation_attributes['genome_suffix'] valid_output = False # Construct output filenames/paths kallisto_index_file_path = os.path.join(data_directory_path, f'sample{sample_id}', KallistoIndexStep.KALLISTO_INDEX_DIR_PATTERN.format(genome_name=genome_suffix), KallistoIndexStep.KALLISTO_INDEX_FILENAME_PATTERN.format(genome_name=genome_suffix)) log_file_path = os.path.join(log_directory_path, f'sample{sample_id}', KallistoIndexStep.KALLISTO_INDEX_LOG_FILENAME_PATTERN.format(genome_name=genome_suffix)) if os.path.isfile(kallisto_index_file_path) and \ os.path.isfile(log_file_path): #Read last line in log file line = "" with open(log_file_path, "r") as log_file: for line in log_file: line = line.rstrip() if line == "ALL DONE!": valid_output = True return valid_output
[docs] @staticmethod def main(cmd_args): """ Entry point into class. Used when script is executed/submitted via the command line with the 'index' subcommand. """ kallisto_index = KallistoIndexStep(log_directory_path=cmd_args.log_directory_path, data_directory_path=cmd_args.data_directory_path) kallisto_index.execute(sample_id=cmd_args.sample_id, genome_suffix=cmd_args.genome_suffix, kallisto_bin_path=cmd_args.kallisto_bin_path, transcriptome_fasta_path=cmd_args.transcriptome_fasta_file_path)
[docs]class KallistoQuantStep(AbstractCampareeStep): """Wrapper around quantifying transript-level counts with kallisto. """ KALLISTO_QUANT_DIR_PATTERN = CAMPAREE_CONSTANTS.KALLISTO_QUANT_DIR_PATTERN KALLISTO_ABUNDANCE_FILENAME = CAMPAREE_CONSTANTS.KALLISTO_ABUNDANCE_FILENAME KALLISTO_QUANT_LOG_FILENAME_PATTERN = CAMPAREE_CONSTANTS.KALLISTO_QUANT_LOG_FILENAME_PATTERN # The basic kallisto command used for transcript-level quants. BASE_KALLISTO_QUANT_COMMAND = ('{kallisto_bin_path} quant' ' --index={kallisto_index_file}' ' --output-dir={kallisto_output_dir}' ' {read_files}') def __init__(self, log_directory_path, data_directory_path, parameters=None): """Constructor for KallistoQuantStep object. Parameters ---------- data_directory_path: string Full path to data directory log_directory_path : string Full path to log directory. parameters : dict Dictionary of other parameters specified by the config file. This parameter is not used by this class and is retained for uniformity with all other CAMPAREE steps. """ self.data_directory_path = data_directory_path self.log_directory_path = log_directory_path
[docs] def validate(self): return True
[docs] def execute(self, sample, genome_suffix, kallisto_bin_path): """Use kallisto to generate transcript-level quantifications from fastq files for a given sample. Parameters ---------- sample : Sample Sample containing paths for FASTQ files for quantification. genome_suffix : string Suffix to identify the parent/allele of the transcriptome. Should be 1 or 2. This same suffix is a appended to all output files/directories. kallisto_bin_path : string Path to the kallisto exectuable binary. """ kallisto_index_file_path = os.path.join(self.data_directory_path, f'sample{sample.sample_id}', KallistoIndexStep.KALLISTO_INDEX_DIR_PATTERN.format(genome_name=genome_suffix), KallistoIndexStep.KALLISTO_INDEX_FILENAME_PATTERN.format(genome_name=genome_suffix)) kallisto_output_path = os.path.join(self.data_directory_path, f'sample{sample.sample_id}', KallistoQuantStep.KALLISTO_QUANT_DIR_PATTERN.format(genome_name=genome_suffix)) log_file_path = os.path.join(self.log_directory_path, f'sample{sample.sample_id}', KallistoQuantStep.KALLISTO_QUANT_LOG_FILENAME_PATTERN.format(genome_name=genome_suffix)) read_files = ' '.join(sample.fastq_file_paths) with open(log_file_path, 'w') as log_file: print(f"Running kallisto quantification for transcriptome {genome_suffix} " f"of sample{sample.sample_id}") log_file.write(f"Running kallisto quantification for transcriptome " f"{genome_suffix} of sample{sample.sample_id}.\n") log_file.write(f"Parameters:\n" f" kallisto binary path: {kallisto_bin_path}\n" f" kallisto index file: {kallisto_index_file_path}\n" f" kallisto output directory: {kallisto_output_path}\n" f" read files: {read_files}\n") log_file.write("Create kallisto quantification output directory.\n") os.mkdir(kallisto_output_path) kallisto_command = KallistoQuantStep.BASE_KALLISTO_QUANT_COMMAND.format(kallisto_bin_path=kallisto_bin_path, kallisto_index_file=kallisto_index_file_path, kallisto_output_dir=kallisto_output_path, read_files=read_files) print(f"Running kallisto with command: {kallisto_command}") print(f"For full kallisto quantification output see {log_file_path}") log_file.write(f"Running kallisto with command: {kallisto_command}.\n\n") log_file.write("kallisto quantification output follows:\n") try: kallisto_result = subprocess.run(kallisto_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Redirect stderr to stdout. encoding="ascii") except subprocess.CalledProcessError as kallisto_quant_exception: log_file.write("\n*****ERROR: kallist quant command failed:\n") log_file.write(f"\tExit code: {kallisto_quant_exception.returncode}\n") log_file.write("\n*****STDOUT:\n") log_file.write(f"{kallisto_quant_exception.stdout}\n") log_file.write("\n*****STDERR:\n") log_file.write(f"{kallisto_quant_exception.stderr}\n") raise CampareeException(f"\nkallisto quant process failed. " f"For full details see {log_file_path}\n") print("Finished kallisto quantification.\n") log_file.write(f"{kallisto_result.stdout}\n") log_file.write("Finished kallisto quantification.\n") log_file.write("ALL DONE!\n")
[docs] def get_commandline_call(self, sample, genome_suffix, kallisto_bin_path): """ Prepare command to execute the KallistoQuantStep from the command line, given all of the arugments used to run the execute() function. Parameters ---------- sample : Sample Sample containing paths to FASTQ files for quantification. genome_suffix : string Suffix to identify the parent/allele of the transcriptome. Should be 1 or 2. This same suffix is a appended to all output files/directories. kallisto_bin_path : string Path to the kallisto exectuable binary. Returns ------- string Command to execute on the command line. It will perform the same operations as a call to execute() with the same parameters. """ #Retrieve path to the kallisto.py script. kallisto_step_path = os.path.realpath(__file__) #If the above command returns a string with a "pyc" extension, instead #of "py", strip off "c" so it points to this script. kallisto_step_path = kallisto_step_path.rstrip('c') command = (f" python {kallisto_step_path} quant" f" --log_directory_path {self.log_directory_path}" f" --data_directory_path {self.data_directory_path}" f" --sample '{repr(sample)}'" f" --genome_suffix {genome_suffix}" f" --kallisto_bin_path {kallisto_bin_path}") return command
[docs] def get_validation_attributes(self, sample, genome_suffix, kallisto_bin_path): """ Prepare attributes required by is_output_valid() function to validate output generated by the KallistoQuantStep job. Parameters ---------- sample : Sample Sample containing paths to FASTQ files for quantification. genome_suffix : string Suffix to identify the parent/allele of the transcriptome. Should be 1 or 2. This same suffix is a appended to all output files/directories. kallisto_bin_path : string Path to the kallisto exectuable binary. [Note: this parameter is captured just so get_validation_attributes() accepts the same arguments as get_commandline_call(). It is not used here.] Returns ------- dict A KallistoQuantStep job's data_directory, log_directory, corresponding sample ID, and genome_suffix. """ validation_attributes = {} validation_attributes['data_directory'] = self.data_directory_path validation_attributes['log_directory'] = self.log_directory_path validation_attributes['sample_id'] = sample.sample_id validation_attributes['genome_suffix'] = genome_suffix return validation_attributes
[docs] @staticmethod def is_output_valid(validation_attributes): """ Check if output of KallistoQuantStep for a specific job/execution is correctly formed and valid, given a job's data directory, log directory, sample ID, and genome suffix. Prepare these attributes for a given job using the get_validation_attributes() method. Parameters ---------- validation_attributes : dict A job's data_directory, log_directory, corresponding sample_id, and genome_suffix used when generating transcript-level quantifications. Returns ------- boolean True - KallistoQuantStep output files were created and are well formed. False - KallistoQuantStep output files do not exist or are missing data. """ data_directory_path = validation_attributes['data_directory'] log_directory_path = validation_attributes['log_directory'] sample_id = validation_attributes['sample_id'] genome_suffix = validation_attributes['genome_suffix'] valid_output = False # Construct output filenames/paths kallisto_output_file_path = os.path.join(data_directory_path, f'sample{sample_id}', KallistoQuantStep.KALLISTO_QUANT_DIR_PATTERN.format(genome_name=genome_suffix), KallistoQuantStep.KALLISTO_ABUNDANCE_FILENAME) log_file_path = os.path.join(log_directory_path, f'sample{sample_id}', KallistoQuantStep.KALLISTO_QUANT_LOG_FILENAME_PATTERN.format(genome_name=genome_suffix)) if os.path.isfile(kallisto_output_file_path) and \ os.path.isfile(log_file_path): #Read last line in log file line = "" with open(log_file_path, "r") as log_file: for line in log_file: line = line.rstrip() if line == "ALL DONE!": valid_output = True return valid_output
[docs] @staticmethod def main(cmd_args): """ Entry point into class. Used when script is executed/submitted via the command line with the 'quant' subcommand. """ sample = eval(cmd_args.sample) # Requires Sample function from BEERS_UTILS.sample kallisto_quant = KallistoQuantStep(log_directory_path=cmd_args.log_directory_path, data_directory_path=cmd_args.data_directory_path) kallisto_quant.execute(sample=sample, genome_suffix=cmd_args.genome_suffix, kallisto_bin_path=cmd_args.kallisto_bin_path)
if __name__ == '__main__': """ Prepare and process command line arguments. The setup belows allows for entry into either the KallistoIndexStep main() function or the KallistoQuantStep main() function based on which subcommand is specified at the command line. """ parser = argparse.ArgumentParser(description='Command line wrapper around' ' kallisto index creation and' ' quantification.') subparsers = parser.add_subparsers(help="Choose one of the following:",dest="RUN_MODE", metavar="RUN_MODE") subparsers.required = True #Setup arguments for the index subcommand kallisto_index_subparser = subparsers.add_parser('index', help="Create kallisto index from transcriptome FASTA.", description="Create kallisto index from transcriptome FASTA.") kallisto_index_subparser.set_defaults(func=KallistoIndexStep.main) #Send arguments for this subcommand to the KallistoIndexStep's main() method. required_named_kallisto_index_subparser = kallisto_index_subparser.add_argument_group('Required named arguments') required_named_kallisto_index_subparser.add_argument('-l', '--log_directory_path', required=True, help='Directory in which to save logging files.') required_named_kallisto_index_subparser.add_argument('-d', '--data_directory_path', required=True, help='Directory in which to save output files.') required_named_kallisto_index_subparser.add_argument('--sample_id', required=True, help='Sample ID associated with input genome.') required_named_kallisto_index_subparser.add_argument('--genome_suffix', required=True, help='Suffix identifying parent/allele of source genome.') required_named_kallisto_index_subparser.add_argument('--kallisto_bin_path', required=True, help='Full path to kallisto executable binary') required_named_kallisto_index_subparser.add_argument('--transcriptome_fasta_file_path', required=True, help='Input transcriptome in FASTA format.') #Setup arguments from the quantification subcommand kallisto_quant_subparser = subparsers.add_parser('quant', help="Run kallisto transcript-level quantification.", description="Run kallisto transcript-level quantification.") #Send arguments for this subcommand to the KallistoQuantStep's main() method. kallisto_quant_subparser.set_defaults(func=KallistoQuantStep.main) required_named_kallisto_quant_subparser = kallisto_quant_subparser.add_argument_group('Required named arguments') required_named_kallisto_quant_subparser.add_argument('-l', '--log_directory_path', required=True, help='Directory in which to save logging files.') required_named_kallisto_quant_subparser.add_argument('-d', '--data_directory_path', required=True, help='Directory in which to save output files.') required_named_kallisto_quant_subparser.add_argument('--sample', required=True, help='String representation of a Sample object.') required_named_kallisto_quant_subparser.add_argument('--genome_suffix', required=True, help='Suffix identifying parent/allele of source genome.') required_named_kallisto_quant_subparser.add_argument('--kallisto_bin_path', required=True, help='Full path to kallisto executable binary') args = parser.parse_args() args.func(args)