Source code for camparee.annotation_info

import collections
import itertools
import os
import argparse
import json
import numpy

##### "Plain old data" classes representing the elements inside a gene info file

[docs]class Gene: """Representation of a gene, including it's transcripts. start, end coordinates indicate the min/max of the start/end coordinates of all its transcripts """ def __init__(self, info, gene_id, chrom, strand, start, end, transcripts=None): if transcripts is None: transcripts = [] self.info = info self.chrom = chrom self.strand = strand self.gene_id = gene_id self.start = start self.end = end self.transcripts = transcripts def __eq__(self, other): return self.gene_id == other.gene_id def __repr__(self): return f"Gene({self.gene_id}, {self.chrom}, {self.strand}, {self.start}, {self.end})"
[docs]class Transcript: """ Transcript of a gene, tracks its introns, exons """ def __init__(self, info, gene_id, transcript_id, chrom, strand, start, end, exons=None, introns=None): if introns is None: introns = [] if exons is None: exons = [] self.info = info self.chrom = chrom self.strand = strand self.gene_id = gene_id self.transcript_id = transcript_id self.start = start self.end = end self.exons = exons self.introns = introns def get_gene(self): return self.info.genes[self.gene_id] def __repr__(self): return f"Transcript({self.gene_id}, {self.transcript_id}, {self.chrom}, {self.strand}, {self.start}, {self.end})" gene = property(get_gene)
[docs]class Region: """ Any genomic span of a chromosome strand = +,-, or . if not strand-specific region (eg: an intergenic region) 'comment' is any extra information to carry along, for the purposes of debugging/printing """ def __init__(self, info, chrom, strand, start, end, comment=None): self.info = info self.chrom = chrom self.strand = strand self.start = start self.end = end self.comment = comment def __repr__(self): return f"{type(self).__name__}({self.chrom}, {self.strand}, {self.start}, {self.end}, {self.comment})"
[docs]class TranscriptRegion(Region): """ Region that is part of a transcript (eg: intron or exon) """ def __init__(self, info, gene_id, transcript_id, *args): self.gene_id = gene_id self.transcript_id = transcript_id self.effective_length = 0 self.antisense_effective_length = 0 Region.__init__(self, info, *args) def get_gene(self): return self.info.genes[self.gene_id] def get_transcript(self): return self.info.transcripts[self.transcript_id] gene = property(get_gene) transcript = property(get_transcript)
[docs]class Intron(TranscriptRegion): def __init__(self, *args): TranscriptRegion.__init__(self, *args) self.mintrons = [] # List of all mintrons that this overlaps self.antisense_mintrons = [] # List of mintrons on the other strand that aren't sense for something else
[docs]class Mintron(Region): def __init__(self, info, *args): Region.__init__(self, info, *args) self.introns = [] self.primary_introns = [] self.primary_gene = None self.primary_antisense_introns = [] self.antisense_introns = [] def __repr__(self): return f"Mintron({self.chrom}, {self.strand}, {self.start}, {self.end}, {self.primary_introns}, {self.primary_gene})"
[docs]class AnnotationInfo: """ Data structure containing all the information in a gene info file. Stores genes, transcripts, intergenic regions, and exons both for easy access by ID and for quick lookup by position, using sorted lists by start position, allowing binary search.""" def __init__(self, geneinfo_file_path, chrom_lengths, flank_size=1500): """ Load geneinfo file as a large datastructure containing all the information geneinfo file is in the bed format with 1-based, inclusive coordinates for ranges For example, an Ensembl gtf converted to a bed file with the provided script works. :param geneinfo_file_path: file path to find the geneinfo file :param chrom_lengths: dictionary of {chromosome -> chromosome lengths}, obtainable from header of a SAM/BAM file """ self.chrom_lengths = chrom_lengths self.genes_by_chrom = collections.defaultdict(dict) # All genes in a chrom self.genes = dict() # All genes self.genic_regions_by_chrom = collections.defaultdict(collections.deque) # genes sorted by start position self.transcripts = dict() # All transcripts self.transcripts_by_chrom = collections.defaultdict(collections.deque) # All transcripts in a chromosome, sorted by start position self.exons_by_chrom = collections.defaultdict(collections.deque) # Exons sorted by start position with open(geneinfo_file_path) as geneinfo_file: for line in geneinfo_file: if line.startswith("#"): continue chrom, strand, tx_start, tx_end, exon_count, exon_starts, exon_ends, transcript_id, gene_id, *other = line.split('\t') tx_start, tx_end = int(tx_start), int(tx_end) exon_starts = [int(start) for start in exon_starts.split(',')] exon_ends = [int(end) for end in exon_ends.split(',')] exons = [TranscriptRegion(self, gene_id, transcript_id, chrom, strand, start, end, "exon") for start, end in zip(exon_starts, exon_ends)] intron_starts = [end + 1 for end in exon_ends[:-1]] intron_ends = [start - 1 for start in exon_starts[1:]] introns = [Intron(self, gene_id, transcript_id, chrom, strand, start, end, "intron") for start, end in zip(intron_starts, intron_ends)] transcript = Transcript(self, gene_id, transcript_id, chrom, strand, tx_start, tx_end, exons, introns) self.transcripts[transcript_id] = transcript self.transcripts_by_chrom[chrom, strand].append(transcript) if gene_id in self.genes: transcript.gene.start = min(transcript.gene.start, tx_start) transcript.gene.end = max(transcript.gene.end, tx_end) transcript.gene.transcripts.append(transcript) else: gene = Gene(self, gene_id, chrom, strand, tx_start, tx_end, transcripts=[transcript]) self.genes[gene_id] = gene self.genes_by_chrom[chrom, strand][gene_id] = gene self.genic_regions_by_chrom[chrom, strand].append(gene) self.exons_by_chrom[chrom, strand].extend(exons) # Sort regions by start position self.exons_by_chrom = {contig: sorted(exons, key=lambda x: x.start) for contig, exons in self.exons_by_chrom.items()} self.genic_regions_by_chrom = {contig: sorted(genics, key=lambda x: x.start) for contig, genics in self.genic_regions_by_chrom.items()} self.transcripts_by_chrom = {contig: sorted(transcripts, key=lambda x: x.start) for contig, transcripts in self.transcripts_by_chrom.items()} # Merge exons self.merged_exons = self.merge_regions(self.exons_by_chrom) self.merged_genics = self.merge_regions(self.genic_regions_by_chrom, merge_strands=True) # Compute the flanks of each gene (if any) # and add them to self.merged_genics and to the each Gene as a new intron (if any) self.flanked_genics = self.add_flanks(flank_size) # Get intergenic regions sorted by start self.intergenics = self.complement_regions(self.flanked_genics, by_strand=False) # Compute the mintrons and their inclusions # mintrons are each region that is neither intergenic nor exonic - pieces of intron intergenic_and_exonic = {(chrom, strand): sorted(itertools.chain(self.intergenics[chrom], self.merged_exons[chrom, strand]), key=lambda x: x.start) for chrom, strand in self.merged_exons} self.intergenic_and_exonic = self.merge_regions(intergenic_and_exonic) self.mintrons_by_chrom = self.complement_regions(self.intergenic_and_exonic, cls=Mintron) # The start,ends of the mintrons/intergenics as numpy arrays for fast lookup self.mintron_extents_by_chrom = {(chrom, strand): (numpy.array([mintron.start for mintron in mintrons]), numpy.array([mintron.end for mintron in mintrons])) for (chrom, strand), mintrons in self.mintrons_by_chrom.items()} self.intergenic_extents_by_chrom = {chrom: (numpy.array([intergenic.start for intergenic in intergenics]), numpy.array([intergenic.end for intergenic in intergenics])) for chrom, intergenics in self.intergenics.items()} # Give mintrons their annotations for (chrom, strand), transcripts in self.transcripts_by_chrom.items(): mintrons = self.mintrons_by_chrom[chrom, strand] mintron_starts = numpy.array([m.start for m in mintrons]) for transcript in transcripts: for intron in transcript.introns: # Find corresponding mintrons # first find the one that starts before us, which we may or may not intersect index_before = numpy.searchsorted(mintron_starts, intron.start, side="right") - 1 if index_before == -1: # No mintrons before our start index_before = 0 mintron_before = mintrons[index_before] if mintron_before.end < intron.start: # We do not intersect with this one # But the next one either does intersect or is past us to the right index_before = index_before + 1 # Find first mintron that is completely to our right index_after = numpy.searchsorted(mintron_starts, intron.end, side="right") if index_after == len(mintron_starts): # no mintrons after our end index_after = len(mintron_starts) mintrons_intersected = range(index_before, index_after) # possibly nothing in this range if these are equal intron.mintrons = [mintrons[i] for i in mintrons_intersected] for mintron in intron.mintrons: mintron.introns.append(intron) if mintron.primary_gene is None: mintron.primary_gene = intron.gene mintron.primary_introns = [intron] elif mintron.primary_gene == intron.gene: mintron.primary_introns.append(intron) else: if intron.gene.start > mintron.primary_gene.start: # Overwrite existing primary introns since we start closer to their mintron mintron.primary_introns = [intron] mintron.primary_gene = intron.gene else: pass # Skip if our gene starts before the primary one # Idea is if a gene lives inside another one, it should get priority over the introns # of the bigger gene since we won't want to count a read for both genes # Now add in anti-sense if necessary # Sense gets preference so annotate only if no sense annotation already exists antisense_strand = "+" if strand == "-" else "-" # Use .get() to avoid a KeyError if there are no transcripts on the given # chromosome/strand (like small chromsomes and non-standard contigs). for transcript in self.transcripts_by_chrom.get((chrom, antisense_strand), []): for intron in transcript.introns: # Find corresponding mintrons # first find the one that starts before us, which we may or may not intersect index_before = numpy.searchsorted(mintron_starts, intron.start, side="right") - 1 if index_before == -1: # No mintrons before our start index_before = 0 mintron_before = mintrons[index_before] if mintron_before.end < intron.start: # We do not intersect with this one # But the next one either does intersect or is past us to the right index_before = index_before + 1 # Find first mintron that is completely to our right index_after = numpy.searchsorted(mintron_starts, intron.end, side="right") if index_after == len(mintron_starts): # no mintrons after our end index_after = len(mintron_starts) mintrons_intersected = range(index_before, index_after) # possibly nothing in this range if these are equal intron.antisense_mintrons = [mintrons[i] for i in mintrons_intersected] for mintron in intron.antisense_mintrons: mintron.antisense_introns.append(intron) if mintron.primary_gene is None: mintron.primary_antisense_introns = [intron] mintron.primary_gene = intron.gene elif mintron.primary_gene == intron.gene: mintron.primary_antisense_introns.append(intron) elif mintron.primary_gene.strand == antisense_strand: if intron.gene.start > mintron.primary_gene.start: # Overwrite existing primary introns since we start closer to their mintron mintron.primary_antisense_introns = [intron] mintron.primary_gene = intron.gene else: pass # Skip if our gene starts before the primary one # Idea is if a gene lives inside another one, it should get priority over the introns # of the bigger gene since we won't want to count a read for both genes else: pass # Do nothing since the mintron already has sense annotations, don't give it any anti-sense # Compute effective lengths of each intron for transcript in self.transcripts.values(): for intron in transcript.introns: intron.effective_length = 0 for mintron in intron.mintrons: intron.effective_length += mintron.end - mintron.start + 1 intron.antisense_effective_length = 0 for mintron in intron.antisense_mintrons: intron.antisense_effective_length += mintron.end - mintron.start + 1
[docs] def add_flanks(self, flank_size): """ Add flanks to each genic region up to size flank_size on each end These account for reads that go past the "ends" of the gene but should be thought of as belonging to that gene rather than to intergenic regions. Flanks are added as first/last introns to every transcript, so all transcripts will have at least two introns. Sometimes there is no room for a flank to be added, in which case the flank is given start/stop coordinates of 0,0. :param flank_size: Size of flanks to add :return: flanked_genics, dictionary of genic regions, sorted by start position with flanks added """ # Look through each gene, check if contained in other genes on either side # on sides not contained, add a flank of size flank_size # up to half the size of the gap until the next exon flanked_genics_by_chrom = dict() for (chrom, strand), genes in self.genes_by_chrom.items(): genic_regions = self.genic_regions_by_chrom[chrom, strand] starts = numpy.array([g.start for g in genic_regions]) for gene in genes.values(): # Find genic region that we are included in index = numpy.searchsorted(starts, gene.start, side="right") - 1 # Always >= 0 since a gene's own genic region starts before the gene our_genic_region = genic_regions[index] # Left flank cmt = "left flank" if our_genic_region.start != gene.start: # no flank on left, since some other gene covers the left flank_start = 0 flank_end = 0 else: # Genic region on the left, extend to half that distance if index > 0: previous_genic_region = genic_regions[index - 1] mid = (previous_genic_region.end + our_genic_region.start) // 2 + 1 # midpoint (rounded down) flank_start = max(mid, gene.start - flank_size) else: flank_start = max(1, gene.start - flank_size) flank_end = our_genic_region.start - 1 if flank_start == flank_end: flank_start, flank_end = 0, 0 # no flank cmt = "no left flank" for transcript in gene.transcripts: left_flank = Intron(self, transcript.gene_id, transcript.transcript_id, chrom, strand, flank_start, flank_end, cmt) transcript.introns.insert(0, left_flank) # Right flank cmt = "right flank" if our_genic_region.end != gene.end: # no flank on right, since some other gene covers the right flank_start = 0 flank_end = 0 else: # Genic region on the right, extend to half that distance if index < len(genic_regions) - 1: next_genic_region = genic_regions[index + 1] # Always leave a gap of at least 1 bases by an extra + 1 here mid = (next_genic_region.start + our_genic_region.end) // 2 - 1 # midpoint (rounded up) flank_end = min(mid, gene.end + flank_size) else: flank_end = min(self.chrom_lengths[chrom], gene.end + flank_size) flank_start = our_genic_region.end + 1 if flank_start == flank_end: flank_start, flank_end = 0, 0 # no flank cmt = "no right flank" for transcript in gene.transcripts: right_flank = Intron(self, transcript.gene_id, transcript.transcript_id, chrom, strand, flank_start, flank_end, cmt) transcript.introns.append(right_flank) # Now repeat some of that to update the merged genic regions with their flanks flanked_genics = collections.deque() for index, region in enumerate(genic_regions): if index == 0: # no previous genic region left_start = max(1, region.start - flank_size) else: previous_region = genic_regions[index - 1] mid = (previous_region.end + region.start) // 2 + 1 left_start = max(mid, region.start - flank_size) if index == len(genic_regions) - 1: # no next genic region right_end = min(self.chrom_lengths[chrom], region.end + flank_size) else: next_region = genic_regions[index + 1] mid = (region.end + next_region.start) // 2 - 1 right_end = min(mid, region.end + flank_size) flanked_genics.append(Region(self, chrom, ".", left_start, right_end, comment="flanked genic")) flanked_genics_by_chrom[chrom] = flanked_genics return flanked_genics_by_chrom
def merge_regions(self, regions_by_chrom, merge_strands=False): merged_by_chrom = dict() for (chrom, strand), regions in regions_by_chrom.items(): # merge regions in this contig last_start = -1 last_end = -1 merged = collections.deque() merged_regions = None for region in regions: start, end = region.start, region.end if start <= last_end + 1: # Overlaps with previous start... # ... OR is adjacent to previous exon (want to join touching exons) if end > last_end: # but extends further merged_regions.append(region) merged[-1] = Region(self, chrom, strand, last_start, end, comment=merged_regions) last_end = end else: pass # Skip, contained in the existing exon else: # New exon, no overlap merged_regions = [region] merged.append(Region(self, chrom, strand, start, end, comment=merged_regions)) last_start = start last_end = end if merge_strands: merged_by_chrom[chrom] = merged else: merged_by_chrom[chrom, strand] = merged return merged_by_chrom def complement_regions(self, regions_by_chrom, by_strand=True, cls=Region): # Assumes regions_by_chrom are merged complements_by_chrom = dict() for contig, regions in regions_by_chrom.items(): if by_strand: chrom, strand = contig else: chrom = contig strand = "." complements = collections.deque() start = 1 for region in regions: if region.start == 1: # No complement on left end start = region.end + 1 continue end = region.start - 1 # Never should get start == end here since we are assuming that adjecent regions have been merged complements.append(cls(self, chrom, strand, start, end)) start = region.end + 1 if start < self.chrom_lengths[chrom]: # need one more region on the end complements.append(cls(self, chrom, strand, start, self.chrom_lengths[chrom])) if by_strand: complements_by_chrom[chrom, strand] = complements else: complements_by_chrom[chrom] = complements return complements_by_chrom