Unverified Commit e9278e8b authored by Peter Stanko's avatar Peter Stanko
Browse files

Executing the worker

- New migration
- Saving all the params for the submission
- Using the worker API
parent 54872df7
Pipeline #13120 canceled with stage
......@@ -26,6 +26,7 @@ python-ldap = "*"
mockredispy = "*"
python-slugify = "*"
coloredlogs = "*"
worker-api = {editable = true, git = "https://gitlab.fi.muni.cz/grp-kontr2/worker-api"}
[dev-packages]
pylint = "*"
......
......@@ -120,22 +120,22 @@ def init_dev_data(app: Flask, db: SQLAlchemy):
student2.groups.append(tc2_students)
# submissions
tc2_sub1 = Submission(user=teacher1, project=tc2_hw01, parameters="")
tc2_sub1 = Submission(user=teacher1, project=tc2_hw01, parameters={})
tc2_sub1.state = SubmissionState.FINISHED
tc2_sub2 = Submission(user=student2, project=tc2_hw01, parameters="")
tc2_sub2 = Submission(user=student2, project=tc2_hw01, parameters={})
tc2_sub2.state = SubmissionState.ABORTED
tc1_sub_p1_cancel = Submission(
user=student2, project=tc1_hw01, parameters="")
user=student2, project=tc1_hw01, parameters={})
tc1_sub_p1_cancel.state = SubmissionState.CANCELLED
tc1_sub_p1_abort = Submission(
user=student1, project=tc1_hw01, parameters="")
user=student1, project=tc1_hw01, parameters={})
tc1_sub_p1_abort.state = SubmissionState.ABORTED
tc1_sub_p1_finished = Submission(
user=student2, project=tc1_hw01, parameters="")
user=student2, project=tc1_hw01, parameters={})
tc1_sub_p1_finished.state = SubmissionState.FINISHED
tc1_sub_p1_in_progress = Submission(
user=student1, project=tc1_hw01, parameters="")
user=student1, project=tc1_hw01, parameters={})
tc1_sub_p1_in_progress.state = SubmissionState.IN_PROGRESS
# Projects in groups
......
"""empty message
Revision ID: c21ce0e65d64
Revises: 8c3c07862a02
Create Date: 2018-09-10 20:45:32.588254
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c21ce0e65d64'
down_revision = '8c3c07862a02'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('submission', 'parameters',
existing_type=sa.TEXT(),
nullable=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('submission', 'parameters',
existing_type=sa.TEXT(),
nullable=False)
# ### end Alembic commands ###
import json
import random
from pathlib import Path
from typing import Union
from storage import UploadedEntity
from portal import logger
from portal.database import Project, Submission, SubmissionState
import portal.tools.worker_client
from portal import logger, tools
from portal.database import Project, Submission, SubmissionState, Worker
from portal.service import general
log = logger.get_logger(__name__)
......@@ -20,7 +24,7 @@ class SubmissionProcessor:
@property
def params(self) -> dict:
return self._params
return self.submission.parameters
@property
def project(self) -> Project:
......@@ -64,9 +68,10 @@ class SubmissionProcessor:
self.reset_task_id(state=SubmissionState.READY)
def download_submission(self):
log.info(f"[ASYNC] Uploading submission: {self.submission} with {self.params}")
file_params = self.params['file_params']
log.info(f"[ASYNC] Uploading submission: {self.submission} with {file_params}")
updated_entity: UploadedEntity = self.storage. \
submissions.create(entity_id=self.submission.id, **self.params)
submissions.create(entity_id=self.submission.id, **file_params)
self.submission_store_ended(version=updated_entity.version)
def clone(self, target):
......@@ -77,13 +82,13 @@ class SubmissionProcessor:
def send_to_worker(self):
# TODO: implement processing
log.info(f"[ASYNC] Sending submission to worker: {self.submission}")
if self.params is not None:
print("Not implemented")
worker = self.schedule_submission_to_worker()
self.execute_submission(worker)
def upload_result(self, path):
log.info(
f"[ASYNC] Uploading result for the submission {self.submission.id} with {self.params}")
self.storage.results.create(entity_id=self.submission.id, **self.params)
def upload_result(self, path, file_params):
log.info(f"[ASYNC] Uploading result for the submission "
f"{self.submission.id} with {file_params}")
self.storage.results.create(entity_id=self.submission.id, **file_params)
Path(path).unlink()
self.reset_task_id(SubmissionState.FINISHED)
......@@ -99,3 +104,25 @@ class SubmissionProcessor:
self.celery.control.revoke(task_id=task_id, terminate=True)
self.reset_task_id(state=SubmissionState.CANCELLED)
# TODO: Storage clean up
# TODO implement - @mdujava
# STUB: Select initialized worker
def schedule_submission_to_worker(self) -> Worker:
"""Based on the features (worker tags) and preferences in project config
schedule submission for the execution on initialized worker
Returns(Worker): Worker instance on which the submission will be executed
"""
workers = Worker.query.filter(Worker.is_initialized).all()
if not workers:
self._worker_not_available()
worker = random.choice(workers) # randomly select a worker
return worker
def execute_submission(self, worker: Worker):
worker_client = tools.worker_client.WorkerClient(worker=worker)
worker_client.execute_submission(self.submission)
def _worker_not_available(self):
log.warning(f"[PROC] Worker is no available for submission: {self.submission}")
pass
......@@ -9,9 +9,9 @@ log = get_task_logger(__name__)
@celery_app.task(name='upload-submission-to-storage')
def process_submission(new_submission_id: str, file_params: dict):
def process_submission(new_submission_id: str):
new_submission = find_submission(new_submission_id)
processor = submission_processor.SubmissionProcessor(new_submission, file_params)
processor = submission_processor.SubmissionProcessor(new_submission)
processor.process_submission()
......@@ -19,9 +19,9 @@ def process_submission(new_submission_id: str, file_params: dict):
def upload_results_to_storage(new_submission_id: str, path: str):
path = str(path)
new_submission = find_submission(new_submission_id)
processor = submission_processor.SubmissionProcessor(new_submission)
file_params = dict(source=dict(url=path, type='zip'))
processor = submission_processor.SubmissionProcessor(new_submission, file_params)
processor.upload_result(path=path)
processor.upload_result(path=path, file_params=file_params)
@celery_app.task(name='clone-submission-files')
......
......@@ -3,11 +3,13 @@ Models module where all of the models are specified
"""
import enum
import json
import uuid
from typing import List
import sqlalchemy
from flask_sqlalchemy import BaseQuery
from sqlalchemy import event
from sqlalchemy import TypeDecorator, event
from sqlalchemy.ext.hybrid import hybrid_property
from werkzeug.security import check_password_hash, generate_password_hash
......@@ -18,6 +20,22 @@ from portal.tools import time
from portal.tools.time import normalize_time
class JSONEncodedDict(TypeDecorator):
"Represents an immutable structure as a json-encoded string."
impl = sqlalchemy.Text
def process_bind_param(self, value, dialect):
if value is not None:
value = json.dumps(value)
return value
def process_result_value(self, value, dialect):
if value is not None:
value = json.loads(value)
return value
def _repr(instance) -> str:
"""Repr helper function
Args:
......@@ -835,12 +853,11 @@ class Submission(db.Model, EntityBase):
id = db.Column(db.String(length=36), default=lambda: str(
uuid.uuid4()), primary_key=True)
scheduled_for = db.Column(db.TIMESTAMP(timezone=True))
parameters = db.Column(db.Text, nullable=False)
parameters = db.Column(JSONEncodedDict(), nullable=True)
state = db.Column(
db.Enum(SubmissionState, name='SubmissionState'), nullable=False)
note = db.Column(db.Text)
source_hash = db.Column(db.String(64))
user_id = db.Column(db.String(36), db.ForeignKey(
'user.id', ondelete='cascade'), nullable=False)
user = db.relationship("User", back_populates="submissions", uselist=False)
......@@ -856,19 +873,19 @@ class Submission(db.Model, EntityBase):
# open to extension (state transition validation, ...)
self.state = new_state
def __init__(self, user: User, project: Project, parameters: str = None,
def __init__(self, user: User, project: Project, parameters: dict = None,
review: 'Review' = None):
"""Creates a new submission instance
Args:
user(User): User instance
project(Project): Project instance
parameters(str): Parameters for the submission
parameters(dict): Parameters for the submission
review(Review): Review instance
"""
self.user = user
self.project = project
self.review = review
self.parameters = parameters
self.parameters = parameters or {}
self.state = SubmissionState.CREATED
def __repr__(self):
......@@ -1003,6 +1020,10 @@ class Worker(EntityBase, Client):
self.state = WorkerState.CREATED
super().__init__(client_type=ClientType.WORKER)
@hybrid_property
def is_initialized(self):
return self.url is not None and self.portal_secret is not None
def __repr__(self):
return _repr(self)
......
......@@ -180,16 +180,14 @@ class ProjectSubmissions(Resource):
data = rest_helpers.parse_request_data(
submission_create_schema, action='create', resource='submission'
)
log.debug(f"[REST] Create submission: {data}")
# data for Kontr processing
project_params = data.get('project_params')
project_params_string = json.dumps(project_params) or ""
service = SubmissionsService()
new_submission = service.create_submission(
user=client,
project=project,
file_params=data['file_params'],
project_params_string=project_params_string
submission_params=data
)
return submission_schema.dump(new_submission)[0], 201
......
......@@ -12,8 +12,7 @@ from portal.rest import rest_helpers
from portal.rest.schemas import courses_schema, groups_schema, password_change_schema, \
projects_schema, reviews_schema, roles_schema, secret_schema, secrets_schema, \
submissions_schema, user_schema, user_schema_reduced, users_schema
from portal.service import auth, general, permissions, users
from portal.service.errors import ForbiddenError, PortalAPIError
from portal.service import auth, errors, general, permissions
from portal.service.general import find_client_owner
from portal.service.secrets import create_secret, delete_secret
from portal.service.users import UserService
......@@ -32,9 +31,9 @@ class UserList(Resource):
course_id = request.args.get('course')
group_id = request.args.get('group')
if group_id and not course_id:
raise PortalAPIError(400,
"Invalid filter combination at /get/users: "
"missing course id.")
raise errors.PortalAPIError(400,
"Invalid filter combination at /get/users: "
"missing course id.")
UserService()
filtered_users = UserService().find_users_filtered(
......@@ -115,7 +114,7 @@ class UserSubmissionList(Resource):
course_id = request.args.get('course')
project_ids = request.args.getlist('project')
if project_ids and not course_id:
raise PortalAPIError(400,
raise errors.PortalAPIError(400,
f"Invalid filter combination at "
f"/get/users/{uid}/submissions: missing course id.")
......@@ -296,7 +295,7 @@ def get_submissions_based_on_permissions(client, user, course_id, project_ids):
if find_client_owner(client) == user:
return user.submissions
else:
raise ForbiddenError(uid=client.id)
raise errors.ForbiddenError(uid=client.id)
def get_submissions_based_on_permissions_for_course(client, course_id, project_ids, user):
......
......@@ -143,3 +143,8 @@ class ValidationError(PortalAPIError): # currently not used
class SubmissionRefusedError(PortalAPIError):
def __init__(self, reason):
super().__init__(code=400, message=reason)
class WorkerNotAvailable(PortalError):
def __init__(self, message: str = None):
self.message = message or "Worker is not available"
......@@ -58,7 +58,7 @@ def write_entity(entity):
db.session.commit()
def find_resource(identifier: str, resource: str, query, throws):
def find_resource(identifier: str, resource: str, query, throws: bool = True):
"""Gets an instance of the resource
Args:
......
......@@ -13,10 +13,8 @@ from werkzeug.utils import secure_filename
from portal import storage
from portal.async_celery import submission_processor, tasks
from portal.async_celery.tasks import clone_submission_files, process_submission
from portal.database.models import Project, Submission, SubmissionState, User, Worker
from portal.service import errors
from portal.service.errors import ForbiddenError
from portal.service.general import delete_entity, write_entity
log = logging.getLogger(__name__)
......@@ -44,8 +42,7 @@ def upload_file_is_allowed(file):
class SubmissionsService(object):
def create_submission(self, user: User, project: Project, file_params: dict,
project_params_string: str):
def create_submission(self, user: User, project: Project, submission_params: dict):
"""Creates a new submission in the database and downloads its files.
Zip and git sources are processed differently.
......@@ -53,21 +50,21 @@ class SubmissionsService(object):
Args:
user(User): the submission's author
project(Project): the project the submission falls under
file_params(dict): parameters for Storage - how and where to download files
project_params_string(str): custom parameters specified by
submission_params(dict): custom parameters specified by
project with their values as a JSON-encoded string
Returns(Submission): the created submission entity
"""
# adding review at submission create is not supported
new_submission = Submission(
user=user, project=project, parameters=project_params_string)
user=user, project=project, parameters=submission_params)
write_entity(new_submission)
# download relevant files into Storage
self._submission = new_submission
self.process_new_submission(file_params)
self.process_new_submission()
return new_submission
def __init__(self, submission: Submission = None):
......@@ -93,18 +90,15 @@ class SubmissionsService(object):
def storage_submission(self):
return storage.submissions.get(self.submission.id)
def process_new_submission(self, file_params: dict) -> AsyncResult:
def process_new_submission(self) -> AsyncResult:
project = self.submission.project
file_params['whitelist'] = project.config.file_whitelist
source = file_params['source']
if source['type'] == 'zip':
source['path'] = "todo" # TODO: download zip
task = self.create_process_submission_task(file_params)
self.submission.parameters['file_params'] = project.config.file_whitelist
write_entity(self.submission)
task = self.create_process_submission_task()
return task
def create_process_submission_task(self, file_params) -> AsyncResult:
result: AsyncResult = process_submission.delay(
self.submission.id, file_params)
def create_process_submission_task(self) -> AsyncResult:
result: AsyncResult = tasks.process_submission.delay(self.submission.id)
self.submission.storage_task_id = result.task_id
return result
......@@ -122,7 +116,7 @@ class SubmissionsService(object):
parameters=self.submission.parameters)
new_submission.note = note
write_entity(new_submission)
clone_submission_files.delay(
tasks.clone_submission_files.delay(
source_id=self.submission.id, target_id=new_submission.id)
return new_submission
......@@ -145,9 +139,9 @@ class SubmissionsService(object):
"""
new_state = data['state']
if isinstance(client, User) and new_state != SubmissionState.CANCELLED:
raise ForbiddenError(uid=client.id,
note=f"User {client.id} cannot update "
f"state to other than CANCELLED.")
raise errors.ForbiddenError(uid=client.id,
note=f"User {client.id} cannot update "
f"state to other than CANCELLED.")
self.submission.change_state(new_state)
write_entity(self.submission)
log.info(f"[UPDATE] Submission state {self.submission.id} "
......
"""
Tools module to help with general tasks
"""
"""
\ No newline at end of file
import worker_api.client
from portal import logger
log = logger.get_logger(__name__)
class WorkerClient:
def __init__(self, worker):
self._worker = worker
self._api = None
@property
def worker(self):
return self._worker
@property
def credentials(self):
cred = dict(
url=self.worker.url,
access_token=self.worker.portal_secret,
)
return cred
@property
def api(self) -> worker_api.client.WorkerClient:
if self._api is None:
self._api = worker_api.client.WorkerClient(**self.credentials)
return self._api
def execute_submission(self, submission):
log.info(f"[WC] Executing: {submission}")
parameters = submission.parameters
parameters['id'] = submission.id
result = self.api.submissions.create(parameters, sid=submission.id)
log.debug(f"[WC] Exec call result: {result}")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment