Verified Commit 500de1b3 authored by Peter Stanko's avatar Peter Stanko
Browse files

Major storage and async tasks refactor

parent 3730157e
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+9 −5
Original line number Diff line number Diff line
@@ -166,16 +166,20 @@ class DataFactory:
        log.debug(f"[CREATE] Project config: {project.log_name}: {config}")
        project.set_config(**config)
        self.session.flush()
        test_path = storage_wrapper.test_files.path / project.id
        test_path = storage_wrapper.test_files.path / project.storage_dirname
        if test_path.exists():
            shutil.rmtree(str(test_path))
        shutil.copytree(str(TESTS_DIR), str(test_path))
        return project

    def create_submission(self, project, user, **kwargs):
        submission = self.__create_entity(Submission, project=project, user=user, **kwargs)
        self.session.flush()
        subm_path = storage_wrapper.submissions.path / submission.id
        subm_path = storage_wrapper.submissions.path / submission.storage_dirname
        if not subm_path.exists():
            shutil.copytree(str(SOURCES_DIR), str(subm_path))
        res_path = storage_wrapper.results.path / submission.id
        res_path = storage_wrapper.results.path / submission.storage_dirname
        if not res_path.exists():
            shutil.copytree(str(RESULTS_DIR), str(res_path))
        return submission

+15 −11
Original line number Diff line number Diff line
@@ -13,11 +13,10 @@ log = logger.get_logger(__name__)


class SubmissionProcessor:
    def __init__(self, submission: Submission, params: dict = None):
    def __init__(self, submission: Submission):
        self._submission = submission
        self._params = params
        from portal.service.services_collection import ServicesCollection
        self._rest = ServicesCollection()
        self._services = ServicesCollection()

    @property
    def submission(self) -> Submission:
@@ -59,7 +58,7 @@ class SubmissionProcessor:

    def dispatch_submission_processing(self):
        delay = self.get_delay_for_submission()
        args = (self.submission.id, self.params)
        args = (self.submission.id,)
        from .tasks import start_processing_submission
        self.submission.scheduled_for = delay
        self._save_submission()
@@ -76,7 +75,7 @@ class SubmissionProcessor:
        log.info(f"[ASYNC] Uploading submission: {self.submission.log_name} with {file_params}")

        updated_entity: UploadedEntity = self.storage. \
            submissions.create(entity_id=self.submission.id, **file_params)
            submissions.create(dirname=self.submission.id, **file_params)
        self.submission_store_ended(version=updated_entity.version)

    def clone(self, target):
@@ -90,11 +89,15 @@ class SubmissionProcessor:
        worker = self.schedule_submission_to_worker()
        if worker:
            self.execute_submission(worker)
        else:
            log.warning(f"[EXEC] Worker not available to process submission: "
                        f"{self.submission.log_name}")
        self.abort_submission("No Worker available")

    def upload_result(self, path, file_params):
        log.info(f"[ASYNC] Uploading result for the submission "
                 f"{self.submission.log_name} with {file_params}")
        self.storage.results.create(entity_id=self.submission.id, **file_params)
        self.storage.results.create(dirname=self.submission.id, **file_params)
        Path(path).unlink()
        self.reset_task_id(SubmissionState.FINISHED)

@@ -118,7 +121,7 @@ class SubmissionProcessor:

    def _get_avail_workers(self):
        course = self.submission.course
        workers = self._rest.workers.find_all()
        workers = self._services.workers.find_all()
        return [worker for worker in workers
                if worker.state == WorkerState.READY and course in worker.courses]

@@ -139,7 +142,7 @@ class SubmissionProcessor:
        return worker

    def execute_submission(self, worker: Worker):
        worker_client = self._rest.workers(worker).worker_client
        worker_client = self._services.workers(worker).worker_client
        self.submission.change_state(SubmissionState.IN_PROGRESS)
        self._save_submission()
        worker_client.execute_submission(self.submission)
@@ -148,7 +151,7 @@ class SubmissionProcessor:
        log.warning(f"[PROC] Worker is no available for submission: {self.submission.log_name}")

    def process_result(self):
        storage_entity = self.storage.results.get(self.submission.id)
        storage_entity = self.storage.results.get(self.submission.storage_dirname)
        # @mdujava - here put submission processing
        return self._submission_result_processing(storage_entity)

@@ -156,7 +159,7 @@ class SubmissionProcessor:
        suite_stats = storage_entity.get('suite-stats.json')
        if not suite_stats.exists():
            log.error(f"[PROC] Suite stats for the {self.submission.log_name} have not been found.")
            raise errors.SuiteStatsNotExists(self.submission.id)
            raise errors.SuiteStatsNotExists(self.submission.storage_dirname)
        stats = json.loads(suite_stats.read_text('utf-8'))
        return self._parse_stats(stats)

@@ -171,7 +174,8 @@ class SubmissionProcessor:

    def abort_submission(self, message: str = 'Unknown error!'):
        self.submission.note['error'] = message
        self._save_submission()
        self.reset_task_id(SubmissionState.ABORTED)

    def _save_submission(self):
        self._rest.submissions.write_entity(self.submission)
        self._services.submissions.write_entity(self.submission)
+64 −22
Original line number Diff line number Diff line
from celery.utils.log import get_task_logger
from portal.storage import UploadedEntity

from portal import storage_wrapper
from portal.async_celery import celery_app, submission_processor
from portal.service.courses import CourseService
from portal.service.find import FindService
from portal.service.general import GeneralService
from portal.service.is_api_service import IsApiService
from portal.service.projects import ProjectService
from portal.service.submissions import SubmissionsService

log = get_task_logger(__name__)

"""
= Submissions related
"""


@celery_app.task(name='upload-submission-to-storage')
def process_submission(new_submission_id: str):
    find_service = FindService()
    new_submission = find_service.submission(new_submission_id)
    project = new_submission.project
    course = project.course
    log.info(f"[SUBMIT] Processing submission: {new_submission.log_name}")
    if not project.config.test_files_commit_hash:
        log.warning(f"Project test files not found: {project.log_name}")
        update_project_test_files(course_id=course.id, project_id=project.id)
        update_project_test_files(course_id=project.course.id, project_id=project.id)
    new_submission = find_service.submission(new_submission_id)
    processor = submission_processor.SubmissionProcessor(new_submission)
    processor.process_submission()


@celery_app.task(name='delete-submission')
def delete_submission(submission_id: str):
    submission = FindService().submission(submission_id)
    SubmissionsService(submission).delete()


@celery_app.task(name='archive-submission')
def archive_submission(submission_id: str):
    submission = FindService().submission(submission_id)
    SubmissionsService(submission).archive()


@celery_app.task(name='upload-results-to-storage')
def upload_results_to_storage(new_submission_id: str, path: str):
    path = str(path)
@@ -33,11 +47,7 @@ def upload_results_to_storage(new_submission_id: str, path: str):
    new_submission = find_service.submission(new_submission_id)
    log.info(f"[SUBMIT] Processing results - upload to the storage for "
             f"{new_submission.log_name}: {path}")
    processor = submission_processor.SubmissionProcessor(new_submission)
    file_params = dict(source=dict(url=path, type='zip'))
    processor.upload_result(path=path, file_params=file_params)
    submission = processor.process_result()
    SubmissionsService().write_entity(submission)
    SubmissionsService(new_submission).upload_results_to_storage(path)


@celery_app.task(name='clone-submission-files')
@@ -50,13 +60,18 @@ def clone_submission_files(source_id: str, target_id: str):


@celery_app.task(name='start-processing-submission')
def start_processing_submission(submission_id: str, submission_params):
def start_processing_submission(submission_id: str):
    submission = FindService().submission(submission_id)
    log.info(f"[SUBMIT] Processing submission - send to worker: {submission.log_name}")
    processor = submission_processor.SubmissionProcessor(submission, submission_params)
    processor = submission_processor.SubmissionProcessor(submission)
    processor.send_to_worker()


"""
= Project related
"""


@celery_app.task(name='update-project-test-files')
def update_project_test_files(course_id: str, project_id: str):
    find_service = FindService()
@@ -71,11 +86,47 @@ def update_project_test_files(course_id: str, project_id: str):
        }
    }
    try:
        _update_test_files(project, params)
        ProjectService(project).update_project_test_files(params)
    except Exception as ex:
        log.error(f"[ASYNC] Cannot update source files {project.log_name}: {ex}")


@celery_app.task(name='delete-project')
def delete_project(course_id, project_id: str):
    course = FindService().course(course_id)
    submission = FindService().project(course, project_id)
    ProjectService(submission).delete()


@celery_app.task(name='archive-project')
def archive_project(course_id, project_id: str):
    course = FindService().course(course_id)
    project = FindService().project(course, project_id)
    ProjectService(project).archive()


"""
= COURSE RELATED
"""


@celery_app.task(name='delete-course')
def delete_course(course_id):
    course = FindService().course(course_id)
    CourseService(course).delete()


@celery_app.task(name='archive-course')
def archive_course(course_id):
    course = FindService().course(course_id)
    CourseService(course).archive()


"""
= IS MUNI RELATED
"""


@celery_app.task(name='is-sync-course-seminaries')
def is_sync_seminaries(course_id):
    find_service = FindService()
@@ -90,12 +141,3 @@ def is_import_users_for_course(course_id: str, role_name: str, users_type: str):
    course = find_service.course(course_id)
    log.info(f"[ASYNC] Import course users for {course.log_name}: {users_type} -> {role_name}")
    IsApiService(course).import_users(role_name=role_name, users_type=users_type)


def _update_test_files(project, params):
    updated_entity: UploadedEntity = storage_wrapper.test_files.update(entity_id=project.id, **params)
    version = updated_entity.version
    project.config.test_files_commit_hash = version
    log.debug(f"Updated project config {project.log_name}: {project.config}")
    GeneralService().write_entity(project.config)
+9 −0
Original line number Diff line number Diff line
@@ -436,6 +436,10 @@ class Project(db.Model, EntityBase, NamedMixin):
    def namespace(self) -> str:
        return f"{self.course.codename}/{self.codename}"

    @hybrid_property
    def storage_dirname(self):
        return f"{self.course.codename}_{self.codename}"

    @hybrid_property
    def state(self) -> ProjectState:
        return self.get_state_by_timestamp()
@@ -857,6 +861,11 @@ class Submission(db.Model, EntityBase):
    def namespace(self) -> str:
        return f"{self.project.namespace}/{self.user.codename}/{self.created_at}"

    @hybrid_property
    def storage_dirname(self):
        created = time.simple_fmt(self.created_at)
        return f"{self.user.username}_{created}_{self.course.codename}_{self.project.codename}"

    def change_state(self, new_state):
        # open to extension (state transition validation, ...)
        if new_state in Submission.ALLOWED_TRANSITIONS.keys():
Loading