import os
import collections
import numpy
from beers_utils.molecule_packet import MoleculePacket
from beers_utils.molecule import Molecule
from beers_utils.sample import Sample
from beers_utils.general_utils import GeneralUtils
[docs]class MoleculeMaker:
"""
MoleculeMaker generates molecules based off of gene, intron, and allelic quantification files
as well as customized genomic sequence and annotation
"""
def __init__(self, sample, sample_directory):
"""
:param sample: sample object to work on
:param sample_directory: path to the directory that contains the sample's file (i.e. the CAMPAREE data directory for the sample)
"""
self.sample = sample
intron_quant_file_path = os.path.join(sample_directory, "intron_quantifications.txt")
self.transcript_intron_quants, self.intron_quants = self.load_intron_quants(intron_quant_file_path)
gene_quant_file_path = os.path.join(sample_directory, "gene_quantifications.txt")
self.genes, self.gene_quants = self.load_gene_quants(gene_quant_file_path)
self.gene_probabilities = self.gene_quants / numpy.sum(self.gene_quants)
self.transcriptomes = [self.load_transcriptome(os.path.join(sample_directory, f"transcriptome_{i}.fa"))
for i in [1,2]]
self.annotations = [self.load_annotation(os.path.join(sample_directory, f"updated_annotation_{i}.txt"))
for i in [1,2]]
self.genomes = [self.load_genome(os.path.join(sample_directory, f"custom_genome_{i}.fa"))
for i in [1,2]]
indel_data = [self.load_indels(os.path.join(sample_directory, f"custom_genome_indels_{i}.txt"))
for i in [1,2]]
self.indels = [indels for indels, offset_data in indel_data]
self.offset_data = [offset_data for indels, offset_data in indel_data]
isoform_quant_file_path = os.path.join(sample_directory, "isoform_psi_value_quantifications.txt")
self.isoform_quants = self.load_isoform_quants(isoform_quant_file_path)
allelic_quant_file_path = os.path.join(sample_directory, "allelic_imbalance_quantifications.txt")
self.allelic_quant = self.load_allelic_quants(allelic_quant_file_path)
def load_annotation(self, file_path):
transcripts = dict()
with open(file_path) as annotation_file:
for line in annotation_file:
if line.startswith("#"):
continue # Comment/header line
chrom, strand, tx_start, tx_end, exon_count, exon_starts, exon_ends, transcript_id, gene_id, gene_sybmol, *other \
= line.split("\t")
transcripts[transcript_id] = (chrom, strand, int(tx_start), int(tx_end),
[int(start) for start in exon_starts.split(",")],
[int(end) for end in exon_ends.split(",")])
return transcripts
[docs] def load_transcriptome(self, file_path):
"""
Read in a fasta file and load is a dictionary id -> sequence
assumed one-line for the whole contig
"""
transcripts = dict()
with open(file_path) as transcriptome_file:
while True:
line = transcriptome_file.readline()
if not line:
break
assert line[0] == ">"
transcript_id, chrom, region = line[1:].strip().split(":")
sequence = transcriptome_file.readline().strip()
transcripts[transcript_id] = sequence
return transcripts
[docs] def load_genome(self, file_path):
"""
Read in a fasta file and load is a dictionary id -> sequence
"""
genome = dict()
with open(file_path) as transcriptome_file:
while True:
line = transcriptome_file.readline()
if not line:
break
assert line[0] == ">"
contig = line[1:].strip()
sequence = transcriptome_file.readline().strip()
genome[contig] = sequence
return genome
[docs] def load_indels(self, file_path):
"""
Read in the file of indel locations for a given custom genome
Store it as a dictionary chrom -> (indel_starts, indel_data)
where indel_starts is a numpy array of start locations of the indels
(i.e. 1 based coordinates of the base in the custom genome where the insertion/deletion occurs immediately after)
and indel_data is a list of tuples (indel_start, indel_type, indel_length)
where indel_type is 'I' or 'D'
The indel file is tab-separated with format "chrom:start type length" and looks like the following:
1:4897762 I 2
1:7172141 I 2
1:7172378 D 1
Assumption is that the file is sorted by start and no indels overlap
Moreover, return a dictionary chrom -> (offset_starts, offset_values)
where offset_starts is as indel_starts, a sorted numpy array with indicating the positions
where the offset (i.e. custom_genome_position - reference_genome_position values) change
and offset_values is a list in the same order indicating these values.
Note that the offset value is to be used for all bases AFTER the offset position, not on that base
"""
indels = collections.defaultdict(lambda : (collections.deque(), collections.deque()))
offset_values = collections.defaultdict(lambda : collections.deque())
current_offsets = collections.defaultdict(lambda : 0)
with open(file_path) as indel_file:
for line in indel_file:
loc, indel_type, length = line.split('\t')
chrom, start = loc.split(':')
# indel_start needs to be relative to the custom genome but the indel file has starts
# that are relative to the reference genome
indel_start = int(start) + current_offsets[chrom]
indel_length = int(length)
starts, data = indels[chrom]
starts.append(indel_start)
data.append((indel_start, indel_type, indel_length))
if indel_type == "I":
current_offsets[chrom] += indel_length
elif indel_type == "D":
current_offsets[chrom] -= indel_length
offset_values[chrom].append(current_offsets[chrom])
# We make these defaultdicts in case there are chromosomes without any indels
# in which case we don't see them in this file, but we don't want to crash on them
result = collections.defaultdict(lambda: (numpy.array([]), list()))
offset_data = collections.defaultdict(lambda: (numpy.array([]), list()))
for key, (starts, data) in indels.items():
result[key] = (numpy.array(starts), list(data))
offset_data[key] = (numpy.array(starts), offset_values[key])
return result, offset_data
[docs] def load_intron_quants(self, file_path):
"""
Load an intron quantification file as two dictionaries,
(transcript ID -> sum FPK of all introns in transcript) and
(transcript ID -> list of FPKs of each intron in transcript)
"""
transcript_intron_quants = dict() # Dictionary transcript -> FPK for all introns in the transcript, combined
intron_quants = dict() # Dictioanry transcript -> array of FPKs for each intron in the transcript
with open(file_path) as intron_quants_file:
for line in intron_quants_file:
if line.startswith("#"):
continue # Comment/header line
gene, transcript, chrom, strand, transcript_intron_reads_FPK, intron_reads_FPK = line.strip().split("\t")
transcript_intron_quants[transcript] = float(transcript_intron_reads_FPK)
intron_quants[transcript] = [float(quant) for quant in intron_reads_FPK.split(",")]
return transcript_intron_quants, intron_quants
[docs] def load_gene_quants(self, file_path):
"""
Read in a gene quantification file as two lists of gene IDs and of their read quantifications
"""
genes = []
gene_quants = []
with open(file_path) as gene_quant_file:
for line in gene_quant_file:
if line.startswith("#"):
continue # Comment/header line
gene, quant = line.strip().split("\t")
genes.append(gene)
gene_quants.append(float(quant))
return genes, numpy.array(gene_quants)
[docs] def load_allelic_quants(self, file_path):
"""
Reads allelic quantification file into a dictionary: gene_id -> (allele 1 probability, allele 2 probability)
"""
allelic_quant = dict()
with open(file_path) as allele_quant_file:
for line in allele_quant_file:
if line.startswith("#"):
continue # Comment/header line
gene, allele1, allele2 = line.split("\t")
allele1 = float(allele1)
allele2 = float(allele2)
allelic_quant[gene] = (allele1, allele2)
return allelic_quant
def make_molecule(self):
# Pick random gene
gene_index = numpy.random.choice(len(self.genes), p=self.gene_probabilities)
gene = self.genes[gene_index]
gene_quant = self.gene_quants[gene_index]
# Pick random transcript in gene
transcripts, psis = self.isoform_quants[gene]
transcript = numpy.random.choice(transcripts, p=psis)
# Pick random allele based on the gene's allelic distribution
allele_number = numpy.random.choice([1,2], p=self.allelic_quant[gene])
# Read in annotation for the chosen transcript
chrom,strand,tx_start,tx_end,starts,ends= self.annotations[allele_number - 1][transcript]
# Determine if pre_mRNA or mature mRNA
intron_quant = self.transcript_intron_quants[transcript]
# TODO: check that this gives the appropriate fraction as pre_mRNA
# previously was using intron_quant / (intron_quant + gene_quant)
# but if assuming everything is either full pre_mRNA or mature mRNA then this should be
# the right fraction, which could happen to be greater than one (!)
try:
fraction_pre_mRNA = min(intron_quant / (gene_quant), 1)
except ZeroDivisionError:
# Should not ever get here since a gene with 0 gene_quant should have 0 chance of being chosen
# however, if we do, we will just always give pre_mRNA
fraction_pre_mRNA = 1.0
pre_mRNA = numpy.random.uniform() < fraction_pre_mRNA
if pre_mRNA:
# If chosen to be pre_mRNA, overwrite the usual exon starts/ends with a single, big "exon"
starts = [tx_start]
ends = [tx_end]
# Find cigar string relative to the reference genome (i.e. custom_genome_1 or custom_genome_2)
gaps = [next_start - last_end - 1 for next_start,last_end in zip(starts[1:],ends[:-1])]
cigar = ''.join( f"{end - start + 1}M{gap}N" for start,end,gap in zip(starts[:-1],ends[:-1],gaps)) \
+ f"{ends[-1] - starts[-1] + 1}M"
ref_cigar = ''.join( f"{self.get_reference_cigar(start, end, chrom, allele_number)}{gap}N"
for start,end,gap in zip(starts[:-1],ends[:-1],gaps)) \
+ self.get_reference_cigar(starts[-1], ends[-1], chrom, allele_number)
ref_start = self.convert_genome_position_to_reference(starts[0], chrom, allele_number)
transcript_id = f"{transcript}_{allele_number}{'_pre_mRNA' if pre_mRNA else ''}"
# Build the actual sequence
chrom_sequence = self.genomes[allele_number - 1][chrom]
sequence = ''.join( chrom_sequence[start-1:end] for start,end in zip(starts, ends) )
if strand == '-':
# We always give the sequence from 5' to 3' end of the RNA molecule
# so reverse complement this
sequence = GeneralUtils.create_complement_strand(sequence)
# NOTE: cigar string stays the same since that is relative to the + strand
# TODO: for now, everything gets polyA but maybe shouldn't
polyA_tail = True
if polyA_tail:
# TODO: polyA tails should vary in length
# Add polyA tail to 3' end
sequence = sequence + "A"*200
# Soft-clip the polyA tail at the end since it shouldn't align
if strand == "+":
cigar = cigar + "200S"
ref_cigar = ref_cigar + "200S"
else:
cigar = "200S" + cigar # Relative to + strand, the A's are going on the 5' end
ref_cigar = "200S" + ref_cigar
return sequence, starts[0], cigar, ref_start, ref_cigar, strand, chrom, transcript_id
[docs] def get_reference_cigar(self, start, end, chrom, allele):
"""
Returns the cigar string for the part of the custom chromosome on the segment from start to end (inclusive, one based)
relative to the reference genome
"""
indel_starts, indel_data = self.indels[allele-1][chrom]
cigar_components = []
start_of_remaining = start
# Skip to the first relevant indel, then continue until they go past this region
index_idx = numpy.searchsorted(indel_starts, start)
for indel in indel_data[index_idx:]:
indel_start, indel_type, indel_length = indel
if indel_start >= end:
# The deletion or insertion begins immediately AFTER indel_start
# so we're done, we've gone through all indels in our exon
break
if indel_start > start_of_remaining:
# Match a region without indels
cigar_components.append(f"{indel_start - start_of_remaining + 1}M")
if indel_type == "I":
# Custom genome has an insertion right after indel_start
# Our region might end before the insertion does, so cap the length of the insertion
length = min(end, indel_start + indel_length) - indel_start
cigar_components.append(f"{length}I")
# Increment to after the indel, which includes the inserted bases
start_of_remaining = indel_start + indel_length + 1
elif indel_type == "D":
# Custom genome has a deletion right after indel_start
cigar_components.append(f"{indel_length}D")
# Advance to just after the deletion, but don't increment by the length of the deletion
# since the deleted bases don't appear in our custom genome segment
start_of_remaining = indel_start + 1
# The last chunk of matches after the last indel
if end >= start_of_remaining:
cigar_components.append(f"{end - start_of_remaining + 1}M")
return ''.join(cigar_components)
[docs] def convert_genome_position_to_reference(self, position, chrom, allele):
"""
Convert (1-indexed) position into the current (custom) genome into a position
relative to the reference genome
"""
# offset_positions is a sorted list of all offsets in this allele+chromosome
# so find where our position fits in, so offset_index -1 is the last offset strictly before position
offset_starts, offset_values = self.offset_data[allele-1][chrom]
offset_index = numpy.searchsorted(offset_starts, position)
if offset_index == 0:
# If ours is before all offsets, then we need no offset
return position
else:
offset = offset_values[offset_index-1]
return position - offset
def make_packet(self, id="packet0", N=10_000):
molecules = []
for i in range(N):
sequence, start, cigar, strand, ref_start, ref_cigar, chrom, transcript_id = self.make_molecule()
# TODO: use ref_start, ref_cigar when making Molecules
# this requires that the Molecule class include these cigar strings
mol = Molecule(Molecule.new_id(), sequence, start=1, cigar=f"{len(sequence)}M",
source_start=start, source_cigar=cigar, source_strand=strand,
transcript_id=transcript_id)
molecules.append(mol)
return MoleculePacket(id, self.sample, molecules)
[docs] def make_molecule_file(self, filepath, N=10_000):
"""
Write out molecules to a tab-separated file
Note: we write out a molecules start and cigar relative to the appropriate
custom genome, either _1 or _2 as per the transcript id
"""
with open(filepath, "w") as molecule_file:
header = "#transcript_id\tchrom\tstart\tcigar\tref_start\tref_cigar\tstrand\tsequence\n"
molecule_file.write(header)
for i in range(N):
sequence, start, cigar, ref_start, ref_cigar, strand, chrom, transcript_id = self.make_molecule()
# NOTE: Not outputing the molecules start or cigar string since those are relative to parent
# which in this case is always trivial (start=1, cigar=###M) since the molecule is new
line = "\t".join([transcript_id,
chrom,
str(start),
cigar,
str(ref_start),
ref_cigar,
strand,
sequence]
) + "\n"
molecule_file.write(line)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Molecule Maker")
parser.add_argument("sample", help="sample object (as output by repr() in single quotes)")
parser.add_argument("sample_directory", help="directory in which sample results go")
parser.add_argument("out_path", help="path to write output molecule file")
parser.add_argument("-N", help="number of molecules to make", default=10_000, type=int)
args = parser.parse_args()
# Remove single quotes and eval the sample
sample_object = eval(args.sample.strip("'"))
print("Starting to make molecules...")
mm = MoleculeMaker(sample_object, args.sample_directory)
packet = mm.make_molecule_file(args.out_path, N=args.N)
''' Example:
python molecule_maker.py 'Sample(1, "Sample 1", "/", ["ACGT", "CGCA"], gender="male")' data/_run1/sample1/ data/_run1/sample1/molecule_file -N 100
'''