Source code for suvtk.taxonomy

"""
taxonomy.py
===========

This script assigns taxonomy to sequences using MMseqs2 from the ICTV nr database.
It generates taxonomy files and integrates with other modules for further processing.

Functions
---------
taxonomy(fasta_file, database, output_path, seqid, threads)
    Main command to assign taxonomy to sequences.
"""

# TODO: Change to genomad taxonomy
import os
import re
import shutil

import pandas as pd
import psutil
import rich_click as click

from suvtk import utils, virus_info


[docs] def parse_memlimit(mem_str): """ Parse memory string like '30G', '40000M', '32GB' into bytes. Returns an integer number of bytes. """ mem_str = mem_str.strip().upper() match = re.match(r"(\d+(?:\.\d+)?)([GMK]?)B?$", mem_str) if not match: raise ValueError(f"Invalid memory limit format: '{mem_str}'") value, unit = match.groups() value = float(value) if unit == "G": value *= 1024**3 elif unit == "M": value *= 1024**2 elif unit == "K": value *= 1024 # default = bytes return int(value)
@click.command(short_help="Assign virus taxonomy to sequences.") @click.option( "-i", "--input", "fasta_file", required=True, type=click.Path(exists=True), help="Input fasta file", ) @click.option( "-o", "--output", "output_path", required=True, type=click.Path(exists=False), help="Output directory", ) @click.option( "-d", "--database", "database", required=True, type=click.Path(exists=True), help="Path to the suvtk database folder.", ) @click.option( "-s", "--identity", "seqid", required=False, default=0.7, type=float, help="Minimum sequence identity for hits to be considered", ) @click.option( "-t", "--threads", "threads", required=False, default=utils.get_available_cpus(), type=int, help="Number of threads to use", ) @click.option( "--memory-limit", "memlimit", required=False, default=None, type=str, help="Memory limit for MMseqs2 (e.g. 4G, 2000M). Defaults to available system memory.", ) def taxonomy(fasta_file, database, output_path, seqid, threads, memlimit): """ This command uses MMseqs2 to assign taxonomy to sequences using protein sequences from ICTV taxa in the nr database. """ if os.path.exists(output_path): click.echo( f"Warning: Output directory '{output_path}' already exists and will be overwritten." ) os.makedirs(output_path, exist_ok=True) taxresult_path = os.path.join(output_path, "taxresults") # TODO Add error handling # TODO removing tmp before running mmseqs might be dangerous(?) if os.path.exists("tmp"): shutil.rmtree("tmp") # Auto-detect memory if not provided if not memlimit: total_mem_gb = psutil.virtual_memory().available / (1024**3) # Leave a little headroom memlimit = f"{int(total_mem_gb) - 1}G" # Validate memory limit (at least 30G) try: mem_bytes = parse_memlimit(memlimit) except ValueError as e: raise click.BadParameter(str(e)) if mem_bytes < 30 * 1024**3: raise click.BadParameter( f"Memory limit too low ({memlimit}). Must be at least 30G." ) Cmd = "mmseqs easy-taxonomy " Cmd += f"{fasta_file} " # input Cmd += os.path.join(database, "ictv_nr_db") + " " # database Cmd += f"{taxresult_path} " # output Cmd += "tmp " # tmp Cmd += "-s 7.5 --blacklist '' --tax-lineage 1 " Cmd += f"--threads {threads} " Cmd += f"--split-memory-limit {memlimit} " utils.Exec(Cmd) shutil.rmtree("tmp") taxonomy = utils.safe_read_csv(f"{taxresult_path}_lca.tsv", sep="\t", header=None) taxonomy.rename( { 0: "query", 1: "taxid", 2: "rank", 3: "name", 4: "fragments", 5: "assigned", 6: "agreement", 7: "support", 8: "lineage", }, axis=1, inplace=True, ) tophit = utils.safe_read_csv(f"{taxresult_path}_tophit_aln", sep="\t", header=None) tophit.rename( { 0: "query", 1: "target", 2: "pident", 3: "len", 4: "mismatch", 5: "gapopen", 6: "qstart", 7: "qend", 8: "tstart", 9: "tend", 10: "evalue", 11: "bits", }, axis=1, inplace=True, ) # Select top hits highest_bits_idx = tophit.groupby("query")["bits"].idxmax() top_tophit = tophit.loc[highest_bits_idx] merged = pd.merge(taxonomy, top_tophit, on="query", how="left") tax_names = [] for index, row in merged.iterrows(): if row["rank"] == "no rank": click.echo(f"No taxonomy for {row['query']}") last_known = "unclassified viruses" elif row["rank"] == "species": # Fix issue when species contains sp. last_known = row["lineage"].split(";")[-2].replace("g_", "") last_known += " sp." elif ( row["rank"] == "genus" and row["pident"] < seqid # TODO check best cutoff ): # if genus rank and sequence identity is lower than 70% (seqid) get family assignment last_known = row["lineage"].split(";")[-2].replace("f_", "") last_known += " sp." else: last_known = row["name"].strip() last_known += " sp." tax_names.append( [ row["query"], last_known, ] ) tax_df = pd.DataFrame(tax_names, columns=["contig", "taxonomy"]) tax_df.to_csv(os.path.join(output_path, "taxonomy.tsv"), sep="\t", index=False) virus_info.run_segment_info(tax_df, database, output_path) if __name__ == "__main__": taxonomy()