Commit 8b993eeb authored by Ivan Vanat's avatar Ivan Vanat
Browse files

WIP: input sequence windows

parent 3fedaecf
Loading
Loading
Loading
Loading
+50 −30
Original line number Diff line number Diff line
@@ -3,20 +3,21 @@ import os
import click
import glob
import shutil
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from subprocess import CalledProcessError
from typing import NoReturn, List

from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord

from nested.config.config import config
from nested.core.tools.ltr.discovery_tool import DiscoveryTool
from nested.core.executor import Executor
from nested.core.sequence_analyzer import SequenceAnalyzer
from nested.core.module_management.module_manager import ModuleManager


available_modules = "\n".join(f"{module} - {config['tool_order'][module]}" for module in config["tool_order"].keys())


@@ -36,21 +37,36 @@ available_modules = "\n".join(f"{module} - {config['tool_order'][module]}" for m
@click.option("--module_order", "-mo", default="", help=f"String containing space separated module numbers in order "
                                                        f"they should be run. Available modules in config file:\n"
                                                        f"{available_modules}")
@click.option("--window_size", "-ws", type=int, default=0)
@click.option("--window_overlap", "-wo", type=int, default=50000)
def main(input_fasta: str, sketch: bool, format: str, output_fasta_offset: int,
         output_folder: str, initial_threshold: int, threshold_multiplier: float,
         threads: int, discovery_tool: str, module_order: str) -> int:
         threads: int, discovery_tool: str, module_order: str,
         window_size: int, window_overlap: int) -> int:
    check_ram(input_fasta, threads)
    check_permissions(output_folder, os.W_OK | os.X_OK)
    number_of_errors = [0]
    start_time = datetime.now()
    futuress = []

    with futures.ThreadPoolExecutor(threads) as executor:
    with ThreadPoolExecutor(threads) as executor:
        for sequence in SeqIO.parse(input_fasta, "fasta"):
            futuress.append(executor.submit(process_sequence, sequence, sketch, format, output_fasta_offset,
                                            output_folder, initial_threshold, threshold_multiplier, number_of_errors,
                                            discovery_tool, module_order))
        futures.wait(futuress)
            process_sequence(window, sketch, format, output_fasta_offset,
                             output_folder, initial_threshold, threshold_multiplier,
                             number_of_errors, discovery_tool, module_order,
                             window_size, window_overlap, executor)
            # if window_size > 0:
            #     for window in to_windows(sequence, window_size, window_overlap):
            #         futuress.append(executor.submit(process_sequence, window, sketch, format, output_fasta_offset,
            #                                         output_folder, initial_threshold, threshold_multiplier,
            #                                         number_of_errors,
            #                                         discovery_tool, module_order))
            # else:
            #     futuress.append(executor.submit(process_sequence, sequence, sketch, format, output_fasta_offset,
            #                                 output_folder, initial_threshold, threshold_multiplier, number_of_errors,
            #                                 discovery_tool, module_order))
        executor.shutdown(wait=True, cancel_futures=False)
        # futures.wait(futuress)

    shutil.rmtree("/tmp/nested/", ignore_errors=True)
    end_time = datetime.now()
@@ -60,32 +76,36 @@ def main(input_fasta: str, sketch: bool, format: str, output_fasta_offset: int,
    return 0


def process_sequence(sequence: Seq, sketch: bool, format: str,
def process_sequence(sequence: SeqRecord, sketch: bool, format: str,
                     output_fasta_offset: int, output_folder: str, initial_threshold: int,
                     threshold_multiplier: float, errors: List[int],
                     discovery_tool: str, module_order: str) -> NoReturn:
                     discovery_tool: str, module_order: str,
                     window_size: int, window_overlap: int, executor: ThreadPoolExecutor) -> NoReturn:
    sequence.id = sequence.id.replace("/", "--")
    arguments = locals()
    modules = ModuleManager(arguments)
    seq_start_time = datetime.now()
    strlen = 15
    print(f"Processing {sequence.id}")
    try:
        executor = Executor(sequence, modules.modules)
        executor.run_main_loop()

        seq_end_time = datetime.now()
        print(f"Processing {sequence.id}: DONE [{seq_end_time - seq_start_time}]")
    except KeyboardInterrupt:
        raise
    except CalledProcessError as ex:
        errors[0] += 1
        print(f"Processing {sequence.id[:strlen]}: SUBPROCESS ERROR: {ex}")
    except Exception as ex:
        errors[0] += 1
        print(f"Processing {sequence.id[:strlen]}: UNEXPECTED ERROR: {ex}")
    finally:
        cleanup()
    # modules = ModuleManager(arguments)
    analyzer = SequenceAnalyzer(executor, sequence, arguments, window_size, window_overlap, errors)
    analyzer.analyze()

    # seq_start_time = datetime.now()
    # strlen = 15
    # print(f"Processing {sequence.id}")
    # try:
    #     executor = Executor(sequence, modules.modules)
    #     executor.run_main_loop()
    #
    #     seq_end_time = datetime.now()
    #     print(f"Processing {sequence.id}: DONE [{seq_end_time - seq_start_time}]")
    # except KeyboardInterrupt:
    #     raise
    # except CalledProcessError as ex:
    #     errors[0] += 1
    #     print(f"Processing {sequence.id[:strlen]}: SUBPROCESS ERROR: {ex}")
    # except Exception as ex:
    #     errors[0] += 1
    #     print(f"Processing {sequence.id[:strlen]}: UNEXPECTED ERROR: {ex}")
    # finally:
    #     cleanup()


def check_permissions(path: str, permissions: int) -> NoReturn:
+1 −0
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ from Bio import SeqRecord

from nested.entities.base_element import BaseElement
from nested.core.module_management.module import Module
from nested.core.tools.nester.nester import Nester


class Executor:
+80 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
import glob
import os

from typing import List, Dict, NoReturn
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future
from concurrent import futures
from datetime import datetime
from subprocess import CalledProcessError

from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord

import nested.utils.window_utils as window_utils
from nested.core.module_management.module_manager import ModuleManager
from nested.core.executor import Executor

from nested.core.window import Window
from nested.entities.base_element import BaseElement


class SequenceAnalyzer:
    def __init__(self, threadpool_executor: ThreadPoolExecutor, sequence: SeqRecord, modules_arguments: Dict[str, any],
                 window_size: int, window_overlap: int, errors: List[int]):
        sequence.id = sequence.id.replace("/", "--")
        self.jobs: List[Future] = []
        self.elements: List[List[BaseElement]] = []
        self.threadpool_executor = threadpool_executor
        self.sequence = sequence
        self.modules_arguments = modules_arguments
        self.window_size = window_size
        self.window_overlap = window_overlap
        self.errors = errors

    def analyze(self):
        seq_start_time = datetime.now()
        print(f"Processing {self.sequence.id}")

        for index, window in enumerate(window_utils.to_windows(self.sequence, self.window_size, self.window_overlap)):
            job = self.threadpool_executor.submit(self._process_window, window, index)
            self.jobs.append(job)
        futures.wait(self.jobs)

        # output
        for module in self.modules:
            if hasattr(module.output_generator, "__iter__"):
                for generator in module.output_generator:
                    generator.generate_output(module.tool.output_object)
            else:
                module.output_generator.generate_output(module.tool.output_object)

        seq_end_time = datetime.now()
        print(f"Processing {self.sequence.id}: DONE [{seq_end_time - seq_start_time}]")

    def _process_window(self, window: Window, index: int) -> NoReturn:
        strlen = 15
        try:
            modules_manager = ModuleManager(self.modules_arguments)
            executor = Executor(window.sequence, modules_manager.modules)
            elements = executor.run_main_loop()

            for element in elements:
                element.location[0] += window.offset

            self.elements += elements
        except KeyboardInterrupt:
            raise
        except CalledProcessError as ex:
            self.errors[0] += 1
            print(f"Processing {window.sequence.id[:strlen]} window {index + 1}: SUBPROCESS ERROR: {ex}")
        except Exception as ex:
            self.errors[0] += 1
            print(f"Processing {window.sequence.id[:strlen]} window {index + 1}: UNEXPECTED ERROR: {ex}")
        finally:
            self._cleanup()

    def _cleanup(self) -> NoReturn:
        for file in glob.glob("*ltrs.fa"):
            os.remove(file)

nested/core/window.py

0 → 100644
+7 −0
Original line number Diff line number Diff line
from Bio.SeqRecord import SeqRecord


class Window:
    def __init__(self, sequence: SeqRecord, offset: int):
        self.sequence = sequence
        self.offset = offset
+36 −0
Original line number Diff line number Diff line
from Bio.SeqRecord import SeqRecord
from typing import Generator, List

from nested.core.window import Window
from nested.entities.base_element import BaseElement


def to_windows(sequence: SeqRecord, window_size: int, window_overlap: int) -> Generator[Window]:
    start = 0
    end = window_size

    while end < len(sequence):
        seqrecord = SeqRecord(sequence.seq[start:end], sequence.id)
        yield Window(seqrecord, start)
        start = end - window_overlap
        end = start + window_size

    yield Window(SeqRecord(sequence.seq[start:len(sequence)], sequence.id), start)


def merge_windows(window1: List[BaseElement], window2: List[BaseElement]) -> List[BaseElement]:
    if window1[-1].location[0] < window2[-1].location[0]:
        first, second = window1, window2
    else:
        first, second = window2, window1

    first_end_index, second_start_index = find_first_common_element_indexes(first, second)

    return first[:first_end_index] + second[second_start_index:]


def find_first_common_element_indexes(window1: List[BaseElement], window2: List[BaseElement]) -> tuple[int, int]:
    for i in range(len(window1)):
        for j in range(len(window2)):
            if window1[i].location == window2[i].location:
                return i, j