diff --git a/convert_gvf_to_vcf/assistingconverter.py b/convert_gvf_to_vcf/assistingconverter.py index 95e2dde..f0738f9 100644 --- a/convert_gvf_to_vcf/assistingconverter.py +++ b/convert_gvf_to_vcf/assistingconverter.py @@ -1,5 +1,5 @@ """ -This is an assistant converter to help convert gvf attributes +This is contains functions to assist the conversion of gvf attributes """ import os from convert_gvf_to_vcf.logger import logger @@ -95,5 +95,7 @@ def convert_gvf_attributes_to_vcf_values(column9_of_gvf, else: logger.info(f"catching attribute keys for review at a later date {attrib_key} {attrib_value}") catching_for_review.append(attrib_key) - info_string = ''.join(f'{key}={value};' for key, value in vcf_info_values.items()).rstrip(';') - return gvf_attribute_dictionary, info_string, vcf_format_values + # info_string = ''.join(f'{key}={value};' for key, value in vcf_info_values.items()).rstrip(';') + # print(type(vcf_info_values)) + # print(vcf_info_values) + return gvf_attribute_dictionary, vcf_info_values, vcf_format_values diff --git a/convert_gvf_to_vcf/convertGVFtoVCF.py b/convert_gvf_to_vcf/convertGVFtoVCF.py index 8efb55d..a336019 100644 --- a/convert_gvf_to_vcf/convertGVFtoVCF.py +++ b/convert_gvf_to_vcf/convertGVFtoVCF.py @@ -1,17 +1,16 @@ import argparse import os -from convert_gvf_to_vcf.utils import read_pragma_mapper, \ - read_in_gvf_file, \ - read_yaml, generate_symbolic_allele_dict +from convert_gvf_to_vcf.utils import read_pragma_mapper, read_in_gvf_file from convert_gvf_to_vcf.vcfline import VcfLine from convert_gvf_to_vcf.logger import set_up_logging, logger - +from convert_gvf_to_vcf.lookup import Lookup # setting up paths to useful directories convert_gvf_to_vcf_folder = os.path.dirname(__file__) etc_folder = os.path.join(convert_gvf_to_vcf_folder, 'etc') +# the functions below relate to the VCF header (Part 1) def generate_vcf_header_structured_lines(header_type, mapping_attribute_dict): - """ Generates a dictionary of all possible standard structured lines for INFO/FILTER/FORMAT/ALT + """ Generates a dictionary of all possible standard structured lines for INFO/FILTER/FORMAT/ALT. :param header_type: type of header file to read i.e. ALT, FILTER, INFO or FORMAT :param mapping_attribute_dict: dictionary of all attributes :return: dictionary of all possible standard structured lines keys for the header type @@ -19,6 +18,7 @@ def generate_vcf_header_structured_lines(header_type, mapping_attribute_dict): all_possible_lines = {} for attribute in mapping_attribute_dict: + # Formatting the header string for FILTER, INFO or FORMAT and storing in a dictionary if mapping_attribute_dict[attribute].get(header_type) is not None and header_type != "ALT": header_string = (f'##{header_type}=' f'') all_possible_lines[mapping_attribute_dict[attribute][header_type]["FieldKey"]] = header_string + # Formatting the header string for ALT and storing in a dictionary elif mapping_attribute_dict[attribute].get(header_type) is not None and header_type == "ALT": if mapping_attribute_dict[attribute][header_type]["FieldKey"] is not None: header_string = (f'##{header_type}=' @@ -36,9 +37,9 @@ def generate_vcf_header_structured_lines(header_type, mapping_attribute_dict): pass return all_possible_lines -def generate_custom_unstructured_meta_line(vcf_unstructured_key, - vcf_unstructured_value): - """ Generates a formatted unstructured metainformation line using a custom key value pair. +def generate_vcf_header_unstructured_line(vcf_unstructured_key, + vcf_unstructured_value): + """ Generates a formatted unstructured metainformation line using a custom key value pair e.g. "##key=value" :param vcf_unstructured_key: key for custom unstructured metainformation line :param vcf_unstructured_value: value for custom unstructured metainformation line :return: custom_unstructured_string @@ -46,57 +47,10 @@ def generate_custom_unstructured_meta_line(vcf_unstructured_key, custom_unstructured_string = f"##{vcf_unstructured_key}={vcf_unstructured_value}" return custom_unstructured_string - -def parse_pragma(pragma_to_parse, delimiter): - """ Parses pragma and returns name and value of the pragma. - :param pragma_to_parse: pragma - :param delimiter: to split by - :return: pragma_name, pragma_value: key and value of pragma - """ - try: - pragma_tokens = pragma_to_parse.split(delimiter) - pragma_name = pragma_tokens[0] - if len(pragma_tokens) >= 2: - pragma_value = ''.join(map(str, pragma_tokens[1:])) - # elif len(pragma_tokens) == 1: - # pragma_value = ''.join(map(str, pragma_tokens[0])) - else: - pragma_value = None - logger.warning("WARNING: no value for the following pragma %s", pragma_to_parse) - return pragma_name, pragma_value - except ValueError: - logger.error("Skipping this, can't be parsed %s", pragma_to_parse) - -def get_pragma_name_and_value(pragma_to_parse, delimiter, pragma_list, pragma_name_to_vcf_dict): - """Get pragma name and value and its corresponding VCF header key. - :param pragma_to_parse: pragma that will be parsed - :param delimiter: the separator - :param pragma_list: list of pragmas to search through - :param pragma_name_to_vcf_dict: dictionary pragma name and its vcf entry - :return vcf_header_key, pragma_name, pragma_value - """ - pragma_name, pragma_value = parse_pragma(pragma_to_parse, delimiter) - if pragma_name in pragma_list: - vcf_header_key = pragma_name_to_vcf_dict.get(pragma_name) - else: - vcf_header_key = None - return vcf_header_key, pragma_name, pragma_value - -def get_pragma_tokens(pragma_value, first_delimiter, second_delimiter): - """Get pragma tokens for nested pragmas - :param pragma_value: value to parse - :param first_delimiter: first separator - :param second_delimiter: second separtor - :return pragma_tokens - """ - initial_list = pragma_value.split(first_delimiter) - pragma_tokens = [] - for element in initial_list: - pragma_tokens = element.split(second_delimiter) - return pragma_tokens - -def generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, - standard_lines_dictionary): +def generate_vcf_header_metainfo(gvf_pragmas, + gvf_non_essential, + list_of_vcf_objects, + standard_lines_dictionary): """ Generates a list of metainformation lines for the VCF header :param gvf_pragmas: list of gvf pragmas to convert :param gvf_non_essential: list of non-essential gvf pragmas to convert @@ -111,17 +65,21 @@ def generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, unique_info_lines_to_add = [] unique_filter_lines_to_add = [] unique_format_lines_to_add = [] + #### # MANDATORY: file format for VCF - pragmas_to_add.append(generate_custom_unstructured_meta_line("fileformat", "VCFv4.4")) + pragmas_to_add.append(generate_vcf_header_unstructured_line("fileformat", "VCFv4.4")) + #Go through essential pragmas #TODO: list of pragmas to add:reference=file, contig, phasing,INFO# list_of_pragma = ["##file-date", "##gff-version", "##gvf-version", "##species", "##genome-build"] pragma_to_vcf_map = read_pragma_mapper(os.path.join(etc_folder, 'pragma_mapper.tsv')) for pragma in gvf_pragmas: vcf_header_key, pragma_name, pragma_value = get_pragma_name_and_value(pragma, " ", list_of_pragma, pragma_to_vcf_map) - pragmas_to_add.append(generate_custom_unstructured_meta_line(vcf_header_key, pragma_value)) + pragmas_to_add.append(generate_vcf_header_unstructured_line(vcf_header_key, pragma_value)) for vcf_obj in list_of_vcf_objects: - pragmas_to_add.append(generate_custom_unstructured_meta_line("source", vcf_obj.source)) + pragmas_to_add.append(generate_vcf_header_unstructured_line("source", vcf_obj.source)) + #### + #### # Go through non-essential pragmas list_of_non_essential_pragma = ["#sample", "#Study_accession", "#Study_type", "#Display_name", "#Publication" "#Study", "#Assembly_name", "#subject"] @@ -129,13 +87,14 @@ def generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, vcf_header_key, pragma_name, pragma_value = get_pragma_name_and_value(non_essential_pragma, ": ", list_of_non_essential_pragma, pragma_to_vcf_map) if pragma_name.startswith("#Publication"): publication_tokens = get_pragma_tokens(pragma_value, ";", "=") - pragmas_to_add.append(generate_custom_unstructured_meta_line(publication_tokens[0], publication_tokens[1])) + pragmas_to_add.append(generate_vcf_header_unstructured_line(publication_tokens[0], publication_tokens[1])) elif pragma_name == "#Study": study_tokens = get_pragma_tokens(pragma_value, ";", "=") - pragmas_to_add.append(generate_custom_unstructured_meta_line(study_tokens[0], study_tokens[1])) + pragmas_to_add.append(generate_vcf_header_unstructured_line(study_tokens[0], study_tokens[1])) else: if vcf_header_key is not None: - pragmas_to_add.append(generate_custom_unstructured_meta_line(vcf_header_key, pragma_value)) + pragmas_to_add.append(generate_vcf_header_unstructured_line(vcf_header_key, pragma_value)) + #### # populating sample headers if pragma_name.startswith("#sample"): list_of_sample_information = pragma_value.split(";") @@ -149,7 +108,7 @@ def generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, if sample not in seen_sample_names: seen_sample_names.add(sample) uniq_sample_name.append(sample) - + ### unique_pragmas_to_add = list(dict.fromkeys(pragma for pragma in pragmas_to_add if pragma not in unique_pragmas_to_add)) unique_alt_lines_to_add = list(dict.fromkeys(alt_line for alt_line in standard_lines_dictionary["ALT"] if alt_line not in unique_alt_lines_to_add)) unique_info_lines_to_add = list(dict.fromkeys(info_line for info_line in standard_lines_dictionary["INFO"] if info_line not in unique_info_lines_to_add)) @@ -158,9 +117,9 @@ def generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, return unique_pragmas_to_add, uniq_sample_name, unique_alt_lines_to_add, unique_info_lines_to_add, unique_filter_lines_to_add, unique_format_lines_to_add -# step 10 +# the function below relates to the VCF headerline (Part 2) def generate_vcf_header_line(samples): - """ Generates the VCF header line + """ Generates the VCF header line using the nine mandatory headers and the sample names. :param samples: list of samples, these will appear in the header line :return: vcf_header: a string """ @@ -170,96 +129,154 @@ def generate_vcf_header_line(samples): vcf_header = '\t'.join(vcf_header_fields) return vcf_header -def gvf_features_to_vcf_objects(gvf_lines_obj_list, - assembly_file, mapping_attribute_dict, - symbolic_allele_dictionary - ): +# the functions below relate to the GVF header +def parse_pragma(pragma_to_parse, delimiter): + """ Parses pragma and returns name and value of the pragma. + :param pragma_to_parse: pragma + :param delimiter: to split by + :return: pragma_name, pragma_value: key and value of pragma + """ + try: + pragma_tokens = pragma_to_parse.split(delimiter) + pragma_name = pragma_tokens[0] + if len(pragma_tokens) >= 2: + pragma_value = ''.join(map(str, pragma_tokens[1:])) + # elif len(pragma_tokens) == 1: + # pragma_value = ''.join(map(str, pragma_tokens[0])) + else: + pragma_value = None + logger.warning(f"WARNING: no value for the following pragma {pragma_to_parse}") + return pragma_name, pragma_value + except ValueError: + logger.error(f"Skipping this, can't be parsed {pragma_to_parse}") + +def get_pragma_name_and_value(pragma_to_parse, delimiter, pragma_list, pragma_name_to_vcf_dict): + """Get pragma name and value and its corresponding VCF header key. + :param pragma_to_parse: pragma that will be parsed + :param delimiter: the separator + :param pragma_list: list of pragmas to search through + :param pragma_name_to_vcf_dict: dictionary pragma name and its vcf entry + :return vcf_header_key, pragma_name, pragma_value + """ + pragma_name, pragma_value = parse_pragma(pragma_to_parse, delimiter) + if pragma_name in pragma_list: + vcf_header_key = pragma_name_to_vcf_dict.get(pragma_name) + else: + vcf_header_key = None + return vcf_header_key, pragma_name, pragma_value + +def get_pragma_tokens(pragma_value, first_delimiter, second_delimiter): + """Get pragma tokens for nested pragmas + :param pragma_value: value to parse + :param first_delimiter: first separator + :param second_delimiter: second separtor + :return pragma_tokens + """ + initial_list = pragma_value.split(first_delimiter) + pragma_tokens = [] + for element in initial_list: + pragma_tokens = element.split(second_delimiter) + return pragma_tokens + +# This is the main conversion logic +def convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, reference_lookup): """ Creates VCF objects from GVF feature lines and stores the VCF objects. :param gvf_lines_obj_list: list of GVF feature line objects - :param assembly_file: FASTA file to assembly - :param mapping_attribute_dict: dictionary of attributes - :param symbolic_allele_dictionary: symbolic_allele_dictionary + :param reference_lookup: an object that stores important dictionaries to be used for reference lookups. :return: standard_header_lines, vcf_data_lines, list_of_vcf_objects: header lines for this VCF, datalines for this VCF and a list of VCF objects """ - vcf_data_lines = {} # DICTIONARY OF LISTS + vcf_data_lines = {} # DICTIONARY OF LISTS, {Chromosome_Pos: [VCF line object]} list_of_vcf_objects = [] - # standard meta-information lines for this VCF file + # Create data structure to store the header lines for this VCF file (standard meta-information lines) standard_header_lines ={ "ALT": [], "INFO": [], "FILTER": [], "FORMAT": [], } + #TODO: place the all_header_lines_per_type_dict into the reference_lookup. + + # Create data structure to store all possible outcomes for header lines (for fields ALT, INFO, FILTER, FORMAT) all_header_lines_per_type_dict = { - htype: generate_vcf_header_structured_lines(htype, mapping_attribute_dict) for htype in ["ALT", "INFO", "FILTER", "FORMAT"] + htype: generate_vcf_header_structured_lines(htype, reference_lookup.mapping_attribute_dict) for htype in ["ALT", "INFO", "FILTER", "FORMAT"] } - # create a vcf object for every feature line in the GVF (1:1) - # add the newly created vcf object to the vcf data line it belongs to - # (1:many; key=chrom_pos; 1 key: many vcf objects) + # Create a vcf object for every feature line in the GVF (1:1) for gvf_featureline in gvf_lines_obj_list: + #NOTE: this is the main Logic of the code vcf_object = VcfLine(gvf_featureline, - mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, standard_header_lines, - all_header_lines_per_type_dict) - - + all_header_lines_per_type_dict, + reference_lookup) + # Store VCF object in the list list_of_vcf_objects.append(vcf_object) + + # vcf_object.key is formatted as follows: Chromosome_Pos if vcf_object.key in vcf_data_lines: + # Add VCF object to the dictionary of lists vcf_data_lines[vcf_object.key].append(vcf_object) else: + # Get it into a format where the VCF object can be added to the dictionary of lists vcf_data_line_objects_list = [vcf_object] vcf_data_lines[vcf_object.key] = vcf_data_line_objects_list - # check the number of objects to see if they are merged - # for key in vcf_data_lines.keys(): - # vcf_obj_list = vcf_data_lines[key] - # print("for", key, " the number of vcf objects is: ", len(vcf_obj_list)) + # Returns the header of the VCF file, the datalines of the VCF file, and the object. return standard_header_lines, vcf_data_lines, list_of_vcf_objects -def format_sample_values(sample_name_dict_format_kv, list_of_sample_names): - """ Creates a partial vcf data line of sample format values. - :param sample_name_dict_format_kv: dictionary of sample names => sample format value - :param list_of_sample_names: list of sample names - :return: sample_format_values_string: formatted string +# The functions below relate to the VCF objects +def compare_vcf_objects(list_of_vcf_objects): + """ Compares VCF objects in the list with the VCF object before it. Returns boolean values. + :params: list_of_vcf_objects: list of vcf objects + :return: comparison_results: list of booleans. For future reference, if True, this will determine merging lines; if False, this will determine use of the previous line. """ - sample_format_value_tokens = [] - for sample in list_of_sample_names: - if sample in sample_name_dict_format_kv: - format_value = sample_name_dict_format_kv[sample] - sample_format_value_tokens.append(':'.join(format_value.values())) + comparison_results = [] + # For each vcf line object, compare with the previous vcf line object in the list + for index in range(1, len(list_of_vcf_objects)): + current_vcf_object = list_of_vcf_objects[index] + previous_vcf_object = list_of_vcf_objects[index - 1] + # Determines the VCF line objects as equal based on the CHROM, POS and REF being the same (__eq__ in Vcfline) + if current_vcf_object == previous_vcf_object: + comparison_results.append(True) # This will use require merging. else: - format_value = "." # set to missing value - sample_format_value_tokens.append(format_value) - sample_format_values_string = '\t'.join(sample_format_value_tokens) - return sample_format_values_string + comparison_results.append(False) # No merging required. Use previous object. + return comparison_results -def format_vcf_datalines(list_of_vcf_objects, list_of_sample_names): - """ Iterates through a list of VCF objects and sample names and formats them as a VCF dataline. - :param list_of_vcf_objects: list of vcf objects - :param list_of_sample_names: list of sample names - :return: formatted_vcf_datalines: list of formatted vcf datalines +def merge_vcf_objects(previous, current, list_of_sample_names): + """ Merge VCF objects. + :params: previous: previous VCF line object + :params: current: current VCF line object + :params: list_of_sample_names: sample names + :return: merged_object + """ + merged_object = previous.merge(current, list_of_sample_names) + return merged_object + +def keep_vcf_objects(previous, list_of_sample_names): + """ Keep VCF objects. + :params: previous VCF line object + :return: kept_object + """ + kept_object = previous.keep(list_of_sample_names) + return kept_object + +def determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_results, list_of_sample_names): + """ Runs through the list of VCF objects and its corresponding comparison result. + If True, merge parts of the vcf object together. If False, use the previous object + :params: list_of_vcf_objects: list of vcf line objects + :return: merge_or_kept_objects: list of vcf line objects that have either been merged or kept as is. """ - formatted_vcf_datalines = [] - for vcf_obj in list_of_vcf_objects: - sample_name_dict_format_kv = vcf_obj.format_dict - sample_format_values_string = format_sample_values(sample_name_dict_format_kv, list_of_sample_names) - vcf_info_string = ";".join([inf for inf in vcf_obj.info if inf is not None]) - vcf_line = (f"{vcf_obj.chrom}\t" - f"{vcf_obj.pos}\t" - f"{vcf_obj.id}\t" - f"{vcf_obj.ref}\t" - f"{vcf_obj.alt}\t" - f"{vcf_obj.qual}\t" - f"{vcf_obj.filter}\t" - #f"{vcf_obj.info}\t" - f"{vcf_info_string}\t" - f"{vcf_obj.format}\t" - f"{sample_format_values_string}" - ) - formatted_vcf_datalines.append(vcf_line) - return formatted_vcf_datalines + merge_or_kept_objects = [] + # start at 1 to ensure the first element has a previous object + for index, compare_result in enumerate(comparison_results, start=1): + # Merge if the previous and current VCF object are the same (compare_result is True) + if compare_result: + merged_object = merge_vcf_objects(list_of_vcf_objects[index - 1], list_of_vcf_objects[index], list_of_sample_names) + # Keep previous if previous and current VCF object are different (compare_result is False) + else: + # keep the previous VCF line object + kept_object = keep_vcf_objects(list_of_vcf_objects[index - 1], list_of_sample_names) + merge_or_kept_objects.append(kept_object) + merge_or_kept_objects.append(list_of_vcf_objects[-1]) + return merge_or_kept_objects def main(): # Parse command line arguments @@ -270,11 +287,13 @@ def main(): parser.add_argument("--log", help="Path to log file") args = parser.parse_args() + # Set up logging functionality if args.log: log_path = set_up_logging(args.log) else: log_path = set_up_logging() + # Log the inputs and outputs. logger.info("Running the GVF to VCF converter") logger.info(f"The provided input file is: {args.gvf_input}") logger.info(f"The provided output file is: {args.vcf_output}") @@ -284,29 +303,33 @@ def main(): assert os.path.isfile(assembly_file), "Assembly file does not exist" logger.info(f"The log file is {log_path}") - # custom meta-information lines for this VCF file + # Read input file and separate out its components logger.info(f"Reading in the following GVF input: {args.gvf_input}") gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(args.gvf_input) - # store attributes and symbolic alleles - mapping_attribute_dict = read_yaml(os.path.join(etc_folder, "attribute_mapper.yaml")) - logger.info("Reading in the attributes file: " + "attribute_mapper.yaml") - symbolic_allele_dictionary = generate_symbolic_allele_dict(mapping_attribute_dict) + # Creating lookup object to store important dictionaries and log what has been stored. + reference_lookup = Lookup(assembly_file) + logger.info("Creating the reference lookup object.") + logger.info("Storing the attributes file: attribute_mapper.yaml") + logger.info("Storing the symbolic allele dictionary.") + logger.info(f"Storing the assembly file: {assembly_file}") + logger.info("Storing the IUPAC ambiguity dictionary.") + + # Convert each feature line in the GVF file to a VCF object (stores all the data for a line in the VCF file). + # NOTE: Main Logic lives here. ( header_lines, - vcf_data_lines, + vcf_data_lines, #TODO: check if this can be removed list_of_vcf_objects - ) = gvf_features_to_vcf_objects(gvf_lines_obj_list, - assembly_file, - mapping_attribute_dict, - symbolic_allele_dictionary - ) - - + ) = convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, reference_lookup) logger.info(f"Writing to the following VCF output: {args.vcf_output}") logger.info("Generating the VCF header and the meta-information lines") with open(args.vcf_output, "w") as vcf_output: + # Preparation work: + # Store the VCF metainformation and ensure preservation of important GVF data. + # This information will be useful when creating the VCF header. + # TODO: refactor function generate_vcf_metainfo ( unique_pragmas_to_add, samples, @@ -314,8 +337,13 @@ def main(): unique_info_lines_to_add, unique_filter_lines_to_add, unique_format_lines_to_add - ) = generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, header_lines) + ) = generate_vcf_header_metainfo(gvf_pragmas, + gvf_non_essential, + list_of_vcf_objects, + header_lines) logger.info(f"Total number of samples in this VCF: {len(samples)}") + + # Part 1 of VCF file: Write the VCF header. This will include perserved data from the GVF file. for pragma in unique_pragmas_to_add: vcf_output.write(f"{pragma}\n") for alt_lines in unique_alt_lines_to_add: @@ -326,12 +354,21 @@ def main(): vcf_output.write(f"{filter_lines}\n") for format_lines in unique_format_lines_to_add: vcf_output.write(f"{format_lines}\n") + + # Part 2 of VCF file: Write the VCF header line. This is the nine mandatory fields with its sample names. header_fields = generate_vcf_header_line(samples) vcf_output.write(f"{header_fields}\n") + + # Part 3 of VCF file: Write the VCF data lines. This will contain info about the position in the genome, + # its variants and genotype information per sample. logger.info("Generating the VCF datalines") - formatted_vcf_datalines = format_vcf_datalines(list_of_vcf_objects, samples) - for line in formatted_vcf_datalines: - vcf_output.write(f"{line}\n") + # Each GVF feature has been converted to a VCF object so begin comparing and merging the VCF objects. + comparison_flags = compare_vcf_objects(list_of_vcf_objects) # Identifies which VCF objects to merge + merge_or_kept_vcf_objects = determine_merge_or_keep_vcf_objects(list_of_vcf_objects, comparison_flags, samples) + # Write the VCF objects as data lines in the VCF file. + for vcf_line_object in merge_or_kept_vcf_objects: + vcf_output.write(str(vcf_line_object) + "\n") + # vcf_output.write("\t".join(str(val) for val in line) + "\n") vcf_output.close() logger.info("GVF to VCF conversion complete") diff --git a/convert_gvf_to_vcf/lookup.py b/convert_gvf_to_vcf/lookup.py new file mode 100644 index 0000000..272ed3a --- /dev/null +++ b/convert_gvf_to_vcf/lookup.py @@ -0,0 +1,21 @@ +import os + +from convert_gvf_to_vcf.utils import read_yaml, generate_symbolic_allele_dict, build_iupac_ambiguity_code + +# setting up paths to useful directories +convert_gvf_to_vcf_folder = os.path.dirname(__file__) +etc_folder = os.path.join(convert_gvf_to_vcf_folder, 'etc') + + +class Lookup: + """ + The class is responsible for the storage of look up dictionaries or files for a VCF file. + """ + def __init__(self, assembly_file): + self.mapping_attribute_dict = read_yaml(os.path.join(etc_folder, "attribute_mapper.yaml")) + self.symbolic_allele_dictionary = generate_symbolic_allele_dict(self.mapping_attribute_dict) + self.assembly_file = assembly_file + self.iupac_ambiguity_dictionary = build_iupac_ambiguity_code() + # self.all_possible_vcf_header_lines_dictionary={ + # htype: generate_vcf_header_structured_lines(htype, self.mapping_attribute_dict) for htype in ["ALT", "INFO", "FILTER", "FORMAT"] + # } diff --git a/convert_gvf_to_vcf/utils.py b/convert_gvf_to_vcf/utils.py index 171435e..6a7b374 100644 --- a/convert_gvf_to_vcf/utils.py +++ b/convert_gvf_to_vcf/utils.py @@ -1,4 +1,4 @@ -# this file contains readers +"""This contains readers and utilities""" import os import yaml @@ -34,6 +34,30 @@ def read_pragma_mapper(pragma_mapper_file): pragma_to_vcf_header[pragma] = vcf_header return pragma_to_vcf_header +def read_in_gvf_file(gvf_input): + """ Reads in the user provided GVF file. + :param gvf_input: arguments.gvf_input : The input GVF file + :return: + - gvf_pragmas: list of pragma lines (start with ## at the top of GVF file) + - gvf_non_essential: list of non essential pragma (start with # near the top of GVF file) + - gvf_lines_obj_list: list of objects where each object represents a GVF feature line + """ + gvf_pragmas = [] # list of pragma lines starting with: ## + gvf_non_essential = [] # list of non-essential lines starting with: # + gvf_lines_obj_list = [] # list of objects when reading in gvf files, one object represents a gvf line + + with open(gvf_input) as gvf_file: + for line in gvf_file: + if line.startswith("##"): + gvf_pragmas.append(line.rstrip()) + elif line.startswith("#"): + gvf_non_essential.append(line.rstrip()) + else: + f_list = line.rstrip().split("\t") + line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) + gvf_lines_obj_list.append(line_object) + return gvf_pragmas, gvf_non_essential, gvf_lines_obj_list + def generate_symbolic_allele_dict(mapping_dictionary): """Reads in mapping dictionary and returns a symbolic allele dictionary. :param mapping_dictionary: mapping dictionary @@ -41,7 +65,8 @@ def generate_symbolic_allele_dict(mapping_dictionary): """ symbolic_allele_dict = {} for attribute in mapping_dictionary: - header_type= "ALT" + # Symbolic alleles refer only to the header type "ALT". + header_type = "ALT" if mapping_dictionary[attribute].get(header_type) is not None: if mapping_dictionary[attribute].get(header_type).get("FieldKey") is not None: name = attribute @@ -54,27 +79,23 @@ def generate_symbolic_allele_dict(mapping_dictionary): symbolic_allele_dict.setdefault(name, []).append(description) return symbolic_allele_dict - -def read_in_gvf_file(gvf_input): - """ Reads in the user provided GVF file. - :param gvf_input: arguments.gvf_input - :return: gvf_pragmas, gvf_non_essential, gvf_lines_obj_list +def build_iupac_ambiguity_code(): + """ Builds dictionary for the iupac ambiguity code. + :return: iupac_ambiguity_dictionary: iupac code as key, list of values as value """ - gvf_pragmas = [] # list of pragma lines starting with: ## - gvf_non_essential = [] # list of non-essential lines starting with: # - features = [] - gvf_lines_obj_list = [] # list of objects when reading in gvf files, one object represents a gvf line - - with open(gvf_input) as gvf_file: - for line in gvf_file: - if line.startswith("##"): - gvf_pragmas.append(line.rstrip()) - elif line.startswith("#"): - gvf_non_essential.append(line.rstrip()) - else: - features.append(line.rstrip()) - for feature in features: - f_list = feature.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) - gvf_lines_obj_list.append(line_object) - return gvf_pragmas, gvf_non_essential, gvf_lines_obj_list \ No newline at end of file + # see PMID: 20202974 (Table 1) for the official list + iupac_codes = ["R", "Y", "M", "K", "S", "D", "W", "H", "B", "V", "D", "N"] + R = ["A", "G"] + Y = ["C", "T"] + M = ["A", "C"] + K = ["G", "T"] + S = ["C", "G"] + W = ["A", "T"] + H = ["A", "C", "T"] + B = ["C", "G", "T"] + V = ["A", "C", "G"] + D = ["A", "G", "T"] + N = ["A", "C", "G", "T"] + iupac_values = [R, Y, M, K, S, D, W, H, B, V, D, N] + iupac_ambiguity_dictionary = dict(zip(iupac_codes, iupac_values)) + return iupac_ambiguity_dictionary diff --git a/convert_gvf_to_vcf/vcfline.py b/convert_gvf_to_vcf/vcfline.py index b9344f7..f1dec12 100644 --- a/convert_gvf_to_vcf/vcfline.py +++ b/convert_gvf_to_vcf/vcfline.py @@ -1,6 +1,8 @@ """ -The purpose of this file is to populate for each field of a VCF line (and perform any necessary calculations to achieve this) +The purpose of this file is to populate for each field of a VCF line (and perform any modifications/calculations to achieve this) """ + + from Bio import SeqIO from convert_gvf_to_vcf.assistingconverter import convert_gvf_attributes_to_vcf_values @@ -24,53 +26,69 @@ def extract_reference_allele(fasta_file, chromosome_name, position, end): return reference_allele class VcfLine: + """ + This class is responsible for the storing and merging of the fields of a VCF dataline. + + A VCF dataline is defined in the VCF specification as: + - containing information about a position in the genome + - genotype information on samples for each position. + """ def __init__(self, gvf_feature_line_object, - mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, field_lines_dictionary, - all_possible_lines_dictionary): - - self.vcf_value, self.info_string, self.format_dict = convert_gvf_attributes_to_vcf_values(gvf_feature_line_object.attributes, mapping_attribute_dict, field_lines_dictionary, all_possible_lines_dictionary) + all_possible_lines_dictionary, #TODO: place this in reference + reference_lookup + ): + # Attributes which store important key-values dicts + (self.vcf_value_from_gvf_attribute, # used to populate the VCF fields. This is a dict of non-converted GVF attribute keys and their values. + self.vcf_values_for_info, # a dict that stores INFO key-values to form VCF line. This includes converted GVF attribute keys (+ other SV INFO). + self.vcf_values_for_format # a dict of FORMAT key-values for each sample to form VCF line + ) = convert_gvf_attributes_to_vcf_values(gvf_feature_line_object.attributes, reference_lookup.mapping_attribute_dict, field_lines_dictionary, all_possible_lines_dictionary) - # ATTRIBUTES - self.assembly = assembly_file - self.symbolic_allele_dictionary = symbolic_allele_dictionary - self.iupac_ambiguity_dictionary = self.build_iupac_ambiguity_code() - # GVF + # Attributes which might form useful parts of INFO field in VCF lines (useful information from GVF) self.source = gvf_feature_line_object.source self.so_type = gvf_feature_line_object.feature_type #currently column 3 of gvf, but could be an attribute so perhapsVCF: INFO or FORMAT? self.end = int(gvf_feature_line_object.end) self.phase = gvf_feature_line_object.phase # this is always a placeholder '.' - # VCF DATALINE + # Attributes which are required to generate a VCF DATALINE + # MANDATORY VCF FIELD 1 self.chrom = gvf_feature_line_object.seqid + # MANDATORY VCF FIELD 2 self.pos = int(gvf_feature_line_object.start) - self.id = self.vcf_value["ID"] # attributes: ID - self.length = self.end - self.pos - self.qual = gvf_feature_line_object.score # see EVA-3879: this is always '.' - self.filter = "." # this is always a placeholder '.'; perhaps could add s50. - - # INFO - self.key = self.chrom + "_" + str(self.pos) - self.info = [] - self.info.append(self.info_string) + # MANDATORY VCF FIELD 3 + self.id = self.vcf_value_from_gvf_attribute["ID"] # attributes: ID + # note ref and alt are calculated below (fields 4 and 5) + # MANDATORY VCF FIELD 6 + self.qual = gvf_feature_line_object.score # see EVA-3879: this is always '.' + # MANDATORY VCF FIELD 7 + self.filter = "." # this is always a placeholder '.' + # forms MANDATORY VCF FIELD 8 + self.info_dict = {} # dict that stores all INFO key-values (including INFO from merged lines and SV INFO). # calculated last - self.ref = self.get_ref() - self.alt = self.get_alt(field_lines_dictionary, all_possible_lines_dictionary) - - self.sample_name = self.vcf_value["sample_name"] # this should be each samples names format value # sample names needs to be populated in attributes - # # higher priority - if self.format_dict: - list_of_format_keys = [format_key for format_value in self.format_dict.values() for format_key in format_value.keys()] - self.format = ":".join(list_of_format_keys) + self.length = self.end - self.pos # required for INFO fields- SVLEN and END + # MANDATORY VCF FIELD 4 + self.ref = self.get_ref(reference_lookup) + # MANDATORY VCF FIELD 5 + self.alt = self.get_alt(field_lines_dictionary, + all_possible_lines_dictionary, + reference_lookup) + # useful for conversion of vcf lines + self.key = self.chrom + "_" + str(self.pos) # required in main logic convert_gvf_features_to_vcf_objects + + # presence of dict that stores FORMAT key-val per sample, store ordered list of FORMAT keys, else, use placeholder. + self.format_keys = [] + if self.vcf_values_for_format: + set_of_format_keys = set([format_key for format_value in self.vcf_values_for_format.values() for format_key in format_value.keys()]) + self.format_keys = self.order_format_keys(set_of_format_keys) # a list of ordered format keys else: - self.format = "." #TODO: this is temporary, when the multiple VCF lines are merged this will be filled in + self.format_keys.append(".") #TODO: this is temporary, when the multiple VCF lines are merged this will be filled in + self.list_of_format_values_per_sample = [] - def add_padded_base(self, ref, alt, placed_before : bool): - """ Adds padded base to REF and ALT allele + # Functions which are responsible for token generation/population for the VCF line + def add_padded_base(self, ref, alt, placed_before : bool, assembly_file): + """ Adds a padded base to the REF and ALT allele of a VCF line. :param ref: reference allele :param alt: alt allele :param placed_before: padded base is placed before ref or alt True or False @@ -79,7 +97,7 @@ def add_padded_base(self, ref, alt, placed_before : bool): if placed_before: padded_base_pos = self.pos - 1 self.pos = padded_base_pos - padded_base = extract_reference_allele(self.assembly, self.chrom, self.pos, self.end) + padded_base = extract_reference_allele(assembly_file, self.chrom, self.pos, self.end) ref = padded_base + ref if alt == ".": alt = padded_base @@ -88,7 +106,7 @@ def add_padded_base(self, ref, alt, placed_before : bool): elif not placed_before: padded_base_pos = self.pos + 1 new_end = self.end + 1 - padded_base = extract_reference_allele(self.assembly, self.chrom, padded_base_pos, new_end) + padded_base = extract_reference_allele(assembly_file, self.chrom, padded_base_pos, new_end) ref = ref + padded_base if alt == ".": alt = padded_base @@ -99,29 +117,8 @@ def add_padded_base(self, ref, alt, placed_before : bool): padded_base = None return padded_base, self.pos, ref, alt - def build_iupac_ambiguity_code(self): - """ Builds dictionary for the iupac ambiguity code - :return: iupac_ambiguity_dictionary: iupac code as key, list of values as value - """ - # see PMID: 20202974 (Table 1) for the official list - iupac_codes = ["R", "Y", "M", "K", "S", "D", "W", "H", "B", "V", "D", "N"] - R = ["A", "G"] - Y = ["C", "T"] - M = ["A", "C"] - K = ["G", "T"] - S = ["C", "G"] - W = ["A", "T"] - H = ["A", "C", "T"] - B = ["C", "G", "T"] - V = ["A", "C", "G"] - D = ["A", "G", "T"] - N = ["A", "C", "G", "T"] - iupac_values = [R, Y, M, K, S, D, W, H, B, V, D, N] - iupac_ambiguity_dictionary = dict(zip(iupac_codes, iupac_values)) - return iupac_ambiguity_dictionary - def convert_iupac_ambiguity_code(self, iupac_ambiguity_dictionary, ref_to_convert): - """ Converts the REF allele if it contains IUPAC ambiguity cod + """ If the REF allele of a VCF line contains an IUPAC ambiguity code, converts it. :param iupac_ambiguity_dictionary: dictionary of IUPAC ambiguity code and a list of values :param ref_to_convert: reference allele to be converted :return: self.ref @@ -136,13 +133,14 @@ def convert_iupac_ambiguity_code(self, iupac_ambiguity_dictionary, ref_to_conver converted_ref = converted_ref + converted_base return converted_ref - def check_ref(self, ref_allele_to_be_checked): - """ Checks whether a reference allele meets the requirements of the VCF specification + def check_ref(self, ref_allele_to_be_checked, reference_lookup): + """ Checks whether a reference allele meets the requirements of the VCF specification. :param ref_allele_to_be_checked: reference allele to check :return: checked_reference_allele: reference allele that meets the requirements of the VCF specification""" if isinstance(ref_allele_to_be_checked, str): if not all(bases in ref_allele_to_be_checked for bases in ["A", "C", "G", "T", "N"]): - checked_reference_allele = self.convert_iupac_ambiguity_code(self.iupac_ambiguity_dictionary, ref_allele_to_be_checked) + # checked_reference_allele = self.convert_iupac_ambiguity_code(self.build_iupac_ambiguity_code(), ref_allele_to_be_checked) + checked_reference_allele = self.convert_iupac_ambiguity_code(reference_lookup.iupac_ambiguity_dictionary, ref_allele_to_be_checked) else: checked_reference_allele = ref_allele_to_be_checked else: @@ -150,30 +148,31 @@ def check_ref(self, ref_allele_to_be_checked): checked_reference_allele = "." return checked_reference_allele - def get_ref(self): + def get_ref(self, reference_lookup): """ Gets the reference allele from attributes column or if not found, returns "." :return: reference allele """ - if "Reference_seq" in self.vcf_value.keys(): - reference_allele = self.vcf_value["Reference_seq"] + assembly_file = reference_lookup.assembly_file + if "Reference_seq" in self.vcf_value_from_gvf_attribute.keys(): + reference_allele = self.vcf_value_from_gvf_attribute["Reference_seq"] else: - if self.assembly: - reference_allele = extract_reference_allele(self.assembly, self.chrom, self.pos, self.end) + if assembly_file: + reference_allele = extract_reference_allele(assembly_file, self.chrom, self.pos, self.end) else: print("WARNING: No reference provided. Placeholder inserted for Reference allele.") reference_allele = "." if reference_allele != ".": - reference_allele = self.check_ref(reference_allele) + reference_allele = self.check_ref(reference_allele, reference_lookup) return reference_allele - - def generate_symbolic_allele(self, field_lines_dictionary, all_possible_lines_dictionary): - """ Generates the symbolic allele and stores the corresponding metainformation lines. Also determines if variant is precise or imprecise. + def generate_symbolic_allele(self, field_lines_dictionary, all_possible_lines_dictionary, symbolic_allele_dictionary): + """ Generates the symbolic allele and stores the corresponding metainformation lines. + Also determines if variant is precise or imprecise. :param field_lines_dictionary: lines for ALT, INFO, etc. :param all_possible_lines_dictionary: all possible lines :return: symbolic_allele, self.info, lines_standard_ALT, lines_standard_INFO """ - symbolic_allele_id = self.symbolic_allele_dictionary[self.so_type][1] + symbolic_allele_id = symbolic_allele_dictionary[self.so_type][1] symbolic_allele = f'<{symbolic_allele_id}>' lines_standard_alt = field_lines_dictionary["ALT"] @@ -183,85 +182,91 @@ def generate_symbolic_allele(self, field_lines_dictionary, all_possible_lines_di if symbolic_allele_id in all_possible_alt_lines: lines_standard_alt.append(all_possible_alt_lines[symbolic_allele_id]) - - info_svlen = None + info_svlen_key = "SVLEN" + info_svlen_value = None if self.length: - info_svlen = "SVLEN=" + str(self.length) + info_svlen_value = str(self.length) - start_range_lower_bound = self.vcf_value["Start_range"][0] - start_range_upper_bound = self.vcf_value["Start_range"][1] - end_range_lower_bound = self.vcf_value["End_range"][0] - end_range_upper_bound = self.vcf_value["End_range"][1] + start_range_lower_bound = self.vcf_value_from_gvf_attribute["Start_range"][0] + start_range_upper_bound = self.vcf_value_from_gvf_attribute["Start_range"][1] + end_range_lower_bound = self.vcf_value_from_gvf_attribute["End_range"][0] + end_range_upper_bound = self.vcf_value_from_gvf_attribute["End_range"][1] # setting up fields to be inserted into INFO - info_end = None - info_imprecise = None - info_cipos = None - info_ciend = None + info_end_key = "END" + info_end_value = None + info_imprecise_key = "IMPRECISE" + info_imprecise_value = None + info_cipos_key = "CIPOS" + info_cipos_value = None + info_ciend_key = "CIEND" + info_ciend_value = None if start_range_lower_bound == "." or start_range_upper_bound == "." or end_range_lower_bound == "." or end_range_upper_bound == ".": is_imprecise = False - info_end = "END=" + str(self.pos + len(self.ref) - 1) + info_end_value = str(self.pos + len(self.ref) - 1) else: is_imprecise = True - info_imprecise = "IMPRECISE" + info_imprecise_value = "IMPRECISE" cipos_lower_bound = int(start_range_lower_bound) - self.pos cipos_upper_bound = int(start_range_upper_bound) - self.pos - info_cipos = "CIPOS=" + str(cipos_lower_bound) + "," + str(cipos_upper_bound) + info_cipos_value = str(cipos_lower_bound) + "," + str(cipos_upper_bound) ciend_lower_bound = int(start_range_lower_bound) - self.pos ciend_upper_bound = int(start_range_upper_bound) - self.pos - info_ciend = "CIEND=" + str(ciend_lower_bound) + "," + str(ciend_upper_bound) + info_ciend_value = str(ciend_lower_bound) + "," + str(ciend_upper_bound) if symbolic_allele == "": - info_end ="END=" + str( self.pos + len(self.ref) - 1 ) + info_end_value = str( self.pos + len(self.ref) - 1 ) elif symbolic_allele in {"", "", "", ""}: - info_end = "END=" + str(self.pos + self.length) + info_end_value = str(self.pos + self.length) elif symbolic_allele == "<*>": - info_end = "END=" + str(self.pos + len(self.ref)) + info_end_value = str(self.pos + len(self.ref)) else: print("Cannot identify symbolic allele") - # for all variants (precise and imprecise) - self.info.append(info_end) + # Set up INFO values for structural variants and store in the info_dict + self.info_dict[info_end_key] = info_end_value + self.info_dict[info_imprecise_key] = info_imprecise_value + self.info_dict[info_cipos_key] = info_cipos_value + self.info_dict[info_ciend_key] = info_ciend_value + self.info_dict[info_svlen_key] = info_svlen_value + + # for all variants (precise and imprecise) store INFO lines for the header lines_standard_info.append(all_possible_info_lines["END"]) - self.info.append(info_svlen) lines_standard_info.append(all_possible_info_lines["SVLEN"]) # for imprecise variants only if is_imprecise: - self.info.append(info_imprecise) lines_standard_info.append(all_possible_info_lines["IMPRECISE"]) - self.info.append(info_cipos) lines_standard_info.append(all_possible_info_lines["CIPOS"]) - self.info.append(info_ciend) lines_standard_info.append(all_possible_info_lines["CIEND"]) - return symbolic_allele, self.info, lines_standard_alt, lines_standard_info + return symbolic_allele, self.info_dict, lines_standard_alt, lines_standard_info - def get_alt(self, field_lines_dictionary, all_possible_lines_dictionary): + def get_alt(self, field_lines_dictionary, all_possible_lines_dictionary, reference_lookup): """ Gets the ALT allele for the VCF file :param field_lines_dictionary: store INFO,ALT, FILTER, FORMAT lines :param all_possible_lines_dictionary: dictionary of all possible ALT, INFO, FORMAT, FILTER lines :return: symbolic_allele, self.info, lines_standard_ALT, lines_standard_INFO """ - if any(base in self.vcf_value["Variant_seq"] for base in ["A", "C", "G", "T", "N"]): - alterative_allele = self.vcf_value["Variant_seq"] - elif self.vcf_value["Variant_seq"] == '.': - symbolic_allele, self.info, lines_standard_alt, lines_standard_info = self.generate_symbolic_allele(field_lines_dictionary, all_possible_lines_dictionary) + if any(base in self.vcf_value_from_gvf_attribute["Variant_seq"] for base in ["A", "C", "G", "T", "N"]): + alterative_allele = self.vcf_value_from_gvf_attribute["Variant_seq"] + elif self.vcf_value_from_gvf_attribute["Variant_seq"] == '.': + symbolic_allele, self.info_dict, lines_standard_alt, lines_standard_info = self.generate_symbolic_allele(field_lines_dictionary, all_possible_lines_dictionary, reference_lookup.symbolic_allele_dictionary) if symbolic_allele is None: alterative_allele = "." - elif (self.vcf_value["Variant_seq"] == "." or self.vcf_value["Variant_seq"] == "-") and symbolic_allele is not None: + elif (self.vcf_value_from_gvf_attribute["Variant_seq"] == "." or self.vcf_value_from_gvf_attribute["Variant_seq"] == "-") and symbolic_allele is not None: alterative_allele = symbolic_allele # add padded bases if self.pos == 1: #print("pos, ref, alt",self.pos,self.ref, alterative_allele) - padded_base, self.pos, self.ref, self.alt = self.add_padded_base(self.ref, alterative_allele, False) - self.ref = self.check_ref(self.ref) + padded_base, self.pos, self.ref, self.alt = self.add_padded_base(self.ref, alterative_allele, False, reference_lookup.assembly_file) + self.ref = self.check_ref(self.ref, reference_lookup) else: #print("pos, ref, alt", self.pos,self.ref, alterative_allele) - padded_base, self.pos, self.ref, self.alt = self.add_padded_base(self.ref, alterative_allele, True) - self.ref = self.check_ref(self.ref) + padded_base, self.pos, self.ref, self.alt = self.add_padded_base(self.ref, alterative_allele, True, reference_lookup.assembly_file) + self.ref = self.check_ref(self.ref, reference_lookup) else: alterative_allele = "." print("Cannot identify symbolic allele. Variant type is not supported.") @@ -271,5 +276,195 @@ def get_alt(self, field_lines_dictionary, all_possible_lines_dictionary): return alterative_allele def __str__(self): - string_to_return = '\t'.join((self.chrom, self.pos, self.key, self.qual, self.filter, self.info, self.source, self.phase, self.end, self.so_type, self.sample_name, self.format)) + """ Creates and formats the VCF line. + :return: string_to_return - the VCF line as a string + """ + string_to_return = '\t'.join((self.chrom, + str(self.pos), + self.id, + self.ref, + self.alt, + self.qual, + self.filter, + self.format_info_string(), + ":".join(self.format_keys) if isinstance(self.format_keys, list) else self.format_keys, + '\t'.join(self.list_of_format_values_per_sample) + )) return string_to_return + + def __eq__(self, other_vcf_line): + """ Compares equality of PARTS of the VcfLine objects. + :param: other_vcf_line: another object to compare equality with + """ + if isinstance(other_vcf_line, VcfLine): + return (self.chrom == other_vcf_line.chrom) and (self.pos == other_vcf_line.pos) and (self.ref == other_vcf_line.ref) + return False + + def merge_and_add(self, previous_element, current_element, delimiter): + """ Merges fields of a VCF line. If field is the same, use current element. If different, merge with delimiter. + :param: previous_element + :param: current_element + :param: delimiter + :return: merged element + """ + if previous_element == current_element: + merged_element = current_element + else: + merged_element = delimiter.join((previous_element, current_element)) + return merged_element + # functions responsible for FORMAT are below + def order_format_keys(self, set_of_format_keys): + """Stores the FORMAT keys of the VCF line in the correct order by anchoring GT as the first key. + :param: set_of_format_keys: format keys in a set + :return: anchored_list_of_keys: list of ordered keys + """ + anchored_list_of_format_keys = [] + if 'GT' in set_of_format_keys: + anchored_list_of_format_keys.append("GT") + set_of_format_keys.discard('GT') + anchored_list_of_format_keys.extend(set_of_format_keys) + return anchored_list_of_format_keys + + def merge_format_keys(self, other_vcf_line): + """ Storing and merging of FORMAT keys of a VCF line in a list. + :param: other_vcf_line: the other VCF line to merge with + """ + merged_format_keys = set() + # this_keys = self.format_keys.split(":") + # other_keys = other_vcf_line.format_keys.split(":") + for this_key in self.format_keys: + merged_format_keys.add(this_key) + for other_key in other_vcf_line.format_keys: + merged_format_keys.add(other_key) + list_of_merged_format_key = self.order_format_keys(merged_format_keys) + self.format_keys = list_of_merged_format_key + other_vcf_line.format_keys = list_of_merged_format_key + + def combine_format_values_by_sample(self, format_tag_and_values_per_sample, list_of_sample_names): + """ Creates list of format values for each sample for the vcf data line. + :param format_tag_and_values_per_sample: nested dictionary {sample_name: {format_tag:formatvalue}}. + :param list_of_sample_names: list of sample names + :return: list_of_format_values_per_sample: a list e.g. ['.:3', '.:.', '.:.', '0:1:3'] (in the VCF file, this would be the tab-separated values under the sample name) + """ + # Creates the list of FORMAT keys so we can get its corresponding value later + set_of_format_keys = {key for sample in format_tag_and_values_per_sample for key in + format_tag_and_values_per_sample[sample]} + list_of_format_key = self.order_format_keys(set_of_format_keys) + # Generate string. For present samples, get its format value. For missing samples, populate with a missing value. + for sample in list_of_sample_names: + if sample in format_tag_and_values_per_sample: + format_value_list = [] + for key in list_of_format_key: + format_value_list.append(format_tag_and_values_per_sample.get(sample, + '.').get(key, + '.')) # adds missing values if not found + self.list_of_format_values_per_sample.append(":".join(format_value_list)) + else: + self.list_of_format_values_per_sample.append(':'.join(['.' for key in list_of_format_key] or ['.'])) + return self.list_of_format_values_per_sample + # functions responsible for INFO are below + + def fill_merge_dicts(self, merged_info_dict, key, previous_line_info_value, current_line_info_value): + """ Logic for merging info dicts + :param: merged_info_dict: merged dictionary + :param: key: key for merged info dict + :param: previous_line_info_value + :param: current_line_info_value + :return: merged info dict + """ + if previous_line_info_value is None and current_line_info_value is None: + pass + elif previous_line_info_value == current_line_info_value: + merged_info_dict[key] = previous_line_info_value + else: + if previous_line_info_value is None: + merged_info_dict[key] = current_line_info_value + elif current_line_info_value is None: + merged_info_dict[key] = previous_line_info_value + else: + merged_info_dict[key] = f"{previous_line_info_value},{current_line_info_value}" + return merged_info_dict + + + def merge_info_dicts(self, other_vcf_line): + """ Merges and stores the INFO dictionaries for the INFO field of a VCF line. + :param: other_vcf_line + """ + # Create data structure to merge the INFO dict of this VCF line and the other_vcf_line + merged_info_dict = {} + # Step 1: Merge from vcf_values_for_info + # Aim is to store converted GVF attributes + # vcf_values_for_info is a dict that stores INFO key-values to form VCF line. This includes converted GVF attribute keys (+ other SV INFO). + for key in self.vcf_values_for_info.keys() | other_vcf_line.vcf_values_for_info.keys(): + this_info_value = self.vcf_values_for_info.get(key) + other_info_value = other_vcf_line.vcf_values_for_info.get(key) + merged_info_dict = self.fill_merge_dicts(merged_info_dict, key,this_info_value,other_info_value) + + # Step 2: Merge from info_dict + # Aim is to store SV INFO + # info_dict = dict that stores all INFO key-values (including INFO from merged lines and SV INFO). + for info_dict_key in self.info_dict.keys() | other_vcf_line.info_dict.keys(): + this_info_dict_value = self.info_dict.get(info_dict_key) + other_info_dict_value = other_vcf_line.info_dict.get(info_dict_key) + merged_info_dict = self.fill_merge_dicts(merged_info_dict,info_dict_key, this_info_dict_value,other_info_dict_value) + + # Remove the ID + key_to_remove = "ID" + if key_to_remove in merged_info_dict: + del merged_info_dict[key_to_remove] + + # Store merged info dict for this VCF line and the other VCF line. + self.info_dict = merged_info_dict + other_vcf_line.info_dict = merged_info_dict + + + def format_info_string(self): + """ Creates a formatted INFO string using the INFO dictionary. Anchors ID to start of the string. + :return: info_string: formatted INFO string for use in VCF line + """ + # Ensure ID is the first key + anchored_key = "ID" + if anchored_key in self.info_dict: + self.info_dict = {anchored_key:self.info_dict.pop(anchored_key), **self.info_dict} + # Remove None values + if "IMPRECISE" in self.info_dict and self.info_dict.get('IMPRECISE') is None: + del self.info_dict["IMPRECISE"] + if "CIPOS" in self.info_dict and self.info_dict.get('CIPOS') is None: + del self.info_dict["CIPOS"] + if "CIEND" in self.info_dict and self.info_dict.get('CIEND') is None: + del self.info_dict["CIEND"] + # Format the string + info_string = ";".join(f"{key}={value}" if key != "IMPRECISE" else f"{value}" for key,value in self.info_dict.items()) + return info_string + + # MERGE OR KEEP below + def merge(self, other_vcf_line, list_of_sample_names): + """ Merging of the fields of a VCF line (ID, ALT, FILTER, INFO, FORMAT, FORMATvalues). + :param: other_vcf_line : other VCF line to merge with + :param: list_of_sample_names: list of sample names to help with creating format values by sample + """ + # Merging ID, ALT and FILTER first + merged_id = self.merge_and_add(self.id, other_vcf_line.id, ";") + merged_alt = self.merge_and_add(self.alt, other_vcf_line.alt, ",") + merged_filter = self.merge_and_add(self.filter, other_vcf_line.filter, ";") + + self.id = other_vcf_line.id = merged_id + self.alt = other_vcf_line.alt = merged_alt + self.filter = other_vcf_line.filter = merged_filter + # Merging INFO using info_dict + self.merge_info_dicts(other_vcf_line) + # Merging FORMAT keys - these go under FORMAT + self.merge_format_keys(other_vcf_line) + # Merging FORMAT values - these go under the Sample + merged_format_dict = self.vcf_values_for_format | other_vcf_line.vcf_values_for_format + self.vcf_values_for_format = merged_format_dict + other_vcf_line.vcf_values_for_format = merged_format_dict + + self.list_of_format_values_per_sample = self.combine_format_values_by_sample(self.vcf_values_for_format, list_of_sample_names) + other_vcf_line.list_of_format_values_per_sample = other_vcf_line.combine_format_values_by_sample(other_vcf_line.vcf_values_for_format, list_of_sample_names) + return self + + + def keep(self, list_of_sample_names): + self.list_of_format_values_per_sample = self.combine_format_values_by_sample(self.vcf_values_for_format, list_of_sample_names) + return self diff --git a/tests/test_assisting_converter.py b/tests/test_assisting_converter.py new file mode 100644 index 0000000..c722c7e --- /dev/null +++ b/tests/test_assisting_converter.py @@ -0,0 +1,29 @@ +#TODO: 3 tests +import os +import unittest + +from convert_gvf_to_vcf.utils import read_yaml, generate_symbolic_allele_dict + + +class TestAssistingConverter(unittest.TestCase): + def setUp(self): + input_folder = os.path.dirname(__file__) + self.input_file = os.path.join(input_folder, "input", "zebrafish.gvf") + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + # the inputs below are INFO attribute files + self.etc_folder = os.path.join(self.input_folder_parent, "etc") + self.mapping_attribute_dict = read_yaml( + os.path.join(self.etc_folder, 'attribute_mapper.yaml')) # formerly attributes_mapper and INFOattributes + self.etc_folder = os.path.join(self.input_folder_parent, "etc") + self.symbolic_allele_dictionary = generate_symbolic_allele_dict(self.mapping_attribute_dict) + self.output_file = os.path.join(input_folder, "input", "a.vcf") + self.assembly = os.path.join(input_folder, "input", "zebrafish.fa") + + def test_generate_custom_structured_meta_line(self): + pass + + def test_get_gvf_attributes(self): + pass + + def test_convert_gvf_attributes_to_vcf_values(self): + pass \ No newline at end of file diff --git a/tests/test_convert_gvf_to_vcf.py b/tests/test_convert_gvf_to_vcf.py index 83a2511..bcb49af 100644 --- a/tests/test_convert_gvf_to_vcf.py +++ b/tests/test_convert_gvf_to_vcf.py @@ -1,391 +1,60 @@ +#TODO: 5 test import os.path import unittest +from convert_gvf_to_vcf.lookup import Lookup #from convert_gvf_to_vcf.utils import read_file -from convert_gvf_to_vcf.convertGVFtoVCF import generate_custom_unstructured_meta_line, read_in_gvf_file, \ - gvf_features_to_vcf_objects, format_vcf_datalines, \ - generate_vcf_metainfo, generate_vcf_header_structured_lines, \ - generate_vcf_header_line, \ - format_sample_values, read_yaml, read_pragma_mapper, generate_symbolic_allele_dict - +from convert_gvf_to_vcf.convertGVFtoVCF import generate_vcf_header_unstructured_line, read_in_gvf_file, convert_gvf_features_to_vcf_objects, generate_vcf_header_metainfo, generate_vcf_header_line, compare_vcf_objects, determine_merge_or_keep_vcf_objects from convert_gvf_to_vcf.vcfline import VcfLine -from convert_gvf_to_vcf.gvffeature import GvfFeatureline + class TestConvertGVFtoVCF(unittest.TestCase): def setUp(self): + # Prepare Directories + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + self.etc_folder = os.path.join(self.input_folder_parent, "etc") input_folder = os.path.dirname(__file__) + # Prepare Inputs self.input_file = os.path.join(input_folder, "input", "zebrafish.gvf") self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) - # the inputs below are INFO attribute files - self.etc_folder = os.path.join(self.input_folder_parent, "etc") - self.mapping_attribute_dict = read_yaml( - os.path.join(self.etc_folder, 'attribute_mapper.yaml')) # formerly attributes_mapper and INFOattributes - self.etc_folder = os.path.join(self.input_folder_parent, "etc") - self.symbolic_allele_dictionary = generate_symbolic_allele_dict(self.mapping_attribute_dict) + # Prepare Outputs self.output_file = os.path.join(input_folder, "input", "a.vcf") + # Prepare References self.assembly = os.path.join(input_folder, "input", "zebrafish.fa") + self.reference_lookup = Lookup(self.assembly) + # self.mapping_attribute_dict = read_yaml( + # os.path.join(self.etc_folder, 'attribute_mapper.yaml')) # formerly attributes_mapper and INFOattributes + # self.symbolic_allele_dictionary = generate_symbolic_allele_dict(self.mapping_attribute_dict) - def test_read_yaml(self): - test_yaml_dictionary = read_yaml(os.path.join(self.etc_folder, 'attribute_mapper.yaml')) - assert len(test_yaml_dictionary) > 0 - - def test_read_pragma_mapper(self): - pragma_to_vcf_header = read_pragma_mapper(os.path.join(self.etc_folder, 'pragma_mapper.tsv')) - assert len(pragma_to_vcf_header) > 0 - - def test_read_mapping_dictionary(self): - symbolic_allele_dictionary = generate_symbolic_allele_dict(self.mapping_attribute_dict) - assert len(symbolic_allele_dictionary) > 0 - - def test_read_in_gvf_file(self): - gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) - assert len(gvf_pragmas) > 1 - assert len(gvf_non_essential) > 1 - assert len(gvf_lines_obj_list) > 1 - - def test_gvf_features_to_vcf_objects(self): - gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - header_lines_for_this_vcf, vcf_data_lines, list_of_vcf_objects = gvf_features_to_vcf_objects(gvf_lines_obj_list, - assembly_file,self.mapping_attribute_dict, self.symbolic_allele_dictionary) - assert len(gvf_pragmas) > 1 - assert len(gvf_non_essential) > 1 - assert len(gvf_lines_obj_list) > 1 - assert len(header_lines_for_this_vcf) > 1 - assert len(vcf_data_lines) > 1 - assert len(list_of_vcf_objects) > 1 - - def test_add_padded_base(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) - - mapping_attribute_dict = self.mapping_attribute_dict - - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - - # Dictionary for all possible VCF meta-information lines - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", mapping_attribute_dict) - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", mapping_attribute_dict) - - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - - v = VcfLine(line_object, - mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - - test_ref = "A" - test_alt = "T" - padded_base, pos, ref, alt = v.add_padded_base(test_ref, test_alt, True) - assert padded_base is not None - assert pos is not None - assert ref is not None - assert alt is not None - - def test_build_iupac_ambiguity_code(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], - f_list[7], f_list[8]) - - mapping_attribute_dict = self.mapping_attribute_dict - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - - # Dictionary for all possible VCF meta-information lines - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.mapping_attribute_dict) - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.mapping_attribute_dict) - - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - v = VcfLine(line_object, - mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - - my_ipuac_dictionary = v.build_iupac_ambiguity_code() - assert len(my_ipuac_dictionary) > 0 - - def test_convert_iupac_ambiguity_code(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], - f_list[7], f_list[8]) - - mapping_attribute_dict = self.mapping_attribute_dict - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - # Dictionary for all possible VCF meta-information lines - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.mapping_attribute_dict) - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.mapping_attribute_dict) - - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - v = VcfLine(line_object, - mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - - - my_ipuac_dictionary = v.build_iupac_ambiguity_code() - ref_to_convert = "TAGD" - converted_ref_allele = v.convert_iupac_ambiguity_code(my_ipuac_dictionary, ref_to_convert) - assert converted_ref_allele not in ["R", "Y", "M", "K", "S", "D", "W", "H", "B", "V", "D", "N"] - - def test_check_ref(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], - f_list[7], f_list[8]) - - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - # Dictionary for all possible VCF meta-information lines - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.mapping_attribute_dict) - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.mapping_attribute_dict) - - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - v = VcfLine(line_object, - self.mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - - reference_allele_to_check = "TGCR" - new_ref = v.check_ref(reference_allele_to_check) - iupac_code = ["R", "Y", "M", "K", "S", "D", "W", "H", "B", "V", "D", "N"] - assert all(code not in new_ref for code in iupac_code) - - def test_get_ref(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) - - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - # Dictionary for all possible VCF meta-information lines - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.mapping_attribute_dict) - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.mapping_attribute_dict) - - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - v = VcfLine(line_object, - self.mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - reference_allele = v.get_ref() - assert len(reference_allele) != 0 - assert reference_allele == 'TA' - - def test_generate_symbolic_allele(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 81 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=77,78;End_range=80,81;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly - - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - # Dictionary for all possible VCF meta-information lines - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.mapping_attribute_dict) - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.mapping_attribute_dict) - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - v = VcfLine(line_object, - self.mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - (output_symbolic_allele, info_field, output_lines_standard_alt, output_lines_standard_info) = v.generate_symbolic_allele(standard_lines_dictionary, all_possible_lines_dictionary) - assert output_symbolic_allele == '' - print(info_field) - assert info_field == ['ID=1;NAME=nssv1412199;ALIAS=CNV28955;VARCALLSOID=SO:0001743;SVCID=CNV28955;REMAP=.98857;VARSEQ=.', 'END=81', 'SVLEN=4', 'IMPRECISE', 'CIPOS=0,1', 'CIEND=0,1', 'END=80', 'SVLEN=4', 'IMPRECISE', 'CIPOS=1,2', 'CIEND=1,2'] - assert output_lines_standard_alt == ['##ALT=', '##ALT='] - print(output_lines_standard_info) - assert output_lines_standard_info == ['##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO='] - - - - def test_get_alt(self): - gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 81 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=77,78;End_range=80,81;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." - f_list = gvf_feature_line.split("\t") - line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) + def test_generate_vcf_header_structured_lines(self): + pass + def test_generate_custom_unstructured_meta_line(self): + formatted_string = generate_vcf_header_unstructured_line("test_string_key", "test_string_value") + assert formatted_string == "##test_string_key=test_string_value" - symbolic_allele_dictionary = self.symbolic_allele_dictionary - assembly_file = self.assembly + def test_parse_pragma(self): + pass - # standard structured meta-information lines for this VCF file - lines_standard_alt = [] - lines_standard_info = [] - lines_standard_filter = [] - lines_standard_format = [] - # merging - standard_lines_dictionary = { - "ALT": lines_standard_alt, - "INFO": lines_standard_info, - "FILTER": lines_standard_filter, - "FORMAT": lines_standard_format, - } - # Dictionary for all possible VCF meta-information lines - all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.mapping_attribute_dict) - all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.mapping_attribute_dict) - all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.mapping_attribute_dict) - all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.mapping_attribute_dict) + def test_get_pragma_name_and_value(self): + pass - all_possible_lines_dictionary = { - "ALT": all_possible_alt_lines, - "INFO": all_possible_info_lines, - "FILTER": all_possible_filter_lines, - "FORMAT": all_possible_format_lines, - } - v = VcfLine(line_object, - self.mapping_attribute_dict, - symbolic_allele_dictionary, - assembly_file, - standard_lines_dictionary, - all_possible_lines_dictionary) - alt_allele = v.get_alt(standard_lines_dictionary, all_possible_lines_dictionary) - assert alt_allele == '' + def test_get_pragma_tokens(self): + pass - def test_generate_vcf_metainformation(self): + def test_generate_vcf_metainfo(self): gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) - - header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects = gvf_features_to_vcf_objects(gvf_lines_obj_list, - self.assembly, self.mapping_attribute_dict, self.symbolic_allele_dictionary) + ( + header_standard_lines_dictionary, + vcf_data_lines, + list_of_vcf_objects + ) = convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, self.reference_lookup) print("standard lines", header_standard_lines_dictionary) (unique_pragmas_to_add, sample_names, unique_alt_lines_to_add, unique_info_lines_to_add, - unique_filter_lines_to_add, unique_format_lines_to_add) = generate_vcf_metainfo( + unique_filter_lines_to_add, unique_format_lines_to_add) = generate_vcf_header_metainfo( gvf_pragmas, gvf_non_essential, list_of_vcf_objects, header_standard_lines_dictionary ) print(unique_pragmas_to_add) @@ -406,35 +75,70 @@ def test_generate_vcf_header_line(self): header_fields = generate_vcf_header_line(['JenMale6', 'Wilds2-3', 'Zon9', 'JenMale7']) assert header_fields == '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tJenMale6\tWilds2-3\tZon9\tJenMale7' - def test_format_sample_values(self): + def test_gvf_features_to_vcf_objects(self): gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + assembly_file = self.assembly + # standard structured meta-information lines for this VCF file - header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects = gvf_features_to_vcf_objects(gvf_lines_obj_list, - self.assembly, self.mapping_attribute_dict, self.symbolic_allele_dictionary) ( - unique_pragmas_to_add, samples, unique_alt_lines_to_add, unique_info_lines_to_add, - unique_filter_lines_to_add, unique_format_lines_to_add - ) = generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, - header_standard_lines_dictionary) - for vcf_obj in list_of_vcf_objects: - sample_name_dict_format_kv = vcf_obj.format_dict - sample_format_values_string = format_sample_values(sample_name_dict_format_kv, samples) - assert isinstance(sample_format_values_string, str) + header_lines_for_this_vcf, + vcf_data_lines, + list_of_vcf_objects + ) = convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, self.reference_lookup) + assert len(gvf_pragmas) > 1 + assert len(gvf_non_essential) > 1 + assert len(gvf_lines_obj_list) > 1 + assert len(header_lines_for_this_vcf) > 1 + assert len(vcf_data_lines) > 1 + assert len(list_of_vcf_objects) > 1 - def test_format_vcf_datalines(self): + def test_compare_vcf_objects(self): gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) - header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects = gvf_features_to_vcf_objects(gvf_lines_obj_list, self.assembly, self.mapping_attribute_dict, self.symbolic_allele_dictionary) - ( - unique_pragmas_to_add, samples, unique_alt_lines_to_add, unique_info_lines_to_add, - unique_filter_lines_to_add, unique_format_lines_to_add - ) = generate_vcf_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, header_standard_lines_dictionary) - formatted_vcf_datalines = format_vcf_datalines(list_of_vcf_objects, samples) - print(formatted_vcf_datalines) - assert formatted_vcf_datalines == ['chromosome1\t1\t1\tAC\t\t.\t.\tID=1;NAME=nssv1412199;ALIAS=CNV28955;VARCALLSOID=SO:0001743;SVCID=CNV28955;REMAP=.98857;VARSEQ=.;END=1;SVLEN=1\t.\t.\t.\t.\t.', 'chromosome1\t76\t1\tTAA\t\t.\t.\tID=1;NAME=nssv1412199;ALIAS=CNV28955;VARCALLSOID=SO:0001743;SVCID=CNV28955;REMAP=.98857;VARSEQ=.;END=78;SVLEN=1;IMPRECISE;CIPOS=776537,776837;CIEND=776537,776837\t.\t.\t.\t.\t.', 'chromosome1\t126\t12\tCGTACGGTACG\t\t.\t.\tID=12;NAME=nssv1406143;ALIAS=CNV22899;VARCALLSOID=SO:0001743;SVCID=CNV22899;REMAP=.87402;VARSEQ=.;END=131;SVLEN=5\t.\t.\t.\t.\t.', 'chromosome1\t127\t13\tGTACGTACG\t\t.\t.\tID=13;NAME=nssv1389474;ALIAS=CNV6230;VARCALLSOID=SO:0001742;SVCID=CNV6230;REMAP=.69625;VARSEQ=.;END=131;SVLEN=4\t.\t.\t.\t.\t.', 'chromosome1\t127\t14\tGTACGTACG\t\t.\t.\tID=14;NAME=nssv1388955;ALIAS=CNV5711;VARCALLSOID=SO:0001742;SVCID=CNV5711;REMAP=.85344;VARSEQ=.;AC=3;END=131;SVLEN=4\t.\t.\t.\t.\t.', 'chromosome1\t127\t14\tGTT\t\t.\t.\tID=14;NAME=nssv1388955;ALIAS=CNV5711;VARCALLSOID=SO:0001742;SVCID=CNV5711;REMAP=.85344;VARSEQ=.;AC=3;DBXREF=mydata;AD=3;END=128;SVLEN=1\tAD\t3\t.\t.\t.', 'chromosome1\t127\t14\tGTT\t\t.\t.\tID=14;NAME=nssv1388955;ALIAS=CNV5711;VARCALLSOID=SO:0001742;SVCID=CNV5711;REMAP=.85344;VARSEQ=.;AC=3;DBXREF=mydata;AD=3;END=128;SVLEN=1\tAD:GT\t.\t.\t.\t3:0:1'] + header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects = convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, self.reference_lookup) + # compare object, if equal, True, if not equal, False # (next function will make true = current and merge; false= previous) + expected_flags_for_list_of_vcf_objects = [False, # line 1 vs 2 + False, # line 2 vs 3 + False, # line 3 vs 4 + True, # line 4 vs 5 + False, # line 5 vs 6 + True # line 6 vs 7 + ] + actual_flags_for_list_of_vcf_objects = compare_vcf_objects(list_of_vcf_objects) + assert actual_flags_for_list_of_vcf_objects == expected_flags_for_list_of_vcf_objects + + def test_merge_vcf_objects(self): + # gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + # header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects = gvf_features_to_vcf_objects( + # gvf_lines_obj_list, self.reference_lookup) + # list_of_samples = ['JenMale6', 'Wilds2-3', 'Zon9', 'JenMale7'] + # # use lines 4 and 5 of gvf file + # previous = list_of_vcf_objects[3] # line 4 + # current = list_of_vcf_objects[4] #line 5 + # merged_object = merge_vcf_objects(previous, current, list_of_samples) + # to_check = ('chromosome1', 127, '13;14', 'GTACGTACG', '', '.', '.', 'ID=13,14;SVCID=CNV6230,CNV5711;ALIAS=CNV6230,CNV5711;END=131;NAME=nssv1389474,nssv1388955;VARCALLSOID=SO:0001742;AC=3;SVLEN=4;REMAP=.69625,.85344;VARSEQ=.', '.', '.\t.\t.\t.') + # assert merged_object == to_check #TODO: the info_string is different each time, ensure order is preserved + pass + + def test_keep_vcf_objects(self): + pass + + def test_determine_merge_or_keep_vcf_objects(self): + gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects = convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, self.reference_lookup) + list_of_samples = ['JenMale6', 'Wilds2-3', 'Zon9', 'JenMale7'] + flags_for_list_of_vcf_objects = compare_vcf_objects(list_of_vcf_objects) + merged_or_kept_objects = determine_merge_or_keep_vcf_objects(list_of_vcf_objects, flags_for_list_of_vcf_objects, list_of_samples) + for j in flags_for_list_of_vcf_objects: + print(j) + for i in merged_or_kept_objects: + print(i) + assert len(merged_or_kept_objects) == 5 # 3 kept + 2 merged + # check variant 13 and 14 have been merged + assert merged_or_kept_objects[3].id == "13;14" + assert merged_or_kept_objects[3].info_dict["NAME"] == "nssv1389474,nssv1388955" + + - def test_generate_custom_unstructured_metainfomation_line(self): - formatted_string = generate_custom_unstructured_meta_line("test_string_key", "test_string_value") - assert formatted_string == "##test_string_key=test_string_value" if __name__ == '__main__': unittest.main() diff --git a/tests/test_logger.py b/tests/test_logger.py new file mode 100644 index 0000000..d803b70 --- /dev/null +++ b/tests/test_logger.py @@ -0,0 +1,16 @@ +#TODO: 1 test +import os +import unittest + +class TestLogger(unittest.TestCase): + def setUp(self): + input_folder = os.path.dirname(__file__) + self.input_file = os.path.join(input_folder, "input", "zebrafish.gvf") + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + # the inputs below are INFO attribute files + self.etc_folder = os.path.join(self.input_folder_parent, "etc") + self.output_file = os.path.join(input_folder, "input", "a.vcf") + self.assembly = os.path.join(input_folder, "input", "zebrafish.fa") + + def test_set_up_logging(self): + pass diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..c1c6720 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,46 @@ +# TODO: 1 test to complete +import os +import unittest + +from convert_gvf_to_vcf.convertGVFtoVCF import generate_vcf_header_structured_lines +from convert_gvf_to_vcf.gvffeature import GvfFeatureline +from convert_gvf_to_vcf.utils import read_yaml, read_pragma_mapper, generate_symbolic_allele_dict, read_in_gvf_file +from convert_gvf_to_vcf.vcfline import VcfLine +from convert_gvf_to_vcf.lookup import Lookup + +class TestUtils(unittest.TestCase): + def setUp(self): + # Prepare Directories + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + self.etc_folder = os.path.join(self.input_folder_parent, "etc") + input_folder = os.path.dirname(__file__) + # Prepare Inputs + self.input_file = os.path.join(input_folder, "input", "zebrafish.gvf") + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + # Prepare Outputs + self.output_file = os.path.join(input_folder, "input", "a.vcf") + # Prepare References + self.assembly = os.path.join(input_folder, "input", "zebrafish.fa") + self.reference_lookup = Lookup(self.assembly) + + + def test_read_yaml(self): + test_yaml_dictionary = read_yaml(os.path.join(self.etc_folder, 'attribute_mapper.yaml')) + assert len(test_yaml_dictionary) > 0 + + def test_read_pragma_mapper(self): + pragma_to_vcf_header = read_pragma_mapper(os.path.join(self.etc_folder, 'pragma_mapper.tsv')) + assert len(pragma_to_vcf_header) > 0 + + def test_read_mapping_dictionary(self): + symbolic_allele_dictionary = generate_symbolic_allele_dict(self.reference_lookup.mapping_attribute_dict) + assert len(symbolic_allele_dictionary) > 0 + + def test_read_in_gvf_file(self): + gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + assert len(gvf_pragmas) > 1 + assert len(gvf_non_essential) > 1 + assert len(gvf_lines_obj_list) > 1 + + def test_build_iupac_ambiguity_code(self): + pass \ No newline at end of file diff --git a/tests/test_vcfline.py b/tests/test_vcfline.py new file mode 100644 index 0000000..9fb4f6a --- /dev/null +++ b/tests/test_vcfline.py @@ -0,0 +1,155 @@ +#TODO: 9 tests +import os +import unittest + +from convert_gvf_to_vcf.convertGVFtoVCF import generate_vcf_header_structured_lines, convert_gvf_features_to_vcf_objects, \ + generate_vcf_header_metainfo +from convert_gvf_to_vcf.gvffeature import GvfFeatureline +from convert_gvf_to_vcf.utils import read_yaml, generate_symbolic_allele_dict, read_in_gvf_file +from convert_gvf_to_vcf.vcfline import VcfLine +from convert_gvf_to_vcf.lookup import Lookup + + +class TestVcfline(unittest.TestCase): + def setUp(self): + # Prepare Directories + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + self.etc_folder = os.path.join(self.input_folder_parent, "etc") + input_folder = os.path.dirname(__file__) + # Prepare Inputs + self.input_file = os.path.join(input_folder, "input", "zebrafish.gvf") + self.input_folder_parent = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..', 'convert_gvf_to_vcf')) + # Prepare Outputs + self.output_file = os.path.join(input_folder, "input", "a.vcf") + # Prepare References + self.assembly = os.path.join(input_folder, "input", "zebrafish.fa") + self.reference_lookup = Lookup(self.assembly) + # Set up GVF line object + gvf_feature_line = "chromosome1 DGVa copy_number_loss 77 78 . + . ID=1;Name=nssv1412199;Alias=CNV28955;variant_call_so_id=SO:0001743;parent=nsv811094;Start_range=.,776614;End_range=786127,.;submitter_variant_call_id=CNV28955;sample_name=Wilds2-3;remap_score=.98857;Variant_seq=." + f_list = gvf_feature_line.split("\t") + gvf_line_object = GvfFeatureline(f_list[0], f_list[1], f_list[2], f_list[3], f_list[4], f_list[5], f_list[6], f_list[7], f_list[8]) + # Set up of data structures + # Dictionary of standard structured meta-information lines for this VCF file + lines_standard_alt = [] + lines_standard_info = [] + lines_standard_filter = [] + lines_standard_format = [] + self.standard_lines_dictionary = { + "ALT": lines_standard_alt, + "INFO": lines_standard_info, + "FILTER": lines_standard_filter, + "FORMAT": lines_standard_format, + } + + # Dictionary for all possible VCF meta-information lines + all_possible_info_lines = generate_vcf_header_structured_lines("INFO", self.reference_lookup.mapping_attribute_dict) + all_possible_alt_lines = generate_vcf_header_structured_lines("ALT", self.reference_lookup.mapping_attribute_dict) + all_possible_filter_lines = generate_vcf_header_structured_lines("FILTER", self.reference_lookup.mapping_attribute_dict) + all_possible_format_lines = generate_vcf_header_structured_lines("FORMAT", self.reference_lookup.mapping_attribute_dict) + self.all_possible_lines_dictionary = { + "ALT": all_possible_alt_lines, + "INFO": all_possible_info_lines, + "FILTER": all_possible_filter_lines, + "FORMAT": all_possible_format_lines, + } + + self.v = VcfLine(gvf_line_object, + self.standard_lines_dictionary, + self.all_possible_lines_dictionary, + self.reference_lookup) + + def test_add_padded_base(self): + test_ref = "A" + test_alt = "T" + padded_base, pos, ref, alt = self.v.add_padded_base(test_ref, test_alt, True, self.assembly) + assert padded_base is not None + assert pos is not None + assert ref is not None + assert alt is not None + + def test_convert_iupac_ambiguity_code(self): + ref_to_convert = "TAGD" + converted_ref_allele = self.v.convert_iupac_ambiguity_code(self.reference_lookup.iupac_ambiguity_dictionary, ref_to_convert) + assert converted_ref_allele not in ["R", "Y", "M", "K", "S", "D", "W", "H", "B", "V", "D", "N"] + + def test_check_ref(self): + + reference_allele_to_check = "TGCR" + new_ref = self.v.check_ref(reference_allele_to_check, self.reference_lookup) + iupac_code = ["R", "Y", "M", "K", "S", "D", "W", "H", "B", "V", "D", "N"] + assert all(code not in new_ref for code in iupac_code) + + def test_get_ref(self): + reference_allele = self.v.get_ref(self.reference_lookup) + assert len(reference_allele) != 0 + assert reference_allele == 'TA' + + def test_generate_symbolic_allele(self): + (output_symbolic_allele, + info_field, + output_lines_standard_alt, + output_lines_standard_info) = self.v.generate_symbolic_allele(self.standard_lines_dictionary, + self.all_possible_lines_dictionary, + self.reference_lookup.symbolic_allele_dictionary) + assert output_symbolic_allele == '' + print(info_field) + assert info_field == {'END': '78', 'IMPRECISE': None, 'CIPOS': None, 'CIEND': None, 'SVLEN': '1'} + assert output_lines_standard_alt == ['##ALT=', '##ALT='] + assert output_lines_standard_info == ['##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO=', '##INFO='] + + def test_get_alt(self): + alt_allele = self.v.get_alt(self.standard_lines_dictionary, self.all_possible_lines_dictionary, self.reference_lookup) + assert alt_allele == '' + + def test__str__(self): + pass + + def test_merge_and_add(self): + # previous="1" + # current ="2" + # delimiter =";" + # merged_string = merge_and_add(previous, current, delimiter) + # assert len(merged_string) > 1 + pass + + def test_put_GT_format_key_first(self): + pass + + def test_format_sample_values(self): + gvf_pragmas, gvf_non_essential, gvf_lines_obj_list = read_in_gvf_file(self.input_file) + # standard structured meta-information lines for this VCF file + ( + header_standard_lines_dictionary, vcf_data_lines, list_of_vcf_objects + ) = convert_gvf_features_to_vcf_objects(gvf_lines_obj_list, self.reference_lookup) + ( + unique_pragmas_to_add, samples, unique_alt_lines_to_add, unique_info_lines_to_add, + unique_filter_lines_to_add, unique_format_lines_to_add + ) = generate_vcf_header_metainfo(gvf_pragmas, gvf_non_essential, list_of_vcf_objects, + header_standard_lines_dictionary) + for vcf_obj in list_of_vcf_objects: + sample_name_dict_format_kv = vcf_obj.vcf_values_for_format + # sample_format_values_string = format_sample_values(sample_name_dict_format_kv, samples) + sample_format_values_list = vcf_obj.combine_format_values_by_sample(sample_name_dict_format_kv, samples) + assert isinstance(sample_format_values_list, list) + number_of_tokens_should_have = len(samples) + actual_number_of_tokens = len(sample_format_values_list) + assert actual_number_of_tokens == number_of_tokens_should_have, f"must have {number_of_tokens_should_have}" + assert sample_format_values_list == ['.:.', '.:.', '.:.', '0:1:3'], "List must match expected value" + + def test_info_list_to_dict(self): + pass + + def test_merge_info_dicts(self): + pass + + def test_merge_info_string(self): + pass + + def test_merge_format_keys(self): + pass + + def test_merge(self): + pass + + def test_keep(self): + pass