Loading portal/async_celery/__init__.py +2 −0 Original line number Diff line number Diff line Loading @@ -25,3 +25,5 @@ def init_app(app: Flask) -> Flask: celery_app.Task = ContextTask return app import portal.async_celery.periodic_tasks No newline at end of file portal/async_celery/async_manager.py 0 → 100644 +43 −0 Original line number Diff line number Diff line from portal.database import Submission, SubmissionState from portal.service.rest import RestService from portal.tools import time class AsyncManager: def __init__(self): self._rest = RestService() @property def rest(self) -> RestService: return self._rest def abort_submissions_sched_exceeded(self, limit): self.abort_submission_any_exceeded(limit=limit, state=SubmissionState.READY, param='created_at', msg_add='Schedule phase') def abort_submission_checkout_exceeded(self, limit): self.abort_submission_any_exceeded(limit=limit, state=SubmissionState.CREATED, param='scheduled_for', msg_add='Checkout phase') def abort_submission_proc_exceeded(self, limit): self.abort_submission_any_exceeded(limit=limit, state=SubmissionState.IN_PROGRESS, param='scheduled_for', msg_add='Process phase') def _proc_expired(self, submission: Submission, limit, param='scheduled_for'): req_time = getattr(submission, param) schedule_time = time.normalize_time(req_time) curr = time.current_time() exp_time = limit + schedule_time return exp_time <= curr def _abort_submission(self, sub: Submission, message): sub.abort(message=message) self.rest.submissions.write_entity(sub) def abort_submission_any_exceeded(self, limit, state, param, msg_add: str, msg=None): msg = msg or "Submission has not been processed in time limit: " msg += msg_add submissions = Submission.query.filter(Submission.state == state).all() submissions = [sub for sub in submissions if self._proc_expired(sub, limit, param=param)] for sub in submissions: self._abort_submission(sub, msg) portal/async_celery/periodic_tasks.py 0 → 100644 +21 −0 Original line number Diff line number Diff line import datetime from portal.async_celery import celery_app @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(30*60, abort_non_proc_submissions.s( datetime.timedelta(hours=2) ), name='Abort non processed submissions after 2 hours') @celery_app.task def abort_non_proc_submissions(limit): from portal.async_celery.async_manager import AsyncManager mgr = AsyncManager() mgr.abort_submissions_sched_exceeded(limit) mgr.abort_submission_proc_exceeded(limit) mgr.abort_submission_checkout_exceeded(limit) portal/async_celery/tasks.py +1 −1 Original line number Diff line number Diff line Loading @@ -77,7 +77,7 @@ def update_project_test_files(course_id: str, project_id: str): try: _update_test_files(rest_service, project, params) except Exception as ex: log.error(f"[ASYNC] Cannot update source files {ex}") log.error(f"[ASYNC] Cannot update source files {project.log_name}: {ex}") def _update_test_files(rest_service, project, params): Loading portal/database/models.py +18 −0 Original line number Diff line number Diff line Loading @@ -902,6 +902,23 @@ class Submission(db.Model, EntityBase): # open to extension (state transition validation, ...) self.state = new_state @property def message(self): if self.note and isinstance(self.note, dict): return self.note.get('message') if self.note and isinstance(self.note, str): return self.note else: return None @message.setter def message(self, value): self.note['message'] = value def abort(self, message): self.state = SubmissionState.ABORTED self.message = message @property def is_cancelled(self) -> bool: return self.state in [SubmissionState.CANCELLED, SubmissionState.ABORTED] Loading Loading @@ -929,6 +946,7 @@ class Submission(db.Model, EntityBase): self.review = review self.parameters = parameters or {} self.state = SubmissionState.CREATED self.note = {} def __repr__(self): return _repr(self) Loading Loading
portal/async_celery/__init__.py +2 −0 Original line number Diff line number Diff line Loading @@ -25,3 +25,5 @@ def init_app(app: Flask) -> Flask: celery_app.Task = ContextTask return app import portal.async_celery.periodic_tasks No newline at end of file
portal/async_celery/async_manager.py 0 → 100644 +43 −0 Original line number Diff line number Diff line from portal.database import Submission, SubmissionState from portal.service.rest import RestService from portal.tools import time class AsyncManager: def __init__(self): self._rest = RestService() @property def rest(self) -> RestService: return self._rest def abort_submissions_sched_exceeded(self, limit): self.abort_submission_any_exceeded(limit=limit, state=SubmissionState.READY, param='created_at', msg_add='Schedule phase') def abort_submission_checkout_exceeded(self, limit): self.abort_submission_any_exceeded(limit=limit, state=SubmissionState.CREATED, param='scheduled_for', msg_add='Checkout phase') def abort_submission_proc_exceeded(self, limit): self.abort_submission_any_exceeded(limit=limit, state=SubmissionState.IN_PROGRESS, param='scheduled_for', msg_add='Process phase') def _proc_expired(self, submission: Submission, limit, param='scheduled_for'): req_time = getattr(submission, param) schedule_time = time.normalize_time(req_time) curr = time.current_time() exp_time = limit + schedule_time return exp_time <= curr def _abort_submission(self, sub: Submission, message): sub.abort(message=message) self.rest.submissions.write_entity(sub) def abort_submission_any_exceeded(self, limit, state, param, msg_add: str, msg=None): msg = msg or "Submission has not been processed in time limit: " msg += msg_add submissions = Submission.query.filter(Submission.state == state).all() submissions = [sub for sub in submissions if self._proc_expired(sub, limit, param=param)] for sub in submissions: self._abort_submission(sub, msg)
portal/async_celery/periodic_tasks.py 0 → 100644 +21 −0 Original line number Diff line number Diff line import datetime from portal.async_celery import celery_app @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(30*60, abort_non_proc_submissions.s( datetime.timedelta(hours=2) ), name='Abort non processed submissions after 2 hours') @celery_app.task def abort_non_proc_submissions(limit): from portal.async_celery.async_manager import AsyncManager mgr = AsyncManager() mgr.abort_submissions_sched_exceeded(limit) mgr.abort_submission_proc_exceeded(limit) mgr.abort_submission_checkout_exceeded(limit)
portal/async_celery/tasks.py +1 −1 Original line number Diff line number Diff line Loading @@ -77,7 +77,7 @@ def update_project_test_files(course_id: str, project_id: str): try: _update_test_files(rest_service, project, params) except Exception as ex: log.error(f"[ASYNC] Cannot update source files {ex}") log.error(f"[ASYNC] Cannot update source files {project.log_name}: {ex}") def _update_test_files(rest_service, project, params): Loading
portal/database/models.py +18 −0 Original line number Diff line number Diff line Loading @@ -902,6 +902,23 @@ class Submission(db.Model, EntityBase): # open to extension (state transition validation, ...) self.state = new_state @property def message(self): if self.note and isinstance(self.note, dict): return self.note.get('message') if self.note and isinstance(self.note, str): return self.note else: return None @message.setter def message(self, value): self.note['message'] = value def abort(self, message): self.state = SubmissionState.ABORTED self.message = message @property def is_cancelled(self) -> bool: return self.state in [SubmissionState.CANCELLED, SubmissionState.ABORTED] Loading Loading @@ -929,6 +946,7 @@ class Submission(db.Model, EntityBase): self.review = review self.parameters = parameters or {} self.state = SubmissionState.CREATED self.note = {} def __repr__(self): return _repr(self) Loading