Loading management/data/__init__.py +7 −4 Original line number Diff line number Diff line Loading @@ -187,15 +187,18 @@ class DataManagement: def cancel_all_submissions(self): with self.app.app_context(): for submission in Submission.query.all(): submission.change_state(SubmissionState.CANCELLED) self.services.submissions.write_entity(submission) self._cancel_one(submission) self.db.session.commit() def _cancel_one(self, submission): message = "Cancelling submission using command line" self.services.submissions(submission).set_state(SubmissionState.CANCELLED, message=message) def cancel_submission(self, sid): with self.app.app_context(): submission = self.services.find.submission(sid) submission.change_state(SubmissionState.CANCELLED) self.services.submissions.write_entity(submission) self._cancel_one(submission) self.db.session.commit() def clean_all_submissions(self): Loading portal/async_celery/submission_processor.py +11 −10 Original line number Diff line number Diff line Loading @@ -40,16 +40,16 @@ class SubmissionProcessor: from portal import storage_wrapper return storage_wrapper def reset_task_id(self, state=None): def reset_task_id(self, state=None, message: str = None): if state is not None: self.submission.state = state self._services.submissions.set_state(state, message=message) self.submission.async_task_id = None self._save_submission() def submission_enqueue_ended(self): log.info(f"[ASYNC] Submission enqueue ended {self.submission.log_name}: {self.submission}") self.reset_task_id(state=SubmissionState.QUEUED) self.reset_task_id(state=SubmissionState.QUEUED, message="Submission queue ended.") def get_delay_for_submission(self): log.info(f"[ASYNC] Submission delay {self.submission.log_name}: {self.submission}") Loading @@ -68,7 +68,7 @@ class SubmissionProcessor: log.info(f"[ASYNC] Submission preparation ended {self.submission.log_name}: " f"{self.submission}") self.submission.source_hash = version self.reset_task_id(state=SubmissionState.READY) self.reset_task_id(state=SubmissionState.READY, message="Submission preparation ended") def download_submission(self): file_params = self.params['file_params'] Loading Loading @@ -99,7 +99,7 @@ class SubmissionProcessor: f"{self.submission.log_name} with {file_params}") self.storage.results.create(dirname=self.submission.storage_dirname, **file_params) Path(path).unlink() self.reset_task_id(SubmissionState.FINISHED) self.reset_task_id(SubmissionState.FINISHED, message="Upload result ended") def process_submission(self): log.info(f"[ASYNC] Processing submission {self.submission.log_name}") Loading @@ -116,7 +116,7 @@ class SubmissionProcessor: task_id = self.submission.async_task_id if task_id: self.celery.control.revoke(task_id=task_id, terminate=True) self.reset_task_id(state=SubmissionState.CANCELLED) self.reset_task_id(state=SubmissionState.CANCELLED, message="Submission cancelled") # TODO: Storage clean up def _get_avail_workers(self): Loading @@ -143,7 +143,9 @@ class SubmissionProcessor: def execute_submission(self, worker: Worker): worker_client = self._services.workers(worker).worker_client self.submission.change_state(SubmissionState.IN_PROGRESS) message = "Executing submission using worker" self._services.submissions(self.submission).set_state(SubmissionState.IN_PROGRESS, message=message) self._save_submission() worker_client.execute_submission(self.submission) Loading Loading @@ -173,9 +175,8 @@ class SubmissionProcessor: return self.submission def abort_submission(self, message: str = 'Unknown error!'): self.submission.note['error'] = message self._save_submission() self.reset_task_id(SubmissionState.ABORTED) self._services.submissions(self.submission).set_state(SubmissionState.ABORTED, message=message) def _save_submission(self): self._services.submissions.write_entity(self.submission) portal/database/models.py +15 −13 Original line number Diff line number Diff line Loading @@ -862,31 +862,33 @@ class Submission(db.Model, EntityBase): created = time.simple_fmt(self.created_at) return f"{self.user.username}_{created}_{self.course.codename}_{self.project.codename}" def change_state(self, new_state): def change_state(self, new_state, message: str = None): # open to extension (state transition validation, ...) if new_state in Submission.ALLOWED_TRANSITIONS.keys(): allow = Submission.ALLOWED_TRANSITIONS[new_state] if self.state not in allow: return False self.state = new_state self.make_change(dict(state=new_state, message=message)) return True @property def message(self): if self.note and isinstance(self.note, dict): return self.note.get('message') if self.note and isinstance(self.note, str): return self.note def make_change(self, change): if self.note is None: self.note = {} if 'changes' in self.note: self.note['changes'].append(change) else: return None self.note['changes'] = [] @message.setter def message(self, value): self.note['message'] = value @property def changes(self) -> List: if self.note and 'changes' in self.note: return self.note['changes'] else: return [] def abort(self, message): self.state = SubmissionState.ABORTED self.message = message self.change_state(SubmissionState.ABORTED, message=message) @property def is_cancelled(self) -> bool: Loading portal/service/submissions.py +15 −8 Original line number Diff line number Diff line Loading @@ -5,13 +5,14 @@ import datetime import json import logging from pathlib import Path from typing import Union from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename from portal import storage_wrapper from portal.async_celery.submission_processor import SubmissionProcessor from portal.database import queries from portal.database import queries, SubmissionState from portal.database.models import Project, Submission, User from portal.service import errors from portal.service.general import GeneralService Loading Loading @@ -140,13 +141,8 @@ class SubmissionsService(GeneralService): Returns(Submission): Updated submission """ new_state = data['state'] if self.submission.change_state(new_state): self.write_entity(self.submission) log.info(f"[UPDATE] Submission state {self.submission.log_name} " f"to {self.submission.state}") else: log.warning("[UPDATE] Submission state change is not possible") return self.submission return self.set_state(state=new_state, message='Manual state update') def cancel_submission(self): """Cancels the submission Loading Loading @@ -218,6 +214,17 @@ class SubmissionsService(GeneralService): if st_entity: st_entity.clean() def set_state(self, state: Union[str, SubmissionState], message: str = None): if isinstance(state, str): state = SubmissionState[state] if self.submission.change_state(state, message=message): self.write_entity(self.submission) log.info(f"[UPDATE] Submission state {self.submission.log_name} " f"to {self.submission.state}") else: log.warning("[UPDATE] Submission state change is not possible") return self.submission def nonempty_intersection(provided: list, required: list): if not required: Loading Loading
management/data/__init__.py +7 −4 Original line number Diff line number Diff line Loading @@ -187,15 +187,18 @@ class DataManagement: def cancel_all_submissions(self): with self.app.app_context(): for submission in Submission.query.all(): submission.change_state(SubmissionState.CANCELLED) self.services.submissions.write_entity(submission) self._cancel_one(submission) self.db.session.commit() def _cancel_one(self, submission): message = "Cancelling submission using command line" self.services.submissions(submission).set_state(SubmissionState.CANCELLED, message=message) def cancel_submission(self, sid): with self.app.app_context(): submission = self.services.find.submission(sid) submission.change_state(SubmissionState.CANCELLED) self.services.submissions.write_entity(submission) self._cancel_one(submission) self.db.session.commit() def clean_all_submissions(self): Loading
portal/async_celery/submission_processor.py +11 −10 Original line number Diff line number Diff line Loading @@ -40,16 +40,16 @@ class SubmissionProcessor: from portal import storage_wrapper return storage_wrapper def reset_task_id(self, state=None): def reset_task_id(self, state=None, message: str = None): if state is not None: self.submission.state = state self._services.submissions.set_state(state, message=message) self.submission.async_task_id = None self._save_submission() def submission_enqueue_ended(self): log.info(f"[ASYNC] Submission enqueue ended {self.submission.log_name}: {self.submission}") self.reset_task_id(state=SubmissionState.QUEUED) self.reset_task_id(state=SubmissionState.QUEUED, message="Submission queue ended.") def get_delay_for_submission(self): log.info(f"[ASYNC] Submission delay {self.submission.log_name}: {self.submission}") Loading @@ -68,7 +68,7 @@ class SubmissionProcessor: log.info(f"[ASYNC] Submission preparation ended {self.submission.log_name}: " f"{self.submission}") self.submission.source_hash = version self.reset_task_id(state=SubmissionState.READY) self.reset_task_id(state=SubmissionState.READY, message="Submission preparation ended") def download_submission(self): file_params = self.params['file_params'] Loading Loading @@ -99,7 +99,7 @@ class SubmissionProcessor: f"{self.submission.log_name} with {file_params}") self.storage.results.create(dirname=self.submission.storage_dirname, **file_params) Path(path).unlink() self.reset_task_id(SubmissionState.FINISHED) self.reset_task_id(SubmissionState.FINISHED, message="Upload result ended") def process_submission(self): log.info(f"[ASYNC] Processing submission {self.submission.log_name}") Loading @@ -116,7 +116,7 @@ class SubmissionProcessor: task_id = self.submission.async_task_id if task_id: self.celery.control.revoke(task_id=task_id, terminate=True) self.reset_task_id(state=SubmissionState.CANCELLED) self.reset_task_id(state=SubmissionState.CANCELLED, message="Submission cancelled") # TODO: Storage clean up def _get_avail_workers(self): Loading @@ -143,7 +143,9 @@ class SubmissionProcessor: def execute_submission(self, worker: Worker): worker_client = self._services.workers(worker).worker_client self.submission.change_state(SubmissionState.IN_PROGRESS) message = "Executing submission using worker" self._services.submissions(self.submission).set_state(SubmissionState.IN_PROGRESS, message=message) self._save_submission() worker_client.execute_submission(self.submission) Loading Loading @@ -173,9 +175,8 @@ class SubmissionProcessor: return self.submission def abort_submission(self, message: str = 'Unknown error!'): self.submission.note['error'] = message self._save_submission() self.reset_task_id(SubmissionState.ABORTED) self._services.submissions(self.submission).set_state(SubmissionState.ABORTED, message=message) def _save_submission(self): self._services.submissions.write_entity(self.submission)
portal/database/models.py +15 −13 Original line number Diff line number Diff line Loading @@ -862,31 +862,33 @@ class Submission(db.Model, EntityBase): created = time.simple_fmt(self.created_at) return f"{self.user.username}_{created}_{self.course.codename}_{self.project.codename}" def change_state(self, new_state): def change_state(self, new_state, message: str = None): # open to extension (state transition validation, ...) if new_state in Submission.ALLOWED_TRANSITIONS.keys(): allow = Submission.ALLOWED_TRANSITIONS[new_state] if self.state not in allow: return False self.state = new_state self.make_change(dict(state=new_state, message=message)) return True @property def message(self): if self.note and isinstance(self.note, dict): return self.note.get('message') if self.note and isinstance(self.note, str): return self.note def make_change(self, change): if self.note is None: self.note = {} if 'changes' in self.note: self.note['changes'].append(change) else: return None self.note['changes'] = [] @message.setter def message(self, value): self.note['message'] = value @property def changes(self) -> List: if self.note and 'changes' in self.note: return self.note['changes'] else: return [] def abort(self, message): self.state = SubmissionState.ABORTED self.message = message self.change_state(SubmissionState.ABORTED, message=message) @property def is_cancelled(self) -> bool: Loading
portal/service/submissions.py +15 −8 Original line number Diff line number Diff line Loading @@ -5,13 +5,14 @@ import datetime import json import logging from pathlib import Path from typing import Union from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename from portal import storage_wrapper from portal.async_celery.submission_processor import SubmissionProcessor from portal.database import queries from portal.database import queries, SubmissionState from portal.database.models import Project, Submission, User from portal.service import errors from portal.service.general import GeneralService Loading Loading @@ -140,13 +141,8 @@ class SubmissionsService(GeneralService): Returns(Submission): Updated submission """ new_state = data['state'] if self.submission.change_state(new_state): self.write_entity(self.submission) log.info(f"[UPDATE] Submission state {self.submission.log_name} " f"to {self.submission.state}") else: log.warning("[UPDATE] Submission state change is not possible") return self.submission return self.set_state(state=new_state, message='Manual state update') def cancel_submission(self): """Cancels the submission Loading Loading @@ -218,6 +214,17 @@ class SubmissionsService(GeneralService): if st_entity: st_entity.clean() def set_state(self, state: Union[str, SubmissionState], message: str = None): if isinstance(state, str): state = SubmissionState[state] if self.submission.change_state(state, message=message): self.write_entity(self.submission) log.info(f"[UPDATE] Submission state {self.submission.log_name} " f"to {self.submission.state}") else: log.warning("[UPDATE] Submission state change is not possible") return self.submission def nonempty_intersection(provided: list, required: list): if not required: Loading