Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • inject/backend
1 result
Show changes
Commits on Source (137)
Showing
with 385 additions and 375 deletions
......@@ -26,14 +26,8 @@ dmypy.json
.idea
*.iml
#graphql-prototype
exercise_definition/uploaded_files
data
# definitions with .md files
definitions/
!definitions/*.md
# export/import
export_import
logs.jsonl
data
build
......@@ -23,6 +23,4 @@ dmypy.json
exercise_definition/uploaded_files
definitions/*.zip
data
# export/import
export_import
build/.env.dev
stages:
- test
- createtag
- buildimage
variables:
......@@ -32,6 +33,43 @@ unit_tests:
- poetry run mypy .
- poetry run python manage.py test
sast:
variables:
SAST_EXCLUDED_PATHS: tests, dev, definitions, docs
allow_failure: true
dependency_scanning:
variables:
DS_EXCLUDED_PATHS: tests, dev, definitions, docs
allow_failure: true
secret_detection:
allow_failure: false
create-tag:
image: python:3.8-buster
stage: createtag
variables:
VERSION_TYPE: minor
rules:
- if: $CI_COMMIT_BRANCH == "main"
when: manual
before_script:
- git config user.email "$GITLAB_USER_EMAIL"
- git config user.name "$GITLAB_USER_NAME"
script:
- old_version=$(sed -n "s/^VERSION = \"\(.*\)\"/\1/p" ttxbackend/settings.py)
- new_version=$(python tagging.py "$old_version" "$VERSION_TYPE")
- >
if [[ $? -ne 0 ]]; then
exit 1
fi
- sed "s/^VERSION = .*/VERSION = \"$new_version\"/" ttxbackend/settings.py --in-place
- git add ttxbackend/settings.py
- git commit -m "update version and create new tag"
- git tag -f "$new_version" -m "backend version $new_version"
- git push --tags https://tagger:$TAGGER_TOKEN@$CI_SERVER_HOST/$CI_PROJECT_PATH HEAD:$CI_COMMIT_REF_NAME -o ci.skip
create-image:
image: docker:20.10.16
services:
......@@ -39,9 +77,11 @@ create-image:
alias: docker
command: ["--tls=false"]
stage: buildimage
only:
refs:
- main
rules:
- if: $CI_COMMIT_BRANCH == 'main'
when: manual
- if: $CI_COMMIT_TAG =~ /^v(?:\d+.){2}(?:\d+)$/
when: manual
before_script:
- echo "Docker registry url is $CI_REGISTRY"
- echo "Docker registry username is $CI_REGISTRY_USER"
......@@ -54,3 +94,10 @@ create-image:
- docker push $IMAGE_TAG
- docker push $IMAGE_LATEST
include:
- template: Security/SAST.gitlab-ci.yml
- template: Jobs/Dependency-Scanning.gitlab-ci.yml
- template: Jobs/Secret-Detection.gitlab-ci.yml
* @xbaisa @xjuhas
* @xbaisa @xjuhas @xglosner
/definitions/* @xglosner @xdvora19 @xbaisa @xjuhas
## 0.4.0
### New features
- add endpoint for specific definition #147
- add new table in DB that holds milestone history #46
- add channels to definition and exercise #151
- add overlay to info and email alternatives #153
- configure logging for project, add `INJECT_LOGS` env variable for specifying path where logs should be saved #157
- add authorization and user management #85
- add Docker container generation to `container-registry`
- add questionnaires #165
- add learning objectives #170
- add first_name and last_name fields into csv user upload #172
- add INJECT_NOAUTH env var for AAI deactivation #166
- add user info and ip address to logs #169
- add `TeamQuestionnaireStateSubscription` for notifying instructors about questionnaire state changes #173
- add django manage command `initadmins`
- add missing LO/LA query and reached properties #187
- add name to exercises #145
- add `exercise_definition/validate` endpoint for definition validation #144
- add `validate_email_address` query for validation purposes #188
- implement formatted welcome email and modify email sender, add DOMAIN env var #185
- addition of INJECT_SECRET_KEY env variable #141
- endpoint for re-generation of user login credentials #202
- add endpoint for user deletion - accessible only to admin #199
- add new fields `username`, `first_name`, `last_name` to `users` query #195
### Changes
- rework file handling #139
- rework action logs #70
- many exercise definition types and their corresponding exercise types were changed, see [CHANGELOG](definitions/CHANGELOG.md) for more details #151
- inject categories, injects and their exercise counterparts were renamed accordingly to #113
- changed the endpoint for graphql subscriptions to `subscription` instead of `graphql` #164
- squash all migrations
- add INJECT prefix to all env variables
- make name parameter during definition upload optional #146
- rename api prefix from `ttxbackend` to `inject` #183
- set csrf cookie for `/version` endpoint
### Fixes
- set correct types and resolvers for user_set attributes in schema_types #174
- fix incorrect permissions on `questionnaireState` query #177
- fix aai decorator placement and check_function, turn on csrf check on graphql
- update performance testing tools to the newest API
- fix SendEmailInput authorization checks
- user_set resolvers are dependent on the user.group in schema_types #204
- prevent instructor promoting himself to admin #206
### Documentation updates
- removed swagger documentation available through the API, it is now available in `openapi.yml` file #176
- split main README into 2 files, README and INSTALATION
- update INSTALATION.md env variables description
- update INSTALATION.md and move poetry description back into README.md
## 0.3.0
### New features
......@@ -54,4 +109,4 @@
## 0.1.0
Initial version, no changes.
\ No newline at end of file
Initial version, no changes.
### Local variables
- `INJECT_DEBUG`: _boolean, default=false_ - Run the backend in debug mode. **Do not set in production.**
Boolean type variables use consider `true` and `yes` as truthy, and `false` and `no` as falsy.
- `INJECT_DEBUG`: _boolean, default=false_ - Run the backend in debug mode. If set, email sending is disabled and emails are printed in the console. **Do not set in production.**
- `INJECT_HOST_ADDRESSES`: _string, default=localhost_ - A comma-separated list of allowed hosts.
- `INJECT_CORS_ALLOWED_ORIGINS`: _string, default=http://localhost_ - A comma-separated list of origins (domains) that are authorized to make cross-site HTTP requests.
- `INJECT_NOAUTH`: _boolean, default=false_ - If set, authorization is turned off.
- `INJECT_EMAIL_HOST`: _string, default=""_ - SMTP server address.
- `INJECT_EMAIL_PORT`: _int, default=25_ - Port to use for the SMTP server defined in INJECT_EMAIL_HOST.
- `INJECT_EMAIL_HOST_USER`: _string, default=""_ - Username to use for authentication to the SMTP server defined in _INJECT_EMAIL_HOST_.
- `INJECT_EMAIL_HOST_PASSWORD`: _string, default=""_ - Password to use for the SMTP server defined in _INJECT_EMAIL_HOST_. This setting is used in conjunction with _INJECT_EMAIL_HOST_USER_ when authenticating to the SMTP server.
- `INJECT_EMAIL_SENDER_ADDRESS`: _string, default=""_ - The sender address for automatic emails.
- `INJECT_EMAIL_PROTOCOL`: _string, default=None_ - Optional variable for defining the preferred protocol for communication with SMTP server. The choices (values) can be _ssl_ or _tls_ (case insensitive). This variable can also be left undefined - no encryption will be used.
- `INJECT_LOGS`: _string, default=backend-logs.log_ - Path to a file where to save logs.
- `INJECT_DOMAIN`: _string, default=""_ - Domain where yours instance of the INJECT is available.
- `INJECT_SECRET_KEY`: _string_ - Used to provide cryptographic signing. Must be at least 50 long characters string.
- `INJECT_EMAIL_TIMEOUT`: _int, default=10_ - Specifies a timeout in seconds for blocking operations like the connection attempt to SMTP.
- `INJECT_MAX_UPLOAD_SIZE`: _int, default=10MB_ - Specifies the maximum body size of requests, including file uploads.
......@@ -51,4 +56,4 @@ allowed host addresses.
In Docker this can be done followingly:
```bash
docker run --name backend -e HOST_ADDRESSES="172.26.0.1,192.168.0.1" -p8000:8000 -d backend gunicorn
```
\ No newline at end of file
```
......@@ -7,7 +7,6 @@ This repository contains the INJECT project.
- [Installation and runnning the application](./INSTALLATION.md)
- [Deployment](./INSTALLATION.md#deployment)
- [Formatting with Black](#formatting-with-black)
- [Django Administration Panel](#django-administration-panel)
- [Django-nose unit tests](#django-nose-unit-tests)
### Versioning
......@@ -57,16 +56,6 @@ poetry run python manage.py runserver
To reformat code run `black .`, to check if your code is formatted properly run `black . --check` .
### Django Administration Panel
Django administration panel is a tool for the management of data created by the application.
You can create, edit and update exercise data, however,
note that these actions are in no way equivalent to the graphql queries and mutations,
which means that an exercise cannot be fully set up and started using just the admin panel.
To access the admin panel, first, run `poetry run python manage.py createsuperuser` and create an admin account.
Then visit http://localhost:8000/inject/api/v1/admin/ and log in.
## Django-nose unit tests
Project includes unit tests, which generate coverage report with the help of django-nose library.
All tests can be run with following command:
......
from django.contrib.auth.models import AnonymousUser
from rest_framework.exceptions import PermissionDenied, AuthenticationFailed
from rest_framework.request import Request
from user.models import User, UserInTeam, DefinitionAccess, InstructorOfExercise
def user_from_context(context: Request) -> User:
user: User = context.user
if isinstance(user, AnonymousUser):
raise AuthenticationFailed("Authentication failed")
return user
def team_access(context: Request, team_id: int):
user = user_from_context(context)
if user.group == User.AuthGroup.ADMIN:
return
condition = UserInTeam.objects.filter(
user_id=user.id, team_id=team_id
).exists() # Both trainees and instructors can be assigned in teams
if not condition and user.group == User.AuthGroup.INSTRUCTOR:
condition = InstructorOfExercise.objects.filter(
user_id=user.id, exercise__teams__in=[team_id]
).exists()
if not condition:
raise PermissionDenied(
f"User does not have access to this team ({team_id})"
)
def exercise_access(context: Request, exercise_id: int):
user = user_from_context(context)
if user.group == User.AuthGroup.ADMIN:
return
condition = UserInTeam.objects.filter(
user_id=user.id, user__teams__exercise_id=exercise_id
).exists() # Both trainees and instructors can be assigned in teams
if not condition and user.group == User.AuthGroup.INSTRUCTOR:
condition = InstructorOfExercise.objects.filter(
user_id=user.id, exercise_id=exercise_id
).exists()
if not condition:
raise PermissionDenied(
f"User does not have access to this exercise ({exercise_id})"
)
def definition_access(context: Request, definition_id: int):
user = user_from_context(context)
if user.group == User.AuthGroup.ADMIN:
return
condition = False
if user.group == User.AuthGroup.INSTRUCTOR:
condition = DefinitionAccess.objects.filter(
user_id=user.id, definition_id=definition_id
).exists()
# trainee should never reach this, but just to be sure
if not condition:
raise PermissionDenied(
f"User does not have access to this definition ({definition_id})"
)
from django.contrib import admin
# Register your models here.
......@@ -4,6 +4,3 @@ from django.apps import AppConfig
class AaiConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "aai"
def ready(self):
import aai.management.commands.assignperms
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.backends.db import SessionStore
from rest_framework.exceptions import AuthenticationFailed
from common_lib.logger import log_user_msg, logger
......@@ -20,7 +21,7 @@ class CustomAuthBackend(ModelBackend):
)
raise AuthenticationFailed("Username or password is missing")
try:
user = UserModel._default_manager.get_by_natural_key(username)
user: User = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
# Run the default password hasher once to reduce the timing
# difference between an existing and a nonexistent user.
......@@ -31,18 +32,19 @@ class CustomAuthBackend(ModelBackend):
)
raise AuthenticationFailed("Incorrect username or password")
else:
if not user.check_password(password):
if not self.user_can_authenticate(user):
logger.error(
log_user_msg(request, user)
+ "Failed login - incorrect password"
+ "Failed login - deactivated user login attempt"
)
raise AuthenticationFailed("Incorrect username or password")
raise AuthenticationFailed("Your account has been blocked")
if not self.user_can_authenticate(user):
if not user.check_password(password):
logger.error(
log_user_msg(request, user)
+ "Failed login - deactivated user login attempt"
+ "Failed login - incorrect password"
)
raise AuthenticationFailed("Your account has been blocked")
raise AuthenticationFailed("Incorrect username or password")
SessionStore.clear_expired()
return user
from typing import Optional
from graphene import ResolveInfo
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
from rest_framework.request import HttpRequest, Request
from aai.access import user_from_context
from common_lib.logger import logger
from user.models import User
# This function is probably significantly overcomplicated
def _get_action_name(func, *args) -> str:
try:
# Should never happen
if len(args) == 0:
return "Unknown"
# I don't know a name for this variable
tmp = args[0]
# This case should happen when the decorator is on a Query
if tmp is None:
# A query function should always have qualname, but just in case
if hasattr(func, "__qualname__"):
# qualname should give us a name in the format Query.resolve<name>
return getattr(func, "__qualname__")
# Should never happen
return "Unknown"
# This case works for mutations and subscriptions where it just gives us the class name
if hasattr(tmp, "__name__"):
return getattr(tmp, "__name__")
# Last case, should only happen for views
tmp_t = type(tmp)
if hasattr(tmp_t, "__name__"):
return getattr(tmp_t, "__name__")
# Again, should never happen as all of our API options are covered
return "Unknown"
# we CANNOT throw an exception in here
except Exception:
return "Unknown"
def _get_user(*args, **kwargs) -> Optional[User]:
"""
Gets user object from known request types.
Args:
*args - non-keyworded arguments of request
**kwargs - keyword arguments of request
Returns:
Instance of User if request is authenticated else instance of AnonymousUser
"""
if len(args) == 0:
return None
request = args[-1] # get request
context: Optional[Request] = None
if isinstance(request, ResolveInfo):
context = request.context
if isinstance(request, HttpRequest) or isinstance(request, Request):
context = request
return user_from_context(context) if context is not None else None
def logged_in(func):
def wrapper(*args, **kwargs):
user = _get_user(*args, **kwargs)
if user is not None:
return func(*args, **kwargs)
raise AuthenticationFailed("Authentication failed")
return wrapper
def protected(required_group: User.AuthGroup):
"""
Decorator that checks whether user of request has required authorization group.
If not, PermissionDenied exception is raised.
Args:
required_group: minimal required authorization group to access endpoint
"""
def decorator(func):
def wrapper(*args, **kwargs):
action_name = _get_action_name(func, *args)
log_text = f"action: {action_name} | arguments: {kwargs}"
user = _get_user(*args, **kwargs)
if user is None:
log_text += f" | username: Unauthenticated"
logger.info(log_text)
raise AuthenticationFailed("Authentication failed")
log_text += f" | username: {user.username}"
if action_name != "Query.resolve_exercise_time_left":
logger.info(log_text)
if user.group >= required_group:
return func(*args, **kwargs)
raise PermissionDenied("Permission denied")
return wrapper
return decorator
default_app_config = "aai.apps.AaiConfig"
from typing import Any
import sys
from django.core.management.base import BaseCommand
from django.contrib.auth.models import Permission, Group
from django.db.models.signals import post_migrate
from django.dispatch import receiver
from aai.models import UserGroup, Perms
migrated_apps = set()
required_migrations = {"user", "aai", "django.contrib.auth"}
DONE = False
# Due to unit tests pipeline this has to be temporarly automatic
@receiver(post_migrate)
def assign_perms_after_migration(sender, **kwargs):
global DONE, migrated_apps, required_migrations
if DONE:
return
migrated_apps.add(sender.name)
if required_migrations.issubset(migrated_apps):
assign_perms()
DONE = True
def assign_perms():
# Initialize groups
trainee, _ = Group.objects.get_or_create(name=UserGroup.TRAINEE)
instructor, _ = Group.objects.get_or_create(name=UserGroup.INSTRUCTOR)
_, _ = Group.objects.get_or_create(name=UserGroup.ADMIN)
#### trainee permissions assignemnt ####
trainee_perm_names = [
Perms.view_exercise.codename,
Perms.use_tool.codename,
Perms.send_email.codename,
Perms.view_trainee_info.codename,
Perms.view_email_info.codename,
Perms.view_email.codename,
Perms.manipulate_file.codename,
]
trainee_perms = Permission.objects.filter(codename__in=trainee_perm_names)
trainee.permissions.add(*trainee_perms)
instructor.permissions.add(*trainee_perms)
trainee.save()
#### instructor permissions assignment ####
instructor_perm_names = [
Perms.update_exercise.codename,
Perms.update_definition.codename,
Perms.view_definition.codename,
Perms.view_category.codename,
Perms.view_milestone.codename,
Perms.send_injectselection.codename,
Perms.view_extendtool.codename,
Perms.view_injectselection.codename,
Perms.view_analytics.codename,
Perms.update_userassignment.codename,
Perms.view_user.codename,
Perms.update_user.codename,
]
instructor_perms = Permission.objects.filter(
codename__in=instructor_perm_names
)
instructor.permissions.add(*instructor_perms)
instructor.save()
sys.stderr.write("Predefined groups and permissions have been set in DB\n")
class Command(BaseCommand):
help = "Populate database with groups and assigns permissions"
def handle(self, *args: Any, **options: Any):
assign_perms()
# Generated by Django 3.2.24 on 2024-05-29 16:54
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('aai', '0002_alter_perms_options'),
]
operations = [
migrations.AlterModelOptions(
name='perms',
options={'default_permissions': (), 'permissions': [('update_exercise', 'Can access instructor tools for exercise manipulation'), ('view_exercise', 'Can view exercise and their info'), ('update_definition', 'Can add/delete/change definition'), ('view_definition', 'Can view definition'), ('view_category', 'Can view inject categories'), ('view_milestone', 'Can view milestones'), ('use_tool', 'Can use tool of the exercise'), ('send_injectselection', 'Can pick and send inject selection'), ('send_email', 'Can send email and execute email related operations'), ('view_trainee_info', 'Can view exercise info intedned to trainees (roles, tools...)'), ('view_extendtool', 'Can view extend tool (with responses)'), ('view_injectselection', 'Can view inject selecion options'), ('view_email_info', 'Can view info related to emails (contacts, addresses...)'), ('view_email', 'Can view email bodies and threads'), ('view_analytics', 'Can view data needed for analytics dashboard'), ('update_userassignment', 'Can (un)assign user to exercise or team'), ('view_user', 'Can view users in database'), ('manipulate_file', 'Can upload and download files during exercise'), ('update_user', 'Can add/remove/change user'), ('delete_user', 'Can delete user'), ('export_import', 'Can export and import database')]},
),
]
# Generated by Django 3.2.22 on 2024-07-10 15:36
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('aai', '0003_alter_perms_options'),
]
operations = [
migrations.DeleteModel(
name='Perms',
),
]
from typing import Optional
from enum import Enum
from django.db import models
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import Group
class UserGroup(str, Enum):
TRAINEE = "trainee"
INSTRUCTOR = "instructor"
ADMIN = "admin"
@staticmethod
def from_str(group: str) -> Optional["UserGroup"]:
lowered_group = group.lower()
if lowered_group == "trainee" or lowered_group == "t":
return UserGroup.TRAINEE
elif lowered_group == "instructor" or lowered_group == "i":
return UserGroup.INSTRUCTOR
elif lowered_group == "admin" or lowered_group == "a":
return UserGroup.ADMIN
return None
def __eq__(self, other):
if isinstance(other, Group):
return self.value == other.name
return super().__eq__(other)
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self) -> int:
return super().__hash__()
class NameHandler:
full_name: str
codename: str
def __init__(self, full_name):
self.full_name = full_name
self.codename = self.full_name.split(".")[1]
def __get__(self, instance, owner):
return self
class Perms(models.Model):
update_exercise = NameHandler("aai.update_exercise")
view_exercise = NameHandler("aai.view_exercise")
update_definition = NameHandler("aai.update_definition")
view_definition = NameHandler("aai.view_definition")
view_category = NameHandler("aai.view_category")
view_milestone = NameHandler("aai.view_milestone")
use_tool = NameHandler("aai.use_tool")
send_injectselection = NameHandler("aai.send_injectselection")
send_email = NameHandler("aai.send_email")
view_trainee_info = NameHandler("aai.view_trainee_info")
view_extendtool = NameHandler("aai.view_extendtool")
view_injectselection = NameHandler("aai.view_injectselection")
view_email_info = NameHandler("aai.view_email_info")
view_email = NameHandler("aai.view_email")
view_analytics = NameHandler("aai.view_analytics")
update_userassignment = NameHandler("aai.update_userassignment")
view_user = NameHandler("aai.view_user")
manipulate_file = NameHandler("aai.manipulate_file")
update_user = NameHandler("aai.update_user")
export_import = NameHandler("aai.export_import")
class Meta:
default_permissions = ()
permissions = [
(
"update_exercise",
"Can access instructor tools for exercise manipulation",
),
("view_exercise", "Can view exercise and their info"),
("update_definition", "Can add/delete/change definition"),
("view_definition", "Can view definition"),
("view_category", "Can view inject categories"),
("view_milestone", "Can view milestones"),
("use_tool", "Can use tool of the exercise"),
("send_injectselection", "Can pick and send inject selection"),
(
"send_email",
"Can send email and execute email related operations",
),
(
"view_trainee_info",
"Can view exercise info intedned to trainees (roles, tools...)",
),
("view_extendtool", "Can view extend tool (with responses)"),
("view_injectselection", "Can view inject selecion options"),
(
"view_email_info",
"Can view info related to emails (contacts, addresses...)",
),
("view_email", "Can view email bodies and threads"),
("view_analytics", "Can view data needed for analytics dashboard"),
(
"update_userassignment",
"Can (un)assign user to exercise or team",
),
("view_user", "Can view users in database"),
(
"manipulate_file",
"Can upload and download files during exercise",
),
("update_user", "Can add/remove/change user"),
("export_import", "Can export and import database"),
]
@classmethod
def content_type(cls):
return ContentType.objects.get_for_model(Perms)
......@@ -3,9 +3,9 @@ from django.conf import settings
from django.contrib.auth import authenticate, logout, login
from django.core.exceptions import ValidationError
from aai.utils import logged_in
from aai.decorators import logged_in
from common_lib.logger import logger, log_user_msg
from common_lib.schema_types import UserType
from common_lib.schema.types import UserType
from user.email.email_sender import send_password_change_notification
from user.models import User
......
......@@ -2,13 +2,13 @@ import os
import shutil
from django.conf import settings
from django.contrib.auth.models import AnonymousUser, Permission
from rest_framework.exceptions import PermissionDenied, AuthenticationFailed
from django.contrib.auth.models import AnonymousUser
from django.test import TestCase, override_settings
from rest_framework.exceptions import PermissionDenied, AuthenticationFailed
from rest_framework.test import APIRequestFactory
from aai.models import Perms
from aai.utils import protected, extra_protected, Check
from aai.access import team_access, exercise_access, definition_access
from aai.decorators import protected
from common_lib.test_utils import (
internal_create_exercise,
internal_upload_definition,
......@@ -26,14 +26,14 @@ from running_exercise.models import (
)
from user.models import User
TEST_DATA_STORAGE = "aai_tests_test_data"
TEST_DATA_STORAGE = "authorization_test_data"
@override_settings(
DATA_STORAGE=TEST_DATA_STORAGE,
FILE_STORAGE=os.path.join(TEST_DATA_STORAGE, "files"),
)
class AaiUtilsTests(TestCase):
class AuthorizationTests(TestCase):
definition: Definition
exercise: Exercise
exercise2: Exercise
......@@ -88,7 +88,6 @@ class AaiUtilsTests(TestCase):
cls.factory = APIRequestFactory()
cls.assignUsers()
cls.assignPerms()
cls.createData()
@classmethod
......@@ -108,20 +107,13 @@ class AaiUtilsTests(TestCase):
# Assign definition to the instructor
cls.instructor.definitions.add(cls.definition)
@classmethod
def assignPerms(cls):
p = Permission.objects.get(codename=Perms.view_exercise.codename)
cls.trainee1.group.permissions.add(p)
cls.instructor.group.permissions.add(p)
p = Permission.objects.get(codename=Perms.update_exercise.codename)
cls.instructor.group.permissions.add(p)
@classmethod
def createData(cls):
# create action log
tool: Tool = cls.definition.tools.first()
content = Content.objects.create(raw=tool.default_response, rendered="")
content = Content.objects.create(
raw=tool.default_response, rendered="", definition=cls.definition
)
details = ToolDetails.objects.create(
tool=tool,
argument="argument",
......@@ -170,9 +162,7 @@ class AaiUtilsTests(TestCase):
return 0
def test_protected_admin(self):
decorated = protected("aai.non_existent")(
self.dummy
) # admin has access
decorated = protected(User.AuthGroup.ADMIN)(self.dummy)
request = self.factory.request()
request.user = self.trainee1
......@@ -187,7 +177,7 @@ class AaiUtilsTests(TestCase):
self.assertEqual(decorated(request), 0)
def test_protected_instructor(self):
decorated = protected(Perms.update_exercise.full_name)(self.dummy)
decorated = protected(User.AuthGroup.INSTRUCTOR)(self.dummy)
request = self.factory.request()
request.user = self.trainee1
......@@ -201,7 +191,7 @@ class AaiUtilsTests(TestCase):
self.assertEqual(decorated(request), 0)
def test_protected_trainee(self):
decorated = protected(Perms.view_exercise.full_name)(self.dummy)
decorated = protected(User.AuthGroup.TRAINEE)(self.dummy)
request = self.factory.request()
request.user = self.trainee1
......@@ -213,156 +203,72 @@ class AaiUtilsTests(TestCase):
request.user = self.admin
self.assertEqual(decorated(request), 0)
def test_protected_anonoymous(self):
decorated = protected(Perms.view_exercise.full_name)(self.dummy)
def test_protected_anonymous(self):
decorated = protected(User.AuthGroup.TRAINEE)(self.dummy)
request = self.factory.request()
request.user = AnonymousUser()
with self.assertRaises(AuthenticationFailed):
decorated(request)
def test_team_protected(self):
def test_team_access(self):
request = self.factory.request()
decorated = extra_protected(Check.TEAM_ID)(self.team_dummy)
# access
# team1: trainee1, trainee2
# team2: empty
# team3: trainee3
request.user = self.trainee1
self.assertEqual(decorated(request, team_id=self.team1.id), 0)
self.assertIsNone(team_access(request, self.team1.id))
with self.assertRaises(PermissionDenied):
decorated(request, team_id=self.team2.id)
team_access(request, self.team2.id)
request.user = self.trainee2
self.assertEqual(decorated(request, team_id=self.team1.id), 0)
self.assertIsNone(team_access(request, self.team1.id))
with self.assertRaises(PermissionDenied):
decorated(request, team_id=self.team2.id)
team_access(request, self.team2.id)
request.user = self.trainee3
self.assertEqual(decorated(request, team_id=self.team3.id), 0)
self.assertIsNone(team_access(request, self.team3.id))
with self.assertRaises(PermissionDenied):
decorated(request, team_id=self.team2.id)
request.user = AnonymousUser()
with self.assertRaises(AuthenticationFailed):
decorated(request, team_id=self.team2.id)
team_access(request, self.team2.id)
def test_exercise_protected(self):
request = self.factory.request()
decorated = extra_protected(Check.EXERCISE_ID)(self.exercise_dummy)
# access
# exercise: instructor, trainee1-3
# exercise2: empty
request.user = self.instructor
self.assertEqual(decorated(request, exercise_id=self.exercise.id), 0)
self.assertIsNone(exercise_access(request, self.exercise.id))
with self.assertRaises(PermissionDenied):
decorated(request, exercise_id=self.exercise2.id)
exercise_access(request, self.exercise2.id)
request.user = self.admin
self.assertEqual(decorated(request, exercise_id=self.exercise.id), 0)
self.assertEqual(decorated(request, exercise_id=self.exercise2.id), 0)
self.assertIsNone(exercise_access(request, self.exercise.id))
self.assertIsNone(exercise_access(request, self.exercise2.id))
request.user = self.trainee1
self.assertEqual(decorated(request, exercise_id=self.exercise.id), 0)
self.assertIsNone(exercise_access(request, self.exercise.id))
with self.assertRaises(PermissionDenied):
decorated(request, exercise_id=self.exercise2.id)
request.user = AnonymousUser()
with self.assertRaises(AuthenticationFailed):
decorated(request, team_id=self.team2.id)
exercise_access(request, self.exercise2.id)
def test_definition_protected(self):
request = self.factory.request()
decorated = extra_protected(Check.DEFINITION_ID)(self.definition_dummy)
# access
# definition: instructor
request.user = self.instructor
self.assertEqual(
decorated(request, definition_id=self.definition.id), 0
) # instructor with access to definition
self.assertIsNone(definition_access(request, self.definition.id))
request.user = self.instructor2
# instructor without access to definition
with self.assertRaises(PermissionDenied):
decorated(request, definition_id=self.definition.id)
request.user = self.admin
# admin should see all definition
self.assertEqual(
decorated(request, definition_id=self.definition.id), 0
)
request.user = self.trainee1
# trainee should't see the definition
with self.assertRaises(PermissionDenied):
decorated(request, definition_id=self.definition.id)
def test_log_protected(self):
request = self.factory.request()
decorated = extra_protected(Check.LOG_ID)(self.log_dummy)
request.user = self.trainee1
# trainee1 created log -> can access it
self.assertEqual(decorated(request, log_id=self.log.id), 0)
request.user = self.trainee2
# trainee1_1 is in the same team as trainee1 who created the log thus
# can read the log as well
self.assertEqual(decorated(request, log_id=self.log.id), 0)
request.user = self.instructor
# instructor is assigned to the exercise where log was created thus can access it
self.assertEqual(decorated(request, log_id=self.log.id), 0)
definition_access(request, self.definition.id)
request.user = self.admin
# admin can see all logs
self.assertEqual(decorated(request, log_id=self.log.id), 0)
request.user = self.trainee3
# trainee3 belongs to different team of the exercise thus can't read other teams logs
with self.assertRaises(PermissionDenied):
decorated(request, log_id=self.log.id)
request.user = self.instructor2
# instructor2 is not assigned to the exercise thus can't access the log
with self.assertRaises(PermissionDenied):
decorated(request, log_id=self.log.id)
def test_thread_protected(self):
request = self.factory.request()
decorated = extra_protected(Check.THREAD_ID)(self.thread_dummy)
request.user = self.trainee1
# trainee1 created thread thus can access it
self.assertEqual(decorated(request, thread_id=self.thread.id), 0)
request.user = self.trainee2
# trainee2 is in the same team as trainee1 thus can access it
self.assertEqual(decorated(request, thread_id=self.thread.id), 0)
request.user = self.instructor
# instructor is assigned to the exercise thus can access all threads of exercise
self.assertEqual(decorated(request, thread_id=self.thread.id), 0)
request.user = self.admin
# admin can access all thread
self.assertEqual(decorated(request, thread_id=self.thread.id), 0)
request.user = self.instructor2
# instructor2 is not assigned to the exercise where thread belongs thus cant access it
with self.assertRaises(PermissionDenied):
decorated(request, thread_id=self.thread.id)
request.user = self.trainee3
# trainee3 does not belong to any participating teams of the thread
with self.assertRaises(PermissionDenied):
decorated(request, thread_id=self.thread.id)
def test_visible_protected(self):
request = self.factory.request()
decorated = extra_protected(Check.VISIBLE_ONLY)(self.visibility_dummy)
request.user = self.instructor
# instructor can see both visible only and all data
self.assertEqual(decorated(request, visible_only=False), 0)
self.assertEqual(decorated(request, visible_only=True), 0)
self.assertIsNone(definition_access(request, self.definition.id))
request.user = self.trainee1
# trainee can see only visible_only=True data
self.assertEqual(decorated(request, visible_only=True), 0)
with self.assertRaises(PermissionDenied):
decorated(request, visible_only=False)
definition_access(request, self.definition.id)