submission_processor.py 3.82 KB
Newer Older
Peter Stanko's avatar
Peter Stanko committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
from pathlib import Path

from storage import UploadedEntity

from portal.database import Project, Submission, SubmissionState
from portal.service import general

log = logging.getLogger(__name__)


class SubmissionProcessor:
    def __init__(self, submission: Submission, params: dict = None):
        self._submission = submission
        self._params = params

    @property
    def submission(self) -> Submission:
        return self._submission

    @property
    def params(self) -> dict:
        return self._params

    @property
    def project(self) -> Project:
        return self.submission.project

    @property
    def celery(self):
        from portal.async_celery import celery_app
        return celery_app

    @property
    def storage(self):
        from portal import storage
        return storage

    def reset_task_id(self, state=None):
        if state is not None:
            self.submission.state = state

        self.submission.async_task_id = None
        general.write_entity(self.submission)

    def submission_enqueue_ended(self):
        log.info(f"[ASYNC] Submission enqueue ended: {self.submission}")
        self.reset_task_id(state=SubmissionState.QUEUED)

    def revoke_enqueue_task(self):
        log.info(f'[ASYNC] Submission storage upload cancelled {self.submission}')
        task_id = self.submission.process_task_id
        if task_id:
            self.celery.control.revoke(task_id=task_id, terminate=True)
            self.reset_task_id(state=SubmissionState.CANCELLED)

    def get_delay_for_submission(self):
        log.debug(f"[ASYNC] Submission delay: {self.submission}")
        time_wait = self.project.config.submissions_cancellation_period
        return time_wait

    def dispatch_submission_processing(self):
        delay = self.get_delay_for_submission()
        args = (self.submission.id, self.params)
        from .tasks import start_processing_submission
        start_processing_submission.apply_async(args=args, countdown=delay)

    def submission_store_ended(self, version: str):
        log.info(f"[ASYNC] Submission preparation ended: {self.submission}")
        self.submission.source_hash = version
        self.reset_task_id(state=SubmissionState.READY)

    def revoke_storage_task(self):
        log.info(f'[ASYNC] Submission storage upload cancelled {self.submission}')
        task_id = self.submission.storage_task_id
        if task_id:
            self.celery.control.revoke(task_id=task_id, terminate=True)
            self.reset_task_id(state=SubmissionState.CANCELLED)
        # TODO: Dispatch cleanup task

    def download_submission(self):
        log.info(f"[ASYNC] Uploading submission: {self.submission} with {self.params}")
        updated_entity: UploadedEntity = self.storage. \
            submissions.create(entity_id=self.submission.id, **self.params)
        self.submission_store_ended(version=updated_entity.version)

    def clone(self, target):
        log.info(f"[ASYNC] Cloning submission: {self.submission} to {target}")
        self.storage.submissions.clone(self.submission.id, target.id)
        self.submission_store_ended(version=self.submission.source_hash)

    def send_to_worker(self):
        # TODO: implement processing
        log.info(f"[ASYNC] Sending submission to worker: {self.submission}")
        if self.params is not None:
            print("Not implemented")

    def upload_result(self, path):
        log.info(
            f"[ASYNC] Uploading result for the submission {self.submission.id} with {self.params}")
        self.storage.results.create(entity_id=self.submission.id, **self.params)
        Path(path).unlink()
        self.reset_task_id(SubmissionState.FINISHED)

    def process_submission(self):
        log.info(f"[ASYNC] Processing submission: {self.submission}")
        self.download_submission()
        self.dispatch_submission_processing()