Unverified Commit b09f77c9 authored by Peter Stanko's avatar Peter Stanko
Browse files

Submission processor support

parent 688649bf
import logging
from portal.async_celery import celery_app
from portal.async_celery.helpers import reset_task_id
from portal.database.models import Submission, SubmissionState
log = logging.getLogger(__name__)
def submission_enqueue_ended(submission: Submission):
# TODO: TBD
log.info(f"[ASYNC] Submission enqueue ended: {submission}")
reset_task_id(submission=submission, task_id_type='process_task_id',
state=SubmissionState.QUEUED)
def revoke_enqueue_task(submission: Submission):
log.info(f'[ASYNC] Submission storage upload cancelled {submission}')
task_id = submission.process_task_id
if task_id:
celery_app.control.revoke(task_id=task_id, terminate=True)
reset_task_id(submission=submission, task_id_type='process_task_id',
state=SubmissionState.CANCELLED)
from portal.database.models import Submission
from portal.service.general import write_entity
def reset_task_id(submission: Submission, task_id_type: str, state=None):
if state is not None:
submission.state = state
setattr(submission, task_id_type, None)
write_entity(submission)
import logging
from portal.async_celery import celery_app
from portal.async_celery.helpers import reset_task_id
from portal.database.models import Submission, SubmissionState
log = logging.getLogger(__name__)
def submission_store_ended(submission: Submission, version: str):
# TODO: TBD
log.info(f"[ASYNC] Submission preparation ended: {submission}")
submission.source_hash = version
reset_task_id(submission=submission, task_id_type='storage_task_id',
state=SubmissionState.READY)
def revoke_storage_task(submission: Submission):
log.info(f'[ASYNC] Submission storage upload cancelled {submission}')
task_id = submission.storage_task_id
if task_id:
celery_app.control.revoke(task_id=task_id, terminate=True)
reset_task_id(submission=submission, task_id_type='storage_task_id',
state=SubmissionState.CANCELLED)
# TODO: Dispatch cleanup task
import logging
from pathlib import Path
from storage import UploadedEntity
from portal.database import Project, Submission, SubmissionState
from portal.service import general
log = logging.getLogger(__name__)
class SubmissionProcessor:
def __init__(self, submission: Submission, params: dict = None):
self._submission = submission
self._params = params
@property
def submission(self) -> Submission:
return self._submission
@property
def params(self) -> dict:
return self._params
@property
def project(self) -> Project:
return self.submission.project
@property
def celery(self):
from portal.async_celery import celery_app
return celery_app
@property
def storage(self):
from portal import storage
return storage
def reset_task_id(self, state=None):
if state is not None:
self.submission.state = state
self.submission.async_task_id = None
general.write_entity(self.submission)
def submission_enqueue_ended(self):
log.info(f"[ASYNC] Submission enqueue ended: {self.submission}")
self.reset_task_id(state=SubmissionState.QUEUED)
def revoke_enqueue_task(self):
log.info(f'[ASYNC] Submission storage upload cancelled {self.submission}')
task_id = self.submission.process_task_id
if task_id:
self.celery.control.revoke(task_id=task_id, terminate=True)
self.reset_task_id(state=SubmissionState.CANCELLED)
def get_delay_for_submission(self):
log.debug(f"[ASYNC] Submission delay: {self.submission}")
time_wait = self.project.config.submissions_cancellation_period
return time_wait
def dispatch_submission_processing(self):
delay = self.get_delay_for_submission()
args = (self.submission.id, self.params)
from .tasks import start_processing_submission
start_processing_submission.apply_async(args=args, countdown=delay)
def submission_store_ended(self, version: str):
log.info(f"[ASYNC] Submission preparation ended: {self.submission}")
self.submission.source_hash = version
self.reset_task_id(state=SubmissionState.READY)
def revoke_storage_task(self):
log.info(f'[ASYNC] Submission storage upload cancelled {self.submission}')
task_id = self.submission.storage_task_id
if task_id:
self.celery.control.revoke(task_id=task_id, terminate=True)
self.reset_task_id(state=SubmissionState.CANCELLED)
# TODO: Dispatch cleanup task
def download_submission(self):
log.info(f"[ASYNC] Uploading submission: {self.submission} with {self.params}")
updated_entity: UploadedEntity = self.storage. \
submissions.create(entity_id=self.submission.id, **self.params)
self.submission_store_ended(version=updated_entity.version)
def clone(self, target):
log.info(f"[ASYNC] Cloning submission: {self.submission} to {target}")
self.storage.submissions.clone(self.submission.id, target.id)
self.submission_store_ended(version=self.submission.source_hash)
def send_to_worker(self):
# TODO: implement processing
log.info(f"[ASYNC] Sending submission to worker: {self.submission}")
if self.params is not None:
print("Not implemented")
def upload_result(self, path):
log.info(
f"[ASYNC] Uploading result for the submission {self.submission.id} with {self.params}")
self.storage.results.create(entity_id=self.submission.id, **self.params)
Path(path).unlink()
self.reset_task_id(SubmissionState.FINISHED)
def process_submission(self):
log.info(f"[ASYNC] Processing submission: {self.submission}")
self.download_submission()
self.dispatch_submission_processing()
from pathlib import Path
from celery.utils.log import get_task_logger
from storage import UploadedEntity
from portal import storage
from portal.async_celery import celery_app
from portal.async_celery.storage import submission_store_ended
from portal.database import SubmissionState
from portal.service import general
from portal.service.general import find_submission, find_project, find_course, write_entity
from portal.async_celery import celery_app, submission_processor
from portal.service.general import find_course, find_project, find_submission, write_entity
log = get_task_logger(__name__)
@celery_app.task(name='upload-submission-to-storage')
def upload_submission_to_storage(new_submission_id: str, file_params: dict):
def process_submission(new_submission_id: str, file_params: dict):
new_submission = find_submission(new_submission_id)
log.info(
f"[ASYNC] Uploading submission: {new_submission} with {file_params}")
updated_entity: UploadedEntity = storage.submissions.create(entity_id=new_submission.id, **file_params)
# TODO: Upload ended -> change state
submission_store_ended(submission=new_submission, version=updated_entity.version)
processor = submission_processor.SubmissionProcessor(new_submission, file_params)
processor.process_submission()
@celery_app.task(name='upload-results-to-storage')
......@@ -28,30 +20,23 @@ def upload_results_to_storage(new_submission_id: str, path: str):
path = str(path)
new_submission = find_submission(new_submission_id)
file_params = dict(source=dict(url=path, type='zip'))
log.info(
f"[ASYNC] Uploading result for the submission {new_submission.id} with {file_params}")
entity = storage.results.create(entity_id=new_submission.id, **file_params)
Path(path).unlink()
new_submission.state = SubmissionState.FINISHED
general.write_entity(new_submission)
processor = submission_processor.SubmissionProcessor(new_submission, file_params)
processor.upload_result(path=path)
@celery_app.task(name='clone-submission-files')
def clone_submission_files(source_id: str, target_id: str):
source = find_submission(source_id)
target = find_submission(target_id)
log.info(f"[ASYNC] Cloning submission: {source} to {target}")
storage.submissions.clone(source.id, target.id)
submission_store_ended(target, version=source.source_hash)
processor = submission_processor.SubmissionProcessor(source)
processor.clone(target)
@celery_app.task(name='start-processing-submission')
def start_processing_submission(submission_id: str, submission_params):
submission = find_submission(submission_id)
log.info(f"[ASYNC] Processing submission: {submission}")
# TODO: implement processing
if submission_params is not None:
print("Not implemented")
processor = submission_processor.SubmissionProcessor(submission, submission_params)
processor.send_to_worker()
@celery_app.task(name='update-project-test-files')
......@@ -67,5 +52,4 @@ def update_project_test_files(course_id: str, project_id: str):
}
updated_entity: UploadedEntity = storage.test_files.update(entity_id=project.id, **params)
project.config.test_files_commit_hash = updated_entity.version
write_entity(project)
......@@ -143,7 +143,7 @@ class Secret(db.Model):
client = db.relationship(
"Client", back_populates="secrets", uselist=False)
def __init__(self, name: str, value: str=None, expires_at=None):
def __init__(self, name: str, value: str = None, expires_at=None):
# TODO: check date/timestamp
self.name = name
self.value = generate_password_hash(value)
......@@ -844,8 +844,7 @@ class Submission(db.Model, EntityBase):
review = db.relationship("Review", back_populates="submission", cascade="all, delete-orphan",
passive_deletes=True, uselist=False)
storage_task_id = db.Column(db.String(length=36))
process_task_id = db.Column(db.String(length=36))
async_task_id = db.Column(db.String(length=36))
def change_state(self, new_state):
# open to extension (state transition validation, ...)
......@@ -1016,17 +1015,20 @@ def receive_set(**kw):
users_groups = db.Table("users_groups", db.Model.metadata,
db.Column("user_id", db.String(36),
db.ForeignKey('user.id', ondelete='cascade')),
db.Column("group_id", db.String(36), db.ForeignKey('group.id', ondelete='cascade')))
db.Column("group_id", db.String(36),
db.ForeignKey('group.id', ondelete='cascade')))
clients_roles = db.Table("clients_roles", db.Model.metadata,
db.Column("client_id", db.String(36),
db.ForeignKey('client.id', ondelete='cascade')),
db.Column("role_id", db.String(36), db.ForeignKey('role.id', ondelete='cascade')))
db.Column("role_id", db.String(36),
db.ForeignKey('role.id', ondelete='cascade')))
projects_groups = db.Table("projects_groups", db.Model.metadata,
db.Column("project_id", db.String(36),
db.ForeignKey('project.id', ondelete='cascade')),
db.Column("group_id", db.String(36), db.ForeignKey('group.id', ondelete='cascade')))
db.Column("group_id", db.String(36),
db.ForeignKey('group.id', ondelete='cascade')))
Client.roles = db.relationship("Role", secondary="clients_roles")
User.groups = db.relationship("Group", secondary="users_groups")
......
......@@ -6,8 +6,8 @@ import logging
from pathlib import Path
from typing import Union
from celery.result import AsyncResult
import flask
from celery.result import AsyncResult
from flask import request
from storage import entities
from werkzeug.utils import secure_filename
......@@ -15,9 +15,8 @@ from werkzeug.utils import secure_filename
from portal import storage
from portal.async_celery import tasks
from portal.async_celery.enqueue import revoke_enqueue_task
from portal.async_celery.storage import revoke_storage_task
from portal.async_celery.tasks import clone_submission_files, upload_submission_to_storage
from portal.database.models import Worker, Project, Submission, SubmissionState, User
from portal.async_celery.tasks import clone_submission_files, process_submission
from portal.database.models import Project, Submission, SubmissionState, User, Worker
from portal.service import errors
from portal.service.errors import ForbiddenError
from portal.service.general import delete_entity, write_entity
......@@ -47,31 +46,23 @@ def create_submission(user: User, project: Project, file_params: dict,
write_entity(new_submission)
# download relevant files into Storage
save_submission_to_storage(new_submission, file_params)
enqueue_submission(new_submission, submission_params=file_params)
process_new_submission(new_submission, file_params)
return new_submission
def save_submission_to_storage(new_submission: Submission, file_params: dict) -> AsyncResult:
def process_new_submission(new_submission: Submission, file_params: dict) -> AsyncResult:
project = new_submission.project
file_params['whitelist'] = project.config.file_whitelist
source = file_params['source']
if source['type'] == 'zip':
source['path'] = "todo" # TODO: download zip
task = create_save_task(file_params, new_submission)
task = create_process_submission_task(file_params, new_submission)
return task
def enqueue_submission(new_submission: Submission, submission_params: dict):
project = new_submission.project
time_wait = project.config.submissions_cancellation_period
result: AsyncResult = tasks.start_processing_submission \
.delay(new_submission.id, submission_params)
def create_save_task(file_params, new_submission) -> AsyncResult:
result: AsyncResult = upload_submission_to_storage.delay(
def create_process_submission_task(file_params, new_submission) -> AsyncResult:
result: AsyncResult = process_submission.delay(
new_submission.id, file_params)
new_submission.storage_task_id = result.task_id
return result
......@@ -181,7 +172,7 @@ def upload_file_is_allowed(file):
raise errors.PortalAPIError(
400,
f"File with extension \"{extension}\" is not allowed: #{file.filename}"
)
)
def upload_files_to_storage(file):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment