import os
import pysam
import re
import sys
from collections import namedtuple
from operator import attrgetter, itemgetter
import math
from io import StringIO
from prettytable import PrettyTable
from beers_utils.constants import CONSTANTS
from camparee.abstract_camparee_step import AbstractCampareeStep
from camparee.camparee_constants import CAMPAREE_CONSTANTS
#Imports required for main() method.
import argparse
import json
from beers_utils.sample import Sample
from camparee.camparee_utils import CampareeUtils
import numpy
Read = namedtuple('Read', ['position', 'description'])
Read.__doc__ = """
A named tuple that possesses all the attributes of a variant
type: match (M), deletion (D), insertion (I)
chromosome: chrN
position: position on ref genome
description: description of the variant (e.g., C, IAA, D5, etc.)
"""
[docs]class VariantsFinderStep(AbstractCampareeStep):
"""
This class creates a text file listing variants for those locations in the reference genome having variants.
The variants include snps and indels with the number of reads attributed to each variant.
The relevant bam-formatted input file is expected to be indexed and sorted.
This script outputs a file that gives the full breakdown at each
location in the genome of the number of A's, C's, G's and T's as
well as the number of each size of insertion and deletion.
If it's an insertion the sequence of the insertion itself is given.
So for example a line of output like the following means
29 reads had a C in that location and three reads had an insertion of TTT.
chr1:10128503 | C:29 | ITTT:3
Note that only the top two variants are kept and of those the lesser variant's counts must meet
certain user criteria (minimum threshold, read total count) to be considered a variant. Single
reads that match the corresponding base in the reference genome are not variants and as such are not kept.
"""
DEFAULT_MIN_THRESHOLD = 0.03
name = "Variants Finder Step"
def __init__(self, log_directory_path, data_directory_path, parameters = dict()):
self.data_directory_path = data_directory_path
self.entropy_sort = parameters.get("sort_by_entropy", False)
self.min_abundance_threshold = parameters.get('min_threshold', VariantsFinderStep.DEFAULT_MIN_THRESHOLD)
self.clip_at_start_pattern = re.compile(r"(^\d+)[SH]")
self.clip_at_end_pattern = re.compile(r"\d+[SH]$")
self.variant_pattern = re.compile(r"(\d+)([NMID])")
self.indel_pattern = re.compile(r"\|([^|]+)")
self.log_directory_path = log_directory_path
[docs] def validate(self):
valid = True
if not (0 < self.min_abundance_threshold < 1):
print(f"The min_threshold, {self.min_abundance_threshold} must be between 0 and 1 exclusive.",
file=sys.stderr)
valid = False
return valid
[docs] def remove_clips(self, cigar, sequence):
"""
Remove soft and hard clips at the beginning and end of the cigar string and remove soft and hard clips at
the beginning of the seq as well. Modified cigar string and sequence are returned
:param cigar: raw cigar string from read
:param sequence: raw sequence string from read
:return: tuple of modified cigar and sequence strings (sans clips)
"""
clip_at_start = re.search(self.clip_at_start_pattern, cigar)
if clip_at_start:
cigar = re.sub(self.clip_at_start_pattern, "", cigar)
sequence = sequence[int(clip_at_start.group(1)):]
cigar = re.sub(self.clip_at_end_pattern, "", cigar)
return cigar, sequence
[docs] def call_variants(self, chromosome, reads):
"""
Parses the reads dictionary (read named tuple:read count) for each chromosome - position to create
a line with the variants and their counts delimited by pipes. Dumping each chromosome's worth of
data at a time is done to avoid too sizable a dictionary. Additionally, if the user requests a sort by entropy,
this function will do that ordering and send that data to stdout.
:param chromosome: chromosome under consideration here
:param reads: dictionary of reads to read counts
"""
# variants list
variants = []
# Initializing the variable that will hold position information objects
position_info = None
# This dictionary is only used if the user requests that the read lines be sorted by entropy
entropy_map = dict()
# Iterate over the reads in the dictionary of variants to read counts sorted by the read position
for read in sorted(reads.keys(), key=attrgetter('position')):
# Initial iteration - set up position information object.
if not position_info:
position_info = PositionInfo(chromosome, read.position)
# If the new position differs from the position of the position information currently being
# consolidated, dump the current position information to the variants file if it is determined to
# contain at least one variant. In either case, create a new position information object for the new
# position.
if read.position != position_info.position:
self.identify_variant(position_info, variants)
# If the sort by entropy option is selected, also add to the entropy map dictionary the position
# information entropy, keyed by the line content but only if the total number of reads exceeds the
# depth cutoff.
if self.entropy_sort and position_info.get_total_reads() >= int(self.depth_cutoff):
entropy_map[position_info.__str__()] = position_info.calculate_entropy()
position_info = PositionInfo(chromosome, read.position)
# Add the read description and read count to the position information
position_info.add_read(read.description, reads[read])
# Now that the reads are exhausted for this chromosome, dump the data from the current position information
# object to the file. Note that there may be no variants.
self.identify_variant(position_info, variants)
# If the user selected the sort by entropy option, other the entropy_map entries in descending order
# of entropy and print to std out.
if self.entropy_sort:
sorted_entropies = sorted(entropy_map.items(), key=itemgetter(1), reverse=True)
for key, value in sorted_entropies:
print(key, end='')
return variants
[docs] def identify_variant(self, position_info, variants):
"""
Helper method to filter position reads to identify variants
:param position_info: position being evaluated
:param variants: growing list of variants to which this position may be added if it contains variants
"""
if position_info:
reference_base = self.reference_genome[position_info.chromosome][position_info.position - 1]
position_info.filter_reads(self.min_abundance_threshold, reference_base)
if position_info:
variants.append(position_info)
[docs] def collect_reads(self, chromosome):
"""
Iterate over the input txt file containing cigar, seq, start location, chromosome for each read and consolidate
reads for each position on the genome.
"""
reads = dict()
current_read_components = []
current_start = None
for line in self.alignment_file.fetch(chromosome):
# Remove unaligned reads, reverse reads, and non-unique alignments
if line.is_unmapped or not line.is_read1 or line.get_tag(tag="NH") != 1:
continue
# Alignment Segment reference_start is zero-based - so adding 1 to conform to convention.
start = line.reference_start + 1
sequence = line.query_sequence.upper()
cigar = line.cigarstring
cigar, sequence = self.remove_clips(cigar, sequence)
current_pos_in_genome = int(start)
loc_on_read = 1
# Dropping duplicate reads (assumed to be PCR artifacts)
if not current_start:
current_start = start
current_read_components.append((cigar, sequence))
elif start != current_start:
current_start = start
current_read_components = []
current_read_components.append((cigar, sequence))
elif (cigar, sequence) in current_read_components:
continue
else:
current_read_components.append((cigar, sequence))
# Iterate over the variant types and lengths in the cigar string
for match in re.finditer(self.variant_pattern, cigar):
length = int(match.group(1))
read_type = match.group(2)
# Skip over N type reads since these generally represent a read bracketing an intron
if read_type == "N":
current_pos_in_genome += length
continue
# For a match, record all the snps at the each location continuously covered by this read type
if read_type == "M":
stop = current_pos_in_genome + length
while current_pos_in_genome < stop:
location = current_pos_in_genome
# Skip any read that contains an N or n in the sequence base
base = sequence[loc_on_read - 1]
if 'N' not in base:
key = Read(location, base)
reads[key] = reads.get(key, 0) + 1
loc_on_read += 1
current_pos_in_genome += 1
continue
# For a deletion, designate the read named tuple description with a Dn where n is the
# length of the deletion starting at this position. In this way, subsequent reads having a
# deletion of the same length at the same position will be added to this key.
if read_type == "D":
location = current_pos_in_genome
key = Read(location, f'D{length}')
reads[key] = reads.get(key, 0) + 1
current_pos_in_genome += length
continue
# For an insert, designate the read named tuple description with an Ib+ where b+ are the
# bases to a inserted starting with this position. In this way, subsequent reads having an
# insertion of the same bases at the same position will be added to this key.
if read_type == "I":
location = current_pos_in_genome
insertion_sequence = sequence[loc_on_read - 1: loc_on_read - 1 + length]
# Skip any read that contains an N or n in the insertion sequence
if 'N' not in insertion_sequence:
key = Read(location, f'I{insertion_sequence}')
reads[key] = reads.get(key, 0) + 1
loc_on_read += length
return reads
[docs] def execute(self, sample, alignment_file_path, chr_ploidy_data, reference_genome, seed=None, chromosomes=None):
"""
Entry point into variants_finder. Iterates over the chromosomes in the list provided by the chr_ploidy_data
keys to pick out variants. Chromosomes that are not pertainent to the sample's gender are skipped. If no
sample gender is specified, only those chromosomes that have the same ploidy for both genders are processed.
:param sample: The sample for which the variants for to be found
:param chr_ploidy_data: dictionary of chromosomes as keys and a dictionary of male/female ploidy as values.
:param reference_genome: A dictionary representation of the reference genome
:param seed: Seed for random number generator
:param chromosomes: A listing of chromosomes to replace the list obtained from the alignment file. Used for
debugging purposes.
"""
variants_file_path = os.path.join(self.data_directory_path, f'sample{sample.sample_id}',
CAMPAREE_CONSTANTS.VARIANTS_FINDER_OUTPUT_FILENAME)
log_file_path = os.path.join(self.log_directory_path, f'sample{sample.sample_id}',
CAMPAREE_CONSTANTS.VARIANTS_FINDER_LOG_FILENAME)
self.alignment_file = pysam.AlignmentFile(alignment_file_path, "rb")
self.chromosomes = chromosomes if chromosomes else chr_ploidy_data.keys()
self.reference_genome = reference_genome
if seed is not None:
numpy.random.seed(seed)
self.filter_chromosome_list(sample, chr_ploidy_data)
log_table = PrettyTable()
log_table.field_names =['chromosome','chromosome length','# positions with variants',
'# variants having no ref base variant','# positions having 1 variant',
'# positions having 2 variants']
log_table.align['chromosome length'] = 'r'
log_table.align['# positions with variants'] = 'r'
log_table.align['# positions having no ref base variant'] = 'r'
log_table.align['# positions having 1 variant'] = 'r'
log_table.align['# positions having 2 variants'] = 'r'
row_totals = [0, 0, 0, 0, 0]
for chromosome in self.chromosomes:
print(f"Finding variants for chromosome {chromosome}")
variants = self.call_variants(chromosome, self.collect_reads(chromosome))
self.load_variants(variants, variants_file_path)
variants_without_ref_base = len([variant for variant in variants if not variant.contains_reference_base])
pos_with_one_variant = len([variant for variant in variants if len(variant.reads) == 1])
pos_with_two_variants = len([variant for variant in variants if len(variant.reads) == 2])
row_values = [len(self.reference_genome[chromosome]), len(variants),
variants_without_ref_base, pos_with_one_variant, pos_with_two_variants]
row_totals = [sum(item) for item in zip(row_totals, row_values)]
row_values = [chromosome] + row_values
log_table.add_row(row_values)
row_totals = ['Totals'] + row_totals
log_table.add_row(row_totals)
with open(log_file_path, 'w') as log_file:
log_file.write(log_table.get_string())
log_file.write('\nALL DONE!\n')
[docs] def filter_chromosome_list(self, sample, chr_ploidy_data):
"""
Culls from the chromosome list, those chromosomes that are either not relevant given the sample gender or
not relevant because no sample gender was provided.
:param sample: subject sample which contains gender information
:param chr_ploidy_data: dictionary of chromosomes as keys and a dictionary of male/female ploidy as values.
"""
gender = sample.gender
if not gender:
self.chromosomes = [chr_ for chr_ in self.chromosomes
if chr_ploidy_data[chr_][CONSTANTS.MALE_GENDER] ==
chr_ploidy_data[chr_][CONSTANTS.FEMALE_GENDER] != 0]
else:
self.chromosomes = [chr_ for chr_ in self.chromosomes if chr_ploidy_data[chr_][gender] != 0]
[docs] def load_variants(self, variants, variants_file_path):
"""
Load the variants to a file in the user's designated output directory one chromosome at a time. The filename
has the stem of the alignment filename suffixed with _variants.txt
:param variants: variants list for one chromosome.
"""
with open(variants_file_path, 'a') as variants_file:
for variant in variants:
variants_file.write(variant.__str__())
[docs] def get_commandline_call(self, sample, alignment_file_path, chr_ploidy_file_path, reference_genome_file_path, seed=None):
"""
Prepare command to execute the VariantsFinder from the command line, given
all of the arugments used to run the execute() function.
Parameters
----------
sample : Sample
Sample for which variants will be called.
alignment_file_path : string
Path to BAM file which will be parsed.
chr_ploidy_file_path : string
File that maps chromosome names to their male/female ploidy.
reference_genome_file_path : string
File that maps chromosome names in reference to nucleotide sequence.
seed : integer
Seed for random number generator. Used to repeated runs will produce
the same results.
Returns
-------
string
Command to execute on the command line. It will perform the same
operations as a call to execute() with the same parameters.
"""
#Retrieve path to the variants_finder.py script.
variant_finder_path = os.path.realpath(__file__)
#If the above command returns a string with a "pyc" extension, instead
#of "py", strip off "c" so it points to this script.
variant_finder_path = variant_finder_path.rstrip('c')
variant_finder_params = {}
variant_finder_params['sort_by_entropy'] = self.entropy_sort
variant_finder_params['min_threshold'] = self.min_abundance_threshold
command = (f" python {variant_finder_path}"
f" --log_directory_path {self.log_directory_path}"
f" --data_directory_path {self.data_directory_path}"
f" --config_parameters '{json.dumps(variant_finder_params)}'"
f" --sample '{repr(sample)}'"
f" --bam_filename {alignment_file_path}"
f" --chr_ploidy_file_path {chr_ploidy_file_path}"
f" --reference_genome_file_path {reference_genome_file_path}")
if seed is not None:
command += f" --seed {seed}"
return command
[docs] def get_validation_attributes(self, sample, alignment_file_path, chr_ploidy_file_path, reference_genome_file_path, seed=None):
"""
Prepare attributes required by is_output_valid() function to validate
output generated the VariantsFinder job corresponding to the given sample.
Parameters
----------
sample : Sample
Sample for which variants will be called.
alignment_file_path : string
Path to BAM file which will be parsed. [Note: this parameter is
captured just so get_validation_attributes() accepts the same
arguments as get_commandline_call(). It is not used here.]
chr_ploidy_file_path : string
File that maps chromosome names to their male/female ploidy. [Note:
this parameter is captured just so get_validation_attributes()
accepts the same arguments as get_commandline_call(). It is not used
here.]
reference_genome_file_path : string
File that maps chromosome names in reference to nucleotide sequence.
[Note: this parameter is captured just so get_validation_attributes()
accepts the same arguments as get_commandline_call(). It is not used
here.]
seed : integer
Seed for random number generator. Used to repeated runs will produce
the same results. [Note: this parameter is captured just so
get_validation_attributes() accepts the same arguments as
get_commandline_call(). It is not used here.]
Returns
-------
dict
A VariantsFinder job's data_directory, log_directory, and sample_id.
"""
validation_attributes = {}
validation_attributes['data_directory'] = self.data_directory_path
validation_attributes['log_directory'] = self.log_directory_path
validation_attributes['sample_id'] = sample.sample_id
return validation_attributes
[docs] @staticmethod
def main():
"""
Entry point into script. Allows script to be executed/submitted via the
command line.
"""
parser = argparse.ArgumentParser(description='Command line wrapper around'
' the variant finder')
parser.add_argument('--log_directory_path')
parser.add_argument('--data_directory_path')
parser.add_argument('--config_parameters')
parser.add_argument('--sample')
parser.add_argument('--bam_filename')
parser.add_argument('--chr_ploidy_file_path')
parser.add_argument('--reference_genome_file_path')
parser.add_argument('--seed', type=int, default=None)
args = parser.parse_args()
config_parameters = json.loads(args.config_parameters)
variants_finder = VariantsFinderStep(args.log_directory_path,
args.data_directory_path,
config_parameters)
sample = eval(args.sample)
reference_genome = CampareeUtils.create_genome(args.reference_genome_file_path)
chr_ploidy_data = CampareeUtils.create_chr_ploidy_data(args.chr_ploidy_file_path)
variants_finder.execute(sample,
args.bam_filename,
chr_ploidy_data,
reference_genome,
args.seed)
[docs] @staticmethod
def is_output_valid(validation_attributes):
"""
Check if output of VariantsFinder for a specific job/execution is
correctly formed and valid, given a job's data directory, log directory,
and sample id. Prepare these attributes for a given sample's jobs using
the get_validation_attributes() method.
Parameters
----------
validation_attributes : dict
A job's data_directory, log_directory, and sample_id.
Returns
-------
boolean
True - VariantsFinder output files were created and are well formed.
False - VariantsFinder output files do not exist or are missing data.
"""
data_directory = validation_attributes['data_directory']
log_directory = validation_attributes['log_directory']
sample_id = validation_attributes['sample_id']
valid_output = False
variants_outfile_path = os.path.join(data_directory, f"sample{sample_id}",
CAMPAREE_CONSTANTS.VARIANTS_FINDER_OUTPUT_FILENAME)
variants_logfile_path = os.path.join(log_directory, f"sample{sample_id}",
CAMPAREE_CONSTANTS.VARIANTS_FINDER_LOG_FILENAME)
if os.path.isfile(variants_outfile_path) and \
os.path.isfile(variants_logfile_path):
#Read last line in variants_finder log file
line = ""
with open(variants_logfile_path, "r") as variants_log_file:
for line in variants_log_file:
line = line.rstrip()
if line == "ALL DONE!":
valid_output = True
return valid_output
[docs]class PositionInfo:
"""
This class is meant to capture all the read data associated with a particular chromsome and position on the
genome. It is used to ascertain whether this position actually holds a variant. If it does, the data is
formatted into a string to be written into the variants file.
"""
def __init__(self, chromosome, position):
self.chromosome = chromosome
self.position = position
self.reads = []
self.reference_base = None
self.contains_reference_base = None
def add_read(self, description, read_count):
self.reads.append((description, read_count))
def get_total_reads(self):
return sum([read[1] for read in self.reads])
def get_abundances(self):
return [read[1] / self.get_total_reads() for read in self.reads]
[docs] def calculate_entropy(self):
"""
Use the top two abundances (if two) of the variants for the given position to compute an entropy. If
only one abundance is given, return 0.
:return: entropy for the given position
"""
abundances = self.get_abundances()
if len(abundances) < 2:
return 0
# cloning the abundances list since lists are mutable.
abundances_copy = abundances.copy()
# Retrieve the highest abundance, then remove it and retrieve the highest abundance again to get the
# next highest.
max_abundances = [max(abundances_copy)]
abundances_copy.remove(max_abundances[0])
max_abundances.append(max(abundances_copy))
# In the event that the second abundance reads nearly 0, just return 0
if max_abundances[1] < 0.0001:
return 0
# Use a scale factor to normalize to the two abundances used to calculate entropy
scale = 1 / sum(max_abundances)
max_abundances = [scale * max_abundance for max_abundance in max_abundances]
return -1 * max_abundances[0] * math.log2(max_abundances[0]) - max_abundances[1] * math.log2(max_abundances[1])
[docs] def filter_reads(self, min_abundance_threshold, reference_base):
"""
Filters out from this position, reads that are not considered true variants. Any reads with read counts of
only 1 are excluded to start with. At most, only the top two remaining reads are retained. The lesser of those
two reads may also be removed if it does not satisfy the minimum abundance threshold criterion. The minimum
abundance threshold criterion specifies that the percent contribution of the lesser variant reads to the total
reads be equal or greater than the threshold provided. In the event of a tie for one of both of those top
two slots, preference is given to the reference base if it is included in the tie. If at any point in
filtering, only one read remains and its description matches the reference base, it is removed, leaving no
variants. Once complete, the reads for this position object contain only true variants (which may include
the reference base if there is one another true variant).
:param min_abundance_threshold: criterion for minimum abundance threshold
:param reference_base: the base of the reference genome at this position.
"""
self.reference_base = reference_base
variants = []
# Remove all reads with read counts of no more than 1
filtered_reads = [read for read in self.reads if read[1] > 1]
# If only a single read remains, remove if it matches the reference base - there are no variants
if len(filtered_reads) == 1:
if filtered_reads[0][0] == reference_base:
variants = []
else:
variants = filtered_reads
# If multiple reads remain, retain only the top 2 reads
elif len(filtered_reads) > 1:
candidate_variants = {filtered_read[0]: filtered_read[1] for filtered_read in filtered_reads}
for _ in range(2):
max_read = max(candidate_variants.items(), key=itemgetter(1))[0]
variants.append((max_read, candidate_variants[max_read]))
del candidate_variants[max_read]
# Get reference read, if it exists
reference_read = [(description, read_count) for description, read_count in filtered_reads
if description == reference_base]
# If the reference read exists and is not among the variants but has the same number of counts as
# the second place read, replace the second place read with it.
if reference_read and reference_read[0] not in variants and reference_read[0][1] == variants[-1][1]:
variants[-1] = reference_read[0]
# Remove the second place read if it does not satisfy the min_abundance_threshold criterion
total_reads = sum(read_count for _, read_count in variants)
if variants[1][1] / total_reads < min_abundance_threshold:
if variants[0][0] != reference_base:
variants = [variants[0]]
else:
variants = []
# The only position reads remaining are variants
if variants:
self.contains_reference_base = any([variant[0] == reference_base for variant in variants])
self.reads = variants
def __bool__(self):
"""
The object's truth value depends on the presence of reads. Following a filter_reads step, that value will
essentially depend on the presence of variants.
:return: True if reads (variants) are present and false otherwise.
"""
return True if self.reads else False
def __str__(self):
"""
Provides a string representation that may be used to dump to a file.
:return: string representation
"""
abundances = [str(round(abundance, 2)) for abundance in self.get_abundances()]
s = StringIO()
s.write(f'{self.chromosome}:{self.position}')
for read in self.reads:
s.write(f' | {read[0]}:{read[1]}')
s.write(f"\tTOT={self.get_total_reads()}")
annotated_abundances = []
for read, abundance in zip(self.reads, abundances):
if read[0] == self.reference_base:
annotated_abundances.append(f'r{abundance}')
else:
annotated_abundances.append(abundance)
s.write(f"\t{','.join(annotated_abundances)}")
s.write(f"\tE={self.calculate_entropy()}\n")
return s.getvalue()
if __name__ == "__main__":
sys.exit(VariantsFinderStep.main())