import collections
import itertools
import os
import sys
import argparse
import json
import numpy
import pysam
from camparee.annotation_info import AnnotationInfo
from camparee.abstract_camparee_step import AbstractCampareeStep
from camparee.camparee_constants import CAMPAREE_CONSTANTS
[docs]class IntronQuantificationStep(AbstractCampareeStep):
def __init__(self, log_directory_path, data_directory_path, parameters):
#TODO: I dont thing the data directory or the log directory are ever
# used in the code below. Should we remove them? Or adapt the code
# to make use of them instead?
self.log_directory_path = log_directory_path
self.data_directory_path = data_directory_path
self.info = None
self.intron_normalized_antisense_counts = collections.Counter()
self.intron_normalized_counts = collections.Counter()
self.transcript_intron_antisense_counts = collections.Counter()
self.transcript_intron_counts = collections.Counter()
self.flank_size = parameters["flank_size"]
self.intergenic_read_counts = collections.defaultdict(collections.Counter)
self.forward_read_is_sense = parameters["forward_read_is_sense"]
[docs] def validate(self):
# TODO: do we need proper validation?
return True
[docs] def execute(self, aligned_file_path, output_directory, geneinfo_file_path):
# (chrom, strand) -> {(transcriptID, intron_number) -> read_count}
intron_read_counts = collections.Counter()
intron_antisense_read_counts = collections.Counter()
# chrom -> {intron -> read count}
# Open BAM file with pysam
# NOTE: use the check_sq=False flag since sometimes pysam complains erroneously about BAM headers
# even though the header appears fine in samtools
print(f"Opening alignment file {aligned_file_path}")
with pysam.AlignmentFile(aligned_file_path, "rb") as alignments:
chrom_lengths = dict(zip(alignments.references, alignments.lengths))
# Read in the annotation information
self.info = AnnotationInfo(geneinfo_file_path, chrom_lengths)
print(f"Read in annotation info file {geneinfo_file_path}")
unpaired_reads = dict()
# Go through all reads, and compare
skipped_chromosomes = []
for read in alignments.fetch(until_eof=True):
# Use only uniquely mapped reads with both pairs mapped
if read.is_unmapped or not read.is_proper_pair or not read.get_tag("NH") == 1:
continue
try:
mate = unpaired_reads[read.query_name]
except KeyError:
# mate not cached for processing, so cache this one
unpaired_reads[read.query_name] = read
continue
# Read is paired to 'mate', so we process both together now
# So remove the mate from the cache since we're done with it
del unpaired_reads[read.query_name]
chrom = read.reference_name
# Check annotations are available for that chromosome
if chrom not in self.info.intergenics:
if chrom not in skipped_chromosomes:
print(f"Alignment from chromosome {chrom} skipped")
skipped_chromosomes.append(chrom)
continue
# According to the SAM file specification, this CAN fail but I don't understand why it would
# so just throw this assert in to verify that it doesn't, at least for now
assert read.is_reverse != mate.is_reverse
# Figure out the fragment's strand - depends on whether the forward or reverse reads are 'sense'
read1_reverse_aligned = (read.is_reverse and read.is_read1) or (not read.is_reverse and read.is_read2)
if self.forward_read_is_sense:
strand = "-" if read1_reverse_aligned else "+"
else:
strand = "+" if read1_reverse_aligned else "-"
antisense_strand = "-" if strand is "+" else "+"
# Use get() method to prevent KeyError if any of these data structures
# are empty. This can occur for small chromosomes and non-standard
# contigs.
mintron_starts, mintron_ends = self.info.mintron_extents_by_chrom.get((chrom, strand), [None, None])
antisense_mintron_starts, antisense_mintron_ends = self.info.mintron_extents_by_chrom.get((chrom, antisense_strand), [None, None])
intergenic_starts, intergenic_ends = self.info.intergenic_extents_by_chrom.get(chrom, [None, None])
mintrons_touched = set()
antisense_mintrons_touched = set()
intergenics_touched = set()
# check what regions our blocks touch
for start, end in itertools.chain(read.get_blocks(), mate.get_blocks()):
# NOTE: pysam is working in 0-based, half-open coordinates! We are using 1-based
start = start + 1
# If any mintrons were found in the current chromosome/strand
if mintron_starts is not None and mintron_ends is not None:
# We may intersect this mintron, depending on where its end lies
# or this could be -1 if we start before all mintrons
last_mintron_before = numpy.searchsorted(mintron_starts, start, side="right") - 1
# We definitely do not intersect this mintron, it starts after our end
first_mintron_after = numpy.searchsorted(mintron_starts, end, side="right")
# but all between the two, we do intersect
if last_mintron_before == -1:
# No introns start before us
mintrons_touched.update(range(0, first_mintron_after))
elif mintron_ends[last_mintron_before] >= start:
# We do intersect the last one to start before us
mintrons_touched.update(range(last_mintron_before, first_mintron_after))
else:
# We only start intersecting the first one after that
mintrons_touched.update(range(last_mintron_before + 1, first_mintron_after))
# If any antisense mintrons were found in the current chromosome/strand
if antisense_mintron_starts is not None and antisense_mintron_ends is not None:
# Now do the same thing as above for antisense regions
last_antisense_mintron_before = numpy.searchsorted(antisense_mintron_starts, start, side="right") - 1
first_antisense_mintron_after = numpy.searchsorted(antisense_mintron_starts, end, side="right")
if last_antisense_mintron_before == -1:
antisense_mintrons_touched.update(range(0, first_antisense_mintron_after))
elif antisense_mintron_ends[last_antisense_mintron_before] >= start:
antisense_mintrons_touched.update(range(last_antisense_mintron_before, first_antisense_mintron_after))
else:
antisense_mintrons_touched.update(range(last_antisense_mintron_before + 1, first_antisense_mintron_after))
# If any intergenic regions were found in the current chromosome
# (should only be an issue for small contigs, some prokaryotic
# genomes, and other less common cases)
if intergenic_starts is not None and intergenic_ends is not None:
# Now do the same thing as above for intergenic regions
last_intergenic_before = numpy.searchsorted(intergenic_starts, start, side="right") - 1
first_intergenic_after = numpy.searchsorted(intergenic_starts, end, side="right")
if last_intergenic_before == -1:
intergenics_touched.update(range(0, first_intergenic_after))
elif intergenic_ends[last_intergenic_before] >= start:
intergenics_touched.update(range(last_intergenic_before, first_intergenic_after))
else:
intergenics_touched.update(range(last_intergenic_before + 1, first_intergenic_after))
# Accumulate the reads
for mintron_index in mintrons_touched:
for intron in self.info.mintrons_by_chrom[chrom, strand][mintron_index].primary_introns:
intron_read_counts[intron] += 1
for antisense_mintron_index in antisense_mintrons_touched:
for intron in self.info.mintrons_by_chrom[chrom, antisense_strand][antisense_mintron_index].primary_antisense_introns:
intron_antisense_read_counts[intron] += 1
for intergenic in intergenics_touched:
self.intergenic_read_counts[chrom][intergenic] += 1
# Normalize reads by effective transcript lengths
for intron, count in intron_read_counts.items():
effective_count = count / intron.effective_length * 1000 # FPK (fragments per kilo-base)
self.intron_normalized_counts[intron] = effective_count
# TODO: is this the right normalization factor for antisense reads?
# currently use the total length of all antisense mintrons, but this seems wrong....
antisense_count = intron_antisense_read_counts[intron]
if antisense_count > 0:
self.intron_normalized_antisense_counts[intron] = antisense_count / intron.antisense_effective_length * 1000
# Now remove counts from non-primary introns from mintrons, under the assumption that
# if two introns overlap, we want to "subtract out" one of them from the overlap
# We process introns in order of their transcript start: the idea being that we are modifying their
# intron counts, so we had better be consistent as we process, going from 5' to 3' ends
for contig, transcripts in self.info.transcripts_by_chrom.items():
for transcript in transcripts:
for intron in transcript.introns:
count = self.intron_normalized_counts.get(intron, 0)
if count == 0:
continue
for mintron in intron.mintrons:
if intron not in mintron.primary_introns:
for other_intron in mintron.primary_introns:
other_counts = self.intron_normalized_counts[other_intron]
# Remove the double-counts but never give negative expression
self.intron_normalized_counts[other_intron] = min(other_counts - count, 0)
# Same for anti-sense
antisense_count = self.intron_normalized_counts.get(intron, 0)
for antisense_mintron in intron.antisense_mintrons:
if intron not in antisense_mintron.primary_antisense_introns:
for other_intron in antisense_mintron.primary_antisense_introns:
other_counts = self.intron_normalized_antisense_counts[other_intron]
# Remove the double-counts but never give negative expression
self.intron_normalized_antisense_counts[other_intron] = min(other_counts - antisense_count, 0)
# Transcript-level intron quantifications
for transcript_id, transcript in self.info.transcripts.items():
# flanks are first-and-last introns
# But for now we do not use them to quantify the total transcript-level
# intron counts since they are not really introns but are very long and so
# throw off the intron length normalization
non_flank_introns = transcript.introns[1:-1]
sense_counts = 0
antisense_counts = 0
total_length = 0
for intron in non_flank_introns:
# We use normalized_counts here and then "unnormalize" them by effective length since we have
# already performed the correction of the above section (removing non-primary intron counts)
sense_counts += self.intron_normalized_counts[intron] * intron.effective_length
antisense_counts += self.intron_normalized_antisense_counts[intron] * intron.effective_length
total_length += intron.effective_length
if total_length == 0:
self.transcript_intron_counts[transcript_id] = 0
self.transcript_intron_antisense_counts[transcript_id] = 0
else:
self.transcript_intron_counts[transcript_id] = sense_counts / total_length
self.transcript_intron_antisense_counts[transcript_id] = antisense_counts / total_length
# Write out the results to output file
# SENSE INTRON OUTPUT
output_file_path = os.path.join(output_directory, CAMPAREE_CONSTANTS.INTRON_OUTPUT_FILENAME)
with open(output_file_path, "w") as output_file:
output_file.write("#gene_id\ttranscript_id\tchr\tstrand\ttranscript_intron_reads_FPK\tintron_reads_FPK\n")
# take transcripts from all chromosomes and combine them, sorting by gene id and then transcript id
transcript_ids = sorted((transcript.gene.gene_id, id, transcript) for id, transcript in self.info.transcripts.items())
for (gene_id, transcript_id, transcript) in transcript_ids:
total_count = self.transcript_intron_counts[transcript_id]
intron_counts = [str(self.intron_normalized_counts[intron]) for intron in transcript.introns]
output_file.write('\t'.join([gene_id,
transcript_id,
transcript.chrom,
transcript.strand,
str(total_count),
','.join(intron_counts),
]) + '\n')
# ANTISENSE INTRON OUTPUT
output_file_path = os.path.join(output_directory, CAMPAREE_CONSTANTS.INTRON_OUTPUT_ANTISENSE_FILENAME)
with open(output_file_path, "w") as output_file:
output_file.write("#gene_id\ttranscript_id\tchr\tstrand\ttranscript_intron_reads_FPK\tintron_reads_FPK\n")
# take transcripts from all chromosomes and combine them, sorting by gene id and then transcript id
transcript_ids = sorted((transcript.gene.gene_id, id, transcript) for id, transcript in self.info.transcripts.items())
for (gene_id, transcript_id, transcript) in transcript_ids:
total_count = self.transcript_intron_antisense_counts[transcript_id]
intron_counts = [str(self.intron_normalized_antisense_counts[intron]) for intron in transcript.introns]
output_file.write('\t'.join([gene_id,
transcript_id,
transcript.chrom,
transcript.strand,
str(total_count),
','.join(intron_counts),
]) + '\n')
# TODO: do we need to normalize intergenic regions?
# Not clear that just dividing by their length is right since usually you have just
# bits and pieces expressed throughout
output_intergenic_file_path = os.path.join(output_directory, CAMPAREE_CONSTANTS.INTERGENIC_OUTPUT_FILENAME)
with open(output_intergenic_file_path, "w") as output_file:
output_file.write("#chromosome\tintergenic_region_number\tstart\tend\treads_FPK\n")
# take transcripts from all chromosomes and combine them, sorting by gene id and then transcript id
chroms_sorted = sorted(self.info.intergenics.keys())
for chrom in chroms_sorted:
intergenics = self.info.intergenics[chrom]
for i, intergenic in enumerate(intergenics):
count = self.intergenic_read_counts[chrom][i]
output_file.write('\t'.join([chrom,
str(i),
str(intergenic.start),
str(intergenic.end),
str(count),
]) + '\n')
[docs] def get_commandline_call(self, aligned_file_path, output_directory, geneinfo_file_path):
"""
Prepare command to execute the IntronQuantification from the command line,
given all of the arugments used to run the execute() method.
Parameters
----------
aligned_file_path : string
Path to BAM file aligned to genome.
output_directory : string
Directory where the following output files will be saved:
{CAMPAREE_CONSTANTS.INTRON_OUTPUT_FILENAME},
{CAMPAREE_CONSTANTS.INTRON_OUTPUT_ANTISENSE_FILENAME},
{CAMPAREE_CONSTANTS.INTERGENIC_OUTPUT_FILENAME}.
geneinfo_file_path : string
Geneinfo file in BED format with 1-based, inclusive coordinates.
Returns
-------
string
Command to execute on the command line. It will perform the same
operations as a call to execute() with the same parameters.
"""
#Retrieve path to the intron_quant.py script.
intron_quant_path = os.path.realpath(__file__)
#If the above command returns a string with a "pyc" extension, instead
#of "py", strip off "c" so it points to this script.
intron_quant_path = intron_quant_path.rstrip('c')
intron_quant_params = {}
intron_quant_params['forward_read_is_sense'] = self.forward_read_is_sense
intron_quant_params['flank_size'] = self.flank_size
command = (f"python {intron_quant_path}"
f" --log_directory_path {self.log_directory_path}"
f" --data_directory_path {self.data_directory_path}"
f" --bam_file {aligned_file_path}"
f" --output_directory {output_directory}"
f" --info_file {geneinfo_file_path}"
f" --parameters '{json.dumps(intron_quant_params)}'")
return command
[docs] def get_validation_attributes(self, aligned_file_path, output_directory, geneinfo_file_path):
"""
Prepare attributes required by is_output_valid() function to validate
output generated the IntronQuantification job.
Parameters
----------
aligned_file_path : string
Path to BAM file aligned to genome. [Note: this parameter is
captured just so get_validation_attributes() accepts the same
arguments as get_commandline_call(). It is not used here.]
output_directory : string
Directory where the following output files are saved:
{CAMPAREE_CONSTANTS.INTRON_OUTPUT_FILENAME},
{CAMPAREE_CONSTANTS.INTRON_OUTPUT_ANTISENSE_FILENAME},
{CAMPAREE_CONSTANTS.INTERGENIC_OUTPUT_FILENAME}.
geneinfo_file_path : string
Geneinfo file in BED format with 1-based, inclusive coordinates.
[Note: this parameter is captured just so get_validation_attributes()
accepts the same arguments as get_commandline_call(). It is not used
here.]
Returns
-------
dict
A IntronQuantification job's output_directory.
"""
validation_attributes = {"output_directory" : output_directory}
return validation_attributes
[docs] @staticmethod
def main():
"""
Entry point into script. Allows script to be executed/submitted via the
command line.
"""
parser = argparse.ArgumentParser(description='Command line wrapper around'
' the intron quantifier')
parser.add_argument("--log_directory_path", help="directory to output logging files to", default=None)
parser.add_argument("--data_directory_path", help="data directory folder (unused)", default=None)
parser.add_argument("-b", "--bam_file", help="BAM or SAM file of strand-specific genomic alignment")
parser.add_argument("-i", "--info_file", help="Geneinfo file in BED format with 1-based, inclusive coordinates")
parser.add_argument("-o", "--output_directory", help=f"Directory where to output files {CAMPAREE_CONSTANTS.INTRON_OUTPUT_FILENAME}, {CAMPAREE_CONSTANTS.INTRON_OUTPUT_ANTISENSE_FILENAME}, {CAMPAREE_CONSTANTS.INTERGENIC_OUTPUT_FILENAME}")
parser.add_argument("--forward_read_is_sense", help="Set if forward read is sense. Default is False. Strand-specificity is assumed", action="store_const", const=True, default=False)
parser.add_argument("--parameters", help="jsonified config parameters", default='{}')
args = parser.parse_args()
print("Starting Intron/Intergenic Quantification step")
parameters = json.loads(args.parameters)
intron_quant = IntronQuantificationStep(args.log_directory_path, args.data_directory_path, parameters=parameters)
intron_quant.execute(args.bam_file, args.output_directory, args.info_file)
[docs] @staticmethod
def is_output_valid(validation_attributes):
#TODO: Add more thorought checks? Also check logging when that's added
# to the rest of the script.
valid_output = False
output_directory = validation_attributes['output_directory']
#Check for the existence of the 3 output files.
if os.path.isfile(os.path.join(output_directory, CAMPAREE_CONSTANTS.INTRON_OUTPUT_FILENAME)) and \
os.path.isfile(os.path.join(output_directory, CAMPAREE_CONSTANTS.INTRON_OUTPUT_ANTISENSE_FILENAME)) and \
os.path.isfile(os.path.join(output_directory, CAMPAREE_CONSTANTS.INTERGENIC_OUTPUT_FILENAME)):
valid_output = True
return valid_output
if __name__ == '__main__':
sys.exit(IntronQuantificationStep.main())
"""
Example:
python intron_quant.py -b "/Users/tgbrooks/BEERS2.0/data/baby_sample1.bam" -i "/Users/tgbrooks/BEERS2.0/data/baby_genome.mm9/annotations" -o .
"""