Source code for camparee.camparee_utils
import re
from io import StringIO
import os
import gzip
import itertools
import pandas as pd
[docs]class CampareeUtils:
"""
Utilities for steps in the CAMPAREE expression pipeline.
"""
# Line format definition for annotation file
annot_output_format = '{chrom}\t{strand}\t{txStart}\t{txEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{transcriptID}\t{geneID}\t{geneSymbol}\t{biotype}\n'
[docs] @staticmethod
def edit_reference_genome(reference_genome_file_path, edited_reference_genome_file_path):
"""
Helper method to convert a reference genome file containing line breaks embedded within its
sequences to a reference genome file containing each seqeuence on a single line.
:param reference_genome_file_path: Path to reference geneome file having multi-line sequence data
:param edited_reference_genome_file_path: Path to reference genome file to create with single line sequence
data.
"""
reference_genome = dict()
fasta_chromosome_pattern = re.compile(">([^\s]*).*")
chromosome, sequence = '', None
building_sequence = False
with open(reference_genome_file_path, 'r') as reference_genome_file:
for line in reference_genome_file:
if line.startswith(">"):
if building_sequence:
reference_genome[chromosome] = sequence.getvalue()
sequence.close()
chromosome_match = re.match(fasta_chromosome_pattern, line)
chromosome = chromosome_match.group(1)
building_sequence = True
sequence = StringIO()
continue
elif building_sequence:
sequence.write(line.rstrip('\n').upper())
with open(edited_reference_genome_file_path, 'w') as edited_reference_genome_file:
for chr, seq in reference_genome.items():
edited_reference_genome_file.write(f">{chr}\n")
edited_reference_genome_file.write(f"{seq}\n")
[docs] @staticmethod
def create_genome(genome_file_path):
"""
Creates a genome dictionary from the genome file located at the provided path
(if compressed, it must have a gz extension). The filename is assumed to contain the chr sequences without
line breaks.
:param genome_file_path: path to reference genome file (either compressed or not)
:return: genome as a dictionary with the chromosomes/contigs as keys and the sequences as values.
"""
chr_pattern = re.compile(">([^\s]*).*")
genome = dict()
_, file_extension = os.path.splitext(genome_file_path)
if 'gz' in file_extension:
with gzip.open(genome_file_path, 'r') as genome_file:
for chr, seq in itertools.zip_longest(*[genome_file] * 2):
chr_match = re.match(chr_pattern, chr.decode("ascii"))
if not chr_match:
raise CampareeUtilsException(f'Cannot parse the chromosome from the fasta line {chr}.')
chr = chr_match.group(1)
genome[chr] = seq.decode("ascii").rstrip().upper()
else:
with open(genome_file_path, 'r') as reference_genome_file:
with open(genome_file_path, 'r') as genome_file:
for chr, seq in itertools.zip_longest(*[genome_file] * 2):
chr_match = re.match(chr_pattern, chr)
if not chr_match:
raise CampareeUtilsException(f'Cannot parse the chromosome from the fasta line {chr}.')
chr = chr_match.group(1)
genome[chr] = seq.rstrip().upper()
return genome
[docs] @staticmethod
def create_chr_ploidy_data(chr_ploidy_file_path):
"""
Parses the chr_ploidy_data from its tab delimited resource file into a dictionary of dictionaries like so:
{
'1': {'male': 2, 'female': 2},
'X': {'male': 1, 'female': 2}.
...
}
:param chr_ploidy_file_path: full path to the chr_ploidy data file
:return: chr_ploidy_data expressed as a dictionary of dictionary as shown above.
"""
df = pd.read_csv(chr_ploidy_file_path, sep='\t')
return df.set_index('chr').to_dict(orient='index')
@staticmethod
def compare_genome_sequence_lengths(reference_file_path, genome_1_file_path, genome_2_file_path, chromosomes):
comparison = {chromosome:[] for chromosome in chromosomes}
genome = CampareeUtils.create_genome(reference_file_path)
[comparison[chromosome].append(len(sequence)) for chromosome, sequence
in genome.items() if chromosome in chromosomes]
genome = CampareeUtils.create_genome(genome_1_file_path)
for chromosome in chromosomes:
seqeunce_length = len(genome.get(chromosome, ''))
comparison[chromosome].append(seqeunce_length)
genome = CampareeUtils.create_genome(genome_2_file_path)
for chromosome in chromosomes:
seqeunce_length = len(genome.get(chromosome, ''))
comparison[chromosome].append(seqeunce_length)
return comparison
[docs] @staticmethod
def parse_variant_line(line):
''' reads a line of a variant file from CAMPAREE'''
# sample line is: (note tabs and spaces both used)
# 1:28494 | C:1 | T:1 TOT=2 0.5,0.5 E=1.0
if line == '':
return "DONE", 0, {}
entries = line.split('\t')
loc_and_vars, total, fractions, entropy = entries
loc, *variants = loc_and_vars.split(" | ")
chromosome, position = loc.split(":")
position = int(position)
variants = {base: int(count) for base, count in [variant.split(":") for variant in variants]}
return chromosome, position, variants
[docs] @staticmethod
def convert_gtf_to_annot_file_format(gtf_filename):
"""Convert a GTF file to a tab-delimited annotation file with one line
per transcript. Each line in the annotation file will have the following
columns:
1 - chrom
2 - strand
3 - txStart
4 - txEnd
5 - exonCount
6 - exonStarts
7 - exonEnds
8 - transcript_id
9 - gene_id
10 - genesymbol
11 - biotype
This method derives transcript info from the "exon" lines in the GTF
file and assumes the exons are listed in the order they appear in the
transcript, as opposed to their genomic coordinates. The annotation file
will list exons in order by plus-strand coordinates, so this method
reverses the order of exons for all minus-strand transcripts.
Note, this function will add a header to the output file, marked by a
'#' character prefix.
See website for standard 9-column GTF specification:
https://useast.ensembl.org/info/website/upload/gff.html
Parameters
----------
gtf_filename : string
Path to GTF file to be converted annotation file format.
Returns
-------
string
Name of the annotation file produced from the GTF file.
"""
#Note, as-is this statement won't expand "~" in the path to point to
#home directory.
output_annot_filename = os.path.splitext(gtf_filename)[0] + ".annotation.txt"
with open(gtf_filename, 'r') as gtf_file, \
open(output_annot_filename, 'w') as output_annot_file:
#Print annot file header (note the '#' prefix)
output_annot_file.write("#" + CampareeUtils.annot_output_format.replace('{', '').replace('}', ''))
#Regex patterns used to extract individual attributes from the 9th
#column in the GTF file (the "attributes" column)
txid_pattern = re.compile(r'transcript_id "([^"]+)";')
geneid_pattern = re.compile(r'gene_id "([^"]+)";')
genesymbol_pattern = re.compile(r'gene_name "([^"]+)";')
biotype_pattern = re.compile(r'gene_biotype "([^"]+)";')
line_data = [] #List of line fields from current line of gtf
curr_gtf_tx = "" #transcript ID from current line of gtf
chrom = "" #Chromosome for current transcript
strand = "" #Strand for current transcript
txid = "" #ID of current transcript
geneid = "" #gene ID of current transcript
genesymbol = "None" #gene symbol of current transcript
biotype = "None" #Biotype of current transcript
ex_starts = [] #List of exon start coordinates for current transcript
ex_stops = [] #List of exon stop coordinates for current transcript
ex_count = 1 #Number of exons in current transcript
#Step through GTF until first exon entry (need to prime variables
#with data from first exon).
exon_found = False
for line in gtf_file:
line_data = line.split("\t")
#First conditional is to skip any header lines, which would
#result in a "list index out of range" error when checking the
#second conditional. Header lines don't contain any tabs, so
#there is no 3rd list element in line_data, following the split
#command.
if len(line_data) > 1 and line_data[2] == "exon":
exon_found = True
break
if not exon_found:
raise CampareeUtilsException('ERROR: {gtf_file} contains no lines with exon feature_type.\n'.format(gtf_file=gtf_filename))
#Prime variables with data from first exon
chrom = line_data[0]
strand = line_data[6]
ex_starts.append(line_data[3])
ex_stops.append(line_data[4])
txid = txid_pattern.search(line_data[8]).group(1)
geneid = geneid_pattern.search(line_data[8]).group(1)
if genesymbol_pattern.search(line_data[8]):
genesymbol = genesymbol_pattern.search(line_data[8]).group(1)
if biotype_pattern.search(line_data[8]):
biotype = biotype_pattern.search(line_data[8]).group(1)
#process the remainder of the GTF file
for line in gtf_file:
line = line.rstrip('\n')
line_data = line.split("\t")
if line_data[2] == "exon":
curr_gtf_tx = txid_pattern.search(line_data[8]).group(1)
#Check transcript in current line is a new transcript
if curr_gtf_tx != txid:
#Check strand of transcript. If minus, reverse exon order.
if strand == "-":
ex_starts.reverse()
ex_stops.reverse()
#Format data from previous transcript and write to annotation file
output_annot_file.write(
CampareeUtils.annot_output_format.format(
chrom=chrom,
strand=strand,
txStart=ex_starts[0],
txEnd=ex_stops[-1],
exonCount=ex_count,
exonStarts=','.join(ex_starts),
exonEnds=','.join(ex_stops),
transcriptID=txid,
geneID=geneid,
geneSymbol=genesymbol,
biotype=biotype
)
)
#Load data from new transcript into appropriate variables
txid = curr_gtf_tx
chrom = line_data[0]
strand = line_data[6]
ex_count = 1
ex_starts = [line_data[3]]
ex_stops = [line_data[4]]
geneid = geneid_pattern.search(line_data[8]).group(1)
if genesymbol_pattern.search(line_data[8]):
genesymbol = genesymbol_pattern.search(line_data[8]).group(1)
else:
genesymbol = "None"
if biotype_pattern.search(line_data[8]):
biotype = biotype_pattern.search(line_data[8]).group(1)
else:
biotype = "None"
#This exon is strill from the same transcript.
else:
ex_starts.append(line_data[3])
ex_stops.append(line_data[4])
ex_count += 1
#Finish processing last transcript in GTF file
#Check strand. If minus, reverse exon order.
if strand == "-":
ex_starts.reverse()
ex_stops.reverse()
#Format data from last transcript and write to annotation file
output_annot_file.write(
CampareeUtils.annot_output_format.format(
chrom=chrom,
strand=strand,
txStart=ex_starts[0],
txEnd=ex_stops[-1],
exonCount=ex_count,
exonStarts=','.join(ex_starts),
exonEnds=','.join(ex_stops),
transcriptID=txid,
geneID=geneid,
geneSymbol=genesymbol,
biotype=biotype
)
)
return output_annot_filename
[docs]class CampareeUtilsException(CampareeException):
"""Base class for Camparee Utils exceptions."""
pass
if __name__ == "__main__":
#CampareeUtils.create_genome("../../resources/index_files/hg38/Homo_sapiens.GRCh38.reference_genome.fa")
chr_ploidy_data_ = CampareeUtils.create_chr_ploidy_data('../../resources/index_files/GRCh38/Homo_sapiens.GRCh38.chr_ploidy.txt')
reference_genome_file_path = '../../resources/index_files/GRCh38/Homo_sapiens.GRCh38.reference_genome.fa.gz'
data_directory_path = '../../data/pipeline_results_run89/expression_pipeline/data'
sample_data_folder = os.path.join(data_directory_path, f'sample1')
results = CampareeUtils.compare_genome_sequence_lengths(reference_genome_file_path,
os.path.join(sample_data_folder, 'custom_genome_1.fa'),
os.path.join(sample_data_folder, 'custom_genome_2.fa'),
chr_ploidy_data_.keys())
df = pd.DataFrame.from_dict(results, orient='index', columns=['Reference Genome', 'Genome 1', 'Genome 2'])
print(df)