Commit 343f63fc authored by jedlickap's avatar jedlickap
Browse files

add python scripts

parent 4807aefb
Loading
Loading
Loading
Loading
+64 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python3
import os
import subprocess
from typing import List

from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq

from nested.core.tools.tool_interface import ToolInterface
from nested.core.tools.nester.nester_domain_finder import NesterDomainFinder	# odtud budu volat statickou metodu 'run_blastx(sequence: Seq)'
from nested.entities.domain_dante import DomainDante # tato trida dedi 'BaseElement' a ma vsechny potrebne tributy, takze ji primo vyuziji a nemusim definovat novou
from nested.output.output_objects.retro_domain_output_object import RetroDomainOutputObject
# from nested.config.config import config, args_dict_to_list toto neni nejspis nutne, kdyz parametry pro blastx budu volat z 'run_blastx()'
from nested.utils import intervals


class RetroTeDomains(ToolInterface):
    def __init__(self, sequence: SeqRecord):
        super().__init__(sequence)

    def run(self, sequence: Seq) -> List[DomainDante]:
        if not os.path.exists("/tmp/nested"):
            os.makedirs("/tmp/nested")

        if not os.path.exists("/tmp/nested/retro_domain"):
            os.makedirs("/tmp/nested/retro_domain")

        with open(f"/tmp/nested/retro_domain/{self.seqId}.fa", "w+") as tmp_file:
            SeqIO.write(SeqRecord(sequence, id=self.seqId),
                        tmp_file,
                        "fasta")

        # tady se bude volat 'NesterDomainFinder', ktery vraci list objektu 'DomainDante' obohacene o atributy: domain.type and domain.annotation. Jmeno navratoveho listu je: 'domains'
        #candidates = NesterDomainFinder.run_blastx(f"/tmp/nested/retro_domain/{self.seqId}.fa")
        candidates = NesterDomainFinder.run_blastx(sequence)
        
        retro_domains = self._filter_candidates(candidates)

        retro_domains.sort(key=lambda e: e.location[0])
        self.output_object = RetroDomainOutputObject(self.seqId, sequence, retro_domains)
        return retro_domains

    def _filter_candidates(self, candidates: List[DomainDante]) -> List[DomainDante]:
        retro_domains = []
        # filter on matches with evalue 0.1 and lower
        candidates = [candidate for candidate in candidates if candidate.evalue <= 0.1]
        # sort after location and score
        candidates.sort(key=lambda e: (e.location, -e.score))
        for candidate in candidates:
            add = True
            for domain in retro_domains:
                if intervals.is_intersect(domain.location, candidate.location):
                    if domain.score > candidate.score:
                        add = False
                        break
                    else:
                       retro_domains.remove(domain) 
            if add:
                retro_domains.append(candidate)

        return retro_domains
    
    
 No newline at end of file
+39 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python3
from typing import NoReturn

from nested.output.output_generators.base_output_generator import BaseOutputGenerator
from nested.output.output_objects.retro_domain_output_object import RetroDomainOutputObject
from nested.config.config import config

class RetroDomainOutputGenerator(BaseOutputGenerator):
    def __init__(self, directory: str):
        super().__init__(directory)

    def generate_output(self, output_object: RetroDomainOutputObject) -> NoReturn:
        self.create_data_directory(output_object.sequence_id)
        
        with open(f"{self.directory}/{output_object.sequence_id}/{output_object.sequence_id}_retro_domain.gff", "w+") as gff:
            i = 0
            gff.write("##gff-version 3\n")
            for domain in output_object.elements:
                domain_location = domain.location
                sign = (lambda x: x and (1, -1)[x < 0])(domain.frame[0])
                strand = "+"
                if sign < 0:
                    domain_location = [domain_location[1], domain_location[0]]
                    strand = '-'                
                gff.write(output_object.sequence_id + "\t"
                          + "retroTeDomain\tpolypeptide_conserved_region\t"
                          + str(domain_location[0]) + "\t"
                          + str(domain_location[1]) + "\t"
                          + ".\t" 
                          + strand + "\t"
                          + ".\t"
                          + "ID=DOMAIN_{};".format(str(i)) 
                          + "name={};".format(domain.type)
                          + "annot={};".format(domain.annotation)
                          + "evalue={};".format(domain.evalue)
                          + "score={};".format(domain.score)
                          + "color={}".format(config["igv_colors"][domain.type])
                          + "\n")
                i += 1
+12 −0
Original line number Original line Diff line number Diff line
#!/usr/bin/env python3
from typing import List

from Bio.Seq import Seq

from nested.output.output_objects.base_output_object import BaseOutputObject
from nested.entities.domain_dante import DomainDante


class RetroDomainOutputObject(BaseOutputObject):
    def __init__(self, sequence_id: str, sequence: Seq, elements: List[DomainDante]):
        super().__init__(sequence_id, sequence, elements)