Unverified Commit bce9eba4 authored by Peter Stanko's avatar Peter Stanko
Browse files

Submissions Processor with submission service

parent b09f77c9
......@@ -47,13 +47,6 @@ class SubmissionProcessor:
log.info(f"[ASYNC] Submission enqueue ended: {self.submission}")
self.reset_task_id(state=SubmissionState.QUEUED)
def revoke_enqueue_task(self):
log.info(f'[ASYNC] Submission storage upload cancelled {self.submission}')
task_id = self.submission.process_task_id
if task_id:
self.celery.control.revoke(task_id=task_id, terminate=True)
self.reset_task_id(state=SubmissionState.CANCELLED)
def get_delay_for_submission(self):
log.debug(f"[ASYNC] Submission delay: {self.submission}")
time_wait = self.project.config.submissions_cancellation_period
......@@ -70,14 +63,6 @@ class SubmissionProcessor:
self.submission.source_hash = version
self.reset_task_id(state=SubmissionState.READY)
def revoke_storage_task(self):
log.info(f'[ASYNC] Submission storage upload cancelled {self.submission}')
task_id = self.submission.storage_task_id
if task_id:
self.celery.control.revoke(task_id=task_id, terminate=True)
self.reset_task_id(state=SubmissionState.CANCELLED)
# TODO: Dispatch cleanup task
def download_submission(self):
log.info(f"[ASYNC] Uploading submission: {self.submission} with {self.params}")
updated_entity: UploadedEntity = self.storage. \
......@@ -106,3 +91,11 @@ class SubmissionProcessor:
log.info(f"[ASYNC] Processing submission: {self.submission}")
self.download_submission()
self.dispatch_submission_processing()
def revoke_task(self):
log.info(f'[ASYNC] Submission processing cancelled {self.submission}')
task_id = self.submission.async_task_id
if task_id:
self.celery.control.revoke(task_id=task_id, terminate=True)
self.reset_task_id(state=SubmissionState.CANCELLED)
# TODO: Storage clean up
......@@ -13,7 +13,7 @@ from portal.service.errors import ForbiddenError, SubmissionRefusedError
from portal.service.permissions import check_client, require_client
from portal.service.projects import can_create_submission, create_project, delete_project, \
find_project_submissions, list_projects, update_project, update_project_config, update_project_test_files_hash
from portal.service.submissions import create_submission
from portal.service.submissions import SubmissionsService
from portal.tools import time
projects_namespace = Namespace('') # pylint: disable=invalid-name
......@@ -199,8 +199,8 @@ class ProjectSubmissions(Resource):
# data for Kontr processing
project_params = data.get('project_params')
project_params_string = json.dumps(project_params) or ""
new_submission = create_submission(
service = SubmissionsService()
new_submission = service.create_submission(
user=client,
project=project,
file_params=data['file_params'],
......
......@@ -11,9 +11,7 @@ from portal.service import auth, general, permissions
from portal.service.general import find_client_owner
from portal.service.permissions import check_client, require_client, require_sysadmin
from portal.service.reviews import create_review, create_review_items
from portal.service.submissions import copy_submission, delete_submission, \
update_submission_state, cancel_submission, send_file_or_zip, upload_results_to_storage, \
send_files_tree
from portal.service.submissions import SubmissionsService
submissions_namespace = Namespace('submissions')
......@@ -43,7 +41,9 @@ class SubmissionResource(Resource):
checks = [
check_client(client, course, ['read_submissions_all']),
submission_access_group(client, submission, course, ['read_submissions_groups']),
(check_client(client, course, ['read_submissions_own']) and submission.user == find_client_owner(client))
(check_client(client, course,
['read_submissions_own']) and submission.user == find_client_owner(
client))
]
permissions.require_any_check(client, checks=checks)
......@@ -55,7 +55,7 @@ class SubmissionResource(Resource):
client = auth.find_client()
permissions.require_sysadmin(client)
submission = general.find_submission(sid)
delete_submission(submission)
SubmissionsService(submission=submission).delete_submission()
return '', 204
......@@ -73,7 +73,9 @@ class SubmissionState(Resource):
checks = [
permissions.check_client(client, course, ['read_submissions_all']),
submission_access_group(client, submission, course, ['read_submissions_groups']),
(check_client(client, course, ['read_submissions_own']) and submission.user == find_client_owner(client))
(check_client(client, course,
['read_submissions_own']) and submission.user == find_client_owner(
client))
]
permissions.require_any_check(client, checks=checks)
......@@ -95,7 +97,8 @@ class SubmissionState(Resource):
data = rest_helpers.parse_request_data(
submission_state_schema, action='update_state', resource='submission'
)
update_submission_state(client, submission, data)
service = SubmissionsService(submission=submission)
service.update_submission_state(client=client, data=data)
return '', 204
......@@ -130,11 +133,13 @@ class SubmissionSourcesTree(Resource):
checks = [
check_client(client, course, ['read_submissions_all']),
submission_access_group(client, submission, course, ['read_submissions_groups']),
(check_client(client, course, ['read_submissions_own']) and submission.user == find_client_owner(client))
(check_client(client, course,
['read_submissions_own']) and submission.user == find_client_owner(
client))
]
permissions.require_any_check(client, checks=checks)
storage_submission = storage.submissions.get(submission.id)
return send_files_tree(storage_submission)
service = SubmissionsService(submission=submission)
return service.send_files_tree()
@submissions_namespace.route('/<string:sid>/files/sources')
......@@ -150,11 +155,13 @@ class SubmissionSourceFiles(Resource):
checks = [
check_client(client, course, ['read_submissions_all']),
submission_access_group(client, submission, course, ['read_submissions_groups']),
(check_client(client, course, ['read_submissions_own']) and submission.user == find_client_owner(client))
(check_client(client, course,
['read_submissions_own']) and submission.user == find_client_owner(
client))
]
permissions.require_any_check(client, checks=checks)
storage_submission = storage.submissions.get(submission.id)
return send_file_or_zip(storage_submission)
service = SubmissionsService(submission=submission)
return service.send_file_or_zip()
@submissions_namespace.route('/<string:sid>/files/test_files/tree')
......@@ -173,7 +180,8 @@ class SubmissionTestFilesTree(Resource):
]
permissions.require_any_check(client, checks=checks)
storage_entity = storage.test_files.get(submission.project.id)
return send_files_tree(storage_entity)
service = SubmissionsService(submission=submission)
return service.send_files_tree(storage_entity)
@submissions_namespace.route('/<string:sid>/files/test_files')
......@@ -192,7 +200,8 @@ class SubmissionTestFiles(Resource):
]
permissions.require_any_check(client, checks=checks)
storage_entity = storage.test_files.get(submission.project.id)
return send_file_or_zip(storage_entity)
service = SubmissionsService(submission=submission)
return service.send_file_or_zip(storage_entity)
@submissions_namespace.route('/<string:sid>/files/results/tree')
......@@ -211,7 +220,8 @@ class SubmissionResultFilesTree(Resource):
]
permissions.require_any_check(client, checks=checks)
storage_entity = storage.results.get(submission.id)
tree = send_files_tree(storage_entity)
service = SubmissionsService(submission=submission)
tree = service.send_files_tree(storage_entity)
return flask.jsonify(tree), 200
......@@ -231,7 +241,8 @@ class SubmissionResultFiles(Resource):
]
permissions.require_any_check(client, checks=checks)
storage_entity = storage.results.get(submission.id)
return send_file_or_zip(storage_entity)
service = SubmissionsService(submission=submission)
return service.send_file_or_zip(storage_entity)
@jwt_required
def post(self, sid: str):
......@@ -240,8 +251,8 @@ class SubmissionResultFiles(Resource):
# authorization
require_sysadmin(client)
# todo: authorize worker
task = upload_results_to_storage(submission)
service = SubmissionsService(submission=submission)
task = service.upload_results_to_storage()
return {'new_task': task.id}
......@@ -264,7 +275,8 @@ class SubmissionResubmit(Resource):
)
# create new submission by copying files from the source submission in
# storage
new_submission = copy_submission(source_submission, note=data['note'])
service = SubmissionsService(submission=source_submission)
new_submission = service.copy_submission(note=data['note'])
return submission_schema.dump(new_submission), 201
......@@ -284,7 +296,8 @@ class SubmissionCancel(Resource):
permissions=['resubmit_submissions'])
# create new submission by copying files from the source submission in
# storage
cancel_submission(submission)
service = SubmissionsService(submission=submission)
service.cancel_submission(submission)
return '', 204
......@@ -302,10 +315,11 @@ class SubmissionReview(Resource):
checks = [
permissions.check_client(client, course, ['read_submissions_all']),
submission_access_group(client, submission, course, ['read_submissions_groups']),
(check_client(client, course, ['read_submissions_own']) and submission.user == find_client_owner(client))
(check_client(client, course,
['read_submissions_own']) and submission.user == find_client_owner(
client))
]
permissions.require_any_check(client, checks=checks)
return review_schema.dump(submission.review)[0]
@jwt_required
......@@ -318,7 +332,8 @@ class SubmissionReview(Resource):
checks = [
permissions.check_client(client, course, ['write_reviews_all']),
submission_access_group(client, submission, course, ['write_reviews_group']),
(check_client(client, course, ['write_reviews_own']) and submission.user == find_client_owner(client))
(check_client(client, course,
['write_reviews_own']) and submission.user == find_client_owner(client))
]
permissions.require_any_check(client, checks=checks)
......
......@@ -13,8 +13,7 @@ from storage import entities
from werkzeug.utils import secure_filename
from portal import storage
from portal.async_celery import tasks
from portal.async_celery.enqueue import revoke_enqueue_task
from portal.async_celery import submission_processor, tasks
from portal.async_celery.tasks import clone_submission_files, process_submission
from portal.database.models import Project, Submission, SubmissionState, User, Worker
from portal.service import errors
......@@ -24,176 +23,176 @@ from portal.service.general import delete_entity, write_entity
log = logging.getLogger(__name__)
def create_submission(user: User, project: Project, file_params: dict,
project_params_string: str) -> Submission:
"""Creates a new submission in the database and downloads its files.
Zip and git sources are processed differently.
Args:
user(User): the submission's author
project(Project): the project the submission falls under
file_params(dict): parameters for Storage - how and where to download files
project_params_string(str): custom parameters specified by
project with their values as a JSON-encoded string
Returns(Submission): the created submission entity
"""
# adding review at submission create is not supported
new_submission = Submission(
user=user, project=project, parameters=project_params_string)
write_entity(new_submission)
# download relevant files into Storage
process_new_submission(new_submission, file_params)
return new_submission
def process_new_submission(new_submission: Submission, file_params: dict) -> AsyncResult:
project = new_submission.project
file_params['whitelist'] = project.config.file_whitelist
source = file_params['source']
if source['type'] == 'zip':
source['path'] = "todo" # TODO: download zip
task = create_process_submission_task(file_params, new_submission)
return task
def create_process_submission_task(file_params, new_submission) -> AsyncResult:
result: AsyncResult = process_submission.delay(
new_submission.id, file_params)
new_submission.storage_task_id = result.task_id
return result
def copy_submission(source: Submission, note: str) -> Submission:
"""Copies a submission. Used at resubmitting
Args:
source(Submission):
note(str): Note for the submission. Typically contains a reason for the resubmit.
Returns(Submission): Copied submission
"""
log.info(f"[COPY] Submission {source.id} to "
f"same project {source.project.id}")
new_submission = Submission(user=source.user, project=source.project,
parameters=source.parameters)
new_submission.note = note
write_entity(new_submission)
clone_submission_files.delay(
source_id=source.id, target_id=new_submission.id)
return new_submission
def delete_submission(submission: Submission):
"""Deletes a submission
Args:
submission(Submission): Submission instance
"""
log.info(f"[DELETE] Submission {submission.id}")
# TODO: dispatch action to clean the storage
delete_entity(submission)
def update_submission_state(client: Union[User, Worker], submission: Submission,
data: dict) -> Submission:
"""Updates submission's state
Args:
client(User): Client attempting the update
submission(Submission): Submission instance to update
data(dict): should contain one key, 'state' with the new value
Returns(Submission): Updated submission
"""
new_state = data['state']
if isinstance(client, User) and new_state != SubmissionState.CANCELLED:
raise ForbiddenError(uid=client.id,
note=f"User {client.id} cannot update "
f"state to other than CANCELLED.")
submission.change_state(new_state)
write_entity(submission)
log.info(f"[UPDATE] Submission state {submission.id} "
f"by {client.id} to {submission.state}")
return submission
def cancel_submission(submission: Submission):
"""Cancels the submission
Args:
submission(Submission): Submission instance
"""
submission.state = SubmissionState.CANCELLED
revoke_enqueue_task(submission=submission)
revoke_storage_task(submission=submission)
write_entity(submission)
def send_zip(storage_submission: entities.Submission):
path = storage_submission.zip_path
if not path.exists():
raise errors.PortalAPIError(400, f"Requested path does not exist: {path}")
log.debug(f"[SEND] Sending zip file for submission ({storage_submission.entity_id}): {path}")
return flask.send_file(str(path), attachment_filename=path.name)
def send_selected_file(storage_submission: entities.Submission, path_query: str):
path_query = Path(path_query)
path = (storage_submission.path / path_query)
path = path.absolute()
if not path.exists():
raise errors.PortalAPIError(400, f"Requested path does not exist: {path}")
log.debug(f"[SEND] Sending file for submission ({storage_submission.entity_id}): {path}")
return flask.send_file(str(path), attachment_filename=path.name)
def send_file_or_zip(storage_entity):
path_query = request.args.get('path')
if path_query is None:
return send_zip(storage_entity)
return send_selected_file(storage_entity, path_query)
def send_files_tree(storage_entity):
tree = storage_entity.tree()
log.debug(f"[TREE] Tree for the storage entity {storage_entity.entity_id}: {tree} ")
return tree
def upload_file_is_allowed(file):
extension = Path(file.filename).suffix
log.debug(f"[ZIP] Extension for {file.filename}: {extension}")
if not extension == '.zip':
raise errors.PortalAPIError(
400,
f"File with extension \"{extension}\" is not allowed: #{file.filename}"
)
def upload_files_to_storage(file):
filename = secure_filename(file.filename)
uploads = storage.results.workspace_path / 'uploads'
if not uploads.exists():
uploads.mkdir(parents=True)
path = uploads / filename
file.save(str(path))
log.info(f"[UPLOAD] Uploading file to {filename} to {path}")
return path
def upload_results_to_storage(submission):
path = get_upload_file_path()
task = tasks.upload_results_to_storage.delay(submission.id, path=str(path))
return task
def get_upload_file_path():
file = request.files['file']
upload_file_is_allowed(file)
path = upload_files_to_storage(file)
return path
class SubmissionsService(object):
def create_submission(self, user: User, project: Project, file_params: dict,
project_params_string: str):
"""Creates a new submission in the database and downloads its files.
Zip and git sources are processed differently.
Args:
user(User): the submission's author
project(Project): the project the submission falls under
file_params(dict): parameters for Storage - how and where to download files
project_params_string(str): custom parameters specified by
project with their values as a JSON-encoded string
Returns(Submission): the created submission entity
"""
# adding review at submission create is not supported
new_submission = Submission(
user=user, project=project, parameters=project_params_string)
write_entity(new_submission)
# download relevant files into Storage
self._submission = new_submission
self.process_new_submission(file_params)
return new_submission
def __init__(self, submission: Submission = None):
self._submission = submission
@property
def submission(self) -> Submission:
return self._submission
@property
def project(self) -> Project:
return self.submission.project
@property
def storage(self):
return storage
@property
def storage_submission(self):
return storage.submissions.get(self.submission.id)
def process_new_submission(self, file_params: dict) -> AsyncResult:
project = self.submission.project
file_params['whitelist'] = project.config.file_whitelist
source = file_params['source']
if source['type'] == 'zip':
source['path'] = "todo" # TODO: download zip
task = self.create_process_submission_task(file_params)
return task
def create_process_submission_task(self, file_params) -> AsyncResult:
result: AsyncResult = process_submission.delay(
self.submission.id, file_params)
self.submission.storage_task_id = result.task_id
return result
def copy_submission(self, note: str) -> Submission:
"""Copies a submission. Used at resubmitting
Args:
note(str): Note for the submission. Typically contains a reason for the resubmit.
Returns(Submission): Copied submission
"""
log.info(f"[COPY] Submission {self.submission.id} to "
f"same project {self.submission.project.id}")
new_submission = Submission(user=self.submission.user, project=self.project,
parameters=self.submission.parameters)
new_submission.note = note
write_entity(new_submission)
clone_submission_files.delay(
source_id=self.submission.id, target_id=new_submission.id)
return new_submission
def delete_submission(self):
"""Deletes a submission
"""
log.info(f"[DELETE] Submission {self.submission}")
# TODO: dispatch action to clean the storage
delete_entity(self.submission)
def update_submission_state(self, client: Union[User, Worker], data: dict) -> Submission:
"""Updates submission's state
Args:
client(User): Client attempting the update
data(dict): should contain one key, 'state' with the new value
Returns(Submission): Updated submission
"""
new_state = data['state']
if isinstance(client, User) and new_state != SubmissionState.CANCELLED:
raise ForbiddenError(uid=client.id,
note=f"User {client.id} cannot update "
f"state to other than CANCELLED.")
self.submission.change_state(new_state)
write_entity(self.submission)
log.info(f"[UPDATE] Submission state {self.submission.id} "
f"by {client.id} to {self.submission.state}")
return self.submission
def cancel_submission(self):
"""Cancels the submission
"""
processor = submission_processor.SubmissionProcessor(self.submission)
processor.revoke_task()
def send_zip(self, storage_submission: entities.Submission):
path = storage_submission.zip_path
if not path.exists():
raise errors.PortalAPIError(400, f"Requested path does not exist: {path}")
log.debug(
f"[SEND] Sending zip file for submission ({storage_submission.entity_id}): {path}")
return flask.send_file(str(path), attachment_filename=path.name)
def send_selected_file(self, storage_submission: entities.Submission, path_query: str):
path_query = Path(path_query)
path = (storage_submission.path / path_query)
path = path.absolute()
if not path.exists():
raise errors.PortalAPIError(400, f"Requested path does not exist: {path}")
log.debug(f"[SEND] Sending file for submission ({storage_submission.entity_id}): {path}")
return flask.send_file(str(path), attachment_filename=path.name)
def send_file_or_zip(self, storage_entity=None):
storage_entity = storage_entity or self.storage_submission
path_query = request.args.get('path')
if path_query is None:
return self.send_zip(storage_entity)
return self.send_selected_file(storage_entity, path_query)
def send_files_tree(self, storage_entity=None):
storage_entity = storage_entity or self.storage_submission
tree = storage_entity.tree()
log.debug(f"[TREE] Tree for the storage entity {storage_entity.entity_id}: {tree} ")