Loading nested/cli/nester.py +2 −1 Original line number Original line Diff line number Diff line Loading @@ -32,6 +32,7 @@ def main(input_fasta, sketch, format, output_fasta_offset): #try: #try: nester = Nester(sequence, i) nester = Nester(sequence, i) sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset, format=format) sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset, format=format) sketcher.create_solo_ltr_gff(nester.solo_ltrs) if sketch: if sketch: if format != 'default': if format != 'default': sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset) sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset) Loading nested/core/domain_dante.py 0 → 100644 +86 −0 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 from io import StringIO from Bio.Blast import NCBIXML from Bio.Blast.Applications import NcbiblastxCommandline from nested.config.config import config class DomainDante(object): """Class representing domain parsed from blastx output. Atrributes: evalue (float): evalue (same as blastx output) frame (tuple): frame (same as blastx output) score (float): score (same as blastx output) title (str): title (same as blastx output) location (list): location [from, to] (from > to for negative strand) type (str): type of domain (AP, GAG, INT, ...) """ def __init__(self, evalue=None, frame=None, score=None, title=None, location=None, domainType=None, annotation=None): self.evalue = evalue self.frame = frame self.score = score self.title = title self.location = location self.type = domainType self.annotation = annotation def __str__(self): strlen = 15 lines = ['{{title = {a}{b},'.format(a=self.title[:strlen], b='...' if len(self.title) > strlen else ''), ' type = {},'.format(self.type), ' location = {},'.format(self.location), ' evalue = {},'.format(self.evalue), ' frame = {},'.format(self.frame), ' score = {}}}'.format(self.score)] return '\n'.join(lines) def run_blastx(sequence): """Run blastx and get the list of found domains Arguments: sequence (Bio.Seq.Seq): sequence Returns: list[Domain]: list of matched domains """ domains = [] args = dict(config['blastx']['args']) args['query'] = "DANTE" blastx_cline = NcbiblastxCommandline(**config['blastx']['args']) xml_out, stderr = blastx_cline(stdin=str(sequence)) #output = open("xml_output_blast.xml", "a+").write(str(xml_out)) blast_records = NCBIXML.parse(StringIO(xml_out)) try: blast_record = next(blast_records).alignments except ValueError: return domains if blast_record: for alignment in blast_record: for hsp in alignment.hsps: domain = DomainDante(evalue=hsp.expect, frame=hsp.frame, score=hsp.score, title=alignment.title) if hsp.frame[0] < 0: # complementary strand domain.location = [hsp.query_start + 3 * (hsp.align_length - hsp.gaps), hsp.query_start] else: domain.location = [hsp.query_start, hsp.query_start + 3 * (hsp.align_length - hsp.gaps)] #domain.type = alignment.title.split(' ')[1].split('_')[0] domain.type = alignment.title.split('__')[0].split('-')[1] domain.annotation = alignment.title.split('__')[1] #print(domain.type, domain.location) domains.append(domain) return domains nested/core/nester.py +25 −2 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 #!/usr/bin/env python3 import math from nested.utils import intervals from nested.utils import intervals from nested.core.gene import Gene from nested.core.gene import Gene from nested.core.nested_element import NestedElement from nested.core.nested_element import NestedElement from nested.core.solo_ltrs import SoloLtrs from nested.logging.logger import NesterLogger from nested.logging.logger import NesterLogger from nested.config.config import config from nested.config.config import config Loading @@ -21,6 +24,7 @@ class Nester(object): self.seqid = sequence.id self.seqid = sequence.id self.sequence = sequence.seq self.sequence = sequence.seq self.nested_element = None self.nested_element = None self.solo_ltrs = SoloLtrs(sequence.id) self._iteration = 0 self._iteration = 0 self._logger = NesterLogger('{}/{}'.format(config['logdir'], self.seqid)) self._logger = NesterLogger('{}/{}'.format(config['logdir'], self.seqid)) self._find_nesting() self._find_nesting() Loading @@ -28,6 +32,9 @@ class Nester(object): def _find_nesting(self): def _find_nesting(self): nested_list = self._get_unexpanded_transposon_list(self.sequence) # find list of nested transposons nested_list = self._get_unexpanded_transposon_list(self.sequence) # find list of nested transposons nested_list = self._expand_transposon_list(nested_list) nested_list = self._expand_transposon_list(nested_list) nested_list = self._filter_nested_list(nested_list) self.nested_element = NestedElement(self.seqid, self.sequence, nested_list) self.nested_element = NestedElement(self.seqid, self.sequence, nested_list) def _get_unexpanded_transposon_list(self, sequence): def _get_unexpanded_transposon_list(self, sequence): Loading @@ -38,7 +45,12 @@ class Nester(object): if not candidates: if not candidates: best_candidate = gene.get_best_candidate() best_candidate = gene.get_best_candidate() if not best_candidate: if not best_candidate: return [] # call blastn here self.solo_ltrs.find_solo_ltrs(sequence) self.solo_ltrs.solo_ltrs.sort(key=lambda x: x.location[0], reverse=True) print(len(self.solo_ltrs.solo_ltrs)) return self.solo_ltrs.solo_ltrs # return [] candidates = [best_candidate] candidates = [best_candidate] # sort by score (from highest) # sort by score (from highest) Loading @@ -64,6 +76,8 @@ class Nester(object): # crop from sequence # crop from sequence cropped_sequence = sequence cropped_sequence = sequence self.solo_ltrs.create_ltr_multifasta(nested_list, sequence) for element in nested_list: for element in nested_list: cropped_sequence = cropped_sequence[:(element.location[0])] + cropped_sequence[ cropped_sequence = cropped_sequence[:(element.location[0])] + cropped_sequence[ (element.location[1]):] #originally:"(element.location[0] - 1)" and "(element.location[1] + 1):]" (element.location[1]):] #originally:"(element.location[0] - 1)" and "(element.location[1] + 1):]" Loading Loading @@ -96,3 +110,12 @@ class Nester(object): nested_list[j].tsr_right = intervals.expand(nested_list[i].location, nested_list[j].tsr_right = intervals.expand(nested_list[i].location, nested_list[j].tsr_right) nested_list[j].tsr_right) return nested_list return nested_list def _filter_nested_list(self, nested_list): result = [] for te in nested_list: if not math.isnan(te.ltr_left_location[0]): result.append(te) return result nested/core/solo_ltr.py 0 → 100644 +21 −0 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 from nested.core.te import TE class SoloLtr(TE): def __init__(self, location = None, score = None, bits = None): TE.__init__(self) self.location = location self.score = score self.bits = bits self.ltr_left_location = [float('nan'), float('nan')] self.ltr_right_location = [float('nan'), float('nan')] self.tsr_left = [float('nan'), float('nan')] self.tsr_right = [float('nan'), float('nan')] self.features = { "domains": [], "ppt": [float('nan'), float('nan')], "pbs": [float('nan'), float('nan')] } nested/core/solo_ltrs.py 0 → 100644 +125 −0 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 import os import subprocess import math from io import StringIO from Bio import SeqIO from Bio.Blast import NCBIXML from Bio.Blast.Applications import NcbiblastnCommandline from nested.config.config import config from nested.core.solo_ltr import SoloLtr import nested.utils.intervals as intervals class SoloLtrs(object): def __init__(self, seqId): self.sequence = "" self.seqId = seqId self.solo_ltrs = [] self.ltrs_fasta_path = '{}/ltrs.fa'.format(os.getcwd()) self.ltr_lengths = {} def find_solo_ltrs(self, sequence): self.sequence = sequence #open("seq_test.fa", 'w').write(str(self.sequence)) if os.path.exists(self.ltrs_fasta_path): subprocess.run(["makeblastdb", "-in", "ltrs.fa", "-dbtype", "nucl", "-out", "ltrs.fa"], stdout=subprocess.DEVNULL) blast_records = self.run_blastn() self._filter_intersections(self._parse_blast(blast_records)) #print("final:", len(self.solo_ltrs)) self.cleanup() def create_ltr_multifasta(self, tes, seq): offset = len(self.ltr_lengths.keys()) with open("ltrs.fa", 'a+') as file: for i in range(len(self.ltr_lengths.keys()), len(self.ltr_lengths.keys()) + len(tes)): if (intervals.length(tes[i - offset].ltr_left_location) < 100 or math.isnan(tes[i - offset].tsr_left[0]) or math.isnan(tes[i - offset].tsr_right[0])): continue file.write(">ltr_" + str(i) + "\n") file.write(str(seq[tes[i - offset].ltr_left_location[0]:tes[i - offset].ltr_left_location[1] + 1]) + "\n") self.ltr_lengths["ltr_" + str(i)] = len(str(seq[tes[i - offset].ltr_left_location[0]:tes[i - offset].ltr_left_location[1] + 1])) def run_blastn(self): args = dict(config['blastx']['args']) args['db'] = self.ltrs_fasta_path args['word_size'] = 10 args['evalue'] = 1e-5 #args['query'] = "seq_test.fa" blastn_cline = NcbiblastnCommandline(**args) # stdin=str(self.sequence) <- capnut do blastn_cline xml_out, stderr = blastn_cline(stdin=str(self.sequence)) output = open("out.xml", 'w') output.write(str(xml_out)) output.close() return NCBIXML.parse(StringIO(xml_out)) def _parse_blast(self, records): result = [] try: blast_record = next(records).alignments except ValueError: return result if blast_record: for alignment in blast_record: for hsp in alignment.hsps: if not (hsp.align_length / self.ltr_lengths[alignment.hit_def] >= 0.95): continue solo_ltr = SoloLtr(score = hsp.score, bits = hsp.bits) if hsp.frame[0] < 0: #complementary strand solo_ltr.location = [hsp.query_start, hsp.query_start - hsp.align_length] else: solo_ltr.location = [hsp.query_start, hsp.query_start + hsp.align_length] result.append(solo_ltr) print(len(result)) return result def _filter_intersections(self, solo_ltrs): result = set() work_list = set(solo_ltrs) for ltr in solo_ltrs: if ltr not in work_list: continue intersections = self._get_intersections(ltr, work_list) work_list = work_list.difference(intersections.keys()) best_hit = self._best_hit(intersections) self._update_result_with_best_hit(best_hit, result) self.solo_ltrs = list(result) def _get_intersections(self, ltr, ltrs): result = { ltr: ltr.score * ltr.bits } for ltr2 in ltrs: if intervals.intersect(ltr.location, ltr2.location): result[ltr2] = ltr2.score * ltr2.bits return result def _best_hit(self, intersections): return max(intersections, key=lambda x: intersections[x]) def _update_result_with_best_hit(self, hit, result): intersections = self._get_intersections(hit, result) real_best = self._best_hit(intersections) result.difference(intersections.keys()) result.add(real_best) def cleanup(self): if os.path.isfile("ltrs.fa.nhr"): os.remove("ltrs.fa.nhr") if os.path.isfile("ltrs.fa.nin"): os.remove("ltrs.fa.nin") if os.path.isfile("ltrs.fa.nsq"): os.remove("ltrs.fa.nsq") Loading
nested/cli/nester.py +2 −1 Original line number Original line Diff line number Diff line Loading @@ -32,6 +32,7 @@ def main(input_fasta, sketch, format, output_fasta_offset): #try: #try: nester = Nester(sequence, i) nester = Nester(sequence, i) sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset, format=format) sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset, format=format) sketcher.create_solo_ltr_gff(nester.solo_ltrs) if sketch: if sketch: if format != 'default': if format != 'default': sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset) sketcher.create_gff(nester.nested_element, output_fasta_offset=output_fasta_offset) Loading
nested/core/domain_dante.py 0 → 100644 +86 −0 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 from io import StringIO from Bio.Blast import NCBIXML from Bio.Blast.Applications import NcbiblastxCommandline from nested.config.config import config class DomainDante(object): """Class representing domain parsed from blastx output. Atrributes: evalue (float): evalue (same as blastx output) frame (tuple): frame (same as blastx output) score (float): score (same as blastx output) title (str): title (same as blastx output) location (list): location [from, to] (from > to for negative strand) type (str): type of domain (AP, GAG, INT, ...) """ def __init__(self, evalue=None, frame=None, score=None, title=None, location=None, domainType=None, annotation=None): self.evalue = evalue self.frame = frame self.score = score self.title = title self.location = location self.type = domainType self.annotation = annotation def __str__(self): strlen = 15 lines = ['{{title = {a}{b},'.format(a=self.title[:strlen], b='...' if len(self.title) > strlen else ''), ' type = {},'.format(self.type), ' location = {},'.format(self.location), ' evalue = {},'.format(self.evalue), ' frame = {},'.format(self.frame), ' score = {}}}'.format(self.score)] return '\n'.join(lines) def run_blastx(sequence): """Run blastx and get the list of found domains Arguments: sequence (Bio.Seq.Seq): sequence Returns: list[Domain]: list of matched domains """ domains = [] args = dict(config['blastx']['args']) args['query'] = "DANTE" blastx_cline = NcbiblastxCommandline(**config['blastx']['args']) xml_out, stderr = blastx_cline(stdin=str(sequence)) #output = open("xml_output_blast.xml", "a+").write(str(xml_out)) blast_records = NCBIXML.parse(StringIO(xml_out)) try: blast_record = next(blast_records).alignments except ValueError: return domains if blast_record: for alignment in blast_record: for hsp in alignment.hsps: domain = DomainDante(evalue=hsp.expect, frame=hsp.frame, score=hsp.score, title=alignment.title) if hsp.frame[0] < 0: # complementary strand domain.location = [hsp.query_start + 3 * (hsp.align_length - hsp.gaps), hsp.query_start] else: domain.location = [hsp.query_start, hsp.query_start + 3 * (hsp.align_length - hsp.gaps)] #domain.type = alignment.title.split(' ')[1].split('_')[0] domain.type = alignment.title.split('__')[0].split('-')[1] domain.annotation = alignment.title.split('__')[1] #print(domain.type, domain.location) domains.append(domain) return domains
nested/core/nester.py +25 −2 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 #!/usr/bin/env python3 import math from nested.utils import intervals from nested.utils import intervals from nested.core.gene import Gene from nested.core.gene import Gene from nested.core.nested_element import NestedElement from nested.core.nested_element import NestedElement from nested.core.solo_ltrs import SoloLtrs from nested.logging.logger import NesterLogger from nested.logging.logger import NesterLogger from nested.config.config import config from nested.config.config import config Loading @@ -21,6 +24,7 @@ class Nester(object): self.seqid = sequence.id self.seqid = sequence.id self.sequence = sequence.seq self.sequence = sequence.seq self.nested_element = None self.nested_element = None self.solo_ltrs = SoloLtrs(sequence.id) self._iteration = 0 self._iteration = 0 self._logger = NesterLogger('{}/{}'.format(config['logdir'], self.seqid)) self._logger = NesterLogger('{}/{}'.format(config['logdir'], self.seqid)) self._find_nesting() self._find_nesting() Loading @@ -28,6 +32,9 @@ class Nester(object): def _find_nesting(self): def _find_nesting(self): nested_list = self._get_unexpanded_transposon_list(self.sequence) # find list of nested transposons nested_list = self._get_unexpanded_transposon_list(self.sequence) # find list of nested transposons nested_list = self._expand_transposon_list(nested_list) nested_list = self._expand_transposon_list(nested_list) nested_list = self._filter_nested_list(nested_list) self.nested_element = NestedElement(self.seqid, self.sequence, nested_list) self.nested_element = NestedElement(self.seqid, self.sequence, nested_list) def _get_unexpanded_transposon_list(self, sequence): def _get_unexpanded_transposon_list(self, sequence): Loading @@ -38,7 +45,12 @@ class Nester(object): if not candidates: if not candidates: best_candidate = gene.get_best_candidate() best_candidate = gene.get_best_candidate() if not best_candidate: if not best_candidate: return [] # call blastn here self.solo_ltrs.find_solo_ltrs(sequence) self.solo_ltrs.solo_ltrs.sort(key=lambda x: x.location[0], reverse=True) print(len(self.solo_ltrs.solo_ltrs)) return self.solo_ltrs.solo_ltrs # return [] candidates = [best_candidate] candidates = [best_candidate] # sort by score (from highest) # sort by score (from highest) Loading @@ -64,6 +76,8 @@ class Nester(object): # crop from sequence # crop from sequence cropped_sequence = sequence cropped_sequence = sequence self.solo_ltrs.create_ltr_multifasta(nested_list, sequence) for element in nested_list: for element in nested_list: cropped_sequence = cropped_sequence[:(element.location[0])] + cropped_sequence[ cropped_sequence = cropped_sequence[:(element.location[0])] + cropped_sequence[ (element.location[1]):] #originally:"(element.location[0] - 1)" and "(element.location[1] + 1):]" (element.location[1]):] #originally:"(element.location[0] - 1)" and "(element.location[1] + 1):]" Loading Loading @@ -96,3 +110,12 @@ class Nester(object): nested_list[j].tsr_right = intervals.expand(nested_list[i].location, nested_list[j].tsr_right = intervals.expand(nested_list[i].location, nested_list[j].tsr_right) nested_list[j].tsr_right) return nested_list return nested_list def _filter_nested_list(self, nested_list): result = [] for te in nested_list: if not math.isnan(te.ltr_left_location[0]): result.append(te) return result
nested/core/solo_ltr.py 0 → 100644 +21 −0 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 from nested.core.te import TE class SoloLtr(TE): def __init__(self, location = None, score = None, bits = None): TE.__init__(self) self.location = location self.score = score self.bits = bits self.ltr_left_location = [float('nan'), float('nan')] self.ltr_right_location = [float('nan'), float('nan')] self.tsr_left = [float('nan'), float('nan')] self.tsr_right = [float('nan'), float('nan')] self.features = { "domains": [], "ppt": [float('nan'), float('nan')], "pbs": [float('nan'), float('nan')] }
nested/core/solo_ltrs.py 0 → 100644 +125 −0 Original line number Original line Diff line number Diff line #!/usr/bin/env python3 import os import subprocess import math from io import StringIO from Bio import SeqIO from Bio.Blast import NCBIXML from Bio.Blast.Applications import NcbiblastnCommandline from nested.config.config import config from nested.core.solo_ltr import SoloLtr import nested.utils.intervals as intervals class SoloLtrs(object): def __init__(self, seqId): self.sequence = "" self.seqId = seqId self.solo_ltrs = [] self.ltrs_fasta_path = '{}/ltrs.fa'.format(os.getcwd()) self.ltr_lengths = {} def find_solo_ltrs(self, sequence): self.sequence = sequence #open("seq_test.fa", 'w').write(str(self.sequence)) if os.path.exists(self.ltrs_fasta_path): subprocess.run(["makeblastdb", "-in", "ltrs.fa", "-dbtype", "nucl", "-out", "ltrs.fa"], stdout=subprocess.DEVNULL) blast_records = self.run_blastn() self._filter_intersections(self._parse_blast(blast_records)) #print("final:", len(self.solo_ltrs)) self.cleanup() def create_ltr_multifasta(self, tes, seq): offset = len(self.ltr_lengths.keys()) with open("ltrs.fa", 'a+') as file: for i in range(len(self.ltr_lengths.keys()), len(self.ltr_lengths.keys()) + len(tes)): if (intervals.length(tes[i - offset].ltr_left_location) < 100 or math.isnan(tes[i - offset].tsr_left[0]) or math.isnan(tes[i - offset].tsr_right[0])): continue file.write(">ltr_" + str(i) + "\n") file.write(str(seq[tes[i - offset].ltr_left_location[0]:tes[i - offset].ltr_left_location[1] + 1]) + "\n") self.ltr_lengths["ltr_" + str(i)] = len(str(seq[tes[i - offset].ltr_left_location[0]:tes[i - offset].ltr_left_location[1] + 1])) def run_blastn(self): args = dict(config['blastx']['args']) args['db'] = self.ltrs_fasta_path args['word_size'] = 10 args['evalue'] = 1e-5 #args['query'] = "seq_test.fa" blastn_cline = NcbiblastnCommandline(**args) # stdin=str(self.sequence) <- capnut do blastn_cline xml_out, stderr = blastn_cline(stdin=str(self.sequence)) output = open("out.xml", 'w') output.write(str(xml_out)) output.close() return NCBIXML.parse(StringIO(xml_out)) def _parse_blast(self, records): result = [] try: blast_record = next(records).alignments except ValueError: return result if blast_record: for alignment in blast_record: for hsp in alignment.hsps: if not (hsp.align_length / self.ltr_lengths[alignment.hit_def] >= 0.95): continue solo_ltr = SoloLtr(score = hsp.score, bits = hsp.bits) if hsp.frame[0] < 0: #complementary strand solo_ltr.location = [hsp.query_start, hsp.query_start - hsp.align_length] else: solo_ltr.location = [hsp.query_start, hsp.query_start + hsp.align_length] result.append(solo_ltr) print(len(result)) return result def _filter_intersections(self, solo_ltrs): result = set() work_list = set(solo_ltrs) for ltr in solo_ltrs: if ltr not in work_list: continue intersections = self._get_intersections(ltr, work_list) work_list = work_list.difference(intersections.keys()) best_hit = self._best_hit(intersections) self._update_result_with_best_hit(best_hit, result) self.solo_ltrs = list(result) def _get_intersections(self, ltr, ltrs): result = { ltr: ltr.score * ltr.bits } for ltr2 in ltrs: if intervals.intersect(ltr.location, ltr2.location): result[ltr2] = ltr2.score * ltr2.bits return result def _best_hit(self, intersections): return max(intersections, key=lambda x: intersections[x]) def _update_result_with_best_hit(self, hit, result): intersections = self._get_intersections(hit, result) real_best = self._best_hit(intersections) result.difference(intersections.keys()) result.add(real_best) def cleanup(self): if os.path.isfile("ltrs.fa.nhr"): os.remove("ltrs.fa.nhr") if os.path.isfile("ltrs.fa.nin"): os.remove("ltrs.fa.nin") if os.path.isfile("ltrs.fa.nsq"): os.remove("ltrs.fa.nsq")