Commit aefd50d6 authored by Ivan Vanat's avatar Ivan Vanat
Browse files

fix bug in soloLtrs module

parent aef715bc
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
from typing import Type, Union, List

from nested.core.tools.tool_interface import ToolInterface
from nested.output.output_generators.base_output_generator import BaseOutputGenerator
from nested.output.output_generators.output_generator_interface import OutputGeneratorInterface


class Module:
    def __init__(self, name: str, tool: ToolInterface, element_type: Type,
                 output_generator: Union[BaseOutputGenerator, List[BaseOutputGenerator]]):
                 output_generator: Union[OutputGeneratorInterface, List[OutputGeneratorInterface]]):
        self.name = name
        self.tool = tool
        self.element_type = element_type
+25 −31
Original line number Diff line number Diff line
#!/usr/bin/env python3
import os
import subprocess
from typing import List, Type
import uuid
from typing import List, NoReturn, Generator, Set, Dict

from io import StringIO

@@ -11,9 +12,9 @@ from Bio.Blast import NCBIXML
from Bio.Blast.Applications import NcbiblastnCommandline

from nested.core.tools.tool_interface import ToolInterface
from nested.entities.base_element import BaseElement
from nested.output.output_objects.solo_ltrs_output_object import SoloLtrsOutputObject
from nested.entities.solo_ltr import SoloLtr
from nested.entities.te import TE
from nested.config.config import config
import nested.utils.intervals as intervals
import nested.utils.file as file
@@ -25,48 +26,37 @@ class SoloLtrs(ToolInterface):

    def __init__(self, sequence: SeqRecord):
        super().__init__(sequence)
        self.ltrs_fasta_path = f"{os.getcwd()}/{self.seqId}_ltrs.fa"
        self.intermediary_path = f"/tmp/nested/{str(uuid.uuid4())}/"
        self.ltrs_fasta_path = f"{self.intermediary_path}{self.seqId}_ltrs.fa"
        self.ltr_lengths = {}
        self._create_intermediary()

    def run(self, sequence: Seq) -> List[Type[BaseElement]]:
    def run(self, sequence: Seq) -> List[SoloLtr]:
        self.sequence = sequence
        self.save_spliced_sequence()
        solo_ltrs = []

        if os.path.exists(self.ltrs_fasta_path) and len(self.ltr_lengths.keys()) > 0:
            subprocess.run(["makeblastdb", "-in", f"{self.seqId}_ltrs.fa", "-dbtype", "nucl", "-out",
                            f"{self.seqId}_ltrs.fa"],
            subprocess.run(["makeblastdb", "-in", self.ltrs_fasta_path, "-dbtype", "nucl", "-out",
                            self.ltrs_fasta_path],
                           stdout=subprocess.DEVNULL)
            blast_records = self.run_blastn()
            solo_ltrs = self._filter_intersections(self._parse_blast(blast_records))
            self.cleanup()

        self.output_object = SoloLtrsOutputObject(self.seqId, self.sequence, solo_ltrs)
        self.output_object = SoloLtrsOutputObject(self.seqId, self.sequence, solo_ltrs, self.intermediary_path)

        return solo_ltrs

    def find_solo_ltrs(self, sequence):
        self.sequence = sequence
        self.save_spliced_sequence()
        if os.path.exists(self.ltrs_fasta_path) and len(self.ltr_lengths.keys()) > 0:
            subprocess.run(["makeblastdb", "-in", f"{self.seqId}_ltrs.fa", "-dbtype", "nucl", "-out",
                            f"{self.seqId}_ltrs.fa"],
                           stdout=subprocess.DEVNULL)
            blast_records = self.run_blastn()
            solo_ltrs = self._filter_intersections(self._parse_blast(blast_records))
            self.cleanup()

            return solo_ltrs

    def save_spliced_sequence(self):
        with open(f"{self.seqId}_spliced.fa", "w") as file:
    def save_spliced_sequence(self) -> NoReturn:
        with open(f"{self.intermediary_path}{self.seqId}_spliced.fa", "w") as file:
            file.write(f">{self.seqId}_spliced\n")
            file.write(str(self.sequence) + "\n")

    def create_ltr_multifasta(self, tes, seq):
    def create_ltr_multifasta(self, tes: List[TE], seq: Seq) -> NoReturn:
        offset = len(self.ltr_lengths.keys())
        num = 1
        with open(f"{self.seqId}_ltrs.fa", "a+") as file:
        with open(self.ltrs_fasta_path, "a+") as file:
            for i in range(len(self.ltr_lengths.keys()), len(self.ltr_lengths.keys()) + len(tes)):
                if (intervals.length(tes[i - offset].features["ltr_left"].location) < 100
                        or "tsr_left" not in tes[i - offset].features.keys()
@@ -80,7 +70,7 @@ class SoloLtrs(ToolInterface):
                    str(seq[tes[i - offset].features["ltr_left"].location[0]:tes[i - offset].features["ltr_left"].location[1] + 1]))
                num += 1

    def run_blastn(self):
    def run_blastn(self) -> Generator:
        args = dict(config["blastx"]["args"])
        args["db"] = self.ltrs_fasta_path.replace("|", "\|")
        args["word_size"] = 10
@@ -91,7 +81,7 @@ class SoloLtrs(ToolInterface):

        return NCBIXML.parse(StringIO(xml_out))

    def _parse_blast(self, records):
    def _parse_blast(self, records: Generator) -> List[SoloLtr]:
        result = []
        try:
            blast_record = next(records).alignments
@@ -113,7 +103,7 @@ class SoloLtrs(ToolInterface):
                    result.append(solo_ltr)
        return result

    def _filter_intersections(self, solo_ltrs):
    def _filter_intersections(self, solo_ltrs: List[SoloLtr]) -> List[SoloLtr]:
        result = set()
        work_list = set(solo_ltrs)
        for ltr in solo_ltrs:
@@ -127,22 +117,26 @@ class SoloLtrs(ToolInterface):

        return list(result)

    def _get_intersections(self, ltr, ltrs):
    def _get_intersections(self, ltr: SoloLtr, ltrs: Set[SoloLtr]) -> Dict[SoloLtr, float]:
        result = {ltr: ltr.score * ltr.bits}
        for ltr2 in ltrs:
            if intervals.intersect(ltr.location, ltr2.location):
                result[ltr2] = ltr2.score * ltr2.bits
        return result

    def _best_hit(self, intersections):
    def _best_hit(self, intersections: Dict[SoloLtr, float]) -> SoloLtr:
        return max(intersections, key=lambda x: intersections[x])

    def _update_result_with_best_hit(self, hit, result):
    def _update_result_with_best_hit(self, hit: SoloLtr, result: Set[SoloLtr]) -> NoReturn:
        intersections = self._get_intersections(hit, result)
        real_best = self._best_hit(intersections)
        result.difference(intersections.keys())
        result.add(real_best)

    def cleanup(self):
    def _create_intermediary(self) -> NoReturn:
        if not os.path.exists(self.intermediary_path):
            os.makedirs(self.intermediary_path)

    def cleanup(self) -> NoReturn:
        for extension in self.cleanup_extensions:
            file.delete_if_exists(f"{self.ltrs_fasta_path}.{extension}")
+9 −7
Original line number Diff line number Diff line
@@ -28,11 +28,13 @@ class SoloLtrsOutputGenerator(OutputGeneratorInterface):
                          + "\n")
                i += 1

        self._move_ltrs_spliced(output_object.sequence_id)
        self._move_ltrs_spliced(output_object.intermediary_path, output_object.sequence_id)

    def _move_ltrs_spliced(self, sequence_id: str):
        if os.path.exists(f"{sequence_id}_ltrs.fa"):
            shutil.move(f"{sequence_id}_ltrs.fa",
                        f"{self.directory}/{sequence_id}/{sequence_id}_ltrs.fa")
        if os.path.exists(f"{sequence_id}_spliced.fa"):
            shutil.move(f"{sequence_id}_spliced.fa", f"{self.directory}/{sequence_id}/{sequence_id}_spliced.fa")
    def _move_ltrs_spliced(self, path: str, sequence_id: str):
        ltrs_path = f"{path}{sequence_id}_ltrs.fa"
        spliced_path = f"{path}{sequence_id}_spliced.fa"

        if os.path.exists(ltrs_path):
            shutil.move(ltrs_path, f"{self.directory}/{sequence_id}/{sequence_id}_ltrs.fa")
        if os.path.exists(spliced_path):
            shutil.move(spliced_path, f"{self.directory}/{sequence_id}/{sequence_id}_spliced.fa")
+2 −1
Original line number Diff line number Diff line
@@ -8,5 +8,6 @@ from nested.entities.solo_ltr import SoloLtr


class SoloLtrsOutputObject(BaseOutputObject):
    def __init__(self, sequence_id: str, sequence: Seq, elements: List[SoloLtr]):
    def __init__(self, sequence_id: str, sequence: Seq, elements: List[SoloLtr], intermediary_path: str):
        super().__init__(sequence_id, sequence, elements)
        self.intermediary_path = intermediary_path