Source code for camparee.beagle

import subprocess
import argparse
import sys
import os
import json # Required to package and read beagle options for command line call

from camparee.abstract_camparee_step import AbstractCampareeStep
from camparee.camparee_utils import CampareeException
from camparee.camparee_constants import CAMPAREE_CONSTANTS

[docs]class BeagleStep(AbstractCampareeStep): #TODO: This is actually just the prefix of the output file. Beagle will generate # two files in the data directory using this prefix: "beagle.vcf.log" and # "beagle.vcf.vcf.gz". We should probably update the prefix so it at least # doesn't contain the ".vcf" extension. However, before we do this, we need # to make sure all downstream steps would use the updated filename. #TODO: Also, beagle creates its own log file. We should probably either suppress # this or use beagle's own log file in place of the logging we currently do # (which offers completely redundant information). Or, we update our own # logging to track different things than beagle's native logging. BEAGLE_OUTPUT_FILENAME = CAMPAREE_CONSTANTS.BEAGLE_OUTPUT_PREFIX #Beagle takes input from the VariantsCompilationStep. This will make sure the #input filename matches up with the name used by the VariantsCompilationStep. BEAGLE_INTPUT_FILENAME = CAMPAREE_CONSTANTS.VARIANTS_COMPILATION_OUTPUT_FILENAME #Name of file where script logging stored. BEAGLE_LOG_FILENAME = CAMPAREE_CONSTANTS.BEAGLE_LOG_FILENAME def __init__(self, log_directory_path, data_directory_path, parameters=dict()): self.data_directory_path = data_directory_path self.log_directory_path = log_directory_path self.beagle_cmd_options = parameters
[docs] def validate(self): invalid_beagle_parameters = ["gt", "out", "seed"] for key, value in self.beagle_cmd_options.items(): if key in invalid_beagle_parameters: print(f"Beagle parameter {key} with value {value} cannot be" f" used as a Beagle option since the value is already" f" determined by the bealge.py script.") return False return True
[docs] def execute(self, beagle_jar_path, seed=None): """ Entry point into the beagle step. This ends up running the Beagle jar from the command line. Parameters ---------- beagle_jar_path : string Path to the beagle JAR file. seed : int Seed for random number generator. Used so repeated runs will produce the same results. """ input_file_path = os.path.join(self.data_directory_path, BeagleStep.BEAGLE_INTPUT_FILENAME) output_file_path = os.path.join(self.data_directory_path, BeagleStep.BEAGLE_OUTPUT_FILENAME) log_file_path = os.path.join(self.log_directory_path, BeagleStep.BEAGLE_LOG_FILENAME) command = f"java -jar {beagle_jar_path} gt={input_file_path} out={output_file_path}" if seed is not None: command += f" seed={seed} " if self.beagle_cmd_options: command += ' '.join( f"{key}={value}" for key,value in self.beagle_cmd_options.items() ) with open(log_file_path, "w") as log_file: #TODO update the output here to make proper use of Python's logging # module and functionality (it manages dual writing to both # console and logging files). print(f"Calling beagle with command: {command}") print(f"For full Beagle output see {log_file_path}") log_file.write(f"Calling beagle with command: {command}.\n\n") log_file.write("Beagle output follows:\n") try: beagle_result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, #Redirect stderr to stdout. encoding="ascii") except subprocess.CalledProcessError as beagle_run_exception: log_file.write("\n*****ERROR: Beagle command failed:\n") log_file.write(f"\tExit code: {beagle_run_exception.returncode}\n") log_file.write("\n*****STDOUT:\n") log_file.write(f"{beagle_run_exception.stdout}\n") log_file.write("\n*****STDERR:\n") log_file.write(f"{beagle_run_exception.stderr}\n") raise CampareeException(f"\nBeagle process failed. " f"For full details see {log_file_path}\n") print(f"Finished running Beagle.\n") log_file.write(f"{beagle_result.stdout}\n") log_file.write(f"\nFinished running Beagle.\n") log_file.write("ALL DONE!\n")
[docs] def get_commandline_call(self, beagle_jar_path, seed=None): """ Prepare command to execute the BeagleStep from the command line, given all of the arugments used to run the execute() function. Parameters ---------- beagle_jar_path : string Path to the beagle JAR file. seed : int Seed for random number generator. Used so repeated runs will produce the same results. Returns ------- string Command to execute on the command line. It will perform the same operations as a call to execute() with the same parameters. """ #Retrieve path to the beagle.py script. beagle_step_path = os.path.realpath(__file__) #If the above command returns a string with a "pyc" extension, instead #of "py", strip off "c" so it points to this script. beagle_step_path = beagle_step_path.rstrip('c') command = (f" python {beagle_step_path}" f" --log_directory_path {self.log_directory_path}" f" --data_directory_path {self.data_directory_path}" f" --beagle_jar_path {beagle_jar_path}") if seed is not None: command += f" --seed {seed}" if self.beagle_cmd_options: command += f" --beagle_parameters '{json.dumps(self.beagle_cmd_options)}'" return command
[docs] def get_validation_attributes(self, beagle_jar_path, seed=None): """ Prepare attributes required by is_output_valid() function to validate output generated the BeagleStep job. Parameters ---------- beagle_jar_path : string Path to the beagle JAR file. [Note: this parameter is captured just so get_validation_attributes() accepts the same arguments as get_commandline_call(). It is not used here.] seed : int Seed for random number generator. Used so repeated runs will produce the same results. [Note: this parameter is captured just so get_validation_attributes() accepts the same arguments as get_commandline_call(). It is not used here.] Returns ------- dict A BeagleStep run's data_directory and log_directory. """ validation_attributes = {} validation_attributes['data_directory'] = self.data_directory_path validation_attributes['log_directory'] = self.log_directory_path return validation_attributes
[docs] @staticmethod def main(): """ Entry point into script. Allows script to be executed/submitted via the command line. """ parser = argparse.ArgumentParser(description='Command line wrapper around' ' the Beagle step') parser.add_argument('--log_directory_path', required=True, help="Directory in which to save logging files.") parser.add_argument('--data_directory_path', required=True, help="Directory in which to save output files.") parser.add_argument('--beagle_jar_path', required=True, help="Path to Beagle jar file.") parser.add_argument('--seed', type=int, required=False, default=None, help='Seed value for random number generator.') parser.add_argument('--beagle_parameters', required=False, default=None, help="Jsonified Beagle parameters.") args = parser.parse_args() parameters = json.loads(args.beagle_parameters) beagle_step = BeagleStep(log_directory_path=args.log_directory_path, data_directory_path=args.data_directory_path, parameters=parameters) beagle_step.execute(beagle_jar_path=args.beagle_jar_path, seed=args.seed)
[docs] @staticmethod def is_output_valid(validation_attributes): """ Check if output of BeagleStep for a specific job/execution is correctly formed and valid, given the run's data and log directories. Prepare these attributes using the get_validation_attributes() method. Parameters ---------- validation_attributes : dict A CAMPAREE run's data_directory and log_directory. Returns ------- boolean True - BeagleStep output files were created and are well formed. False - BeagleStep output files do not exist or are missing data. """ data_directory = validation_attributes['data_directory'] log_directory = validation_attributes['log_directory'] valid_output = False #The way beagle is set to run above, it generates an output file based #on the given filename prefix, with a ".vcf.gz" suffix added. output_file_path = os.path.join(data_directory, BeagleStep.BEAGLE_OUTPUT_FILENAME + ".vcf.gz") log_file_path = os.path.join(log_directory, BeagleStep.BEAGLE_LOG_FILENAME) if os.path.isfile(output_file_path) and os.path.isfile(log_file_path): #Read last line in beagle log file line = "" with open(log_file_path, "r") as log_file: for line in log_file: line = line.rstrip() if line == "ALL DONE!": valid_output = True return valid_output
if __name__ == "__main__": sys.exit(BeagleStep.main())