Commit 3499b40f authored by Ondřej Borýsek's avatar Ondřej Borýsek
Browse files

Fix variable import: from x import y -> import x

"from x import y" is equivalent to  "import x; y = x.y"
This can cause problems with order import if later modified from different script.
parent 5ba1bb50
Loading
Loading
Loading
Loading
+3 −2
Original line number Diff line number Diff line
@@ -29,7 +29,8 @@ from api_templates import download_templates_from_pwndoc_to_scan2report
from pwndoc_api import login, get_findings_from_audit, upsert_raw_finding, get_audits
from template_scan2report import TemplateScan2Report
from template_pwndoc import get_fid_from_raw_finding
from template_converters import Scan2ReportConvert, HTML_ATTRIBUTES
from template_converters import Scan2ReportConvert
import template_converters
from helpers.custom_logging import FlashLog
from helpers.rq_helper import REDIS_QUEUE, rq_job_finalized, rq_job_failed
import redis  # dependency via rq
@@ -442,7 +443,7 @@ def get_changed_after_combine_with_existing_findings(audit_id: str, possibly_new
        # if fid not in ["burp_5243008", "HTTP_generic_XSS"]:
        #     continue

        for attr_name in HTML_ATTRIBUTES:
        for attr_name in template_converters.HTML_ATTRIBUTES:
            single_raw_finding[attr_name] = MdAndHTMLConvertor.html_to_pwndoc_html(single_raw_finding.get(attr_name, ""))

        if fid not in fid_mapping:
+2 −2
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@ import functools
import logging

from loguru import logger
from config import LOGGER_INTERCEPT_STD_LOGGING
import config
from flask import flash, has_request_context

# todo: the following handler does NOT catch print statements (currently used inside scan2report)
@@ -53,6 +53,6 @@ class FlashLog(object):
        cls.flash_loguru_msg(msg, level)


if LOGGER_INTERCEPT_STD_LOGGING:
if config.LOGGER_INTERCEPT_STD_LOGGING:
    logging.basicConfig(handlers=[InterceptHandler()], level=0)
+1 −2
Original line number Diff line number Diff line
@@ -11,7 +11,6 @@ from typing import Any, Optional, Dict, List, Set

import config
from helpers.custom_logging import FlashLog
from config import ALLOWED_SCANNER_EXTENSIONS
from loguru import logger
from helpers.file_root import relative_path  # don't remove this import - other parts of code depend on it

@@ -22,7 +21,7 @@ def json_dump(obj: Any, filepath: str) -> None:


# Taken directly from Flask documentation
def is_whitelisted_extension(filename: str, whitelisted_extension: Set[str] = ALLOWED_SCANNER_EXTENSIONS):
def is_whitelisted_extension(filename: str, whitelisted_extension: Set[str] = config.ALLOWED_SCANNER_EXTENSIONS):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in whitelisted_extension

+17 −18
Original line number Diff line number Diff line
@@ -12,33 +12,32 @@ from typing import List, Tuple
from helpers.db_models import DbPwndocMapping
from template_pwndoc import TemplatePwndoc, MultilocaleTemplateWrapperPwndoc, TNSTemplatePwndoc, \
    to_html

from config import PWNDOC_URL, PWNDOC_USERNAME, PWNDOC_PASSWORD, PWNDOC_DISABLE_HTTPS_VERIFICATION
import config
from helpers.file_utils import *

session = requests.Session()
session.verify = not PWNDOC_DISABLE_HTTPS_VERIFICATION
session.verify = not config.PWNDOC_DISABLE_HTTPS_VERIFICATION
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)  # I'm writing one custom warning on config load, the default setting of urllib3 prints it on every request.

NO_FID = "NO_FID"


def login():
    assert not PWNDOC_URL.startswith(config.PWNDOC_NO_CONNECTION_PLACEHOLDER), 'Tried to perform login NO-CONNECTION-TEST domain. Aborting.'
    assert not config.PWNDOC_URL.startswith(config.PWNDOC_NO_CONNECTION_PLACEHOLDER), 'Tried to perform login NO-CONNECTION-TEST domain. Aborting.'
    data = {
        "username": PWNDOC_USERNAME,
        "password": PWNDOC_PASSWORD,
        "username": config.PWNDOC_USERNAME,
        "password": config.PWNDOC_PASSWORD,
        "totpToken": "",
    }

    resp = session.post(f"{PWNDOC_URL}/api/users/token", data=data)
    assert resp.status_code == 200, f"Login failed. Did you set PwnDoc username ({PWNDOC_USERNAME}) and password ({'*'*len(PWNDOC_PASSWORD)}) in ENV variables?"
    resp = session.post(f"{config.PWNDOC_URL}/api/users/token", data=data)
    assert resp.status_code == 200, f"Login failed. Did you set PwnDoc username ({config.PWNDOC_USERNAME}) and password ({'*'*len(config.PWNDOC_PASSWORD)}) in ENV variables?"


def refresh_token():
    # todo: maybe use requests-oauthlib instead?
    # https://requests-oauthlib.readthedocs.io/en/latest/examples/real_world_example_with_refresh.html
    resp = session.get(f"{PWNDOC_URL}/api/users/refreshtoken")
    resp = session.get(f"{config.PWNDOC_URL}/api/users/refreshtoken")
    assert resp.status_code == 200


@@ -51,10 +50,10 @@ def add_finding_to_audit(finding: 'TemplatePwndoc', audit_id: str):
def upsert_raw_finding(audit_id: str, data: dict):
    finding_id = data.get("_id")
    if finding_id is None:
        url = f"{PWNDOC_URL}/api/audits/{audit_id}/findings"
        url = f"{config.PWNDOC_URL}/api/audits/{audit_id}/findings"
        resp = session.post(url, json=data)
    else:
        url = f"{PWNDOC_URL}/api/audits/{audit_id}/findings/{finding_id}"
        url = f"{config.PWNDOC_URL}/api/audits/{audit_id}/findings/{finding_id}"
        resp = session.put(url, json=data)
    assert resp.status_code == 200, f"Upserting of finding failed; {resp.text}"

@@ -62,7 +61,7 @@ def upsert_raw_finding(audit_id: str, data: dict):
def add_templates_for_vulnerability(lang_mutations_of_template: Dict[str, 'TemplatePwndoc']):
    """ Don't use this directly, rather use PwndocTemplateManager.add_single_locale_template """

    url = f"{PWNDOC_URL}/api/vulnerabilities"
    url = f"{config.PWNDOC_URL}/api/vulnerabilities"

    data = MultilocaleTemplateWrapperPwndoc()

@@ -110,7 +109,7 @@ def get_and_save(url_path: str, filename: Optional[str] = None) -> dict:
    if filename is None:
        filename = filename_from_url_path(url_path)

    url = f"{PWNDOC_URL}{url_path}"
    url = f"{config.PWNDOC_URL}{url_path}"
    resp = session.get(url)
    if resp.status_code != 200:
        FlashLog.error(f"PwnDoc API failed {url_path} | {resp.status_code} | {resp.text}")
@@ -175,7 +174,7 @@ def load_audit_file(audit_id: str) -> Optional[dict]:

def delete_all_templates_in_pwndoc():
    login()
    url = f"{PWNDOC_URL}/api/vulnerabilities"
    url = f"{config.PWNDOC_URL}/api/vulnerabilities"
    resp = session.delete(url)
    assert resp.status_code == 200, f"Template removal failed: {resp.text}"

@@ -223,14 +222,14 @@ def download_and_return_finding(audit_id: str, finding_id: str):
def delete_finding(audit_id: str, finding_id: str):
    login()
    url_path = f"/api/audits/{audit_id}/findings/{finding_id}"
    resp = session.delete(f"{PWNDOC_URL}{url_path}")
    resp = session.delete(f"{config.PWNDOC_URL}{url_path}")
    assert resp.status_code == 200, f"Template removal failed: {resp.text}"


def update_scope(audit_id: str, scope_raw: dict):
    login()
    url_path = f"/api/audits/{audit_id}/network"
    resp = session.put(f"{PWNDOC_URL}{url_path}", json=scope_raw)
    resp = session.put(f"{config.PWNDOC_URL}{url_path}", json=scope_raw)

    json_dump(scope_raw, persistent_audit_scopes_file_path(audit_id))

@@ -267,7 +266,7 @@ def download_report(audit_id: str, report_filename: str):
    report_filepath = get_docx_filepath(report_filename)

    login()
    url = f"{PWNDOC_URL}/api/audits/{audit_id}/generate"
    url = f"{config.PWNDOC_URL}/api/audits/{audit_id}/generate"
    resp = session.get(url, timeout=config.RQ_JOB_TIMEOUT)
    assert resp.status_code == 200, f"Report download failed"

@@ -281,6 +280,6 @@ def create_audit(name_section: str = "", locale: str = "cs") -> str:
        "language": locale,
        "auditType": "Example audit type"
    }
    resp = session.post(f"{PWNDOC_URL}/api/audits", data=data)
    resp = session.post(f"{config.PWNDOC_URL}/api/audits", data=data)
    assert resp.status_code == 201
    return resp.json()["datas"]["audit"]["_id"]
+6 −7
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ import time
import json

import config
from config import PWNDOC_URL, PWNDOC_USERNAME, PWNDOC_PASSWORD
from helpers.file_utils import relative_path

from pwndoc_api import session, login
@@ -51,7 +50,7 @@ class InitialData:
    def _is_db_clean():
        for i in range(10):
            try:
                resp = session.get(f"{PWNDOC_URL}/api/users/init")
                resp = session.get(f"{config.PWNDOC_URL}/api/users/init")
                if resp.status_code == 200 and resp.json().get("datas") is True:
                    logger.info("Clean instance detected. Forcing import to DB.")
                    return True
@@ -59,14 +58,14 @@ class InitialData:
            except requests.exceptions.ConnectionError:
                if config.APP_INIT_PWNDOC_DISALLOW_WAITING:
                    break
                logger.warning(f"Connection to {PWNDOC_URL} failed. Waiting before retry.")
                logger.warning(f"Connection to {config.PWNDOC_URL} failed. Waiting before retry.")
                time.sleep(random.uniform(1, 2))

        logger.warning(f"Connection to {PWNDOC_URL} failed. Not retrying.")
        logger.warning(f"Connection to {config.PWNDOC_URL} failed. Not retrying.")
        return False

    @staticmethod
    def _add_pwndoc_user(username: str = PWNDOC_USERNAME, password: str = PWNDOC_PASSWORD, firstname: str = "Pwndoc",
    def _add_pwndoc_user(username: str = config.PWNDOC_USERNAME, password: str = config.PWNDOC_PASSWORD, firstname: str = "Pwndoc",
                         lastname: str = "Importer", first_user: bool = False):

        init_data = {
@@ -75,7 +74,7 @@ class InitialData:
            "firstname": firstname,
            "lastname": lastname
        }
        url = f"{PWNDOC_URL}/api/users"
        url = f"{config.PWNDOC_URL}/api/users"

        if first_user:
            url += "/init"
@@ -117,7 +116,7 @@ class InitialData:
    @staticmethod
    def _get_api_path_from_filename(filename: str) -> str:
        url_path = filename.replace("_", "/").rsplit(".", 1)[0]
        return f"{PWNDOC_URL}{url_path}"
        return f"{config.PWNDOC_URL}{url_path}"

    @staticmethod
    def _get_data_from_init_file(filename: str) -> dict: