Source code for camparee.camparee_utils
import re
import os
import gzip
import itertools
import pandas as pd
[docs]class CampareeUtils:
"""
Utilities for steps in the CAMPAREE expression pipeline.
"""
# Line format definition for annotation file
annot_output_format = '{chrom}\t{strand}\t{txStart}\t{txEnd}\t{exonCount}\t{exonStarts}\t{exonEnds}\t{transcriptID}\t{geneID}\t{geneSymbol}\t{biotype}\n'
[docs] @staticmethod
def create_oneline_seq_fasta(input_fasta_file_path, output_oneline_fasta_file_path):
"""Helper method to convert a FASTA file containing line breaks embedded
within its sequences to a FASTA file containing each sequence on a single
line.
Parameters
----------
input_fasta_file_path : string
Path to input FASTA file with multi-line sequence data.
output_oneline_fasta_file_path : string
Path to output FASTA file to create, where single line sequences
will be stored.
Returns
-------
set
Set of unique chromosome/contig names contained in input FASTA file.
"""
# Use set to track unique chromosomes/contigs since it automatically
# handles duplicate removal.
chromosomes = set()
# Build regex to recognize FASTA header line and store chromosome/contig
# name (i.e. sequence identifier) in the first group. Chromosome/contig
# name defined as all non-space characters between ">" and the first
# whitespace character.
fasta_header_pattern = re.compile(r'>([^\s]*).*')
# Flag to denote when a FASTA sequence (and not a header) is being
# processed.
building_sequence = False
# Input FASTA lines saved directly to output file as they are processed
# to minimize the amount of data stored in memory at a given time.
with CampareeUtils.open_file(input_fasta_file_path, 'r') as input_fasta_file, \
CampareeUtils.open_file(output_oneline_fasta_file_path, 'w') as output_fasta_file:
for line in input_fasta_file:
if line.startswith(">"):
if building_sequence:
# Add newline at the end of previous sequence
output_fasta_file.write('\n')
building_sequence = False
sequence_id = re.match(fasta_header_pattern, line).group(1)
output_fasta_file.write('>' + sequence_id + '\n')
chromosomes.add(sequence_id)
else:
output_fasta_file.write(line.rstrip('\n').upper())
building_sequence = True
# Add line break to end of output oneline fasta file.
output_fasta_file.write("\n")
return chromosomes
[docs] @staticmethod
def create_genome(genome_file_path):
"""
Creates a genome dictionary from the genome file located at the provided path
(if compressed, it must have a gz extension). The filename is assumed to contain the chr sequences without
line breaks.
:param genome_file_path: path to reference genome file (either compressed or not)
:return: genome as a dictionary with the chromosomes/contigs as keys and the sequences as values.
"""
chr_pattern = re.compile(">([^\s]*).*")
genome = dict()
_, file_extension = os.path.splitext(genome_file_path)
if 'gz' in file_extension:
with gzip.open(genome_file_path, 'r') as genome_file:
for chr, seq in itertools.zip_longest(*[genome_file] * 2):
chr_match = re.match(chr_pattern, chr.decode("ascii"))
if not chr_match:
raise CampareeUtilsException(f'Cannot parse the chromosome from the fasta line {chr}.')
chr = chr_match.group(1)
genome[chr] = seq.decode("ascii").rstrip().upper()
else:
with open(genome_file_path, 'r') as reference_genome_file:
with open(genome_file_path, 'r') as genome_file:
for chr, seq in itertools.zip_longest(*[genome_file] * 2):
chr_match = re.match(chr_pattern, chr)
if not chr_match:
raise CampareeUtilsException(f'Cannot parse the chromosome from the fasta line {chr}.')
chr = chr_match.group(1)
genome[chr] = seq.rstrip().upper()
return genome
[docs] @staticmethod
def create_chr_ploidy_data(chr_ploidy_file_path):
"""
Parses the chr_ploidy_data from its tab delimited resource file into a dictionary of dictionaries like so:
{
'1': {'male': 2, 'female': 2},
'X': {'male': 1, 'female': 2}.
...
}
:param chr_ploidy_file_path: full path to the chr_ploidy data file
:return: chr_ploidy_data expressed as a dictionary of dictionary as shown above.
"""
df = pd.read_csv(chr_ploidy_file_path, sep='\t')
return df.set_index('chr').to_dict(orient='index')
@staticmethod
def compare_genome_sequence_lengths(reference_file_path, genome_1_file_path, genome_2_file_path, chromosomes):
comparison = {chromosome:[] for chromosome in chromosomes}
genome = CampareeUtils.create_genome(reference_file_path)
[comparison[chromosome].append(len(sequence)) for chromosome, sequence
in genome.items() if chromosome in chromosomes]
genome = CampareeUtils.create_genome(genome_1_file_path)
for chromosome in chromosomes:
seqeunce_length = len(genome.get(chromosome, ''))
comparison[chromosome].append(seqeunce_length)
genome = CampareeUtils.create_genome(genome_2_file_path)
for chromosome in chromosomes:
seqeunce_length = len(genome.get(chromosome, ''))
comparison[chromosome].append(seqeunce_length)
return comparison
[docs] @staticmethod
def parse_variant_line(line):
''' reads a line of a variant file from CAMPAREE'''
# sample line is: (note tabs and spaces both used)
# 1:28494 | C:1 | T:1 TOT=2 0.5,0.5 E=1.0
if line == '':
return "DONE", 0, {}
entries = line.split('\t')
loc_and_vars, total, fractions, entropy = entries
loc, *variants = loc_and_vars.split(" | ")
chromosome, position = loc.split(":")
position = int(position)
variants = {base: int(count) for base, count in [variant.split(":") for variant in variants]}
return chromosome, position, variants
[docs] @staticmethod
def convert_gtf_to_annot_file_format(input_gtf_filename, output_annot_filename):
"""Convert a GTF file to a tab-delimited annotation file with one line
per transcript. Each line in the annotation file will have the following
columns:
1 - chrom
2 - strand
3 - txStart
4 - txEnd
5 - exonCount
6 - exonStarts
7 - exonEnds
8 - transcript_id
9 - gene_id
10 - genesymbol
11 - biotype
This method derives transcript info from the "exon" lines in the GTF
file and assumes the exons are listed in the order they appear in the
transcript, as opposed to their genomic coordinates. The annotation file
will list exons in order by plus-strand coordinates, so this method
reverses the order of exons for all minus-strand transcripts.
Note, this function will add a header to the output file, marked by a
'#' character prefix.
See website for standard 9-column GTF specification:
https://useast.ensembl.org/info/website/upload/gff.html
Parameters
----------
input_gtf_filename : string
Path to GTF file to be converted annotation file format.
output_annot_filename : string
Path to output file in annotation format.
Returns
-------
set
Set of unique chromosome/contig names contained in input GTF file.
Only GTF entries of "exon" feature type contribute to this set.
"""
# Use set to track unique chromosomes/contigs since it automatically
# handles duplicate removal.
chromosomes = set()
with CampareeUtils.open_file(input_gtf_filename, 'r') as gtf_file, \
CampareeUtils.open_file(output_annot_filename, 'w') as output_annot_file:
#Print annot file header (note the '#' prefix)
output_annot_file.write("#" + CampareeUtils.annot_output_format.replace('{', '').replace('}', ''))
#Regex patterns used to extract individual attributes from the 9th
#column in the GTF file (the "attributes" column)
txid_pattern = re.compile(r'transcript_id "([^"]+)";')
geneid_pattern = re.compile(r'gene_id "([^"]+)";')
genesymbol_pattern = re.compile(r'gene_name "([^"]+)";')
biotype_pattern = re.compile(r'gene_biotype "([^"]+)";')
line_data = [] #List of line fields from current line of gtf
curr_gtf_tx = "" #transcript ID from current line of gtf
chrom = "" #Chromosome for current transcript
strand = "" #Strand for current transcript
txid = "" #ID of current transcript
geneid = "" #gene ID of current transcript
genesymbol = "None" #gene symbol of current transcript
biotype = "None" #Biotype of current transcript
ex_starts = [] #List of exon start coordinates for current transcript
ex_stops = [] #List of exon stop coordinates for current transcript
ex_count = 1 #Number of exons in current transcript
#Step through GTF until first exon entry (need to prime variables
#with data from first exon).
exon_found = False
for line in gtf_file:
line_data = line.split("\t")
#First conditional is to skip any header lines, which would
#result in a "list index out of range" error when checking the
#second conditional. Header lines don't contain any tabs, so
#there is no 3rd list element in line_data, following the split
#command.
if len(line_data) > 1 and line_data[2] == "exon":
exon_found = True
break
if not exon_found:
raise CampareeUtilsException(f"ERROR: {input_gtf_filename} contains "
f"no lines with exon feature_type.\n")
#Prime variables with data from first exon
chrom = line_data[0]
strand = line_data[6]
ex_starts.append(line_data[3])
ex_stops.append(line_data[4])
txid = txid_pattern.search(line_data[8]).group(1)
geneid = geneid_pattern.search(line_data[8]).group(1)
if genesymbol_pattern.search(line_data[8]):
genesymbol = genesymbol_pattern.search(line_data[8]).group(1)
if biotype_pattern.search(line_data[8]):
biotype = biotype_pattern.search(line_data[8]).group(1)
chromosomes.add(chrom)
#process the remainder of the GTF file
for line in gtf_file:
line = line.rstrip('\n')
line_data = line.split("\t")
if line_data[2] == "exon":
curr_gtf_tx = txid_pattern.search(line_data[8]).group(1)
#Check transcript in current line is a new transcript
if curr_gtf_tx != txid:
#Check strand of transcript. If minus, reverse exon order.
if strand == "-":
ex_starts.reverse()
ex_stops.reverse()
#Format data from previous transcript and write to annotation file
output_annot_file.write(
CampareeUtils.annot_output_format.format(
chrom=chrom,
strand=strand,
txStart=ex_starts[0],
txEnd=ex_stops[-1],
exonCount=ex_count,
exonStarts=','.join(ex_starts),
exonEnds=','.join(ex_stops),
transcriptID=txid,
geneID=geneid,
geneSymbol=genesymbol,
biotype=biotype
)
)
#Load data from new transcript into appropriate variables
txid = curr_gtf_tx
chrom = line_data[0]
strand = line_data[6]
ex_count = 1
ex_starts = [line_data[3]]
ex_stops = [line_data[4]]
geneid = geneid_pattern.search(line_data[8]).group(1)
if genesymbol_pattern.search(line_data[8]):
genesymbol = genesymbol_pattern.search(line_data[8]).group(1)
else:
genesymbol = "None"
if biotype_pattern.search(line_data[8]):
biotype = biotype_pattern.search(line_data[8]).group(1)
else:
biotype = "None"
chromosomes.add(chrom)
#This exon is strill from the same transcript.
else:
ex_starts.append(line_data[3])
ex_stops.append(line_data[4])
ex_count += 1
#Finish processing last transcript in GTF file
#Check strand. If minus, reverse exon order.
if strand == "-":
ex_starts.reverse()
ex_stops.reverse()
#Format data from last transcript and write to annotation file
output_annot_file.write(
CampareeUtils.annot_output_format.format(
chrom=chrom,
strand=strand,
txStart=ex_starts[0],
txEnd=ex_stops[-1],
exonCount=ex_count,
exonStarts=','.join(ex_starts),
exonEnds=','.join(ex_stops),
transcriptID=txid,
geneID=geneid,
geneSymbol=genesymbol,
biotype=biotype
)
)
return chromosomes
[docs] @staticmethod
def open_file(filename, mode='r'):
"""Helper method which can open gzipped files by checking the filename
for the '.gz' extension. If no '.gz' extension found, this method uses
the standard open() function.
Parameters
----------
filename : string
Name of the file to open. If this filename has a '.gz' extension,
the function uses the gzip package. If not, it uses open() function.
mode : string
Access mode (e.g. 'r' - read, 'w' - write) passed to the open function.
Note, to open a file in binary mode, need to explicitly end the mode
code with a 'b'. [DEFAULT: 'r' - open file for reading in text mode].
Returns
-------
file object
Pointer to the opened file.
"""
file_pointer = None
# gzip.open defaults to opening files in binary modes and requires
# explicit inclusion of 't' in the mode to open it as text. However,
# the built-in open() function defaults to text mode. To normalize the
# behavior between these two methods to match the built-in open(), the
# code here checks for the presence of a 'b' (for binary mode). If
# there's no 'b' or 't', the code appends a 't', so the mode will
# always default to text mode.
updated_mode = mode
if 'b' not in updated_mode.lower() and 't' not in updated_mode.lower():
updated_mode = updated_mode + 't'
if filename.endswith('.gz'):
file_pointer = gzip.open(filename=filename, mode=updated_mode)
else:
file_pointer = open(file=filename, mode=updated_mode)
return file_pointer
[docs]class CampareeUtilsException(CampareeException):
"""Base class for Camparee Utils exceptions."""
pass
if __name__ == "__main__":
#CampareeUtils.create_genome("../../resources/index_files/hg38/Homo_sapiens.GRCh38.reference_genome.fa")
chr_ploidy_data_ = CampareeUtils.create_chr_ploidy_data('../../resources/index_files/GRCh38/Homo_sapiens.GRCh38.chr_ploidy.txt')
reference_genome_file_path = '../../resources/index_files/GRCh38/Homo_sapiens.GRCh38.reference_genome.fa.gz'
data_directory_path = '../../data/pipeline_results_run89/expression_pipeline/data'
sample_data_folder = os.path.join(data_directory_path, f'sample1')
results = CampareeUtils.compare_genome_sequence_lengths(reference_genome_file_path,
os.path.join(sample_data_folder, 'custom_genome_1.fa'),
os.path.join(sample_data_folder, 'custom_genome_2.fa'),
chr_ploidy_data_.keys())
df = pd.DataFrame.from_dict(results, orient='index', columns=['Reference Genome', 'Genome 1', 'Genome 2'])
print(df)