import os
import sys
import collections
import argparse
import numpy
import pickle
from camparee.abstract_camparee_step import AbstractCampareeStep
from camparee.camparee_constants import CAMPAREE_CONSTANTS
from beers_utils.molecule_packet import MoleculePacket
from beers_utils.molecule import Molecule
from beers_utils.sample import Sample
from beers_utils.general_utils import GeneralUtils
[docs]class MoleculeMakerStep(AbstractCampareeStep):
"""
MoleculeMaker generates molecules based off of gene, intron, and allelic
quantification files as well as customized genomic sequence and annotation
"""
OUTPUT_OPTIONS_W_EXTENSIONS=CAMPAREE_CONSTANTS.MOLECULE_MAKER_OUTPUT_OPTIONS_W_EXTENSIONS
OUTPUT_FILENAME_PATTERN=CAMPAREE_CONSTANTS.MOLECULE_MAKER_OUTPUT_FILENAME_PATTERN
DEFAULT_MOLECULES_PER_PACKET=CAMPAREE_CONSTANTS.MOLECULE_MAKER_DEFAULT_NUM_MOLECULES_PER_PACKET
# Filename patterns for output from previous CAMPAREE steps.
_GENE_QUANT_FILENAME=CAMPAREE_CONSTANTS.TXQUANT_OUTPUT_GENE_FILENAME
_INTRON_QUANT_FILENAME=CAMPAREE_CONSTANTS.INTRON_OUTPUT_FILENAME
_TX_QUANT_PSI_FILENAME=CAMPAREE_CONSTANTS.TXQUANT_OUTPUT_PSI_FILENAME
_ALLELIC_IMBALANCE_FILENAME=CAMPAREE_CONSTANTS.ALLELIC_IMBALANCE_OUTPUT_FILENAME
_PARENTAL_TX_FASTA_FILENAME_PATTERN=CAMPAREE_CONSTANTS.TRANSCRIPTOME_FASTA_OUTPUT_FILENAME_PATTERN
_PARENTAL_ANNOT_FILENAME_PATTERN=CAMPAREE_CONSTANTS.UPDATEANNOT_OUTPUT_FILENAME_PATTERN
_PARENTAL_GENOME_FASTA_FILENAME_PATTERN=CAMPAREE_CONSTANTS.GENOMEBUILDER_SEQUENCE_FILENAME_PATTERN
_PARENTAL_GENOME_INDEL_FILENAME_PATTERN=CAMPAREE_CONSTANTS.GENOMEBUILDER_INDEL_FILENAME_PATTERN
def __init__(self, log_directory_path, data_directory_path, parameters=None):
"""Constructor for MoleculeMakerStep object.
Parameters
----------
data_directory_path: string
Full path to data directory
log_directory_path : string
Full path to log directory.
parameters : dict
Dictionary of other parameters specified by the config file. This
parameter is not used by this class and is retained for uniformity
with all other CAMPAREE steps.
"""
self.data_directory_path = data_directory_path
self.log_directory_path = log_directory_path
# Nearly all of the validation for this step is already performed in the
# expression_pipeline, since the output options in the config file are
# specified outside of the standard 'steps' framework.
[docs] def validate(self):
return True
def load_annotation(self, file_path):
transcripts = dict()
with open(file_path) as annotation_file:
for line in annotation_file:
if line.startswith("#"):
continue # Comment/header line
chrom, strand, tx_start, tx_end, exon_count, exon_starts, exon_ends, transcript_id, gene_id, gene_sybmol, *other \
= line.split("\t")
transcripts[transcript_id] = (chrom, strand, int(tx_start), int(tx_end),
[int(start) for start in exon_starts.split(",")],
[int(end) for end in exon_ends.split(",")])
return transcripts
[docs] def load_transcriptome(self, file_path):
"""
Read in a fasta file and load is a dictionary id -> sequence
assumed one-line for the whole contig
"""
transcripts = dict()
with open(file_path) as transcriptome_file:
while True:
line = transcriptome_file.readline()
if not line:
break
assert line[0] == ">"
transcript_id, chrom, region = line[1:].strip().split(":")
sequence = transcriptome_file.readline().strip()
transcripts[transcript_id] = sequence
return transcripts
[docs] def load_genome(self, file_path):
"""
Read in a fasta file and load is a dictionary id -> sequence
"""
genome = dict()
with open(file_path) as transcriptome_file:
while True:
line = transcriptome_file.readline()
if not line:
break
assert line[0] == ">"
contig = line[1:].strip()
sequence = transcriptome_file.readline().strip()
genome[contig] = sequence
return genome
[docs] def load_indels(self, file_path):
"""
Read in the file of indel locations for a given custom genome
Store it as a dictionary chrom -> (indel_starts, indel_data)
where indel_starts is a numpy array of start locations of the indels
(i.e. 1 based coordinates of the base in the custom genome where the insertion/deletion occurs immediately after)
and indel_data is a list of tuples (indel_start, indel_type, indel_length)
where indel_type is 'I' or 'D'
The indel file is tab-separated with format "chrom:start type length" and looks like the following:
1:4897762 I 2
1:7172141 I 2
1:7172378 D 1
Assumption is that the file is sorted by start and no indels overlap
Moreover, return a dictionary chrom -> (offset_starts, offset_values)
where offset_starts is as indel_starts, a sorted numpy array with indicating the positions
where the offset (i.e. custom_genome_position - reference_genome_position values) change
and offset_values is a list in the same order indicating these values.
Note that the offset value is to be used for all bases AFTER the offset position, not on that base
"""
indels = collections.defaultdict(lambda : (collections.deque(), collections.deque()))
offset_values = collections.defaultdict(lambda : collections.deque())
current_offsets = collections.defaultdict(lambda : 0)
with open(file_path) as indel_file:
for line in indel_file:
loc, indel_type, length = line.split('\t')
chrom, start = loc.split(':')
# indel_start needs to be relative to the custom genome but the indel file has starts
# that are relative to the reference genome
indel_start = int(start) + current_offsets[chrom]
indel_length = int(length)
starts, data = indels[chrom]
starts.append(indel_start)
data.append((indel_start, indel_type, indel_length))
if indel_type == "I":
current_offsets[chrom] += indel_length
elif indel_type == "D":
current_offsets[chrom] -= indel_length
offset_values[chrom].append(current_offsets[chrom])
# We make these defaultdicts in case there are chromosomes without any indels
# in which case we don't see them in this file, but we don't want to crash on them
result = collections.defaultdict(lambda: (numpy.array([]), list()))
offset_data = collections.defaultdict(lambda: (numpy.array([]), list()))
for key, (starts, data) in indels.items():
result[key] = (numpy.array(starts), list(data))
offset_data[key] = (numpy.array(starts), offset_values[key])
return result, offset_data
[docs] def load_intron_quants(self, file_path):
"""
Load an intron quantification file as two dictionaries,
(transcript ID -> sum FPK of all introns in transcript) and
(transcript ID -> list of FPKs of each intron in transcript)
"""
transcript_intron_quants = dict() # Dictionary transcript -> FPK for all introns in the transcript, combined
intron_quants = dict() # Dictioanry transcript -> array of FPKs for each intron in the transcript
with open(file_path) as intron_quants_file:
for line in intron_quants_file:
if line.startswith("#"):
continue # Comment/header line
gene, transcript, chrom, strand, transcript_intron_reads_FPK, intron_reads_FPK = line.strip().split("\t")
transcript_intron_quants[transcript] = float(transcript_intron_reads_FPK)
intron_quants[transcript] = [float(quant) for quant in intron_reads_FPK.split(",")]
return transcript_intron_quants, intron_quants
[docs] def load_gene_quants(self, file_path):
"""
Read in a gene quantification file as two lists of gene IDs and of their read quantifications
"""
genes = []
gene_quants = []
with open(file_path) as gene_quant_file:
for line in gene_quant_file:
if line.startswith("#"):
continue # Comment/header line
gene, quant = line.strip().split("\t")
genes.append(gene)
gene_quants.append(float(quant))
return genes, numpy.array(gene_quants)
[docs] def load_allelic_quants(self, file_path):
"""
Reads allelic quantification file into a dictionary: gene_id -> (allele 1 probability, allele 2 probability)
"""
allelic_quant = dict()
with open(file_path) as allele_quant_file:
for line in allele_quant_file:
if line.startswith("#"):
continue # Comment/header line
gene, allele1, allele2 = line.split("\t")
allele1 = float(allele1)
allele2 = float(allele2)
allelic_quant[gene] = (allele1, allele2)
return allelic_quant
def make_molecule(self):
# Pick random gene
gene_index = numpy.random.choice(len(self.genes), p=self.gene_probabilities)
gene = self.genes[gene_index]
gene_quant = self.gene_quants[gene_index]
# Pick random transcript in gene
transcripts, psis = self.isoform_quants[gene]
transcript = numpy.random.choice(transcripts, p=psis)
# Pick random allele based on the gene's allelic distribution
allele_number = numpy.random.choice([1,2], p=self.allelic_quant[gene])
# Read in annotation for the chosen transcript
chrom,strand,tx_start,tx_end,starts,ends= self.annotations[allele_number - 1][transcript]
# Determine if pre_mRNA or mature mRNA
intron_quant = self.transcript_intron_quants[transcript]
# TODO: check that this gives the appropriate fraction as pre_mRNA
# previously was using intron_quant / (intron_quant + gene_quant)
# but if assuming everything is either full pre_mRNA or mature mRNA then this should be
# the right fraction, which could happen to be greater than one (!)
try:
fraction_pre_mRNA = min(intron_quant / (gene_quant), 1)
except ZeroDivisionError:
# Should not ever get here since a gene with 0 gene_quant should have 0 chance of being chosen
# however, if we do, we will just always give pre_mRNA
fraction_pre_mRNA = 1.0
pre_mRNA = numpy.random.uniform() < fraction_pre_mRNA
if pre_mRNA:
# If chosen to be pre_mRNA, overwrite the usual exon starts/ends with a single, big "exon"
starts = [tx_start]
ends = [tx_end]
# Find cigar string relative to the reference genome (i.e. custom_genome_1 or custom_genome_2)
gaps = [next_start - last_end - 1 for next_start,last_end in zip(starts[1:],ends[:-1])]
cigar = ''.join( f"{end - start + 1}M{gap}N" for start,end,gap in zip(starts[:-1],ends[:-1],gaps)) \
+ f"{ends[-1] - starts[-1] + 1}M"
ref_cigar = ''.join( f"{self.get_reference_cigar(start, end, chrom, allele_number)}{gap}N"
for start,end,gap in zip(starts[:-1],ends[:-1],gaps)) \
+ self.get_reference_cigar(starts[-1], ends[-1], chrom, allele_number)
ref_start = self.convert_genome_position_to_reference(starts[0], chrom, allele_number)
transcript_id = f"{transcript}_{allele_number}{'_pre_mRNA' if pre_mRNA else ''}"
# Build the actual sequence
chrom_sequence = self.genomes[allele_number - 1][chrom]
sequence = ''.join( chrom_sequence[start-1:end] for start,end in zip(starts, ends) )
if strand == '-':
# We always give the sequence from 5' to 3' end of the RNA molecule
# so reverse complement this
sequence = GeneralUtils.create_complement_strand(sequence)
# NOTE: cigar string stays the same since that is relative to the + strand
# TODO: for now, everything gets polyA but maybe shouldn't
polyA_tail = True
if polyA_tail:
# TODO: polyA tails should vary in length
# Add polyA tail to 3' end
sequence = sequence + "A"*200
# Soft-clip the polyA tail at the end since it shouldn't align
if strand == "+":
cigar = cigar + "200S"
ref_cigar = ref_cigar + "200S"
else:
cigar = "200S" + cigar # Relative to + strand, the A's are going on the 5' end
ref_cigar = "200S" + ref_cigar
return sequence, starts[0], cigar, ref_start, ref_cigar, strand, chrom, transcript_id
[docs] def get_reference_cigar(self, start, end, chrom, allele):
"""Returns the cigar string for the part of the custom chromosome on the
segment from start to end (inclusive, one based) relative to the reference
genome
"""
indel_starts, indel_data = self.indels[allele-1][chrom]
cigar_components = []
start_of_remaining = start
# Skip to the first relevant indel, then continue until they go past this region
index_idx = numpy.searchsorted(indel_starts, start)
for indel in indel_data[index_idx:]:
indel_start, indel_type, indel_length = indel
if indel_start >= end:
# The deletion or insertion begins immediately AFTER indel_start
# so we're done, we've gone through all indels in our exon
break
if indel_start > start_of_remaining:
# Match a region without indels
cigar_components.append(f"{indel_start - start_of_remaining + 1}M")
if indel_type == "I":
# Custom genome has an insertion right after indel_start
# Our region might end before the insertion does, so cap the length of the insertion
length = min(end, indel_start + indel_length) - indel_start
cigar_components.append(f"{length}I")
# Increment to after the indel, which includes the inserted bases
start_of_remaining = indel_start + indel_length + 1
elif indel_type == "D":
# Custom genome has a deletion right after indel_start
cigar_components.append(f"{indel_length}D")
# Advance to just after the deletion, but don't increment by the length of the deletion
# since the deleted bases don't appear in our custom genome segment
start_of_remaining = indel_start + 1
# The last chunk of matches after the last indel
if end >= start_of_remaining:
cigar_components.append(f"{end - start_of_remaining + 1}M")
return ''.join(cigar_components)
[docs] def convert_genome_position_to_reference(self, position, chrom, allele):
"""
Convert (1-indexed) position into the current (custom) genome into a position
relative to the reference genome
"""
# offset_positions is a sorted list of all offsets in this allele+chromosome
# so find where our position fits in, so offset_index -1 is the last offset strictly before position
offset_starts, offset_values = self.offset_data[allele-1][chrom]
offset_index = numpy.searchsorted(offset_starts, position)
if offset_index == 0:
# If ours is before all offsets, then we need no offset
return position
else:
offset = offset_values[offset_index-1]
return position - offset
def make_packet(self, sample, id="packet0", N=10_000):
molecules = []
for i in range(N):
sequence, start, cigar, strand, ref_start, ref_cigar, chrom, transcript_id = self.make_molecule()
# TODO: use ref_start, ref_cigar when making Molecules
# this requires that the Molecule class include these cigar strings
mol = Molecule(Molecule.new_id(), sequence, start=1, cigar=f"{len(sequence)}M",
source_start=start, source_cigar=cigar, source_strand=strand,
transcript_id=transcript_id)
molecules.append(mol)
return MoleculePacket(id, sample, molecules)
[docs] def make_molecule_file(self, filepath, N=10_000):
"""
Write out molecules to a tab-separated file
Note: we write out a molecules start and cigar relative to the appropriate
custom genome, either _1 or _2 as per the transcript id
"""
with open(filepath, "w") as molecule_file:
header = "#transcript_id\tchrom\tstart\tcigar\tref_start\tref_cigar\tstrand\tsequence\n"
molecule_file.write(header)
for i in range(N):
sequence, start, cigar, ref_start, ref_cigar, strand, chrom, transcript_id = self.make_molecule()
# NOTE: Not outputing the molecules start or cigar string since those are relative to parent
# which in this case is always trivial (start=1, cigar=###M) since the molecule is new
line = "\t".join([transcript_id,
chrom,
str(start),
cigar,
str(ref_start),
ref_cigar,
strand,
sequence]
) + "\n"
molecule_file.write(line)
[docs] def execute(self, sample, output_type, output_molecule_count, seed=None, molecules_per_packet=None):
"""This is the main method that generates simulated molecules and saves/
exports them in the desired format. It uses the gene, transcript, intron,
and allelic imbalance distributions generated by the other CAMPAREE steps.
Parameters
----------
sample : Sample
Sample object corresponding to the input distributions. When exporting
molecule packets, this Sample object is used to instantiate the
MoleculePacket object.
output_type : string
Type of file or object used to save or export simulated molecules.
Sould be one of {', '.join(MoleculeMakerStep.OUTPUT_OPTIONS_W_EXTENSIONS.keys())}.
output_molecule_count : integer
Total number of molecules to save/export for the current Sample.
seed : integer
[OPTIONAL] Seed for random number generator. Used so repeated runs
can produce the same results.
molecules_per_packet : integer
[OPTIONAL] Maximum number of molecules in each molecule packet. Must
be positive, non-zero integer (this is not currently checked).
"""
sample_data_directory = os.path.join(self.data_directory_path, f"sample{sample.sample_id}")
log_file_path = os.path.join(self.log_directory_path, f'sample{sample.sample_id}',
CAMPAREE_CONSTANTS.MOLECULE_MAKER_LOG_FILENAME)
output_file_extension = MoleculeMakerStep.OUTPUT_OPTIONS_W_EXTENSIONS[output_type]
if seed is not None:
numpy.random.seed(seed)
if not molecules_per_packet:
molecules_per_packet=MoleculeMakerStep.DEFAULT_MOLECULES_PER_PACKET
with open(log_file_path, "w") as log_file:
print(f"Generating molecules for sample{sample.sample_id}.")
log_file.write(f"Generating molecules for sample{sample.sample_id}.\n")
log_file.write(f"Parameters:\n"
f" Output file type: {output_type}\n"
f" Output file extension: {output_file_extension}\n"
f" Num molecules to generate: {output_molecule_count}\n"
f" Num molecules per packet: {molecules_per_packet}\n"
f" Random seed value: {seed}\n")
print('Loading gene, intron, transcript PSI, and allelic imbalance'
' distributions.')
log_file.write('Loading gene, intron, transcript PSI, and allelic'
' imbalance distributions.\n')
# Read and load data from gene, intron, transcript PSI, and allelic
# imbalance distribution files.
self.genes, self.gene_quants = \
self.load_gene_quants(os.path.join(sample_data_directory,
self._GENE_QUANT_FILENAME))
self.gene_probabilities = self.gene_quants / numpy.sum(self.gene_quants)
self.transcript_intron_quants, self.intron_quants = \
self.load_intron_quants(os.path.join(sample_data_directory,
self._INTRON_QUANT_FILENAME))
self.isoform_quants = \
self.load_isoform_quants(os.path.join(sample_data_directory,
self._TX_QUANT_PSI_FILENAME))
self.allelic_quant = \
self.load_allelic_quants(os.path.join(sample_data_directory,
self._ALLELIC_IMBALANCE_FILENAME))
print('Loading annotations, transcriptome sequences, and genome sequences'
' from botr parental genomes.')
log_file.write('Loading annotations, transcriptome sequences, and genome'
' sequences from both parental genomes.\n')
# Read and load annotations, as well as full transcriptome and genome
# sequences for each parental genome. This information is used when
# generating the simulated molecule sequences.
self.transcriptomes = \
[self.load_transcriptome(os.path.join(sample_data_directory,
self._PARENTAL_TX_FASTA_FILENAME_PATTERN.format(genome_name=genome_name)))
for genome_name in [1,2]]
self.annotations = \
[self.load_annotation(os.path.join(sample_data_directory,
self._PARENTAL_ANNOT_FILENAME_PATTERN.format(genome_name=genome_name)))
for genome_name in [1,2]]
self.genomes = \
[self.load_genome(os.path.join(sample_data_directory,
self._PARENTAL_GENOME_FASTA_FILENAME_PATTERN.format(genome_name=genome_name)))
for genome_name in [1,2]]
print('Loading indel information from both parental genomes.')
log_file.write('Loading indel information from both parental genomes.\n')
# Read and load indel data for each parental genome. This information is
# used when constructing CIGAR strings mapping transcripts back to their
# locations in the original reference genome.
indel_data = \
[self.load_indels(os.path.join(sample_data_directory,
self._PARENTAL_GENOME_INDEL_FILENAME_PATTERN.format(genome_name=genome_name)))
for genome_name in [1,2]]
self.indels = [indels for indels, offset_data in indel_data]
self.offset_data = [offset_data for indels, offset_data in indel_data]
# Generate molecules and save/export them according to output type.
print('Generating molecules and saving/exporting the results.')
log_file.write('Generating molecules and saving/exporting the results.')
if output_type == "packet":
# TODO: potentially rounds down the number of molecules to make
num_packets = output_molecule_count // molecules_per_packet
for i in range(1,num_packets+1):
print(f" Generating packet {i} of {num_packets}")
log_file.write(f" Generating packet {i} of {num_packets}\n")
packet = self.make_packet(sample=sample, id=f"sample{sample.sample_id}.{i}")
molecule_packet_filename = os.path.join(sample_data_directory,
self.OUTPUT_FILENAME_PATTERN.format(output_type=output_type,
packet_num=i,
extension=output_file_extension))
with open(molecule_packet_filename, "wb") as out_file:
pickle.dump(packet, out_file)
elif output_type == "file":
print("Generating molecule file.")
log_file.write("Generating molecule file.")
molecule_output_filename = os.path.join(sample_data_directory,
self.OUTPUT_FILENAME_PATTERN.format(output_type=output_type,
packet_num="",
extension=output_file_extension))
self.make_molecule_file(filepath=molecule_output_filename,
N = output_molecule_count)
log_file.write("\nALL DONE!\n")
[docs] def get_commandline_call(self, sample, output_type, output_molecule_count,
seed=None, molecules_per_packet=None):
"""Prepare command to execute the MoleculeMakerStep from the command line,
given all of the arugments used to run the execute() function.
Parameters
----------
sample : Sample
Sample object corresponding to the input distributions. When exporting
molecule packets, this Sample object is used to instantiate the
MoleculePacket object.
output_type : string
Type of file or object used to save or export simulated molecules.
Sould be one of {', '.join(MoleculeMakerStep.OUTPUT_OPTIONS_W_EXTENSIONS.keys())}.
output_molecule_count : integer
Total number of molecules to save/export for the current Sample.
seed : integer
[OPTIONAL] Seed for random number generator. Used so repeated runs
can produce the same results.
molecules_per_packet : integer
[OPTIONAL] Maximum number of molecules in each molecule packet. Must
be positive, non-zero integer (this is not currently checked).
Returns
-------
string
Command to execute on the command line. It will perform the same
operations as a call to execute() with the same parameters.
"""
#Retrieve path to the allelic_imbalance_quant.py script.
molecule_maker_step_path = os.path.realpath(__file__)
#If the above command returns a string with a "pyc" extension, instead
#of "py", strip off "c" so it points to this script.
molecule_maker_step_path = molecule_maker_step_path.rstrip('c')
command = (f" python {molecule_maker_step_path}"
f" --log_directory_path {self.log_directory_path}"
f" --data_directory_path {self.data_directory_path}"
f" --sample '{repr(sample)}'"
f" --output_type {output_type}"
f" --output_molecule_count {output_molecule_count}")
if seed is not None:
command += f" --seed {seed}"
if molecules_per_packet:
command += f" --molecules_per_packet {molecules_per_packet}"
return command
[docs] def get_validation_attributes(self, sample, output_type, output_molecule_count,
seed=None, molecules_per_packet=None):
"""Prepare attributes required by is_output_valid() function to validate
output generated by the MoleculeMakerStep job.
Parameters
----------
sample : Sample
Sample object corresponding to the input distributions. When exporting
molecule packets, this Sample object is used to instantiate the
MoleculePacket object.
output_type : string
Type of file or object used to save or export simulated molecules.
Sould be one of {', '.join(MoleculeMakerStep.OUTPUT_OPTIONS_W_EXTENSIONS.keys())}.
output_molecule_count : integer
Total number of molecules to save/export for the current Sample.
seed : integer
[OPTIONAL] Seed for random number generator. Used so repeated runs
can produce the same results. [Note: this parameter is captured just
so get_validation_attributes() accepts the same arguments as
get_commandline_call(). It is not used here.]
molecules_per_packet : integer
[OPTIONAL] Maximum number of molecules in each molecule packet. Must
be positive, non-zero integer (this is not currently checked).
Returns
-------
dict
A MoleculeMakerStep job's data_directory, log_directory, corresponding
sample ID, output file type, output molecule count, and the number of
molecules per packet.
"""
validation_attributes = {}
validation_attributes['data_directory'] = self.data_directory_path
validation_attributes['log_directory'] = self.log_directory_path
validation_attributes['sample_id'] = sample.sample_id
validation_attributes['output_type'] = output_type
validation_attributes['output_molecule_count'] = output_molecule_count
validation_attributes['molecules_per_packet'] = molecules_per_packet
return validation_attributes
[docs] @staticmethod
def is_output_valid(validation_attributes):
"""Check if output of MoleculeMakerStep for a specific job/execution is
correctly formed and valid, given a job's data directory, log directory,
sample ID, output file type, output molecule count, and the number of
molecules per packet (if provided). Prepare these attributes for a given
job using the get_validation_attributes() method.
Parameters
----------
validation_attributes : dict
A job's data_directory, log_directory, corresponding sample_id,
output file type, output molecule count, and the number of molecules
per packet (if provided).
Returns
-------
boolean
True - MoleculeMakerStep output files were created and are well formed.
False - MoleculeMakerStep output files do not exist or are missing data.
"""
data_directory_path = validation_attributes['data_directory']
log_directory_path = validation_attributes['log_directory']
sample_id = validation_attributes['sample_id']
output_type = validation_attributes['output_type']
output_molecule_count = validation_attributes['output_molecule_count']
molecules_per_packet = validation_attributes.get('molecules_per_packet')
output_file_extension = MoleculeMakerStep.OUTPUT_OPTIONS_W_EXTENSIONS[output_type]
if not molecules_per_packet:
molecules_per_packet = MoleculeMakerStep.DEFAULT_MOLECULES_PER_PACKET
valid_output = False
# Construct output filenames/paths
sample_data_directory = os.path.join(data_directory_path, f"sample{sample_id}")
log_file_path = os.path.join(log_directory_path, f'sample{sample_id}',
CAMPAREE_CONSTANTS.MOLECULE_MAKER_LOG_FILENAME)
# Check existence of output files. If output type is molecule packet,
# there will be multiple output files and this code will check that they
# all exist (based on number of molecules per packet).
all_molecule_files_exist = False
if output_type == "packet":
total_num_packets = output_molecule_count // molecules_per_packet
num_packets_exist = 0
for i in range(1, total_num_packets+1):
molecule_packet_file = os.path.join(sample_data_directory,
MoleculeMakerStep.OUTPUT_FILENAME_PATTERN.format(output_type=output_type,
packet_num=i,
extension=output_file_extension))
if os.path.isfile(molecule_packet_file):
num_packets_exist += 1
if total_num_packets == num_packets_exist:
all_molecule_files_exist = True
elif output_type == "file":
molecule_file = os.path.join(sample_data_directory,
MoleculeMakerStep.OUTPUT_FILENAME_PATTERN.format(output_type=output_type,
packet_num="",
extension=output_file_extension))
all_molecule_files_exist = os.path.isfile(molecule_file)
# TODO: Report reason why out validation failed.
if all_molecule_files_exist and os.path.isfile(log_file_path):
#Read last line in log file
line = ""
with open(log_file_path, "r") as log_file:
for line in log_file:
line = line.rstrip()
if line == "ALL DONE!":
valid_output = True
return valid_output
[docs] @staticmethod
def main():
"""Entry point into script. Parses the argument list to obtain all the
files needed and feeds them to the class constructor. Calls the appropriate
methods thereafter.
"""
parser = argparse.ArgumentParser(description='Generate simulated molecules and export/save.')
parser.add_argument('-l', '--log_directory_path', required=True,
help="Path to log directory.")
parser.add_argument('-d', '--data_directory_path', required=True,
help='Path to data directory')
parser.add_argument('--sample', required=True,
help='String representation of a Sample object.')
parser.add_argument('--output_type', required=True,
help=f"Type of molecule output ({', '.join(MoleculeMakerStep.OUTPUT_OPTIONS_W_EXTENSIONS.keys())}).")
parser.add_argument('--output_molecule_count', type=int, required=True,
help='Number of molecules to generate.')
parser.add_argument('--seed', type=int, default=None, required=False,
help='Seed value for random number generator.')
parser.add_argument('--molecules_per_packet', type=int, default=None, required=False,
help='Number of molecules per molecule packet. '
'Only used if output_type set to "packet".')
args = parser.parse_args()
sample = eval(args.sample)
molecule_maker = MoleculeMakerStep(log_directory_path=args.log_directory_path,
data_directory_path=args.data_directory_path)
molecule_maker.execute(sample=sample,
output_type=args.output_type,
output_molecule_count=args.output_molecule_count,
seed=args.seed,
molecules_per_packet=args.molecules_per_packet)
if __name__ == "__main__":
sys.exit(MoleculeMakerStep.main())