Commit a81bc6e0 authored by Ondřej Borýsek's avatar Ondřej Borýsek
Browse files

Transform InitialData methods to classmethods

parent cb58fe90
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -41,7 +41,7 @@ def download_api_examples():

@bp.route("/force_upload_init_data", methods=["GET"])
def force_upload_init_data():
    pwndoc_db_init.InitialData().upload_initial_data()
    pwndoc_db_init.InitialData.upload_initial_data()
    return basic_msg("API inited")


+1 −1
Original line number Diff line number Diff line
@@ -111,7 +111,7 @@ def single_threaded_init(app, skip_pwndoc: bool):
    if not main_thread:
        return  # todo: All threads except the main one will be responsive before the full setup is completed. User actions could cause race condition.

    pwndoc_db_init.InitialData().upload_initial_data()
    pwndoc_db_init.InitialData.upload_initial_data()
    logger.info("PwnDoc Init completed")

    pwndoc_db_init.PwnDocUpdate.setup_scan2report_plugin_folder()  # This will fail if run second time.
+34 −28
Original line number Diff line number Diff line
@@ -36,13 +36,14 @@ class InitialData:

        return main_thread

    def upload_initial_data(self):
        self._upload_universal('_api_templates.json')
        self._upload_universal('_api_data_languages.json')
        self._upload_universal('_api_data_sections.json')
        self._upload_universal('_api_data_vulnerability-categories.json')
        self._upload_universal('_api_data_audit-types.json')  # requires: templates, languages, sections
        self._upload_universal('_api_data_custom-fields.json')  # requires: everything
    @classmethod
    def upload_initial_data(cls):
        cls._upload_universal('_api_templates.json')
        cls._upload_universal('_api_data_languages.json')
        cls._upload_universal('_api_data_sections.json')
        cls._upload_universal('_api_data_vulnerability-categories.json')
        cls._upload_universal('_api_data_audit-types.json')  # requires: templates, languages, sections
        cls._upload_universal('_api_data_custom-fields.json')  # requires: everything

    @staticmethod
    def wait_until_pwndoc_user_is_ready(wait_max_x_seconds=30):
@@ -96,21 +97,23 @@ class InitialData:
        assert resp.status_code == 201, f"Init failed: {resp.text}"
        return True

    def _save_new_id_mapping(self, old_id: str, new_id: str):
    @classmethod
    def _save_new_id_mapping(cls, old_id: str, new_id: str):
        try:
            with open(self.__NEW_MAPPING_FILEPATH, encoding="utf8") as f:
            with open(cls.__NEW_MAPPING_FILEPATH, encoding="utf8") as f:
                content = json.load(f)
        except:
            content = {}

        content[old_id] = new_id
        with open(self.__NEW_MAPPING_FILEPATH, "w", encoding="utf8") as f:
        with open(cls.__NEW_MAPPING_FILEPATH, "w", encoding="utf8") as f:
            json.dump(content, f, indent=4)

    def _load_new_id_mapping(self, old_id: str) -> str:
    @classmethod
    def _load_new_id_mapping(cls, old_id: str) -> str:
        for i in range(10):
            try:
                with open(self.__NEW_MAPPING_FILEPATH, encoding="utf8") as f:
                with open(cls.__NEW_MAPPING_FILEPATH, encoding="utf8") as f:
                    content = json.load(f)
                return content.get(old_id, old_id)
            except json.decoder.JSONDecodeError:
@@ -130,25 +133,27 @@ class InitialData:
            content = json.load(f)["datas"]
        return content

    def _upload_universal(self, filename: str):
        content = self._get_data_from_init_file(filename)
    @classmethod
    def _upload_universal(cls, filename: str):
        content = cls._get_data_from_init_file(filename)

        for single_item in content:

            if filename == "_api_templates.json":
                single_item['file'] = self._load_template_file(single_item['name'] + "." + single_item['ext']).decode()
                single_item['file'] = cls._load_template_file(single_item['name'] + "." + single_item['ext']).decode()

            if filename == "_api_data_audit-types.json":
                for locale_dict in single_item.get("templates", []):
                    locale_dict["template"] = self._load_new_id_mapping(locale_dict["template"])
                    locale_dict["template"] = cls._load_new_id_mapping(locale_dict["template"])

            self._create_single_item(self._get_api_path_from_filename(filename), single_item)
            cls._create_single_item(cls._get_api_path_from_filename(filename), single_item)

    def _create_single_item(self, url: str, orig_data: dict):
    @classmethod
    def _create_single_item(cls, url: str, orig_data: dict):
        resp = session.post(url, json=orig_data)
        if resp.status_code == 201:
            if orig_data.get("_id"):
                self._save_new_id_mapping(orig_data["_id"], resp.json()["datas"]["_id"])
                cls._save_new_id_mapping(orig_data["_id"], resp.json()["datas"]["_id"])
        else:
            if not resp.json().get("data", "").endswith("already exists"):
                logger.info(f"Skipping upload duplicate: {resp.text}")
@@ -165,36 +170,37 @@ class InitialData:

    # -- later updates

    def find_custom_field_by_label(self, datas: List[dict], label: str) -> Optional[dict]:
    @staticmethod
    def find_custom_field_by_label(datas: List[dict], label: str) -> Optional[dict]:
        one_field = list(filter(lambda x: x.get('label') == label, datas))
        return one_field[0] if len(one_field) else None

    def insert_vuln_custom_field_if_doesnt_exist(self, label: str):
    @classmethod
    def insert_vuln_custom_field_if_doesnt_exist(cls, label: str):
        CUSTOM_FIELDS_FILENAME = '_api_data_custom-fields.json'

        file_datas = self._get_data_from_init_file(CUSTOM_FIELDS_FILENAME)
        local_field = self.find_custom_field_by_label(file_datas, label)
        file_datas = cls._get_data_from_init_file(CUSTOM_FIELDS_FILENAME)
        local_field = cls.find_custom_field_by_label(file_datas, label)
        if local_field is None:
            logger.warning(f"Invoked addition of new custom field {label}, however no field with such label is in {CUSTOM_FIELDS_FILENAME}. Skipping.")
            return

        # login is outside in PwnDocUpdate
        url = self._get_api_path_from_filename(CUSTOM_FIELDS_FILENAME)
        url = cls._get_api_path_from_filename(CUSTOM_FIELDS_FILENAME)
        resp = session.get(url)
        assert resp.status_code == 200, f'Python Requests auth mechanism failed for PwnDoc failed? {resp.status_code}'
        pwndoc_datas = resp.json()["datas"]
        pwndoc_field = self.find_custom_field_by_label(pwndoc_datas, label)
        pwndoc_field = cls.find_custom_field_by_label(pwndoc_datas, label)

        if pwndoc_field is None:
            logger.info(f"Adding Vulnerability custom field '{label}' to PwnDoc.")
            self._create_single_item(url, local_field)
            cls._create_single_item(url, local_field)


class PwnDocUpdate:
    @staticmethod
    def add_new_fields_if_needed():
        _id = InitialData()
        _id.insert_vuln_custom_field_if_doesnt_exist('ignore_pluginoutput_checkbox')
        InitialData.insert_vuln_custom_field_if_doesnt_exist('ignore_pluginoutput_checkbox')

    @staticmethod
    def setup_scan2report_plugin_folder():