Skip to content
Snippets Groups Projects
Commit 3dafa070 authored by Pavel Jedlicka's avatar Pavel Jedlicka
Browse files

Merge branch 'feature/custom-discovery-tool' into 'master'

Feature/custom discovery tool

See merge request !23
parents d693a0de b69bdbc2
No related branches found
No related tags found
1 merge request!23Feature/custom discovery tool
Showing
with 566 additions and 87 deletions
......@@ -7,3 +7,8 @@ tmp
images
results
config.yml
LTR_Finder/
build/
nested.egg-info/
dist/
.vscode/
LTR_Finder @ eefef6a6
Subproject commit eefef6a6046338f0f69ef8003077c0dabcaedf8e
......@@ -141,6 +141,7 @@ Options:
-s, --sketch_only If true, nesting is not computed. Genes are sketched
only from existing gff files.
-d, --data_folder TEXT Output data folder.
-dt --discovery-tool TEXT Used discovery tool. Default is LTR-finder.
--help Show this message and exit.
```
......@@ -162,3 +163,11 @@ Options:
-d, --data_folder TEXT Output data folder.
--help Show this message and exit.
```
## Adding new LTR discovery tool
* Implement run() method in new TE class based on core/TE_template.py
* Add new value to core/DiscoveryTool.py including any aliases, make sure the numeric value is different from already implemented tools and that it is the same for all of your aliases. (used in calling nester ``-dt <value you set>``)
* Import your newly implemented class in utils/DependencyResolver.py and add it to ``valueToClassDict`` attribute. (key is numeric value you set in core/DiscoveryTool.py)
......@@ -14,7 +14,17 @@ ltr:
w: 2 #don't change, parsing format required!
a: *prosite_path
s: *tRNAdb_path
ltr_harvest:
path: 'gt'
command: 'ltrharvest'
args:
similar: 40
maxlenltr: 3000
maxdistltr: 20000
mintsd: 4
maxtsd: 10
gt:
path: 'gt'
command: 'sketch'
......
#!/usr/bin/env python3
import os
import sys
import click
import glob
from concurrent import futures
......@@ -12,7 +11,8 @@ from Bio import SeqIO
from nested.core.nester import Nester
from nested.output.sketcher import Sketcher
from nested.core.sequence_thread import SequenceThread
from nested.core.discovery_tool import DiscoveryTool
from nested.utils.dependency_resolver import DependencyResolver
@click.command()
......@@ -24,30 +24,39 @@ from nested.core.sequence_thread import SequenceThread
@click.option('--initial_threshold', '-t', type=int, default=500, help='Initial threshold value.')
@click.option('--threshold_multiplier', '-m', type=float, default=1, help='Threshold multiplier.')
@click.option('--threads', '-n', type=int, default=1, help='Number of threads')
def main(input_fasta, sketch, format, output_fasta_offset, output_folder, initial_threshold, threshold_multiplier, threads):
@click.option('--discovery_tool', '-dt', default=DiscoveryTool.LTR_finder.name,
type=click.Choice(list(map(lambda val: val, DiscoveryTool.__members__))),
help='Determines which tool is used for retrotransoson discovery. Default: LTR_finder')
def main(input_fasta, sketch, format, output_fasta_offset, output_folder, initial_threshold,
threshold_multiplier, threads, discovery_tool):
check_ram(input_fasta, threads)
check_permissions(output_folder, os.W_OK | os.X_OK)
number_of_errors = [0]
start_time = datetime.now()
sequences = list(SeqIO.parse(open(input_fasta), 'fasta'))
sketcher = Sketcher()
sketcher = Sketcher()
futuress = []
dependencyResolver = DependencyResolver(discovery_tool)
with futures.ThreadPoolExecutor(threads) as executor:
for sequence in sequences:
futuress.append(executor.submit(process_sequence, sequence, sketcher, sketch, format, output_fasta_offset, output_folder, initial_threshold, threshold_multiplier, number_of_errors))
for sequence in SeqIO.parse(input_fasta, 'fasta'):
futuress.append(executor.submit(process_sequence, sequence, sketcher,
sketch, format, output_fasta_offset, output_folder,
initial_threshold, threshold_multiplier, number_of_errors,
dependencyResolver))
futures.wait(futuress)
end_time = datetime.now()
print('Total time: {}'.format(end_time - start_time))
print('Number of errors: {}'.format(number_of_errors[0]))
def process_sequence(sequence, sketcher, sketch, format, output_fasta_offset, output_folder, initial_threshold, threshold_multiplier, errors):
def process_sequence(sequence, sketcher, sketch, format, output_fasta_offset, output_folder, initial_threshold, threshold_multiplier, errors, dependency_resolver):
sequence.id = sequence.id.replace('/', '--')
seq_start_time = datetime.now()
strlen = 15
print("Processing {}".format(sequence.id))
try:
nester = Nester(sequence, initial_threshold, threshold_multiplier)
nester = Nester(sequence, initial_threshold, threshold_multiplier, dependency_resolver)
sketcher.create_gff(nester.nested_element, dirpath=output_folder, output_fasta_offset=output_fasta_offset, format=format)
sketcher.create_solo_ltr_gff(nester.solo_ltrs, dirpath=output_folder)
sketcher.create_solo_ltr_gff(nester.solo_ltrs, dirpath=output_folder)
sketcher.create_trf_gff(nester.trf, nester.seqid, dirpath=output_folder)
if sketch:
if format != 'default':
sketcher.create_gff(nester.nested_element, dirpath=output_folder, output_fasta_offset=output_fasta_offset)
......@@ -57,14 +66,14 @@ def process_sequence(sequence, sketcher, sketch, format, output_fasta_offset, ou
except KeyboardInterrupt:
cleanup()
raise
except CalledProcessError:
except CalledProcessError as ex:
cleanup()
errors[0] += 1
print('Processing {}: SUBPROCESS ERROR'.format(sequence.id[:strlen]))
except Exception:
print('Processing {}: SUBPROCESS ERROR: {}'.format(sequence.id[:strlen], ex))
except Exception as ex:
cleanup()
errors[0] += 1
print('Processing {}: UNEXPECTED ERROR:'.format(sequence.id[:strlen]), sys.exc_info()[0])
print('Processing {}: UNEXPECTED ERROR: {}'.format(sequence.id[:strlen], ex))
def check_permissions(path, permissions):
path = os.getcwd() if len(path) == 0 else path
......@@ -76,6 +85,16 @@ def check_permissions(path, permissions):
except Exception:
return
def check_ram(input_file, n):
mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
mem_gib = mem_bytes/(1024**3)
seq_sizes = []
for seq in SeqIO.parse(input_file, 'fasta'):
seq_sizes.append(len(seq) / (1024**3))
seq_sizes.sort()
if (sum(seq_sizes[-n:]) * 15) > (0.8 * mem_gib):
print("\033[93m" + "During the analysis memory usage may exceed 80% of system's total physical memory." + "\033[0m")
def cleanup():
for file in glob.glob("*ltrs.fa"):
os.remove(file)
......
#!/usr/bin/env python3
from enum import Enum
class DiscoveryTool(Enum):
LTR_finder = 1
LTRharvest = 2
#add aliases with same value as the tool above
finder = 1
harvest = 2
#!/usr/bin/env python3
from nested.core.nester import Nester
from nested.core.tandem_repeat import TandemRepeat
from nested.core.tools.tandem_repeat_finder import TandemRepeatFinder
class Executor():
def __init__(self, sequence):
self.sequence = sequence
self.tool_order = [TandemRepeatFinder(), Nester()]
def run_main_loop(self):
elements = []
for tool in self.tool_order:
elements += tool.run(self.sequence)
def _crop_sequence(self, elements):
for element in elements:
self.sequence = self.sequence[:element.location[0]] + self.sequence[element.location[1]:]
\ No newline at end of file
#!/usr/bin/env python3
from nested.core.te import run_ltr_finder
#from nested.core.te import run_ltr_finder
# from nested.core.domain import run_blastx
# from nested.core.domain_dante import run_last
from nested.core.domain_dante import run_blastx
from nested.scoring.graph import Graph
from nested.utils import intervals
from nested.utils.dependency_resolver import DependencyResolver
class Gene(object):
......@@ -20,10 +21,11 @@ class Gene(object):
"""
def __init__(self, seqid=None, sequence=None):
def __init__(self, dependency_resolver, seqid=None, sequence=None):
self.seqid = seqid
self.sequence = sequence
self.te_list = run_ltr_finder(seqid, sequence)
self.te_list = dependency_resolver.discovery_tool.run(seqid, sequence)
#self.te_list = run_ltr_finder(seqid, sequence)
self.domain_list = run_blastx(sequence)
# self.domain_list = run_last(sequence)
self.scores = []
......
......@@ -8,9 +8,11 @@ from nested.core.nested_element import NestedElement
from nested.core.solo_ltrs import SoloLtrs
from nested.logging.logger import NesterLogger
from nested.config.config import config
from nested.core.tool_interface import ToolInterface
import nested.core.tandem_repeat as tandemRepeatFinder
class Nester(object):
class Nester(ToolInterface):
"""Class represent nesting in sequence, recursivelly find best evaluated transposon and crop it until no new transposons found
Attributes:
......@@ -19,7 +21,7 @@ class Nester(object):
nested_element (NestedElement): nested element
"""
def __init__(self, sequence, threshold, multiplier):
def __init__(self, sequence, threshold, multiplier, dependency_resolver):
self.seqid = sequence.id
self.sequence = sequence.seq
self.nested_element = None
......@@ -28,18 +30,28 @@ class Nester(object):
self._logger = NesterLogger('{}/{}'.format(config['logdir'], self.seqid))
self.threshold = threshold
self.multiplier = multiplier
self.dependency_resolver = dependency_resolver
self.trf = []
self._find_nesting()
def _find_nesting(self):
nested_list = self._get_unexpanded_transposon_list(self.sequence, self.threshold) # find list of nested transposons
nested_list = self._expand_transposon_list(nested_list)
nested_list = self._filter_nested_list(nested_list)
# tandem repeat finder
self.trf = tandemRepeatFinder.run(self.seqid, self.sequence)
nested_list = self.trf[:]
cropped_sequence = self._crop_sequence(nested_list, self.sequence)
nested_list += self._get_unexpanded_transposon_list(cropped_sequence, self.threshold) # find list of nested transposons
nested_list = self._expand_transposon_list(nested_list)
print(len(nested_list))
nested_list = self._filter_nested_list(nested_list)
print(len(nested_list))
self.nested_element = NestedElement(self.seqid, self.sequence, nested_list)
def _get_unexpanded_transposon_list(self, sequence, threshold):
# recursivelly find and crop best evaluated transposon, return unexpanded list of found transposons
self._iteration += 1
gene = Gene(self.seqid, sequence)
gene = Gene(self.dependency_resolver, self.seqid, sequence)
candidates = gene.get_candidates_above_threshold(threshold)
if not candidates:
best_candidate = gene.get_best_candidate()
......@@ -96,9 +108,11 @@ class Nester(object):
nested_list[j].ltr_right_location)
for domain in nested_list[j].features['domains']:
domain.location = intervals.expand(nested_list[i].location, domain.location)
nested_list[j].features['ppt'] = intervals.expand(nested_list[i].location,
if 'ppt' in nested_list[j].features.keys():
nested_list[j].features['ppt'] = intervals.expand(nested_list[i].location,
nested_list[j].features['ppt'])
nested_list[j].features['pbs'] = intervals.expand(nested_list[i].location,
if 'pbs' in nested_list[j].features.keys():
nested_list[j].features['pbs'] = intervals.expand(nested_list[i].location,
nested_list[j].features['pbs'])
nested_list[j].tsr_left = intervals.expand(nested_list[i].location,
nested_list[j].tsr_left)
......@@ -114,4 +128,10 @@ class Nester(object):
result.append(te)
return result
def _crop_sequence(self, elements, sequence):
cropped = sequence
for element in elements:
cropped = cropped[:element.location[0]] + cropped[element.location[1]:]
return cropped
......@@ -41,6 +41,7 @@ class SoloLtrs(object):
def create_ltr_multifasta(self, tes, seq):
offset = len(self.ltr_lengths.keys())
num = 1
with open("{}_ltrs.fa".format(self.seqId), 'a+') as file:
for i in range(len(self.ltr_lengths.keys()), len(self.ltr_lengths.keys()) + len(tes)):
if (intervals.length(tes[i - offset].ltr_left_location) < 100
......@@ -48,9 +49,10 @@ class SoloLtrs(object):
or math.isnan(tes[i - offset].tsr_right[0])):
continue
file.write(">ltr_" + str(i) + "\n")
file.write(">ltr_" + str(num) + "\n")
file.write(str(seq[tes[i - offset].ltr_left_location[0]:tes[i - offset].ltr_left_location[1] + 1]) + "\n")
self.ltr_lengths["ltr_" + str(i)] = len(str(seq[tes[i - offset].ltr_left_location[0]:tes[i - offset].ltr_left_location[1] + 1]))
num += 1
def run_blastn(self):
args = dict(config['blastx']['args'])
......@@ -114,6 +116,7 @@ class SoloLtrs(object):
result.difference(intersections.keys())
result.add(real_best)
# refactor this
def cleanup(self):
if os.path.isfile("{}.nhr".format(self.ltrs_fasta_path)):
os.remove("{}.nhr".format(self.ltrs_fasta_path))
......@@ -121,4 +124,13 @@ class SoloLtrs(object):
os.remove("{}.nin".format(self.ltrs_fasta_path))
if os.path.isfile("{}.nsq".format(self.ltrs_fasta_path)):
os.remove("{}.nsq".format(self.ltrs_fasta_path))
if os.path.isfile("{}.ndb".format(self.ltrs_fasta_path)):
os.remove("{}.ndb".format(self.ltrs_fasta_path))
if os.path.isfile("{}.not".format(self.ltrs_fasta_path)):
os.remove("{}.not".format(self.ltrs_fasta_path))
if os.path.isfile("{}.ntf".format(self.ltrs_fasta_path)):
os.remove("{}.ntf".format(self.ltrs_fasta_path))
if os.path.isfile("{}.nto".format(self.ltrs_fasta_path)):
os.remove("{}.nto".format(self.ltrs_fasta_path))
#!/usr/bin/env python3
import os
import subprocess
from itertools import islice
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from nested.core.te import TE
from nested.utils import intervals
class TandemRepeat(TE):
def __init__(self, loc, period_size, copies, matches, indels, scor, entropy, monomer):
super().__init__(location=loc, score=scor, ltr_left_location=[float('nan'), float('nan')])
self.matches = matches
self.indels = indels
self.entropy = entropy
self.period_size = period_size
self.copies = copies
self.monomer = monomer
def is_better(self, other):
if not isinstance(other, TandemRepeat):
return False
score = 0
score += self.matches > other.matches
score += self.indels < other.indels
score += self.score > other.score
score += self.entropy > other.entropy
if score == 2:
return intervals.length(self.location) > intervals.length(other.location)
return score > 2
def __str__(self):
lines = ['location: {}'.format(self.location),
'period size: {}'.format(self.period_size),
'number of copies: {}'.format(self.copies),
'% of matches: {}'.format(self.matches),
'% of indels: {}'.format(self.indels),
'score: {}'.format(self.score),
'entropy: {}'.format(self.entropy)]
return os.linesep.join(lines)
def run(seqid, sequence):
if not os.path.exists('/tmp/nested'):
os.makedirs('/tmp/nested')
if not os.path.exists('/tmp/nested/trf'):
os.makedirs('/tmp/nested/trf')
with open('/tmp/nested/trf/{}.fa'.format(seqid), 'w+') as tmp_file:
SeqIO.write(SeqRecord(sequence, id=seqid),
tmp_file,
'fasta')
process = subprocess.Popen(
['trf', '/tmp/nested/trf/{}.fa'.format(seqid), str(2), str(5), str(7),
str(80), str(10), str(50), str(2000), '-m', '-d', '-h'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
stdout, stderr = process.communicate()
repeats = filter_candidates(get_candidates(seqid))
repeats.sort(key=lambda r: r.location[0], reverse=True)
return repeats
def get_candidates(seqid):
candidates = []
with open('{}.fa.2.5.7.80.10.50.2000.dat'.format(seqid)) as file:
for entry in islice(file, 15, None):
split = entry.split(' ')
candidates.append(TandemRepeat([int(split[0]), int(split[1])], int(split[2]), float(split[3]),
int(split[5]), int(split[6]), int(split[7]), float(split[12]), split[-1]))
cleanup()
return candidates
def filter_candidates(candidates):
candidates.sort(key=lambda r: r.location[0])
repeats = []
for candidate in candidates:
add = True
for candidate2 in candidates:
if candidate2.location[0] > candidate.location[1]:
break
if intervals.contains(candidate.location, candidate2.location):
continue
elif intervals.intersect(candidate.location, candidate2.location):
if not candidate.is_better(candidate2):
add = False
break
for repeat in repeats:
if repeat.location[0] > candidate.location[1]:
break
if intervals.contains(repeat.location, candidate.location):
add = False
break
if add:
repeats.append(candidate)
return repeats
def cleanup():
if os.path.isfile("{}.fa.2.5.7.80.10.50.2000.dat".format(seqid)):
os.remove("{}.fa.2.5.7.80.10.50.2000.dat".format(seqid))
if os.path.isfile("{}.fa.2.5.7.80.10.50.2000.mask".format(seqid)):
os.remove("{}.fa.2.5.7.80.10.50.2000.mask".format(seqid))
......@@ -27,7 +27,7 @@ class TE(object):
def __init__(self, ppt=[0, 0], pbs=[0, 0], location=[0, 0],
ltr_left_location=[0, 0], ltr_right_location=[0, 0],
tsr_left=[0, 0], tsr_right=[0, 0], features={}, score=None):
tsr_left=[0, 0], tsr_right=[0, 0], features={'domains': []}, score=None):
self.ppt = ppt
self.pbs = pbs
self.location = location
......@@ -49,58 +49,58 @@ class TE(object):
# ' features = {},'.format(self.features),
' score = {}}}'.format(self.score)]
return '\n'.join(lines)
def run_ltr_finder(seqid, sequence):
"""Run LTR finder on sequence and return list of transposons
Arguments:
seqid (str): sequence id
sequence (Bio.Seq.Seq): sequence
tmp_dir (str): Auxiliary existing directory
Returns:
list[TE]: list of found ltr pairs as a TE class
"""
transposons = []
if not os.path.exists('/tmp/nested'):
os.makedirs('/tmp/nested')
if not os.path.exists('/tmp/nested/ltr'):
os.makedirs('/tmp/nested/ltr')
with open('/tmp/nested/ltr/{}.fa'.format(seqid), 'w+') as tmp_file: # prepare tmp fasta file for ltr finder
SeqIO.write(SeqRecord(sequence, id=seqid),
tmp_file,
'fasta')
# call LTR finder and feed stdin to it
process = subprocess.Popen(
[config['ltr']['path']] + args_dict_to_list(config['ltr']['args']) + ['/tmp/nested/ltr/{}.fa'.format(seqid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# os.remove('/tmp/nested/ltr/{}.fa'.format(seqid))
stdout, stderr = process.communicate()
parsed_output = parse_raw_output(stdout)
for entry in parsed_output:
params = {
'ppt': entry['ppt'],
'pbs': entry['pbs'],
'location': entry['location'],
'ltr_left_location': entry['ltr_left_location'],
'ltr_right_location': entry['ltr_right_location'],
'tsr_right': entry['tsr_right'],
'tsr_left': entry['tsr_left'],
}
transposons.append(TE(**params))
return transposons
@staticmethod
def run(seqid, sequence):
"""Run LTR finder on sequence and return list of transposons
Arguments:
seqid (str): sequence id
sequence (Bio.Seq.Seq): sequence
tmp_dir (str): Auxiliary existing directory
Returns:
list[TE]: list of found ltr pairs as a TE class
"""
transposons = []
if not os.path.exists('/tmp/nested'):
os.makedirs('/tmp/nested')
if not os.path.exists('/tmp/nested/ltr'):
os.makedirs('/tmp/nested/ltr')
with open('/tmp/nested/ltr/{}.fa'.format(seqid), 'w+') as tmp_file: # prepare tmp fasta file for ltr finder
SeqIO.write(SeqRecord(sequence, id=seqid),
tmp_file,
'fasta')
# call LTR finder and feed stdin to it
process = subprocess.Popen(
[config['ltr']['path']] + args_dict_to_list(config['ltr']['args']) + ['/tmp/nested/ltr/{}.fa'.format(seqid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# os.remove('/tmp/nested/ltr/{}.fa'.format(seqid))
stdout, stderr = process.communicate()
parsed_output = parse_raw_output(stdout)
for entry in parsed_output:
params = {
'ppt': entry['ppt'],
'pbs': entry['pbs'],
'location': entry['location'],
'ltr_left_location': entry['ltr_left_location'],
'ltr_right_location': entry['ltr_right_location'],
'tsr_right': entry['tsr_right'],
'tsr_left': entry['tsr_left'],
}
transposons.append(TE(**params))
return transposons
def parse_raw_output(raw_output):
"""Parse raw output from LTR finder
......
#!/usr/bin/env python3
import os
import subprocess
import re
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from nested.config.config import config, args_dict_to_list
class TE_harvest(object):
"""Class representing TE. Every location is in format [from, to].
Attributes:
ppt (list): location of ppt
pbs (list): location of pbs
location (list): position of TE in gene
ltr_left_location (list): location of left ltr
ltr_right_location (list): location of right ltr
tsr_left (list): location of left tsr
tsr_right (list): location of right tsr
features (dict): dictionary of features assigned to TE (i.e. list of domains on the optimal path in graph)
score (float): evaluated score of TE
"""
def __init__(self, ppt=[0, 0], pbs=[0, 0], location=[0, 0],
ltr_left_location=[0, 0], ltr_right_location=[0, 0],
tsr_left=[0, 0], tsr_right=[0, 0], features={}, score=None):
self.ppt = ppt
self.pbs = pbs
self.location = location
self.ltr_left_location = ltr_left_location
self.ltr_right_location = ltr_right_location
self.tsr_left = tsr_left
self.tsr_right = tsr_right
self.features = features
self.score = score
def __str__(self):
lines = ['{{location = {},'.format(self.location),
' left ltr = {},'.format(self.ltr_left_location),
' right ltr = {},'.format(self.ltr_right_location),
' left tsr = {},'.format(self.tsr_left),
' right tsr = {},'.format(self.tsr_right),
' ppt = {},'.format(self.ppt),
' pbs = {},'.format(self.pbs),
# ' features = {},'.format(self.features),
' score = {}}}'.format(self.score)]
return '\n'.join(lines)
@staticmethod
def run(seqid, sequence):
"""Run LTR finder on sequence and return list of transposons
Arguments:
seqid (str): sequence id
sequence (Bio.Seq.Seq): sequence
tmp_dir (str): Auxiliary existing directory
Returns:
list[TE]: list of found ltr pairs as a TE class
"""
transposons = []
if not os.path.exists('/tmp/nested'):
os.makedirs('/tmp/nested')
if not os.path.exists('/tmp/nested/ltr'):
os.makedirs('/tmp/nested/ltr')
with open('/tmp/nested/ltr/{}.fa'.format(seqid), 'w+') as tmp_file: # prepare tmp fasta file for ltr finder
SeqIO.write(SeqRecord(sequence, id=seqid),
tmp_file,
'fasta')
process = subprocess.Popen(['gt', 'suffixerator', '-db', '/tmp/nested/ltr/{}.fa'.format(seqid),
'-indexname', '/tmp/nested/ltr/{}.fa'.format(seqid), '-tis', '-suf', '-lcp', '-des', '-ssp', '-sds', '-dna'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
process.communicate()
# call LTR harvest
process = subprocess.Popen(
[config['ltr_harvest']['path']] + [config['ltr_harvest']['command']] +
args_dict_to_list(config['ltr_harvest']['args']) +
['-index', '/tmp/nested/ltr/{}.fa'.format(seqid)] +
['-longoutput'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
parsed_output = parse_raw_output(stdout)
for entry in parsed_output:
params = {
'ppt': entry['ppt'],
'pbs': entry['pbs'],
'location': entry['location'],
'ltr_left_location': entry['ltr_left_location'],
'ltr_right_location': entry['ltr_right_location'],
'tsr_right': entry['tsr_right'],
'tsr_left': entry['tsr_left'],
}
transposons.append(TE_harvest(**params))
return transposons
def parse_raw_output(raw_output):
"""Parse raw output from LTR harvest
Arguments:
raw_output (str): ltr finder raw output (args -w 2)
Returns:
dict: parsed raw output
"""
entries = raw_output.decode('utf-8').split(os.linesep)[13:-1]
result = []
for entry in entries:
split_line = entry.split(' ')
transposon = { "location": [int(split_line[0]), int(split_line[1])],
"ltr_left_location": [int(split_line[3]), int(split_line[4])],
"ltr_right_location": [int(split_line[8]), int(split_line[9])],
"ppt": [float("nan"), float("nan")],
"pbs": [float("nan"), float("nan")]
}
try:
tsr_right = [int(split_line[1]) + 1, int(split_line[1]) + int(split_line[7])]
tsr_left = [int(split_line[0]) - int(split_line[7]), int(split_line[0]) - 1]
except:
tsr_right = [float("nan"), float("nan")]
tsr_left = [float("nan"), float("nan")]
transposon["tsr_right"] = tsr_right
transposon["tsr_left"] = tsr_left
result.append(transposon)
return result
#!/usr/bin/env python3
import os
import subprocess
import re
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from nested.config.config import config, args_dict_to_list
class TE(object):
"""Class representing TE. Every location is in format [from, to].
Attributes:
ppt (list): location of ppt
pbs (list): location of pbs
location (list): position of TE in gene
ltr_left_location (list): location of left ltr
ltr_right_location (list): location of right ltr
tsr_left (list): location of left tsr
tsr_right (list): location of right tsr
features (dict): dictionary of features assigned to TE (i.e. list of domains on the optimal path in graph)
score (float): evaluated score of TE
"""
def __init__(self, ppt=[0, 0], pbs=[0, 0], location=[0, 0],
ltr_left_location=[0, 0], ltr_right_location=[0, 0],
tsr_left=[0, 0], tsr_right=[0, 0], features={}, score=None):
self.ppt = ppt
self.pbs = pbs
self.location = location
self.ltr_left_location = ltr_left_location
self.ltr_right_location = ltr_right_location
self.tsr_left = tsr_left
self.tsr_right = tsr_right
self.features = features
self.score = score
def __str__(self):
lines = ['{{location = {},'.format(self.location),
' left ltr = {},'.format(self.ltr_left_location),
' right ltr = {},'.format(self.ltr_right_location),
' left tsr = {},'.format(self.tsr_left),
' right tsr = {},'.format(self.tsr_right),
' ppt = {},'.format(self.ppt),
' pbs = {},'.format(self.pbs),
# ' features = {},'.format(self.features),
' score = {}}}'.format(self.score)]
return '\n'.join(lines)
@staticmethod
def run(seqid, sequence):
"""Run discovery tool of choice on sequence and return list of transposons
Arguments:
seqid (str): sequence id
sequence (Bio.Seq.Seq): sequence
Returns:
list[TE]: list of found ltr pairs as a TE class
"""
#!/usr/bin/env python3
class ToolInterface():
def run(self, sequence):
"""Runs selected tool, if any arguments are needed they are taken from config.yml or constructor
Returns: collection of BaseElement
"""
pass
\ No newline at end of file
#!/usr/bin/env python3
from nested.core.tool_interface import ToolInterface
class TandemRepeatFinder(ToolInterface):
def __init__(self):
pass
def run(self, sequence):
pass
\ No newline at end of file
......@@ -6,6 +6,7 @@ import subprocess
from nested.config.config import config, args_dict_to_list
from nested.output.gff import GFFMaker
from nested.output.solo_gff import SoloGFFMaker
from nested.output.trf_gff import TrfGFFMaker
DEFAULT_DIRPATH = 'data'
......@@ -13,6 +14,7 @@ class Sketcher(object):
def __init__(self):
self._gff_maker = GFFMaker()
self._solo_gff_maker = SoloGFFMaker()
self._trf_gff_maker = TrfGFFMaker()
self._gff_path = ''
def create_gff(self, nested_element, dirpath, output_fasta_offset=0, format='default'):
......@@ -23,14 +25,23 @@ class Sketcher(object):
path = os.path.join(dirpath, DEFAULT_DIRPATH)
self._solo_gff_maker.create_solo_gff(solo_ltrs, path)
self._solo_gff_maker.move_ltrs_spliced(solo_ltrs.seqId, path)
def create_trf_gff(self, trf, seqId, dirpath):
path = os.path.join(dirpath, DEFAULT_DIRPATH)
self._trf_gff_maker.create_gff(trf, seqId, path)
def sketch(self, id, dirpath):
path = os.path.join(dirpath, DEFAULT_DIRPATH)
print(path)
null = open(os.devnull, 'w')
process = subprocess.check_output(
[config['gt']['path'], config['gt']['command']] +
try:
process = subprocess.check_output(
[config['gt']['path'], config['gt']['command']] +
args_dict_to_list(config['gt']['args']) +
['{}/{}/{}.png'.format(path, id, id)] +
['{}/{}/{}.gff'.format(path, id, id)], stderr=null)
except Exception:
command = " ".join([config['gt']['path'], config['gt']['command']] +
args_dict_to_list(config['gt']['args']) +
['{}/{}/{}.png'.format(path, id, id)] +
['{}/{}/{}.gff'.format(path, id, id)], stderr=null)
['{}/{}/{}.gff'.format(path, id, id)])
raise Exception("Command \'{}\' returned non-zero status code. Problem with package genometools.".format(command))
#!/usr/bin/env python3
class TrfGFFMaker(object):
def __init__(self):
pass
def create_gff(self, trf, seqId, dirpath):
with open("{}/{}/{}_trf.gff".format(dirpath, seqId, seqId), 'w+') as gff:
i = 0
gff.write("##gff-version 3\n")
for repeat in trf:
gff.write(seqId + '\t'
+ "feature\ttandem_repeat\t"
+ str(repeat.location[0]) + '\t'
+ str(repeat.location[1]) + '\t'
+ ".\t.\t.\t"
+ "ID=tandem_repeat_" + str(i)
+ ";monomer_count=" + str(repeat.copies)
+ ";monomer=" + repeat.monomer
+ '\n')
i += 1
\ No newline at end of file
......@@ -83,8 +83,8 @@ class Graph(object):
for domain in domain_list:
if domain.type not in ['GAG', 'PROT', 'INT', 'RT', 'RH', 'aRH', 'CHD', 'CHDCR']:
continue
if (intervals.compare(te.ltr_left_location, domain.location) != 1
and intervals.compare(domain.location, te.ltr_right_location) != 1):
if (intervals.compare(te.ltr_left_location, sorted(domain.location)) != 1
and intervals.compare(sorted(domain.location), te.ltr_right_location) != 1):
domain_dict[domain.type].append(domain)
i += 1
i = 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment