Skip to content
Snippets Groups Projects
Commit 092ca0bd authored by Martin Juhás's avatar Martin Juhás
Browse files

feat: Rework exercise loop to allow usage of workers

No API changes

Closes #212
parent eb3cee29
No related branches found
No related tags found
No related merge requests found
from typing import Optional
from django.conf import settings from django.conf import settings
from django.utils import timezone from django.utils import timezone
...@@ -9,15 +7,9 @@ from common_lib.utils import get_model ...@@ -9,15 +7,9 @@ from common_lib.utils import get_model
from exercise.models import Exercise from exercise.models import Exercise
from running_exercise.lib.exercise_updater import ExerciseUpdater from running_exercise.lib.exercise_updater import ExerciseUpdater
from running_exercise.lib.loop_thread import LoopThread from running_exercise.lib.loop_thread import LoopThread
from running_exercise.lib.utils import (
get_running_exercise,
)
__loop: Optional[LoopThread] = None
def _start_exercise(exercise: Exercise): def _start_exercise(exercise: Exercise):
global __loop
if exercise.finished: if exercise.finished:
raise RunningExerciseOperationException( raise RunningExerciseOperationException(
"Cannot start a finished exercise" "Cannot start a finished exercise"
...@@ -28,7 +20,9 @@ def _start_exercise(exercise: Exercise): ...@@ -28,7 +20,9 @@ def _start_exercise(exercise: Exercise):
"Cannot start a running exercise" "Cannot start a running exercise"
) )
if (running_exercise := get_running_exercise()) is not None: if (
running_exercise := Exercise.objects.filter(running=True).first()
) is not None:
raise RunningExerciseOperationException( raise RunningExerciseOperationException(
f"Cannot start an exercise while exercise id: {running_exercise.id} is running" f"Cannot start an exercise while exercise id: {running_exercise.id} is running"
) )
...@@ -41,65 +35,12 @@ def _start_exercise(exercise: Exercise): ...@@ -41,65 +35,12 @@ def _start_exercise(exercise: Exercise):
logger.info(f"resuming a previously started exercise id: {exercise.id}") logger.info(f"resuming a previously started exercise id: {exercise.id}")
exercise.save() exercise.save()
__loop = LoopThread( LoopThread(
interval_s=settings.UPDATE_INTERVAL, interval_s=settings.UPDATE_INTERVAL,
elapsed_s=exercise.elapsed_s, elapsed_s=exercise.elapsed_s,
total_time_s=exercise.config.exercise_duration * 60, total_time_s=exercise.config.exercise_duration * 60,
updater=ExerciseUpdater(exercise), updater=ExerciseUpdater(exercise),
) ).start()
__loop.start()
def _stop_exercise(exercise: Exercise):
global __loop
if not exercise.running:
raise RunningExerciseOperationException(
"Cannot stop a non-running exercise."
)
# if the backend crashes while an exercise is running,
# the loop will no longer exist, however the exercise will still
# have the running flag turned on, which would prevent
# starting any exercise, including the one that crashed
if __loop is not None:
__loop.cancel()
logger.info(f"stopping exercise id: {exercise.id}")
exercise.running = False
else:
exercise.running = False
exercise.save()
logger.info(
f"stopping exercise id: {exercise.id} after possible server crash"
)
def _move_time(exercise: Exercise, time_diff: int):
global __loop
if not exercise.running:
raise RunningExerciseOperationException(
"Exercise must be running to move time"
)
# can only happen after the server crashed during a running exercise
if __loop is None:
raise RunningExerciseOperationException(
"Cannot move time when no exercise is running"
)
__loop.elapsed_s = max(__loop.elapsed_s + time_diff, 0)
exercise.elapsed_s = __loop.elapsed_s
logger.info(
f"moving exercise id: {exercise.id} time by {time_diff} seconds"
)
def _is_running(exercise_id: int) -> bool:
global __loop
return (
__loop is not None
and __loop.updater.exercise.id == exercise_id
and __loop.is_alive()
)
class ExerciseLoop: class ExerciseLoop:
...@@ -112,15 +53,33 @@ class ExerciseLoop: ...@@ -112,15 +53,33 @@ class ExerciseLoop:
@staticmethod @staticmethod
def stop(exercise_id: int) -> Exercise: def stop(exercise_id: int) -> Exercise:
exercise = get_model(Exercise, id=exercise_id) exercise = get_model(Exercise, id=exercise_id)
_stop_exercise(exercise) if not exercise.running:
raise RunningExerciseOperationException(
"Cannot stop a non-running exercise."
)
# The exercise will stop during the next update interval
exercise.running = False
exercise.save()
logger.info(f"stopping exercise id: {exercise.id}")
return exercise return exercise
@staticmethod @staticmethod
def move_time(exercise_id: int, time_diff: int): def move_time(exercise_id: int, time_diff: int):
exercise = get_model(Exercise, id=exercise_id) exercise = get_model(Exercise, id=exercise_id)
_move_time(exercise, time_diff) if not exercise.running:
raise RunningExerciseOperationException(
"Exercise must be running to move time"
)
exercise.elapsed_s = max(exercise.elapsed_s + time_diff, 0)
exercise.save()
logger.info(
f"moving exercise id: {exercise.id} time by {time_diff} seconds"
)
return exercise return exercise
@staticmethod @staticmethod
def is_running(exercise_id: int) -> bool: def is_running(exercise_id: int) -> bool:
return _is_running(exercise_id) exercise = get_model(Exercise, id=exercise_id)
return exercise.running
...@@ -4,6 +4,7 @@ from typing import List, Dict ...@@ -4,6 +4,7 @@ from typing import List, Dict
from django.utils import timezone from django.utils import timezone
from common_lib.exceptions import RunningExerciseOperationException from common_lib.exceptions import RunningExerciseOperationException
from common_lib.logger import logger
from common_lib.subscription_handler import SubscriptionHandler from common_lib.subscription_handler import SubscriptionHandler
from common_lib.utils import get_model from common_lib.utils import get_model
from exercise.models import ( from exercise.models import (
...@@ -212,7 +213,14 @@ class ExerciseUpdater: ...@@ -212,7 +213,14 @@ class ExerciseUpdater:
self.exercise.elapsed_s = seconds self.exercise.elapsed_s = seconds
self.exercise.save() self.exercise.save()
return time.time() - start update_time = time.time() - start
logger.info(
f"update for exercise id: {self.exercise.id}"
f" elapsed time: {seconds}s"
f" update time: {update_time}s"
)
return update_time
def finish(self): def finish(self):
if not self.exercise.running: if not self.exercise.running:
...@@ -223,13 +231,4 @@ class ExerciseUpdater: ...@@ -223,13 +231,4 @@ class ExerciseUpdater:
self.exercise.running = False self.exercise.running = False
self.exercise.finished = True self.exercise.finished = True
self.exercise.save() self.exercise.save()
logger.info(f"exercise id: {self.exercise.id} has finished")
def stop(self, elapsed_s: int):
if not self.exercise.running:
raise RunningExerciseOperationException(
"Cannot stop a non-running exercise."
)
self.exercise.running = False
self.exercise.elapsed_s = elapsed_s
self.exercise.save()
from threading import Thread, Event import time
from threading import Thread
from common_lib.logger import logger from common_lib.logger import logger
from common_lib.schema_types import ExerciseEventTypeEnum from common_lib.schema_types import ExerciseEventTypeEnum
from common_lib.subscription_handler import SubscriptionHandler from common_lib.subscription_handler import SubscriptionHandler
from common_lib.utils import get_model
from exercise.models import Exercise
from running_exercise.lib.exercise_updater import ExerciseUpdater from running_exercise.lib.exercise_updater import ExerciseUpdater
...@@ -12,7 +15,6 @@ class LoopThread(Thread): ...@@ -12,7 +15,6 @@ class LoopThread(Thread):
__interval_s: int __interval_s: int
__total_time_s: int __total_time_s: int
__finished: Event
def __init__( def __init__(
self, self,
...@@ -22,81 +24,58 @@ class LoopThread(Thread): ...@@ -22,81 +24,58 @@ class LoopThread(Thread):
updater: ExerciseUpdater, updater: ExerciseUpdater,
): ):
super().__init__(daemon=True) super().__init__(daemon=True)
self.__interval_s = interval_s self.__interval_s = max(interval_s, 1)
self.elapsed_s = elapsed_s self.elapsed_s = elapsed_s
self.__total_time_s = total_time_s self.__total_time_s = total_time_s
self.updater = updater self.updater = updater
self.__finished = Event()
def cancel(self): def __exercise_loop(self):
self.__finished.set() while self.elapsed_s <= self.__total_time_s:
exercise = get_model(Exercise, id=self.updater.exercise.id)
if not exercise.running:
logger.info(
f"stopping exercise loop for exercise id: {self.updater.exercise.id}"
)
break
# This checks if move_time was called
if self.elapsed_s != exercise.elapsed_s + self.__interval_s:
self.elapsed_s = exercise.elapsed_s
update_time = self.updater.update(self.elapsed_s)
wait_time = (
self.__interval_s - update_time
if update_time < self.__interval_s
else 0
)
self.elapsed_s += self.__interval_s
time.sleep(wait_time)
def run(self): def run(self):
try: try:
SubscriptionHandler.broadcast_exercise_loop( SubscriptionHandler.broadcast_exercise_loop(
self.updater.exercise, True self.updater.exercise, True
) )
self.updater.update(self.elapsed_s)
SubscriptionHandler.broadcast_exercises( SubscriptionHandler.broadcast_exercises(
self.updater.exercise, ExerciseEventTypeEnum.modify() self.updater.exercise, ExerciseEventTypeEnum.modify()
) )
wait_time: float = self.__interval_s
while (
self.elapsed_s <= self.__total_time_s
and not self.__finished.wait(wait_time)
):
update_time = self.updater.update(self.elapsed_s)
wait_time = (
self.__interval_s - update_time
if update_time < self.__interval_s
else 0
)
# this is necessary to prevent infinite length loops with 0 interval
# I wouldn't expect this during any real deployment, however it might
# be useful during testing for max speed update times
if self.__interval_s == 0:
self.elapsed_s += 1
else:
self.elapsed_s += self.__interval_s
logger.info(
f"update loop for exercise id: {self.updater.exercise.id} elapsed time: {self.elapsed_s}s"
)
SubscriptionHandler.broadcast_exercise_loop( self.__exercise_loop()
self.updater.exercise, False
)
self.__finished.set()
if self.elapsed_s >= self.__total_time_s: if self.elapsed_s >= self.__total_time_s:
self.updater.finish() self.updater.finish()
logger.info(
f"finishing exercise loop for exercise id: {self.updater.exercise.id}"
)
else:
self.updater.stop(self.elapsed_s)
SubscriptionHandler.broadcast_exercises(
self.updater.exercise, ExerciseEventTypeEnum.modify()
)
except Exception as ex: except Exception as ex:
logger.error( logger.error(
f"loop thread exception {type(ex).__name__}: {str(ex)}" f"loop thread exception {type(ex).__name__}: {str(ex)}"
) )
for i in range(10): finally:
try: SubscriptionHandler.broadcast_exercises(
self.updater.stop(self.elapsed_s - self.__interval_s) self.updater.exercise, ExerciseEventTypeEnum.modify()
except: )
logger.error( SubscriptionHandler.broadcast_exercise_loop(
f"failed to stop exercise: {self.updater.exercise.id}" self.updater.exercise, False
) )
else:
logger.info(
f"stopped exercise: {self.updater.exercise.id} on the {i}th try"
)
break
else:
logger.error(
f"failed to stop exercise: {self.updater.exercise.id} 10 times, still running"
)
def remaining_time_s(self) -> int: def remaining_time_s(self) -> int:
return self.__total_time_s - self.elapsed_s return self.__total_time_s - self.elapsed_s
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment