import sys
import os
import importlib
import time
from beers_utils.constants import CONSTANTS,SUPPORTED_SCHEDULER_MODES,MAX_SEED
from camparee.camparee_constants import CAMPAREE_CONSTANTS
from beers_utils.job_monitor import JobMonitor
from camparee.camparee_utils import CampareeUtils, CampareeException
#To enable export of config parameter dictionaries to command line
import json
import subprocess
import inspect
import numpy
import camparee.transcriptomes as transcriptomes
[docs]class ExpressionPipeline:
"""
This class represents a pipeline of steps that take user supplied fastq files through alignment, variants
finding, parental genome construction, annotation, quantification and generation of transcripts and finally the
generation of packets of molecules that may be used to simulate RNA sequencing.
"""
THIRD_PARTY_SOFTWARE_DIR_PATH = os.path.join(CAMPAREE_CONSTANTS.CAMPAREE_ROOT_DIR, "third_party_software")
REQUIRED_RESOURCE_MAPPINGS = ['species_model', 'star_genome_index_directory_name',
'reference_genome_filename', 'annotation_filename', 'chr_ploidy_filename']
REQUIRED_OUTPUT_MAPPINGS = ['directory_path', 'type', 'override_sample_molecule_count', 'default_molecule_count']
def __init__(self, configuration, scheduler_mode, output_directory_path, input_samples):
self.scheduler_mode = scheduler_mode
self.scheduler_default_params = {'default_num_processors': None,
'default_memory_in_mb': None,
'default_submission_args': None}
self.samples = input_samples
self.output_directory_path = output_directory_path
log_directory_path = os.path.join(output_directory_path, CONSTANTS.LOG_DIRECTORY_NAME)
data_directory_path = os.path.join(output_directory_path, CONSTANTS.DATA_DIRECTORY_NAME)
self.data_directory_path = data_directory_path
self.log_directory_path = log_directory_path
self.create_intermediate_data_subdirectories(data_directory_path, log_directory_path)
self.log_file_path = os.path.join(log_directory_path, "expression_pipeline.log")
self.steps = {}
#Track pathes of scripts for each step. This is needed when running the
#steps from the command line, as we do when submitting to lsf.
self.__step_paths = {}
# Individual steps can provide scheduler parameters that override
# the default values.
self.__step_scheduler_param_overrides = {}
for step, props in configuration['steps'].items():
module_name, step_name = step.rsplit(".")
parameters = props["parameters"] if props and "parameters" in props else None
scheduler_parameters = props["scheduler_parameters"] if props and "scheduler_parameters" in props else None
module = importlib.import_module(f'.{module_name}', package="camparee")
step_class = getattr(module, step_name)
self.steps[step_name] = step_class(log_directory_path, data_directory_path, parameters)
self.__step_paths[step_name] = inspect.getfile(module)
self.__step_scheduler_param_overrides[step_name] = scheduler_parameters
JobMonitor.PIPELINE_STEPS[step_name] = step_class
# Validate the resources and set file and directory paths as needed.
if not self.validate_and_set_resources(configuration['resources']):
raise CampareeValidationException("The resources data is not completely valid. "
"Consult the standard error file for details.")
# Collect the data from the ref genome and chromosome ploidy files
self.reference_genome = CampareeUtils.create_genome(self.reference_genome_file_path)
self.chr_ploidy_data = CampareeUtils.create_chr_ploidy_data(self.chr_ploidy_file_path)
# Set 3rd party software paths
self.set_third_party_software()
# Validate output data and set
if not self.validate_and_set_output_data(configuration['output']):
raise CampareeValidationException("The output data is not completely valid. "
"Consult the standard error file for details.")
# Load default schduler parameters (if provided)
if self.scheduler_mode != "serial":
print(f"Running CAMPAREE using the {self.scheduler_mode} job scheduler.",
file=sys.stderr)
if configuration['setup'].get('default_scheduler_parameters'):
self.scheduler_default_params['default_num_processors'] = configuration['setup']['default_scheduler_parameters'].get('default_num_processors', None)
self.scheduler_default_params['default_memory_in_mb'] = configuration['setup']['default_scheduler_parameters'].get('default_memory_in_mb', None)
self.scheduler_default_params['default_submission_args'] = configuration['setup']['default_scheduler_parameters'].get('default_submission_args', None)
print("With the following default scheduler parameters:\n",
'\n'.join({f"\t-{key} : {value}" for key, value in self.scheduler_default_params.items()}),
file=sys.stderr)
else:
print("Running CAMPAREE in serial mode.", file=sys.stderr)
self.expression_pipeline_monitor = JobMonitor(output_directory_path=self.output_directory_path,
scheduler_name=self.scheduler_mode,
default_num_processors=self.scheduler_default_params['default_num_processors'],
default_memory_in_mb=self.scheduler_default_params['default_memory_in_mb'])
def create_intermediate_data_subdirectories(self, data_directory_path, log_directory_path):
for sample in self.samples:
os.makedirs(os.path.join(data_directory_path, f'sample{sample.sample_id}'), mode=0o0755, exist_ok=True)
os.makedirs(os.path.join(log_directory_path, f'sample{sample.sample_id}'), mode=0o0755, exist_ok=True)
[docs] def validate_and_set_output_data(self, output):
"""
Helper method to validate and set output data.
:param output: The output dictionary extracted from the configuration file.
:return: True for valid output data and False otherwise
"""
valid = True
# Insure that all required resources keys are in place. No point in continuing until this
# problem is resolved.
missing_output_keys = [item
for item in ExpressionPipeline.REQUIRED_OUTPUT_MAPPINGS
if item not in output]
if missing_output_keys:
print(f"The following required mappings were not found under 'outputs': "
f"{(',').join(missing_output_keys)}", file=sys.stderr)
return False
# Insure type mapping exists
if "type" not in output:
print(f"The required mapping 'type' was not found under 'output.", file=sys.stderr)
valid = False
else:
self.output_type = output["type"]
# Insure default_molecule_count exists and is an int
# TODO: is this redundant given the check performed with missing_output_keys above?
if "default_molecule_count" not in output:
print(f"The required mapping 'default_molecule_count' was not found under 'output.", file=sys.stderr)
valid = False
else:
self.default_molecule_count = output["default_molecule_count"]
if not isinstance(self.default_molecule_count, int) and \
self.default_molecule_count < 0:
print(f"The 'default_molecule_count' must be a positive integer - not "
f"{output['default_molecule_count']}",
file=sys.stderr)
valid = False
#Check to make sure override_sample_molecule_count is a boolean
self.override_sample_molecule_count = output["override_sample_molecule_count"]
if not isinstance(self.override_sample_molecule_count, bool):
print(f"The 'override_sample_molecule_count' must be a boolean (True/False) "
f"- not {output['override_sample_molecule_count']}",
file=sys.stderr)
valid = False
return valid
[docs] def set_third_party_software(self):
"""
Helper method to gather the names of all the 3rd party application files or directories and use them to set
all the paths needed in the pipeline. Since the third party software is shipped with this application, validation
should not be necessary. Software is identified generally by name and not specifically by filename since
filenames may contain versioning and other artefacts.
:return: the filenames for beagle, star, and kallisto, and the directory name for bowtie2.
"""
beagle_filename = [filename for filename in os.listdir(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH)
if "beagle" in filename.lower()][0]
self.beagle_file_path = os.path.join(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH, beagle_filename)
star_filename = [filename for filename in os.listdir(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH)
if "STAR" in filename][0]
self.star_file_path = os.path.join(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH, star_filename)
kallisto_filename = [filename for filename in os.listdir(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH)
if "kallisto" in filename][0]
self.kallisto_file_path = os.path.join(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH, kallisto_filename)
bowtie2_dir_name = [filename for filename in os.listdir(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH)
if "bowtie2" in filename][0]
self.bowtie2_dir_path = os.path.join(ExpressionPipeline.THIRD_PARTY_SOFTWARE_DIR_PATH, bowtie2_dir_name)
[docs] def validate_and_set_resources(self, resources):
"""
Since the resources are input file intensive, and since information about resource paths is found in the
configuration file, this method validates that all needed resource information is complete, consistent and all
input data is found.
:param resources: dictionary containing resources from the configuration file
:return: True if valid and False otherwise
"""
# TODO a some point STAR will be in third party software and may require validation
valid = True
# Insure that all required resources keys are in place. No point in continuing until this
# problem is resolved.
missing_resource_keys = [item
for item in ExpressionPipeline.REQUIRED_RESOURCE_MAPPINGS
if item not in resources]
if missing_resource_keys:
print(f"The following required mappings were not found under 'resources': "
f"{(',').join(missing_resource_keys)}", file=sys.stderr)
return False
# If user did not provide a path to the resources directory, use the
# directory contained in the CAMPAREE install path.
resources_directory_path = resources.get('directory_path', None)
if not resources_directory_path:
resources_directory_path = os.path.join(CAMPAREE_CONSTANTS.CAMPAREE_ROOT_DIR, "resources")
elif not(os.path.exists(resources_directory_path) and os.path.isdir(resources_directory_path)):
print(f"The given resources directory, {resources_directory_path}, must exist as a directory.",
file=sys.stderr)
return False
# Insure that the species model directory exists. No point in continuing util this problem is
# resolved.
species_model_directory_path = os.path.join(resources_directory_path, resources['species_model'])
if not(os.path.exists(species_model_directory_path) and os.path.isdir(species_model_directory_path)):
print(f"The species model directory, {species_model_directory_path}, must exist as a directory",
file=sys.stderr)
return False
# Insure that the reference genome file path exists and points to a file.
self.reference_genome_file_path = os.path.join(species_model_directory_path, resources['reference_genome_filename'])
if not(os.path.exists(self.reference_genome_file_path) and os.path.isfile(self.reference_genome_file_path)):
print(f"The reference genome file path, {self.reference_genome_file_path}, must exist as"
f" a file.", file=sys.stderr)
valid = False
# Insure that the chromosome ploidy file path exists and points to a file.
self.chr_ploidy_file_path = os.path.join(species_model_directory_path, resources['chr_ploidy_filename'])
if not(os.path.exists(self.chr_ploidy_file_path) and os.path.isfile(self.chr_ploidy_file_path)):
print(f"The chr ploidy file path, {self.chr_ploidy_file_path} must exist as a file", file=sys.stderr)
valid = False
# Insure that the annotations file path exists and points to a file.
self.annotation_file_path = os.path.join(species_model_directory_path, resources['annotation_filename'])
if not(os.path.exists(self.annotation_file_path) and os.path.isfile(self.annotation_file_path)):
print(f"The annotation file path, {self.annotation_file_path} must exist as a file", file=sys.stderr)
valid = False
# Insure that the star index directory exists as a directory.
self.star_index_directory_path = \
os.path.join(species_model_directory_path, resources['star_genome_index_directory_name'])
if not (os.path.exists(self.star_index_directory_path) and os.path.isdir(self.star_index_directory_path)):
print(f"The star index directory, {self.star_index_directory_path}, must exist as a directory",
file=sys.stderr)
valid = False
return valid
def validate(self):
valid = True
reference_genome_chromosomes = self.reference_genome.keys()
ploidy_chromosomes = set(self.chr_ploidy_data.keys())
if not ploidy_chromosomes.issubset(reference_genome_chromosomes):
missing_chromosomes = ' '.join(chrom for chrom in ploidy_chromosomes.difference(reference_genome_chromosomes))
reference_chroms = ' '.join(chrom for chrom in reference_genome_chromosomes)
print(f"The chromosome ploidy has chromosomes `{missing_chromosomes}` not found in the reference genome file", file=sys.stderr)
print(f"The reference genome has chromosomes {reference_chroms}", file=sys.stderr)
valid = False
if not all([step.validate() for step in self.steps.values()]):
valid = False
return valid
def execute(self):
print("Execution of the Expression Pipeline Started...")
seeds = self.generate_job_seeds()
bam_files = {}
for sample in self.samples:
#Retrieve name of BAM file associated with this sample. This is either
#the path to a user provided BAM file, or the default path the
#GenomeAlignmentStep will use to store the alignment results for this
#sample.
genome_alignment = self.steps['GenomeAlignmentStep']
bam_file = genome_alignment.get_genome_bam_path(sample)
bam_files[sample.sample_id] = bam_file
self.run_step(step_name='GenomeAlignmentStep',
sample=sample,
execute_args=[sample, self.star_index_directory_path,
self.star_file_path],
cmd_line_args=[sample, self.star_index_directory_path,
self.star_file_path])
for sample in self.samples:
bam_filename = bam_files[sample.sample_id]
self.run_step(step_name='GenomeBamIndexStep',
sample=sample,
execute_args=[sample, bam_filename],
cmd_line_args=[sample, bam_filename],
dependency_list=[f"GenomeAlignmentStep.{sample.sample_id}"])
for sample in self.samples:
bam_filename = bam_files[sample.sample_id]
seed = seeds[f"VariantsFinderStep.{sample.sample_id}"]
self.run_step(step_name='VariantsFinderStep',
sample=sample,
execute_args=[sample, bam_filename, self.chr_ploidy_data,
self.reference_genome, seed],
cmd_line_args=[sample, bam_filename, self.chr_ploidy_file_path,
self.reference_genome_file_path, seed],
dependency_list=[f"GenomeBamIndexStep.{sample.sample_id}"])
for sample in self.samples:
output_directory = os.path.join(self.data_directory_path, f"sample{sample.sample_id}")
bam_filename = bam_files[sample.sample_id]
self.run_step(step_name='IntronQuantificationStep',
sample=sample,
execute_args=[bam_filename, output_directory, self.annotation_file_path],
cmd_line_args=[bam_filename, output_directory, self.annotation_file_path],
dependency_list=[f"GenomeBamIndexStep.{sample.sample_id}"])
#TODO: do we need to depend upon the index being done? or just the alignment?
# I'm hypothesizing that some failures are being caused by indexing and quantification happening
# on the same BAM file at the same time, though I don't know why this would be a problem.
seed = seeds["VariantsCompilationStep"]
self.run_step(step_name='VariantsCompilationStep',
sample=None,
execute_args=[[sample.sample_id for sample in self.samples],
self.chr_ploidy_data, self.reference_genome, seed],
cmd_line_args=[[sample.sample_id for sample in self.samples],
self.chr_ploidy_file_path,
self.reference_genome_file_path, seed],
dependency_list=[f"VariantsFinderStep.{sample.sample_id}" for sample in self.samples])
seed = seeds["BeagleStep"]
self.run_step(step_name='BeagleStep',
sample=None,
execute_args=[self.beagle_file_path, seed],
cmd_line_args=[self.beagle_file_path, seed],
dependency_list=[f"VariantsCompilationStep"])
#TODO: We could load all of the steps in the entire pipeline into the queue
# and then just have the queue keep running until everything finishes.
# The only advantage of explicitly waiting here is that the user gets
# stdout indicating which stage is running for the pipeline.
self.expression_pipeline_monitor.monitor_until_all_jobs_completed(queue_update_interval=10)
for sample in self.samples:
print(f"Processing sample{sample.sample_id} ({sample.sample_name}...")
self.run_step(step_name='GenomeBuilderStep',
sample=sample,
execute_args=[sample, self.chr_ploidy_data, self.reference_genome],
cmd_line_args=[sample, self.chr_ploidy_file_path,
self.reference_genome_file_path],
dependency_list=[f"BeagleStep"])
for suffix in [1, 2]:
self.run_step(step_name='UpdateAnnotationForGenomeStep',
sample=sample,
execute_args=[sample, suffix, self.annotation_file_path,
self.chr_ploidy_file_path],
cmd_line_args=[sample, suffix, self.annotation_file_path,
self.chr_ploidy_file_path],
dependency_list=[f"GenomeBuilderStep.{sample.sample_id}"],
jobname_suffix=suffix)
seed = seeds[f"TranscriptQuantificatAndMoleculeGenerationStep.{sample.sample_id}"]
num_molecules_to_generate = sample.molecule_count
# If no molecule count specified for this sample, use the default count.
if not num_molecules_to_generate or self.override_sample_molecule_count:
num_molecules_to_generate = self.default_molecule_count
self.run_step(step_name='TranscriptQuantificatAndMoleculeGenerationStep',
sample=sample,
execute_args=[sample, self.kallisto_file_path, self.bowtie2_dir_path,
self.output_type, num_molecules_to_generate, seed],
cmd_line_args=[sample, self.kallisto_file_path, self.bowtie2_dir_path,
self.output_type, num_molecules_to_generate, seed],
dependency_list=[f"UpdateAnnotationForGenomeStep.{sample.sample_id}.1",
f"UpdateAnnotationForGenomeStep.{sample.sample_id}.2"])
self.expression_pipeline_monitor.monitor_until_all_jobs_completed(queue_update_interval=10)
#for _ in range(2):
# quantifier = Quantify(annotation_updates, self.alignment_filename)
# transcript_distributions.append(quantifier.quantify())
#for i, item in enumerate(genomes):
# genome = item[0]
# transcript_maker = TranscriptMaker(genome, annotation_updates[i], transcript_distributions[i], self.parameters)
# molecules.append(transcript_maker.prepare_transcripts())
#r_rna = RibosomalRNA(self.parameters)
#molecules.append(r_rna.generate_rRNA_sample())
print("Execution of the Expression Pipeline Ended")
[docs] def generate_job_seeds(self):
"""
Generate one seed per job that needs a seed, returns a dictionary mapping
job names to seeds
We generate seeds for each job since they run on separate nodes of the cluster, potentially
and so do not simply share Numpy seeds. We generate them all ahead of time so that if jobs need
to be restart, they can reuse the same seed.
"""
seeds = {}
# Seeds for jobs that don't run per sample
for job in ["VariantsCompilationStep", "BeagleStep"]:
seeds[job] = numpy.random.randint(MAX_SEED)
# Seeds for jobs that are run per sample
for job in ["VariantsFinderStep", "TranscriptQuantificatAndMoleculeGenerationStep"]:
for sample in self.samples:
seeds[f"{job}.{sample.sample_id}"] = numpy.random.randint(MAX_SEED)
return seeds
[docs] def run_step(self, step_name, sample, execute_args, cmd_line_args, dependency_list=None,
jobname_suffix=None):
"""
Helper function that runs the given step, with the given parameters. If
CAMPAREE is configured to use a scheduler/job monitor, this helper function
wraps submission of the step to the job monitor.
Parameters
----------
step_name : string
Name of the CAMPAREE step to run. It should be in the list of steps
stored in the steps dictionary.
sample : Sample
Sample to run through the step. For steps that aren't associated with
specific samples, set this to None.
execute_args : list
List of positional paramteres to pass to the execute() method for the
given step.
cmd_line_args : list
List of positional parameters to pass to the get_commandline_call()
method for the given step.
dependency_list : list
List of job names (if any) the current step depends on. Default: None.
jobname_suffix : string
Suffix to add to job submission ID. Default: None.
"""
if step_name not in list(self.steps.keys()):
raise CampareeException(f"{step_name} not in the list of loaded steps (see config file).")
step_class = self.steps[step_name]
if self.scheduler_mode == "serial":
status_msg = f"Performing {step_name}"
status_msg += f".{jobname_suffix}" if jobname_suffix else ""
status_msg += f" on sample{sample.sample_id}" if sample else ""
print(status_msg)
step_class.execute(*execute_args)
status_msg = f"Finished {step_name}"
status_msg += f".{jobname_suffix}" if jobname_suffix else ""
status_msg += f" on sample{sample.sample_id}" if sample else ""
print(status_msg + "\n")
else:
# Check if the current step has an overrides for scheduler parameters
scheduler_num_processors = None
scheduler_memory_in_mb = None
if self.__step_scheduler_param_overrides[step_name]:
scheduler_num_processors = self.__step_scheduler_param_overrides[step_name].get('num_processors',None)
scheduler_memory_in_mb = self.__step_scheduler_param_overrides[step_name].get('memory_in_mb',None)
# Check if there are any scheduler submission arguments to add
scheduler_submission_args = ""
if self.scheduler_default_params['default_submission_args']:
scheduler_submission_args = self.scheduler_default_params['default_submission_args']
stdout_log = os.path.join(step_class.log_directory_path,
f"sample{sample.sample_id}" if sample else "",
f"{step_name}{f'.{jobname_suffix}' if jobname_suffix else ''}.{self.scheduler_mode}.out")
stderr_log = os.path.join(step_class.log_directory_path,
f"sample{sample.sample_id}" if sample else "",
f"{step_name}{f'.{jobname_suffix}' if jobname_suffix else ''}.{self.scheduler_mode}.err")
scheduler_job_name = (f"{step_name}{f'.sample{sample.sample_id}_{sample.sample_name}' if sample else ''}"
f"{f'.{jobname_suffix}' if jobname_suffix else ''}")
scheduler_args = {'job_name' : scheduler_job_name,
'stdout_logfile' : stdout_log,
'stderr_logfile' : stderr_log,
'num_processors' : scheduler_num_processors,
'memory_in_mb' : scheduler_memory_in_mb,
'additional_args' : scheduler_submission_args
}
command = step_class.get_commandline_call(*cmd_line_args)
validation_attributes = step_class.get_validation_attributes(*cmd_line_args)
output_directory = os.path.join(step_class.data_directory_path,
f"sample{sample.sample_id}" if sample else "")
self.expression_pipeline_monitor.submit_new_job(job_id=f"{step_name}{f'.{sample.sample_id}' if sample else ''}"
f"{f'.{jobname_suffix}' if jobname_suffix else ''}",
job_command=command,
sample=sample,
step_name=step_name,
scheduler_arguments=scheduler_args,
validation_attributes=validation_attributes,
output_directory_path=output_directory,
system_id=None,
dependency_list=dependency_list)
@staticmethod
def main(configuration, scheduler_mode, output_directory_path, input_samples):
pipeline = ExpressionPipeline(configuration, scheduler_mode, output_directory_path, input_samples)
if not pipeline.validate():
raise CampareeValidationException("Expression Pipeline Validation Failed. "
"Consult the standard error file for details.")
pipeline.execute()
[docs]class CampareeValidationException(CampareeException):
pass