pax_global_header 0000666 0000000 0000000 00000000064 14313124554 0014514 g ustar 00root root 0000000 0000000 52 comment=b65c17abcc7dbbbdac817061daccc29c448d7fa0
pwndocimportautomator-release-2022-09-22/ 0000775 0000000 0000000 00000000000 14313124554 0020211 5 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/.dockerignore 0000664 0000000 0000000 00000000200 14313124554 0022655 0 ustar 00root root 0000000 0000000 .git/
.idea/
.venv/
venv-windows/
.vagrant/
__pycache__/
pwndoc/
user_data/
backups/
debug_tmp/
.env
docker.env
pwndocimportautomator-release-2022-09-22/.env.dist 0000664 0000000 0000000 00000000616 14313124554 0021747 0 ustar 00root root 0000000 0000000 # Copy this to .env for local deployment or docker.env for docker usage.
# Fill in the copied file.
PWNDOC_USERNAME=FILL_ME_IN
PWNDOC_PASSWORD=FILL_ME_IN
PWNDOC_URL=https://FILL_ME_IN/
PWNDOC_DISABLE_HTTPS_VERIFICATION=False
# For docker deployment uncomment the following two lines.
# PWNDOC_URL=https://pwndoc-backend:4242/
# PWNDOC_DISABLE_HTTPS_VERIFICATION=True
FLASK_SECRET_KEY=FILL_ME_IN
pwndocimportautomator-release-2022-09-22/.gitignore 0000664 0000000 0000000 00000000660 14313124554 0022203 0 ustar 00root root 0000000 0000000 *.json
debug_tmp/*
unified_templates/*
!/**/.gitkeep
.env
docker.env
.venv/
venv-windows
*.pyc
.idea/
.vagrant/
backups/*/*
!user_data/pwndoc-init/*
!user_data/pwndoc-config/*
user_data/mongo-data
user_data/pwndoc-docx-templates/*
user_data/importer-logs/*.log
user_data/importer-logs/*.log.gz
user_data/pwndoc-logs/*.log
user_data/worker-logs/*
test.docx
user_data/pwndoc-init/new_id_mapping.json
user_data/importer-db/main.db
pwndocimportautomator-release-2022-09-22/.gitmodules 0000664 0000000 0000000 00000000354 14313124554 0022370 0 ustar 00root root 0000000 0000000 [submodule "scan2report"]
path = scan2report
url = git@gitlab.fi.muni.cz:cybersec/tns/scan2report.git
branch = master
[submodule "pwndoc"]
path = pwndoc
url = https://github.com/BorysekOndrej/pwndoc.git
branch = customization-poc
pwndocimportautomator-release-2022-09-22/Dockerfile 0000664 0000000 0000000 00000001431 14313124554 0022202 0 ustar 00root root 0000000 0000000 FROM python:3.8
WORKDIR /app
RUN apt-get clean && apt-get update && apt-get install -y locales locales-all
RUN locale-gen cs_CZ && locale-gen cs_CZ.UTF-8 && update-locale cs_CZ.UTF-8
ENV LANG="cs_CZ.UTF-8" LC_ALL="cs_CZ.UTF-8" LC_CTYPE="cs_CZ.UTF-8"
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
COPY . .
# todo: lower the timeout back to 30 seconds when the scan2report processing is done asynchronously
# 90 seconds is timeout in Firefox
ENV FLASK_APP app.py
CMD [ "gunicorn", \
"--bind", "0.0.0.0:5000", \
"-w", "4", \
"--worker-class", "gevent", \
"--timeout", "85", \
"--access-logfile", "user_data/importer-logs/gunicorn_access.log", \
"--error-logfile", "user_data/importer-logs/gunicorn_error.log", \
"app:create_app()"]
pwndocimportautomator-release-2022-09-22/README.md 0000664 0000000 0000000 00000002124 14313124554 0021467 0 ustar 00root root 0000000 0000000 # pwndocImportAutomator
To clone repo with submodules:
```sh
git clone --recurse-submodules git@gitlab.fi.muni.cz:cybersec/tns/pwndocimportautomator.git
cd pwndocimportautomator
```
## Deployment - Docker-compose
The recommended deployment process is using Docker-compose.
```sh
# MANUAL STEP: Fill in docker.env file (use .env.dist as a template)
docker-compose up --build -d
```
How to stop the containers:
```
docker-compose down --remove-orphans
# note: flag -v would also remove the docker volume with the database (DANGEROUS)
```
Ports:
- 5001: ImportAutomator (HTTP)
- 8443: Pwndoc (HTTPS)
## First run setup steps
- Import Scan2Report templates using the web interface (they are no longer autoimported from ./scan2report/plugins/)
## How to backup
Before first run:
`sudo apt update && sudo apt install zip`
To run backup:
`./backup.sh`
## How to update
```sh
./backup.sh
git pull --recurse-submodules -X ours
./backup.sh
docker-compose up --build -d
```
## How to export scan2report plugins folder
HTTP request to `:5001/templates/export_as_scan2report_native` returns it as zip
pwndocimportautomator-release-2022-09-22/Vagrantfile 0000664 0000000 0000000 00000001557 14313124554 0022406 0 ustar 00root root 0000000 0000000 Vagrant.configure("2") do |config|
# config.vm.box = "ubuntu/xenial64"
config.vm.box = "munikypo/xubuntu-18.04"
config.vm.box_version = "0.2.0"
#map ports from local PC, to VM ... which intern will be mapped to container ports in docker-compose.yml
config.vm.network "forwarded_port",
guest: 5001, host: 9001
config.vm.network "forwarded_port",
guest: 8443, host: 9443
#allow access on windows to VM
config.vm.network "public_network", bridge: "Intel(R) Dual Band Wireless-AC 7265"
# Mount this folder as RO in the guest, since it contains secure stuff
config.vm.synced_folder ".", "/vagrant" #, :mount_options => ["ro"]
#To install, rebuild and run docker-compose on vagrant up
config.vm.provision :docker
config.vm.provision :docker_compose, yml: "/vagrant/docker-compose.yml", rebuild: true, run: "always"
end
pwndocimportautomator-release-2022-09-22/api_debug.py 0000664 0000000 0000000 00000021342 14313124554 0022504 0 ustar 00root root 0000000 0000000 import datetime
import os
from typing import List, Tuple
import time
from tqdm import tqdm
from flask import Blueprint, render_template, Response, send_file, url_for
from helpers.pwndoc_log_beautify import log_to_beautified_errors
from helpers.template_grouping import TemplateAliasing, TemplateGrouping
from helpers.custom_logging import FlashLog
import pwndoc_db_init
from helpers.file_utils import zip_multiple_folders_non_recursive_to_virtual, relative_path
from template_manager import PwndocTemplateManager
import template_pwndoc
from pwndoc_api import login, refresh_examples, delete_all_templates_in_pwndoc, load_audit_file, \
get_findings_from_audit, download_and_return_finding, delete_finding, get_audits
from helpers.db_models import DbTemplate
from template_pwndoc import TNSTemplatePwndoc, get_fid_from_raw_finding
from template_scan2report import TemplateScan2Report
from template_converters import PwndocConverter
bp = Blueprint('api_debug', __name__, template_folder='templates')
@bp.route("/", methods=["GET"])
def show_debug_actions_list():
audits_dict = get_audits()
return render_template('debug_actions.html', audits_dict=audits_dict)
@bp.route("/download_api", methods=["GET"])
def download_api_examples():
login()
refresh_examples()
return "API examples downloaded"
@bp.route("/force_upload_init_data", methods=["GET"])
def force_upload_init_data():
pwndoc_db_init.InitialData(force=True)
return "API inited"
@bp.route("/refresh_api_examples", methods=["GET"])
def debug_refresh_api_examples():
login()
refresh_examples()
return f"OK"
@bp.route("/download_logs/", methods=["GET"])
def debug_download_logs(log_name: str = "main"):
log_folder_paths = {
'importer': 'user_data/importer-logs/',
'pwndoc': 'user_data/pwndoc-logs/',
'worker': 'user_data/worker-logs/',
}
if log_name in ["main", "gunicorn_error", "gunicorn_access", "everything"]:
folder_path = log_folder_paths['importer']
elif log_name in ["pwndoc"]:
folder_path = log_folder_paths['pwndoc']
else:
return "This log doesn't exist or it's download is not permitted.", 400
# todo: everything currently does not include Pwndoc
if log_name != "everything":
with open(f"{folder_path}{log_name}.log", encoding="utf8") as f:
return Response(f.read(), mimetype='application/json')
beautified_log_path = os.path.join(log_folder_paths['importer'], f'pwndoc_beautified_{int(time.time())}.log')
with open(beautified_log_path, 'w', encoding='utf8') as f:
beautified_text = generate_pwndoc_human_readable_log()
f.write(beautified_text)
memory_file = zip_multiple_folders_non_recursive_to_virtual(
list(log_folder_paths.values())
)
return send_file(
memory_file,
download_name=f"pwndoc_importer_logs_{time.time()}.zip",
as_attachment=True
)
def generate_pwndoc_human_readable_log():
folder_path = "user_data/pwndoc-logs/"
log_path = relative_path(f"{folder_path}/pwndoc.log")
with open(log_path, encoding="utf8") as f:
log_lines = f.readlines()
beautified_errors = log_to_beautified_errors(log_lines)
text_answer = "This log is persistent - it shows all errors since the logs creation. That means some of the errors might have already been resolved."
text_answer += "\nThe newest logs are on the top. Only errors which could be beautified are displayed here."
text_answer += f"\nGenerated {str(datetime.datetime.now())} \n\n==================\n\n"
text_answer += "\n\n================== NEXT ERROR ==================\n\n".join(beautified_errors)
return text_answer
@bp.route("/pwndoc_human_readable_log", methods=["GET"])
def make_pwndoc_human_readable_log():
text_answer = generate_pwndoc_human_readable_log()
raw_response = Response(text_answer)
raw_response.mimetype = 'text/plain'
return raw_response
@bp.route("/test_db", methods=["GET"])
def debug_test_db():
template1 = template_pwndoc.TNSTemplatePwndoc(fid="fid1", locale="cs")
template2 = template_pwndoc.TNSTemplatePwndoc(fid="fid1", locale="og")
DbTemplate.delete_template("fid")
assert DbTemplate.get_template("fid", "og") is None
res2, _ = DbTemplate.add_template("fid", "og", template1)
assert DbTemplate.get_template("fid", "og") == res2
DbTemplate.delete_template("fid")
assert DbTemplate.get_template("fid", "og") is None
res5, _ = DbTemplate.add_template("fid", "og", template1)
res6, _ = DbTemplate.add_template("fid", "og", template2)
assert res6.get_previous_revision() == res5
DbTemplate.delete_template("fid")
assert DbTemplate.get_template("fid", "og", include_deleted=True) == res6
return "OK"
@bp.route("/get_all_templates_from_db")
def debug_get_all_templates():
current_templates: List[DbTemplate] = DbTemplate.get_all_current_templates()
return f"{len(current_templates)} templates"
@bp.route("/delete_vulnerability_templates")
def delete_vulnerability_templates():
dict_of_all_templates = PwndocTemplateManager.get_all_templates_as_dict_of_locales()
for fid in dict_of_all_templates.keys():
DbTemplate.delete_template(fid)
delete_all_templates_in_pwndoc()
TemplateScan2Report.delete_template_files()
return "Deleted"
@bp.route("/force_unescape_old_templates", methods=["GET"])
def force_unescape_old_templates():
PwndocTemplateManager.update_template_db_from_pwndoc()
all_templates: List[TNSTemplatePwndoc] = PwndocTemplateManager.get_all_templates()
new_templates: List[TNSTemplatePwndoc] = []
for single_template in tqdm(all_templates):
tmp = PwndocConverter.html_conversion(single_template, html_to_md=False, html_force_unescape_basic_tags=True)
# print(single_template.description)
# print(tmp.description)
new_templates.append(tmp)
n_templates_changed: int = 0
for single_template in tqdm(new_templates):
new = PwndocTemplateManager.add_single_locale_template(single_template)
if new:
n_templates_changed += 1
return f"{len(new_templates)} templates processed, {n_templates_changed} templates were updated."
@bp.route("/flash_msg", methods=["GET"])
def test_flash_msg():
FlashLog.info("test info")
FlashLog.warning("test warning")
FlashLog.error("test error")
return render_template("base.html")
def _list_newly_grouped_or_aliased_findings(audit_id: str) -> List[Tuple[str, str, str]]:
login()
refresh_examples(audit_id)
TemplateAliasing.refresh()
TemplateGrouping.refresh()
audit_data = load_audit_file(audit_id)
newly_grouped_or_aliased_findings: List[Tuple[str, str, str]] = []
findings = audit_data.get("datas", {}).get("findings", [])
for single_finding in findings:
fid = get_fid_from_raw_finding(single_finding)
pwndoc_id = single_finding.get("_id", "")
if fid is None:
FlashLog.warning(f"Pwndoc finding {pwndoc_id} doesn't have fid. Skipping.")
continue
for qry_function in [TemplateAliasing.get_alias, TemplateGrouping.get_gid]:
# noinspection PyArgumentList
alias_or_gid = qry_function(fid)
if alias_or_gid:
newly_grouped_or_aliased_findings.append((fid, alias_or_gid, pwndoc_id))
return newly_grouped_or_aliased_findings
@bp.route("/list_newly_grouped_or_aliased_findings/", methods=["GET"])
def show_newly_grouped_or_aliased_findings(audit_id: str):
newly_grouped_or_aliased_findings = _list_newly_grouped_or_aliased_findings(audit_id)
return "The following findings from the audit are currently part of alias or group: " + \
"FID - alias or GID - PwnDoC ID " + \
" ".join([" - ".join(x) for x in newly_grouped_or_aliased_findings]) + \
f'
Delete all these findings (DANGEROUS; no confirmation dialog) '
@bp.route("/delete_newly_grouped_or_aliased_findings/", methods=["GET"])
def delete_newly_grouped_or_aliased_findings(audit_id: str):
newly_grouped_or_aliased_findings = _list_newly_grouped_or_aliased_findings(audit_id)
for _, _, pwndoc_id in newly_grouped_or_aliased_findings:
delete_finding(audit_id, pwndoc_id)
return "Deleted the following findings: " + \
"FID - alias or GID - PwnDoC ID " + \
" ".join([" - ".join(x) for x in newly_grouped_or_aliased_findings])
@bp.route("/download_audit/", methods=["GET"])
def debug_download_audit(audit_id: str):
findings = get_findings_from_audit(audit_id)
for single_finding in findings:
download_and_return_finding(audit_id, single_finding.get("_id"))
return "OK"
pwndocimportautomator-release-2022-09-22/api_examples/ 0000775 0000000 0000000 00000000000 14313124554 0022660 5 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/api_examples/.gitkeep 0000664 0000000 0000000 00000000000 14313124554 0024277 0 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/api_process_findings.py 0000664 0000000 0000000 00000057564 14313124554 0024774 0 ustar 00root root 0000000 0000000 import html
import io
from typing import Optional, Set, Dict, List, Tuple
from dataclasses import dataclass, field
from tqdm import tqdm
from dataclasses_json import dataclass_json
import json
import os
import pathlib
import shlex
from loguru import logger
from enum import Enum
from flask import Blueprint, render_template, request, redirect, url_for, current_app, send_file
from werkzeug.utils import secure_filename
import config
from helpers.time_helper import current_timestamp
from helpers.md_and_html_convertor import MdAndHTMLConvertor
import helpers.table_of_services
from helpers.file_utils import relative_path, extract_fid_from_filename, json_safe_load, generate_random_folder_name, \
persistent_audit_scopes_file_path, folder_in_upload_path
from template_manager import PwndocTemplateManager
from helpers.flask_multifile_upload import save_multiple_uploaded_files
import template_pwndoc
from scan2report import scan2report
from api_templates import download_templates_from_pwndoc_to_scan2report
from pwndoc_api import login, get_findings_from_audit, upsert_raw_finding, get_audits
from template_scan2report import TemplateScan2Report
from template_pwndoc import get_fid_from_raw_finding
from template_converters import Scan2ReportConvert, HTML_ATTRIBUTES
from helpers.custom_logging import FlashLog
from helpers.rq_helper import REDIS_QUEUE, rq_job_finalized
import redis # dependency via rq
INFO_FILE = "_pwndoc_importer_processing.json"
class OutputFormat(Enum):
PWNDOC = "pwndoc"
JSON = "json"
CSV = "csv"
DOCX = "docx"
class ProcessingSteps(Enum):
UPLOAD_VULN_SCANS = "Upload scan results from browser to server"
PREPARATIONS_FOR_SCAN2REPORT = "Download templates from PwnDoc"
SCAN2REPORT_FIRST_PASS = "Process scan findings using Scan2Report"
UPLOAD_OF_NEW_TEMPLATES = "Upload newly found templates to PwnDoc"
SCAN2REPORT_SECOND_PASS = "Rerun Scan2Report with new templates (if any)"
PWNDOC_FINDINGS_UPLOAD = "Upload findings to PwnDoc audit"
class ProcessingStatus(Enum):
WAITING = '⏳',
RUNNING = '🚀',
DONE = '✔',
FAILED = '❌',
SKIPPED = '⏩'
SEVERITIES = { # todo: synchronize this with scan2report
'0': 'Informative',
'1': 'Low',
'2': 'Medium',
'3': 'High',
'4': 'Critical'
}
# TODO: There is race condition. When two people are adding findings - the scan2report template synchronization might break the other scan2report run.
bp = Blueprint('api_process_findings', __name__, template_folder='templates')
def _get_audits() -> Dict[str, str]:
audits_dict = {
"Export to JSON": OutputFormat.JSON.value,
"Export to CSV": OutputFormat.CSV.value,
"Export to DOCX": OutputFormat.DOCX.value,
}
for audit_id, audit_name in get_audits():
nice_name = f'{audit_name} (id: {audit_id})'
audits_dict[nice_name] = audit_id
return audits_dict
def _list_profiles() -> List[str]:
with open(relative_path("scan2report/plugins/profiles.json"), encoding="utf8") as f:
data: dict = json.load(f)
return list(data.keys())
@bp.route("/upload_scanner_result/", methods=["GET"])
@bp.route("/upload_scanner_result", methods=["GET"])
def upload_scanner_result_get(audit_id: str = ""): # todo: maybe add additional arguments for setting persistence across error
audits_dict = _get_audits()
if len(audit_id) and audit_id not in list(audits_dict.values()):
FlashLog.warning("Audit ID from URL doesn't exist.")
audit_id = OutputFormat.DOCX.value
languages = list(filter(lambda x: x != "og", config.LOCALES))
profiles = _list_profiles()
return render_template('upload_scanner_result_get.html', audit_dict=audits_dict, audit_id=audit_id, lang_list=languages, severities=SEVERITIES, profiles=profiles, allowed_scanner_extensions=config.ALLOWED_SCANNER_EXTENSIONS)
def _go_to_start(audit_id: str = ""):
return redirect(url_for('api_process_findings.upload_scanner_result_get', audit_id=audit_id))
def _go_to_meta(folder_name: str):
return redirect(url_for("api_process_findings.show_meta_output", folder_name=folder_name))
def _flash_msg_and_go_to_start(msg: str, level: str = "warning", audit_id: str = ""):
FlashLog.log_and_flash_with_level(msg, level, depth=2)
# TODO: ps.user_msgs
return _go_to_start(audit_id)
@dataclass_json
@dataclass
class ProcessingSettings:
audit_id: str = OutputFormat.DOCX.value
upload_to_pwndoc: bool = False
scan2report_args: str = ''
__folder_name: str = field(default_factory=lambda: generate_random_folder_name("findings_import"))
stored_files: Set[str] = field(default_factory=set)
locale: str = 'cs'
steps_progress: Dict[str, Tuple[str, int, int]] = field(default_factory=lambda:
{e.name: (ProcessingStatus.WAITING.name, 0, 0) for e in ProcessingSteps}
)
user_msgs: List[Tuple[str, str]] = field(default_factory=list)
finished: bool = False
missing_templates: List[str] = field(default_factory=list) # todo: missing templates might not include grouped findings which don't have their own fid in the resulting findings.json
output_unspecified: dict = field(default_factory=dict)
@staticmethod
def __get_folder_path(folder_name: str) -> str:
assert folder_name == secure_filename(folder_name)
return folder_in_upload_path(folder_name)
def get_folder_path(self) -> str:
return self.__get_folder_path(self.__folder_name)
def get_folder_name(self) -> str:
return self.__folder_name
@classmethod
def __get_info_file_path(cls, folder_name: str) -> str:
return os.path.join(cls.__get_folder_path(folder_name), INFO_FILE)
@classmethod
def load_from_folder_name(cls, folder_name: str) -> Optional['ProcessingSettings']:
filepath = cls.__get_info_file_path(folder_name)
if not pathlib.Path(filepath).is_file():
return None
with open(filepath, encoding="utf-8") as f:
file_data = json.load(f)
return ProcessingSettings.from_dict(file_data)
def save(self):
pathlib.Path(self.get_folder_path()).mkdir(exist_ok=True)
with open(self.__get_info_file_path(self.__folder_name), "w", encoding="utf-8") as f:
json.dump(self.to_dict(), f, indent=4)
def cleanup_uploaded_files(self):
files = [os.path.join(self.get_folder_path(), f) for f in self.stored_files]
for file in files:
if pathlib.Path(file).is_file():
os.remove(file)
def get_result_path(self) -> str:
if self.audit_id == OutputFormat.DOCX.value:
return self._get_result_docx_path()
if self.audit_id == OutputFormat.CSV.value:
return self._get_result_csv_path()
return self._get_result_json_path()
def _get_result_docx_path(self):
return os.path.join(self.get_folder_path(), 'scan2report_output.docx')
def _get_result_json_path(self):
return os.path.join(self.get_folder_path(), 'scan2report_output.json')
def _get_result_csv_path(self):
return os.path.join(self.get_folder_path(), 'scan2report_output.csv')
def _meta_file_path(self) -> str:
return f"{self.get_result_path()}.meta.json"
def load_meta_file(self) -> dict:
meta_data = json_safe_load(self._meta_file_path())
return meta_data if meta_data else {}
def set_step_status(self, step: ProcessingSteps, new_status: ProcessingStatus):
# todo: make sure all calls to this function happen even when exceptions occur
step_name = step.name
old_status_name, start_timestamp, end_timestamp = self.steps_progress[step_name]
if old_status_name == ProcessingStatus.WAITING.name:
start_timestamp = current_timestamp()
if new_status in [ProcessingStatus.DONE, ProcessingStatus.FAILED, ProcessingStatus.SKIPPED]:
end_timestamp = current_timestamp()
if new_status == ProcessingStatus.FAILED:
self.finished = True
self.steps_progress[step_name] = (new_status.name, start_timestamp, end_timestamp)
self.save()
def get_statuses_for_display(self) -> List[Tuple[str, str, str]]:
"""Step display text | Step status text | Step duration in minutes and seconds"""
answer: List[Tuple[str, str, str]] = []
for step_enum in ProcessingSteps:
step_info = self.steps_progress.get(step_enum.name, None)
if step_info is None:
continue
old_status_name, start_timestamp, end_timestamp = step_info
if end_timestamp >= start_timestamp:
duration = end_timestamp - start_timestamp
elif start_timestamp != 0:
duration = current_timestamp() - start_timestamp
else:
duration = 0
step_text = step_enum.value
step_status = ProcessingStatus[step_info[0]]
step_status_text = step_status.value[0] if isinstance(step_status.value, tuple) else step_status.value # todo: inspect why python sees some emojis as tuples
step_duration_text = ""
if step_status not in [ProcessingStatus.WAITING, ProcessingStatus.SKIPPED]:
step_duration_text = f"{str(duration//60).zfill(2)}:{str(duration%60).zfill(2)}"
answer.append((step_text, step_status_text, step_duration_text))
return answer
def rq_is_still_in_enqueued(self) -> bool:
return rq_job_finalized(self.get_folder_path()) is False
@bp.route("/upload_scanner_result", methods=["POST"])
def upload_scanner_result_post():
audit_id = request.form.get("audit")
locale = request.form.get("locale")
profile = request.form.get("profile")
min_severity = request.form.get("min-severity")
def _flash_error_msg(msg: str, level: str = "warning"):
return _flash_msg_and_go_to_start(msg, level, audit_id) # note: audit_id is taken from outer scope
if len(audit_id) and audit_id not in list(_get_audits().values()):
return _flash_error_msg("This audit ID doesn't exist.")
if locale not in config.LOCALES:
return _flash_error_msg("This locale does not exist.")
if profile not in _list_profiles():
return _flash_error_msg("This profile does not exist.")
if min_severity not in SEVERITIES.keys():
return _flash_error_msg("This severity doesn't exist")
scan2report_args = request.form.get("scan2report_args")
scan2report_args += f' --min-severity {min_severity} --profile {profile}' # todo: make sure that I'm not overwriting explicitly set parameters
audit_info = list(filter(lambda x: x[0] == audit_id, get_audits()))
upload_to_pwndoc = len(audit_info) > 0
ps = ProcessingSettings(
audit_id=audit_id,
scan2report_args=scan2report_args,
locale=locale,
upload_to_pwndoc=upload_to_pwndoc,
)
ps.set_step_status(ProcessingSteps.UPLOAD_VULN_SCANS, ProcessingStatus.RUNNING)
if not upload_to_pwndoc:
ps.set_step_status(ProcessingSteps.PWNDOC_FINDINGS_UPLOAD, ProcessingStatus.SKIPPED)
error_msg, saved_filenames = save_multiple_uploaded_files(output_folder=ps.get_folder_path(), reserved_filenames=set(INFO_FILE), allowed_extensions=config.ALLOWED_SCANNER_EXTENSIONS)
ps.stored_files = ps.stored_files.union(saved_filenames)
ps.save()
if len(error_msg) > 0:
ps.set_step_status(ProcessingSteps.UPLOAD_VULN_SCANS, ProcessingStatus.FAILED)
ps.cleanup_uploaded_files()
return _flash_msg_and_go_to_start(error_msg, "error", audit_id)
ps.set_step_status(ProcessingSteps.UPLOAD_VULN_SCANS, ProcessingStatus.DONE)
if config.RQ_DISABLED:
full_processing_native(ps.get_folder_name())
else:
try:
REDIS_QUEUE.enqueue(
full_processing_background_wrapper, args=(ps.get_folder_name(),),
job_timeout='15m', ttl='1d', result_ttl=24*60*60, job_id=ps.get_folder_name()
)
except redis.exceptions.ConnectionError as e:
FlashLog.log_and_flash_with_level(f"Failed to connect to Redis - is the container running? ({e})", "error")
return _go_to_meta(ps.get_folder_name())
def _upload_missing_templates(ps: ProcessingSettings):
PwndocTemplateManager.update_template_db_from_pwndoc() # Update DB so that we can do automatic template merging.
for single_missing_string in tqdm(ps.missing_templates):
filename, severity, title = single_missing_string.strip().split("\t")
filename = secure_filename(filename)
fid = extract_fid_from_filename(filename)
path = os.path.join(TemplateScan2Report.native_folder_name_for_locale(ps.locale, original=True), filename)
with open(path, encoding="utf8") as f:
s2r_template_dict = json.load(f)
og_template: TemplateScan2Report = TemplateScan2Report.parse_dict_custom(fid=fid, locale="og", template_dict=s2r_template_dict, original=True)
if og_template is None:
logger.info(f"Empty og template for fid {fid}?")
continue
og_template_tns = Scan2ReportConvert.convert_single_finding_to_tns_pwndoc(og_template, "og")
# todo: fix this - I shouldn't need to hack it here
og_template_tns.set_is_template(True)
if og_template_tns.hosts == "":
og_template_tns.hosts = []
PwndocTemplateManager.add_single_locale_template(og_template_tns)
TemplateScan2Report.delete_template_files(["og"])
PwndocTemplateManager.update_template_db_from_pwndoc() # Update DB to have a current state.
def process_scanner_result_using_scan2report(folder_name: str) -> bool:
ps: Optional[ProcessingSettings] = ProcessingSettings.load_from_folder_name(folder_name)
if ps is None:
logger.warning(f"Scan folder doesn't exist: {folder_name}")
return False
ps.set_step_status(ProcessingSteps.PREPARATIONS_FOR_SCAN2REPORT, ProcessingStatus.RUNNING)
TemplateScan2Report.delete_template_files([ps.locale, "og"])
download_templates_from_pwndoc_to_scan2report(
"scan2report_native" if ps.audit_id == OutputFormat.DOCX.value else "scan2report_with_pwndoc_formatting",
locale=ps.locale
)
files = [os.path.join(ps.get_folder_path(), f) for f in ps.stored_files]
args = shlex.split(ps.scan2report_args)
if "--output" in args:
ps.user_msgs.append(('error', "Don't specify --output as a parameter, use selector of audit ID instead."))
ps.save()
return False
output_path = ps.get_result_path()
combined_args = ['--output', output_path]
if not ("-l" in args or "--lang" in args):
combined_args += ['--lang', ps.locale]
combined_args += args + files
logger.debug(f"Running scan2report with: {combined_args}")
# todo: the http connection might time out - do as async?
count_of_known_templates_before = len(TemplateScan2Report.list_files_in_locale_folder(ps.locale, original=False))
ps.set_step_status(ProcessingSteps.PREPARATIONS_FOR_SCAN2REPORT, ProcessingStatus.DONE)
ps.set_step_status(ProcessingSteps.SCAN2REPORT_FIRST_PASS, ProcessingStatus.RUNNING)
scan2report.main(combined_args) # First run to generates OG templates
# Metadata from the first load - extract the missing templates.
meta_data = ps.load_meta_file()
ps.missing_templates = meta_data["missing"] # _upload_missing_templates
ps.output_unspecified["missing_n_templates"] = len(ps.missing_templates)
ps.save()
ps.set_step_status(ProcessingSteps.SCAN2REPORT_FIRST_PASS, ProcessingStatus.DONE)
ps.set_step_status(ProcessingSteps.UPLOAD_OF_NEW_TEMPLATES, ProcessingStatus.RUNNING)
_upload_missing_templates(ps)
download_templates_from_pwndoc_to_scan2report(
output_format="scan2report_native" if ps.audit_id == OutputFormat.DOCX.value else "scan2report_with_pwndoc_formatting",
locale='og',
force_locale_folder=ps.locale,
prefix_title_with_locale=True
)
ps.set_step_status(ProcessingSteps.UPLOAD_OF_NEW_TEMPLATES, ProcessingStatus.DONE)
count_of_known_templates_after = len(TemplateScan2Report.list_files_in_locale_folder(ps.locale, original=False))
if count_of_known_templates_before != count_of_known_templates_after:
ps.set_step_status(ProcessingSteps.SCAN2REPORT_SECOND_PASS, ProcessingStatus.RUNNING)
scan2report.main(combined_args) # Second run to regenerate the findings if during the first run we found OG templates
ps.set_step_status(ProcessingSteps.SCAN2REPORT_SECOND_PASS, ProcessingStatus.DONE)
else:
ps.set_step_status(ProcessingSteps.SCAN2REPORT_SECOND_PASS, ProcessingStatus.SKIPPED)
ps.cleanup_uploaded_files()
return True
def get_changed_after_combine_with_existing_findings(audit_id: str, possibly_new_findings: List[dict]) -> List[dict]:
existing_raw_findings = get_findings_from_audit(audit_id)
fid_mapping = {}
for single_raw_finding in existing_raw_findings:
fid = get_fid_from_raw_finding(single_raw_finding)
if fid in fid_mapping:
FlashLog.warning(f"Multiple existing findings with {fid} in single audit. Overwriting.")
fid_mapping[fid] = single_raw_finding
single_raw_finding["_changed"] = False
for single_raw_finding in possibly_new_findings:
fid = get_fid_from_raw_finding(single_raw_finding)
# Interesting tests for the HTML parsing
# if fid not in ["burp_5243008", "HTTP_generic_XSS"]:
# continue
for attr_name in HTML_ATTRIBUTES:
single_raw_finding[attr_name] = MdAndHTMLConvertor.html_to_pwndoc_html(single_raw_finding.get(attr_name, ""))
if fid not in fid_mapping:
fid_mapping[fid] = single_raw_finding
single_raw_finding["_changed"] = True
continue
original_scope = fid_mapping[fid].get("scope", "").split(" ")
new_scope = single_raw_finding.get("scope", "").split(" ")
merged_scope = original_scope + new_scope
merged_scope_deduplicated = list(dict.fromkeys(merged_scope))
fid_mapping[fid]["scope"] = " ".join(merged_scope_deduplicated)
if len(merged_scope_deduplicated) != len(new_scope):
fid_mapping[fid]["poc"] = fid_mapping[fid].get("poc", "") + "
=====
" + single_raw_finding.get("poc", "")
single_raw_finding["_changed"] = True
new_or_changed = list(filter(lambda x: x.get("_changed", True), fid_mapping.values()))
logger.info(f"Import contained {len(list(fid_mapping.values()))} unique FIDs which resulted in upsert of {len(new_or_changed)} findings.")
return new_or_changed
def _extend_audit_services(audit_id: str, new_services: dict) -> helpers.table_of_services.Scope:
answer = helpers.table_of_services.get_tns_table_of_services(audit_id)
persistent_scopes_file_path = persistent_audit_scopes_file_path(audit_id)
local_services_copy = json_safe_load(persistent_scopes_file_path)
if local_services_copy is not None:
for host in local_services_copy.get('scope', {}).get('hosts', []):
for service in host.get('services', []):
answer.add_entry(host['ip'], service['port'], service['protocol'], service['name'])
for host_ip, port_list in new_services.items():
for port_number, l3_protocol_list in port_list.items():
for l3_protocol, app_protocol in l3_protocol_list.items():
answer.add_entry(host_ip, port_number, l3_protocol, app_protocol)
return answer
@bp.route("/force_upload_services/", methods=["GET"])
def force_upload_services(audit_id: str):
services_as_pwndoc_scope = _extend_audit_services(audit_id, {})
helpers.table_of_services.set_tns_table_of_services(audit_id, services_as_pwndoc_scope)
return f"OK; {len(services_as_pwndoc_scope.hosts)} hosts are in the table of services"
def upload_scan_findings_to_pwndoc(folder_name: str) -> bool:
ps: Optional[ProcessingSettings] = ProcessingSettings.load_from_folder_name(folder_name)
if ps is None:
logger.warning(f"Scan folder doesn't exist: {folder_name}")
return False
ps.set_step_status(ProcessingSteps.PWNDOC_FINDINGS_UPLOAD, ProcessingStatus.RUNNING)
path = ps.get_result_path()
with open(path, encoding="utf8") as f:
file_data = json.load(f)
findings: List[TemplateScan2Report] = []
locale = ps.locale
for single_finding in file_data:
finding_parsed = TemplateScan2Report.parse_dict_custom(single_finding["fid"], locale, single_finding)
if finding_parsed is None:
logger.warning(f"Empty finding in audit report?")
else:
findings.append(finding_parsed)
login()
new_raw_findings = []
for single_finding in findings:
single_finding_pwndoc = Scan2ReportConvert.convert_single_finding_to_tns_pwndoc(single_finding, locale)
single_finding_pwndoc.set_custom_field(
"original_description_from_scan",
MdAndHTMLConvertor.html_to_pwndoc_html(template_pwndoc.to_html(single_finding.description))
)
single_finding_pwndoc_dict = single_finding_pwndoc.to_pwndoc_dict()
new_raw_findings.append(single_finding_pwndoc_dict)
changed_raw_findings = get_changed_after_combine_with_existing_findings(ps.audit_id, new_raw_findings)
for single_raw_finding in tqdm(changed_raw_findings, desc="Upserting findings"):
upsert_raw_finding(ps.audit_id, single_raw_finding)
meta_data = ps.load_meta_file()
services_as_pwndoc_scope = _extend_audit_services(ps.audit_id, meta_data.get("services", {}))
helpers.table_of_services.set_tns_table_of_services(ps.audit_id, services_as_pwndoc_scope)
ps.set_step_status(ProcessingSteps.PWNDOC_FINDINGS_UPLOAD, ProcessingStatus.DONE)
ps.user_msgs.append(('info', f'OK, added or updated {len(changed_raw_findings)} findings - you can now find them in PwnDoc.'))
ps.save()
return True
@bp.route("/meta_output/", methods=["GET"])
def show_meta_output(folder_name: str):
ps: Optional[ProcessingSettings] = ProcessingSettings.load_from_folder_name(folder_name)
if ps is None:
return "Folder doesn't exist", 400
missing_templates_splitted = [x.split("\t") for x in ps.missing_templates]
for level, msg in ps.user_msgs:
FlashLog.flash_loguru_msg(msg, loguru_level=level)
return render_template("findings_meta_output.html",
ps=ps,
missing_templates=missing_templates_splitted,
steps=ps.get_statuses_for_display(),
# import_id=folder_name,
)
@bp.route("/file_output/", methods=["GET"])
def show_file_output(folder_name: str):
ps: Optional[ProcessingSettings] = ProcessingSettings.load_from_folder_name(folder_name)
if ps is None:
return "Folder doesn't exist", 400
if ps.get_result_path().endswith(".docx"): # todo: don't match on extension
return send_file(ps.get_result_path(), as_attachment=True)
with open(ps.get_result_path(), "r", encoding="utf8") as f:
text = f.read()
text = html.escape(text, quote=False)
text = MdAndHTMLConvertor.reverse_custom_escaping_of_html(text)
filename = os.path.basename(ps.get_result_path())
return send_file(io.BytesIO(text.encode("utf8")), as_attachment=True, download_name=filename)
def full_processing_background_wrapper(folder_name: str):
from app import create_app
with create_app().app_context():
full_processing_native(folder_name)
def full_processing_native(folder_name: str):
if not process_scanner_result_using_scan2report(folder_name):
return
if ProcessingSettings.load_from_folder_name(folder_name).upload_to_pwndoc:
if not upload_scan_findings_to_pwndoc(folder_name):
return
ps = ProcessingSettings.load_from_folder_name(folder_name)
ps.finished = True
for step_name, step_value in ps.steps_progress.items():
if step_value[0] == ProcessingStatus.WAITING.name:
ps.set_step_status(ProcessingSteps["step_name"], ProcessingStatus.SKIPPED)
ps.save()
pwndocimportautomator-release-2022-09-22/api_pwndoc_audit.py 0000664 0000000 0000000 00000006111 14313124554 0024073 0 ustar 00root root 0000000 0000000 from pathlib import Path
import os
from flask import redirect, url_for, Blueprint, send_file, render_template
from werkzeug.utils import secure_filename
import redis # dependency via rq
import uuid
import config
from helpers.rq_helper import REDIS_QUEUE, rq_job_failed, rq_job_finalized
from helpers.custom_logging import FlashLog
from helpers.file_root import relative_path
from pwndoc_api import download_report, get_docx_filepath
bp = Blueprint('api_pwndoc_audit', __name__, template_folder='templates')
@bp.route("/report_download/", methods=["GET"])
def report_download(audit_id: str):
docx_filename = f"report_{audit_id}_{uuid.uuid4()}.docx"
if config.RQ_DISABLED:
download_report(audit_id, docx_filename)
return redirect(url_for("api_pwndoc_audit.report_progress", docx_filename=docx_filename))
try:
enqueue_result = REDIS_QUEUE.enqueue(
download_report, args=(audit_id, docx_filename,),
job_timeout='15m', ttl='1d', result_ttl=24*60*60, job_id=docx_filename
)
except redis.exceptions.ConnectionError as e:
FlashLog.log_and_flash_with_level(f"Failed to connect to Redis - is the container running? ({e})", "error")
return redirect(url_for("api_pwndoc_audit.report_progress", docx_filename=docx_filename))
@bp.route("/report_progress/", methods=["GET"])
def report_progress(docx_filename: str):
docx_filename = secure_filename(docx_filename)
result_filepath = get_docx_filepath(docx_filename)
def _success_response():
return render_template('basic_msg.html', contentHTML=f'Report file is ready. Download docx report')
does_file_exist = Path(result_filepath).is_file()
if config.RQ_DISABLED:
if does_file_exist:
return _success_response()
FlashLog.error("The download of report failed.")
return render_template("basic_msg.html", contentHTML="")
if rq_job_finalized(docx_filename) is False:
return render_template('basic_msg.html', contentHTML="""
This pwndoc docx result doesn't not exist yet. This page will autorefresh until it does.
""")
if rq_job_failed(docx_filename):
FlashLog.error("The download of report in background failed.")
return render_template("basic_msg.html", contentHTML="")
if not does_file_exist:
FlashLog.flash_loguru_msg("The download of report in background is no longer running, but no result file was produced. This likely indicates error.", "danger")
return render_template('basic_msg.html', contentHTML="")
return _success_response()
@bp.route("/report_file/", methods=["GET"])
def report_file(docx_filename: str):
result_filepath = get_docx_filepath(secure_filename(docx_filename))
# todo: maybe somehow delete the file after download?
return send_file(result_filepath, download_name=os.path.basename(result_filepath), as_attachment=True)
pwndocimportautomator-release-2022-09-22/api_root.py 0000664 0000000 0000000 00000003110 14313124554 0022372 0 ustar 00root root 0000000 0000000 from flask import render_template, Response, request, Blueprint
from helpers.disk_file_versioning import DiskFileVersioning
from template_pwndoc import TNSTemplatePwndoc
from template_manager import PwndocTemplateManager
from helpers.template_grouping import TemplateGrouping
from typing import List
bp = Blueprint('api_root', __name__, template_folder='templates')
@bp.route("/")
def homepage():
return render_template('base.html')
@bp.route("/json_editor", methods=["GET"])
def json_editor():
PwndocTemplateManager.update_template_db_from_pwndoc()
files_selector = [(x, x) for x in DiskFileVersioning().list_available_files()]
templates: List[TNSTemplatePwndoc] = PwndocTemplateManager.get_all_templates()
templates.sort(key=lambda x: x.fid)
return render_template("json_editor.html", files=files_selector, templates=templates)
@bp.route("/versioned_file/", methods=["GET"])
def get_versioned_file(filename: str):
file_content = DiskFileVersioning().get_file_from_disk(filename)
if file_content is None:
return "Filename doesn't match existing mapping", 404
return Response(file_content, mimetype='application/json') # todo: this might not have to be JSON
@bp.route("/versioned_file/", methods=["POST"])
def upload_versioned_file(filename: str):
data = request.data
file_content = DiskFileVersioning().save_file_to_disk(filename, data.decode())
if file_content is None:
return "Filename doesn't match existing mapping", 404
if filename == 'groups':
TemplateGrouping.add_new_gids()
return "OK"
pwndocimportautomator-release-2022-09-22/api_templates.py 0000664 0000000 0000000 00000013167 14313124554 0023422 0 ustar 00root root 0000000 0000000 import json
import os
from pathlib import Path
import time
from collections import defaultdict
from typing import Dict, List, Optional
from flask import request, redirect, url_for, render_template, flash, Blueprint, abort, send_file
from tqdm import tqdm
from loguru import logger
import config
from template_manager import PwndocTemplateManager
from template_scan2report import TemplateScan2Report
from helpers.flask_multifile_upload import save_multiple_uploaded_files
from helpers.file_utils import json_dump, relative_path, extract_fid_from_filename, zip_folder_recursively, generate_random_folder_path
from template_converters import Scan2ReportConvert, PwndocConverter
from template_pwndoc import TNSTemplatePwndoc
from helpers.custom_logging import FlashLog
bp = Blueprint('api_templates', __name__, template_folder='templates')
@bp.route("/download_from_pwndoc")
@bp.route("/download_from_pwndoc/")
@bp.route("/download_from_pwndoc//")
def download_templates_from_pwndoc_to_scan2report(
output_format: str = "scan2report_native",
locale: str = config.LOCALES[0],
force_locale_folder: Optional[str] = None,
prefix_title_with_locale: bool = False
):
OUTPUT_FORMAT_MAPPING = {"scan2report_native": True, "scan2report_with_pwndoc_formatting": False}
PwndocTemplateManager.update_template_db_from_pwndoc()
TemplateScan2Report.recreate_template_folders_if_necessary()
if force_locale_folder is None:
force_locale_folder = locale
if output_format not in OUTPUT_FORMAT_MAPPING.keys():
abort(400, description="The selected output format is not supported.")
return # this is return is unreachable, but included to silence PyCharm warning
s2r_dicts: List[dict] = PwndocConverter.convert_to_list_of_scan2report_combined_dict(
output_without_html_tags=OUTPUT_FORMAT_MAPPING[output_format],
locale=locale
)
output_folder = TemplateScan2Report.native_folder_name_for_locale(force_locale_folder)
for single_template_dict in s2r_dicts:
output_file_path = os.path.join(output_folder, f"{single_template_dict.get('fid')}.json")
if Path(output_file_path).is_file():
logger.warning(f"Template for {single_template_dict.get('fid')} with locale {force_locale_folder} already exists on disk. Skipping to avoid overwriting it.")
continue
if prefix_title_with_locale:
single_template_dict['name'] = f"[{locale.upper()}] {single_template_dict.get('name', '')}"
json_dump(single_template_dict, output_file_path)
return f"OK; {len(s2r_dicts)} templates downloaded; Converted to format {output_format} for locale {locale}"
@bp.route("/import_scan2report_to_pwndoc", methods=["GET", "POST"])
def import_scan2report_templates_to_pwndoc():
if request.method == "GET":
return render_template("import_templates.html", locales=config.LOCALES)
locale_to_import = request.form.get("template_locale")
if locale_to_import not in config.LOCALES:
FlashLog.warning("This locale doesn't exist.")
return redirect(url_for('api_templates.import_scan2report_templates_to_pwndoc'))
output_folder = generate_random_folder_path(f"template_import_{locale_to_import}")
error_msg, saved_filenames = save_multiple_uploaded_files(output_folder, reserved_filenames=set(), allowed_extensions={"json"})
if len(error_msg) > 0:
FlashLog.error(error_msg)
return redirect(url_for('api_templates.import_scan2report_templates_to_pwndoc'))
PwndocTemplateManager.update_template_db_from_pwndoc() # necessary for mapping new uploads to existing templates
bundled_templates: Dict[str, Dict[str, TNSTemplatePwndoc]] = defaultdict(dict)
for single_filename in tqdm(saved_filenames):
fid = extract_fid_from_filename(single_filename)
with open(os.path.join(output_folder, single_filename), encoding="utf8") as f:
template_json = json.load(f)
lang_and_templates: Dict[str, TemplateScan2Report] = TemplateScan2Report.parse_dict_for_locale_and_og(fid, locale_to_import, template_json)
# I don't need to do DB refresh (for pwndoc ID) after every addition because I'm processing only single locale (or locale and og at the same time).
for locale, single_template in lang_and_templates.items():
single_tns_template: TNSTemplatePwndoc = Scan2ReportConvert.convert_single_finding_to_tns_pwndoc(single_template, locale)
assert fid == single_tns_template.fid
assert locale == single_tns_template.locale
bundled_templates[fid][locale] = single_tns_template
del single_filename, fid, single_tns_template, locale, template_json, lang_and_templates, f, output_folder
for fid in tqdm(bundled_templates):
dict_of_locales: Dict[str, TNSTemplatePwndoc] = bundled_templates[fid]
PwndocTemplateManager.add_locales_for_fid(list(dict_of_locales.values()))
PwndocTemplateManager.update_template_db_from_pwndoc() # todo: this is workaround for not using PwndocTemplateManager.add_single_locale_template
FlashLog.info(f"OK; {len(bundled_templates)} templates uploaded")
return redirect(url_for('api_templates.import_scan2report_templates_to_pwndoc'))
@bp.route("/export_as_scan2report_native", methods=["GET"])
def export_as_scan2report_native():
for locale in config.LOCALES:
download_templates_from_pwndoc_to_scan2report("scan2report_native", locale)
zip_path = zip_folder_recursively(config.TEMPLATE_FOLDER_SCAN2REPORT)
return send_file(
zip_path,
download_name=f"scan2report_templates_{int(time.time())}.zip",
as_attachment=True
)
pwndocimportautomator-release-2022-09-22/app.py 0000664 0000000 0000000 00000005671 14313124554 0021354 0 ustar 00root root 0000000 0000000 from flask import flash
from loguru import logger
import helpers.custom_logging
from flask import Flask, request, render_template, Response
from flask_migrate import Migrate
import config
from helpers.file_utils import *
def create_app():
import pwndoc_db_init
from helpers.db_models import db
from helpers.template_grouping import TemplateGrouping
from api_process_findings import bp as process_findings_bp
from api_templates import bp as templates_bp
from api_debug import bp as debug_bp
from api_root import bp as root_bp
from api_pwndoc_audit import bp as pwndoc_bp
import rq_dashboard
logger.add("user_data/importer-logs/main.log", rotation="1 week", retention="60 days", compression="gz", encoding="utf8")
pwndoc_db_init.InitialData()
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///user_data/importer-db/main.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
migrate = Migrate()
# https://github.com/miguelgrinberg/Flask-Migrate/issues/61#issuecomment-208131722
with app.app_context():
if db.engine.url.drivername == 'sqlite':
migrate.init_app(app, db, render_as_batch=True, compare_type=True)
else:
migrate.init_app(app, db)
with app.app_context():
db.create_all()
app.config['UPLOAD_FOLDER'] = config.FLASK_UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000
app.config["JSONIFY_PRETTYPRINT_REGULAR"] = True
app.config["SECRET_KEY"] = config.SECRET_KEY
Path(app.config['UPLOAD_FOLDER']).mkdir(parents=True, exist_ok=True)
app.register_blueprint(process_findings_bp, url_prefix='/findings')
app.register_blueprint(templates_bp, url_prefix='/templates')
app.register_blueprint(pwndoc_bp, url_prefix='/pwndoc_audit')
app.register_blueprint(root_bp, url_prefix='/')
if not config.RQ_DISABLED:
app.config.from_object(rq_dashboard.default_settings)
app.config["RQ_DASHBOARD_REDIS_URL"] = f'redis://{config.REDIS_HOST}:{config.REDIS_PORT}'
app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")
else:
@app.route("/rq")
@app.route("/rq/")
def rq_disabled():
flash("Redis scheduling is disabled.", "info")
return render_template('base.html')
# Todo: possibly disable this or require additional password
app.register_blueprint(debug_bp, url_prefix='/debug')
TemplateGrouping.add_new_gids()
@app.errorhandler(500)
def internal_error(error):
flash(f"Internal server error (more information is in server logs)", "danger")
return render_template('base.html'), 500
@app.errorhandler(404)
def internal_error(error):
flash(f"The requested URL doesn't exist.", "danger")
return render_template('base.html'), 404
return app
if __name__ == "__main__":
app = create_app()
app.run() # todo: maybe change?
pwndocimportautomator-release-2022-09-22/backup.sh 0000775 0000000 0000000 00000000617 14313124554 0022021 0 ustar 00root root 0000000 0000000 #!/bin/bash
set -e
parent_path=$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )
echo $parent_path
cd $parent_path
folderName=backups/$(date +"%Y-%m-%dT%H-%M-%S")
echo $folderName
mkdir $folderName
docker exec pwndoc-mongo sh -c 'exec mongodump --archive' > "$folderName/mongo-db.backup"
zip -r $folderName/user_data.zip user_data/
zip -r $folderName/scan2report_plugins.zip scan2report/plugins/
pwndocimportautomator-release-2022-09-22/backups/ 0000775 0000000 0000000 00000000000 14313124554 0021641 5 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/backups/.gitkeep 0000664 0000000 0000000 00000000000 14313124554 0023260 0 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/config.py 0000664 0000000 0000000 00000002322 14313124554 0022027 0 ustar 00root root 0000000 0000000 from dotenv import load_dotenv
import os
from loguru import logger
from helpers.file_root import relative_path
TEMPLATE_FOLDER_SCAN2REPORT = "scan2report/plugins/"
LOCALES = ["cs", "en", "og"]
assert LOCALES[-1] == "og", "inner logic of scan2report parsing depends on 'og' being the last locale"
load_dotenv()
SECRET_KEY = os.environ.get('FLASK_SECRET_KEY')
ALLOWED_SCANNER_EXTENSIONS = {'dat', 'xml', 'nessus', 'json'}
FLASK_UPLOAD_FOLDER = relative_path('debug_tmp')
PWNDOC_URL = os.getenv('PWNDOC_URL').rstrip("/")
PWNDOC_USERNAME = os.getenv('PWNDOC_USERNAME')
PWNDOC_PASSWORD = os.getenv('PWNDOC_PASSWORD')
PWNDOC_DISABLE_HTTPS_VERIFICATION = os.getenv("PWNDOC_DISABLE_HTTPS_VERIFICATION", False) == "True"
if PWNDOC_DISABLE_HTTPS_VERIFICATION:
logger.warning(f"Security warning: HTTPS certificate validation for requests is DISABLED.")
LOGGER_INTERCEPT_STD_LOGGING = os.getenv('LOGGER_INTERCEPT_STD_LOGGING', False) == "True"
TNS_DESCRIPTION_PROOF_DELIMITER = "========================================"
IN_DOCKER = os.getenv("IN_DOCKER", False) == "True"
RQ_DISABLED = os.getenv("RQ_DISABLED", False) == "True"
REDIS_HOST = 'importer-redis' if IN_DOCKER else 'localhost'
REDIS_PORT = 6379 if IN_DOCKER else 5002
pwndocimportautomator-release-2022-09-22/debug_tmp/ 0000775 0000000 0000000 00000000000 14313124554 0022157 5 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/debug_tmp/.gitkeep 0000664 0000000 0000000 00000000000 14313124554 0023576 0 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/docker-compose.yml 0000664 0000000 0000000 00000007064 14313124554 0023655 0 ustar 00root root 0000000 0000000 version: '3'
# Note: Some user content is in the user_data folder and some is in the Docker volumes.
services:
import-automator:
container_name: pwndoc-import-automator
build: .
restart: unless-stopped
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- 5001:5000
environment:
- IN_DOCKER=True
env_file:
- docker.env
volumes:
- ./user_data/pwndoc-init:/app/user_data/pwndoc-init
- ./user_data/importer-logs:/app/user_data/importer-logs
- ./user_data/importer-db:/app/user_data/importer-db
- ./user_data/pwndoc-logs:/app/user_data/pwndoc-logs:ro # Flask web interface allows download of Pwndoc Backend log
- ./user_data/worker-logs:/app/user_data/worker-logs:ro
- ./scan2report:/app/scan2report # preserves modified docx templates and plugin configurations; scan2report templates are preserved in pwndoc
- ./debug_tmp:/app/debug_tmp
depends_on:
- pwndoc-backend
- importer-redis
links:
- pwndoc-backend
- importer-redis
networks:
- backend
import-automator-worker:
container_name: pwndoc-import-automator-worker
build: .
restart: unless-stopped
environment:
- IN_DOCKER=True
env_file:
- docker.env
volumes:
- ./user_data/pwndoc-init:/app/user_data/pwndoc-init
- ./user_data/worker-logs:/app/user_data/importer-logs
- ./user_data/importer-db:/app/user_data/importer-db
- ./scan2report:/app/scan2report # preserves modified docx templates and plugin configurations; scan2report templates are preserved in pwndoc
- ./debug_tmp:/app/debug_tmp
depends_on:
- pwndoc-backend
- importer-redis
links:
- pwndoc-backend
- importer-redis
networks:
- backend
entrypoint: ["/bin/sh", "-c", "/usr/local/bin/rq worker --path /app --url redis://importer-redis 2>&1 | tee -a /app/user_data/importer-logs/worker-full.log"]
pwndoc-mongo:
image: mongo:4.2.15
container_name: pwndoc-mongo # note: This name is the reverse of original name (pwndoc-mongo vs mongo-pwndoc)
volumes:
- mongo-data:/data/db # on Windows you can't use mapping to folder - there Mongo needs a proper volume
restart: unless-stopped
# ports:
# - 127.0.0.1:27017:27017
environment:
- MONGO_DB:pwndoc
networks:
- backend
pwndoc-backend:
build: ./pwndoc/backend
image: yeln4ts/pwndoc:backend
container_name: pwndoc-backend
volumes:
- ./user_data/pwndoc-docx-templates:/app/report-templates
- ./user_data/pwndoc-config:/app/src/config
- ./user_data/pwndoc-logs:/app/custom_logs
depends_on:
- pwndoc-mongo
restart: unless-stopped
# ports:
# - 4242:4242
links:
- pwndoc-mongo
networks:
- backend
# entrypoint: ["npm", "start"] # this is default, but I want a copy of the logs to a file
entrypoint: ["/bin/sh", "-c", "npm start 2>&1| tee -a /app/custom_logs/pwndoc.log"]
pwndoc-frontend:
build: ./pwndoc/frontend
image: yeln4ts/pwndoc:frontend
container_name: pwndoc-frontend
restart: unless-stopped
ports:
- 8443:8443
depends_on:
- pwndoc-backend
networks:
- backend
importer-redis:
container_name: importer-redis
image: redis
restart: unless-stopped
ports: # todo: don't have this enabled in production, only in dev
- 5002:6379
networks:
- backend
volumes:
mongo-data:
# networks:
# default:
# external:
# name: dockerInternal
networks:
backend:
driver: bridge
pwndocimportautomator-release-2022-09-22/helpers/ 0000775 0000000 0000000 00000000000 14313124554 0021653 5 ustar 00root root 0000000 0000000 pwndocimportautomator-release-2022-09-22/helpers/custom_logging.py 0000664 0000000 0000000 00000003576 14313124554 0025260 0 ustar 00root root 0000000 0000000 import functools
import logging
from loguru import logger
from config import LOGGER_INTERCEPT_STD_LOGGING
from flask import flash, has_request_context
# todo: the following handler does NOT catch print statements (currently used inside scan2report)
# https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
class InterceptHandler(logging.Handler):
def emit(self, record):
# Get corresponding Loguru level if it exists
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
class FlashLog(object):
@classmethod
def info(cls, msg: str):
cls.log_and_flash_with_level(msg, "info")
@classmethod
def warning(cls, msg: str):
cls.log_and_flash_with_level(msg, "warning")
@classmethod
def error(cls, msg: str):
cls.log_and_flash_with_level(msg, "error")
@staticmethod
def flash_loguru_msg(msg: str, loguru_level: str = "info"):
loguru_to_flask = {'info': 'info', 'warning': 'warning', 'error': 'danger'}
flask_flash_level = loguru_to_flask.get(loguru_level.lower(), "warning")
if has_request_context():
flash(msg, flask_flash_level)
@classmethod
def log_and_flash_with_level(cls, msg: str, level: str = "info", depth: int = 2):
"""Levels: info, warning, error"""
logger.opt(depth=depth).log(level.upper(), msg)
cls.flash_loguru_msg(msg, level)
if LOGGER_INTERCEPT_STD_LOGGING:
logging.basicConfig(handlers=[InterceptHandler()], level=0)
pwndocimportautomator-release-2022-09-22/helpers/db_models.py 0000664 0000000 0000000 00000012263 14313124554 0024161 0 ustar 00root root 0000000 0000000 from typing import Optional, List, Tuple
import sqlalchemy
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import or_
from helpers.time_helper import current_timestamp
from template_pwndoc import TNSTemplatePwndoc
db = SQLAlchemy()
class DbPwndocMapping(db.Model):
# todo: should this even be in DB? It might be better to just store it in memory and refresh it automatically to avoid synchronization issues
__tablename__ = 'pwndoc_mapping'
id = db.Column(db.Integer, primary_key=True)
fid = db.Column(db.String(80), unique=True, nullable=False, index=True)
pwndoc_id = db.Column(db.String(80), unique=True, nullable=False, index=True)
@classmethod
def get_pwndoc_id_from_fid(cls, fid: str) -> Optional[str]:
# if DbTemplate.query.filter_by(fid=fid).filter_by(deleted=False).first() is None: # todo: do this properly
# return None
res = cls.query.filter_by(fid=fid).first()
return res.pwndoc_id if res else None
@classmethod
def get_fid_from_pwndoc_id(cls, pwndoc_id: str) -> Optional[str]:
res = cls.query.filter_by(pwndoc_id=pwndoc_id).first()
return res.fid if res else None
@classmethod
def save_fid_and_pwndoc_id(cls, fid: str, pwndoc_id: str):
current_pwndoc_id: Optional[str] = cls.get_pwndoc_id_from_fid(fid)
if current_pwndoc_id and current_pwndoc_id == pwndoc_id:
return
if current_pwndoc_id:
qry = sqlalchemy.delete(cls).where(or_(cls.fid == fid, cls.pwndoc_id == pwndoc_id))
db.session.execute(qry)
new_mapping = DbPwndocMapping(fid=fid, pwndoc_id=pwndoc_id)
db.session.add(new_mapping)
db.session.commit()
class DbTemplate(db.Model):
__tablename__ = 'templates'
id = db.Column(db.Integer, primary_key=True)
fid = db.Column(db.String(80), unique=False, nullable=False, index=True)
locale = db.Column(db.String(10), unique=False, nullable=False, default="og")
content = db.Column(db.Text, nullable=False) # This is effectively read-only
timestamp = db.Column(db.Integer, default=current_timestamp)
deleted = db.Column(db.Boolean, default=False)
@classmethod
def get_template(cls, fid: str, locale: str, revisions_back: int = 0, include_deleted: bool = False) -> Optional['DbTemplate']:
qry = cls.query.\
filter_by(fid=fid).\
filter_by(locale=locale).\
filter_by(deleted=include_deleted).\
order_by(cls.id.desc())
res = qry.limit(revisions_back+1).all()
return res[revisions_back] if revisions_back < len(res) else None
@classmethod
def add_template(cls, fid: str, locale: str, template: 'TNSTemplatePwndoc') -> Tuple['DbTemplate', bool]:
previous_template = cls.get_template(fid, locale)
new_template = DbTemplate(fid=fid, locale=locale)
new_template._encode_content(template)
if previous_template:
if new_template.content == previous_template.content:
return previous_template, False # not new
db.session.add(new_template)
db.session.commit()
return new_template, True # new
@classmethod
def delete_template(cls, fid: str, locale: Optional[str] = None):
qry = sqlalchemy.update(cls).where(cls.fid == fid)
if locale:
qry = qry.where(cls.locale == locale)
qry = qry.values(deleted=True)
db.session.execute(qry)
if len(cls.get_current_templates_for_fid(fid)) == 0:
mapping_res = DbPwndocMapping.query.filter_by(fid=fid).first()
if mapping_res:
db.session.delete(mapping_res)
db.session.commit()
def get_previous_revision(self) -> Optional['DbTemplate']:
return self.get_template(self.fid, self.locale, revisions_back=1)
def decode_content(self) -> Optional['TNSTemplatePwndoc']:
return TNSTemplatePwndoc.from_json(self.content) # todo: handle gracefully if the content is invalid or missing?
def _encode_content(self, template: 'TNSTemplatePwndoc'):
# note: there is no commit here, this is only a helper function so that encoding and decoding are right next to each other
self.content = template.to_json() # todo: I am presuming the order of attributes is consistent
@classmethod
def _get_current_templates(cls, fid: Optional[str] = None) -> List['DbTemplate']:
qry_newest_template_ids = db.session.\
query(sqlalchemy.func.max(cls.id).label('id')).\
filter_by(deleted=False)
if fid:
qry_newest_template_ids = qry_newest_template_ids.filter_by(fid=fid)
qry_newest_template_ids = qry_newest_template_ids.\
group_by(cls.fid, cls.locale). \
subquery()
# Warning: never merge it to one query. In one query the order_by before group_by doesn't work as expected.
res = cls.query.filter(cls.id.in_(qry_newest_template_ids)).all()
return res
@classmethod
def get_all_current_templates(cls) -> List['DbTemplate']:
return cls._get_current_templates()
@classmethod
def get_current_templates_for_fid(cls, fid: str) -> List['DbTemplate']:
return cls._get_current_templates(fid)
pwndocimportautomator-release-2022-09-22/helpers/disk_file_versioning.py 0000664 0000000 0000000 00000002116 14313124554 0026421 0 ustar 00root root 0000000 0000000 from typing import Optional, List
class DiskFileVersioning:
def __init__(self):
self._mappings = {
'groups': 'scan2report/plugins/groups.json',
'profiles': 'scan2report/plugins/profiles.json',
'aliases': 'scan2report/plugins/aliases.json',
}
def _file_name_to_path(self, filename: str) -> Optional[str]:
return self._mappings.get(filename, None)
def list_available_files(self) -> List[str]:
return list(self._mappings.keys())
def get_file_from_disk(self, filename: str) -> Optional[str]:
filepath = self._file_name_to_path(filename)
if filepath is None:
return None
# todo: use versioning in DB
with open(filepath, encoding="utf8") as f:
return f.read()
def save_file_to_disk(self, filename: str, content: str):
filepath = self._file_name_to_path(filename)
if filepath is None:
return None
# todo: use versioning in DB
with open(filepath, "w", encoding="utf8") as f:
return f.write(content)
pwndocimportautomator-release-2022-09-22/helpers/file_root.py 0000664 0000000 0000000 00000000340 14313124554 0024204 0 ustar 00root root 0000000 0000000 import os
def relative_path(relative_filepath: str) -> str:
joined_path = os.path.join(os.path.dirname(__file__), "..", relative_filepath)
normalized_path = os.path.normpath(joined_path)
return normalized_path
pwndocimportautomator-release-2022-09-22/helpers/file_utils.py 0000664 0000000 0000000 00000007535 14313124554 0024376 0 ustar 00root root 0000000 0000000 import datetime
import glob
import io
import json
import os
import shutil
import uuid
import zipfile
from pathlib import Path
from typing import Any, Optional, Dict, List, Set
import config
from helpers.custom_logging import FlashLog
from config import ALLOWED_SCANNER_EXTENSIONS
from loguru import logger
from helpers.file_root import relative_path # don't remove this import - other parts of code depend on it
def json_dump(obj: Any, filepath: str) -> None:
with open(filepath, "w", encoding="utf8") as f:
json.dump(obj, f, indent=4, ensure_ascii=False, sort_keys=True)
# Taken directly from Flask documentation
def is_whitelisted_extension(filename: str, whitelisted_extension: Set[str] = ALLOWED_SCANNER_EXTENSIONS):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in whitelisted_extension
def json_safe_load(filepath: str) -> Optional[Dict[str, Any]]:
if not os.path.isfile(filepath):
# logger.debug(f"File '{filepath}' does not exist.")
return None
with open(filepath, encoding="utf8") as f:
try:
return json.load(f)
except json.decoder.JSONDecodeError as e:
FlashLog.warning(f"Parsing of JSON file '{filepath}' failed. Error: {e}")
return None
def extract_fid_from_filename(filename: str) -> str:
return os.path.splitext(filename)[0]
def list_fids_in_paths(wildcard_path: List[str]) -> List[str]:
fids = []
for single_path in wildcard_path:
full_paths_of_files = glob.glob(single_path)
filenames = [os.path.basename(x) for x in full_paths_of_files]
fids += [extract_fid_from_filename(x) for x in filenames]
return list(set(fids))
def zip_folder_non_recursive_to_virtual(folder_path: str) -> io.BytesIO:
return zip_multiple_folders_non_recursive_to_virtual([folder_path])
def zip_multiple_folders_non_recursive_to_virtual(folder_paths: List[str]) -> io.BytesIO:
# note: this is not recursive
# based on https://python.tutorialink.com/how-do-i-zip-an-entire-folder-with-subfolders-and-serve-it-through-flask-without-saving-anything-to-disk/
memory_file = io.BytesIO()
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
for folder_path in folder_paths:
for root, _, files in os.walk(folder_path):
folder_name = os.path.basename(os.path.dirname(root))
for file in files:
zipf.write(os.path.join(root, file), os.path.join(folder_name, file))
memory_file.seek(0)
return memory_file
def zip_folder_recursively(folder_path: str) -> str:
output_folder = generate_random_folder_path("zip")
output_filename = os.path.join(output_folder, "output") # extension is added inside shutil.make_archive
shutil.make_archive(output_filename, "zip", folder_path)
return output_filename+".zip"
def zip_folder_recursively_to_virtual(folder_path: str) -> io.BytesIO:
output_filename = zip_folder_recursively(folder_path)
with open(output_filename, "rb") as f:
return io.BytesIO(f.read())
def generate_random_folder_name(label: str = "") -> str:
iso8601_time: str = datetime.datetime.now().replace(microsecond=0).isoformat()
prefix = iso8601_time.replace(":", "-")
suffix = str(uuid.uuid4())[:8]
return f"{prefix}_{label}_{suffix}"
def generate_random_folder_path(label: str = "") -> str:
return folder_in_upload_path(generate_random_folder_name(label), exists_ok=False)
def folder_in_upload_path(folder_name: str = "", exists_ok: bool = True) -> str:
r_path = os.path.join(config.FLASK_UPLOAD_FOLDER, folder_name)
Path(r_path).mkdir(parents=True, exist_ok=exists_ok)
return r_path
def persistent_audit_scopes_file_path(audit_id: str) -> str:
folder_path = folder_in_upload_path(f"audit_{audit_id}")
file_path = os.path.join(folder_path, "services.json")
return file_path
pwndocimportautomator-release-2022-09-22/helpers/flask_multifile_upload.py 0000664 0000000 0000000 00000002375 14313124554 0026752 0 ustar 00root root 0000000 0000000 import pathlib
from werkzeug.utils import secure_filename
from flask import request
from helpers.file_utils import is_whitelisted_extension
from typing import Optional, Union, Set, Tuple
import os
def save_multiple_uploaded_files(output_folder: str, reserved_filenames: Set[str], allowed_extensions: Set[str]) -> Tuple[str, Set[str]]:
files = request.files.getlist("file[]")
output_filenames = set()
if len(files) == 0:
return "No files provided?", output_filenames
pathlib.Path(output_folder).mkdir(exist_ok=True, parents=True)
for file in files:
if not file or file.filename == '':
return "No filename provided?", output_filenames
filename = secure_filename(file.filename)
if not is_whitelisted_extension(filename, allowed_extensions):
return f"Filetype of file {filename} not allowed", output_filenames
if filename in reserved_filenames:
return f"Name {filename} is reserved.", output_filenames
if filename in output_filenames:
return f"Name {filename} is duplicate.", output_filenames
output_filenames.add(filename)
output_path = os.path.join(output_folder, filename)
file.save(output_path)
return "", output_filenames
pwndocimportautomator-release-2022-09-22/helpers/md_and_html_convertor.py 0000664 0000000 0000000 00000027342 14313124554 0026604 0 ustar 00root root 0000000 0000000 import functools
from typing import Callable
from bs4 import BeautifulSoup
from loguru import logger
import html
class MdAndHTMLConvertor:
_HTML_TO_RESERVED_CHARS = {
'<': '\ue020',
'>': '\ue021',
'&': '\ue022',
'\'': '\ue023',
'"': '\ue024',
}
_HTML_TAG_NORMALIZATION = {
"": "",
"": "",
"": "",
"": "",
}
_MD_TO_HTML_PAIR_TAGS = {
"**": ("", ""),
"__": ("", ""),
}
_HTML_TAGS_TO_STRIP_FOR_RAW_MD = {
's': '', # ~~strike-through~~
'u': '', # underlined
'code': '',
'p': '\n',
'ul': '\n',
'li': '\n-', # todo: this works for opening tag, not so much for the closing tag
}
# missing: div and it's style and classes
# missing: span and it's style and classes
# note: and (un)escaping is handled separately
@staticmethod
def _get_basic_html_tags():
simple_tags = ['b', 'i', 'br', 'p', 'ul', 'li', 'strong', 's', 'u', 'p', 'code', 'strong', 'em', 'div', 'span']
answer = []
for tag in simple_tags:
answer.append(f'<{tag}>')
answer.append(f'{tag}>')
return answer
# BEGIN: MD -> HTML
@staticmethod
def __is_md_separator(char: str) -> bool:
# warning: This is not up to spec, but slightly more involved than scan2report version.
# correct spec would be: https://spec.commonmark.org/0.30/#emphasis-and-strong-emphasis
return char.isspace() or char.isnumeric() or char in '''!"#$%&'()+, -./:;<=>?@[]^`{|}~'''
@classmethod
def __is_valid_md_state_change(cls, current_state: bool, char_before: str, char_after: str):
# warning: This is not up to spec.
# if (not current_state and cls.__is_md_separator(char_before) and not cls.__is_md_separator(char_after)) or \
# (current_state and not cls.__is_md_separator(char_before) and cls.__is_md_separator(char_after)):
return True
@classmethod
def __replace_simple_tag_with_pair_tag(cls, text: str, original_tag: str, opening_tag: str, closing_tag: str) -> str:
answer = ""
remaining_string = text
current_state = False
while True:
pre, sep, post = remaining_string.partition(original_tag)
answer += pre
if len(sep) == 0:
break
char_before = (" " + pre)[-1]
char_after = (post + " ")[0]
if cls.__is_valid_md_state_change(current_state, char_before, char_after):
current_state = not current_state
answer += opening_tag if current_state else closing_tag
else:
answer += sep
remaining_string = post
if current_state:
logger.warning(f"Conversion found possible unmatched tag {original_tag}")
return answer
@classmethod
def _replace_md_pair_tags(cls, text: str) -> str:
for md_tag, html_tags in cls._MD_TO_HTML_PAIR_TAGS.items():
text = cls.__replace_simple_tag_with_pair_tag(text, md_tag, html_tags[0], html_tags[1])
return text
@classmethod
def reverse_custom_escaping_of_html(cls, text: str) -> str:
for html_char, custom_escaped_char in cls._HTML_TO_RESERVED_CHARS.items():
text = text.replace(custom_escaped_char, html_char)
unicode_repr = ascii(custom_escaped_char).strip("'")
text = text.replace(unicode_repr, html_char) # Handles unicode repr (e.g. \ue020) which happens for example when outputting to JSON.
return text
# BEGIN: HTML -> MD
@classmethod
def _normalize_html_tags(cls, text: str) -> str:
"""Normalize (partially) the HTML tags (i.e. -> )"""
for original_tag, new_tag in cls._HTML_TAG_NORMALIZATION.items():
text = text.replace(original_tag, new_tag)
return text
@classmethod
def _replace_html_pair_tags_with_md_equivalents(cls, text: str) -> str:
""" Convert HTML formatting to MD (i.e. -> **) """
for md_tag, html_tags in cls._MD_TO_HTML_PAIR_TAGS.items():
for html_tag in html_tags:
text = text.replace(html_tag, md_tag)
return text
@classmethod
def _strip_unsupported_html_tags(cls, text: str) -> str:
""" Strip (some) unsupported HTML tags (i.e.
and
to ) """
for html_tag, md_tag in cls._HTML_TAGS_TO_STRIP_FOR_RAW_MD.items():
text = text.replace(f"<{html_tag}>", md_tag)
text = text.replace(f"{html_tag}>", md_tag)
return text
@classmethod
def _custom_escape_html(cls, text: str) -> str:
""" Escapes HTML special chars so that it's possible to distinguish them from any HTML chars added later in the process by scan2report. """
for html_char, custom_escaped_char in cls._HTML_TO_RESERVED_CHARS.items():
text = text.replace(html_char, custom_escaped_char)
return text
# BEGIN: Public interface
@classmethod
def md_to_html(cls, md_text: str) -> str:
answer = md_text
answer = html.escape(answer)
answer = cls._replace_md_pair_tags(answer)
answer = cls.reverse_custom_escaping_of_html(answer)
answer = answer.replace("\n", " ")
return answer
@classmethod
def html_to_md(cls, html_text: str, output_without_html_tags: bool = False) -> str:
answer = html_text
answer = cls._normalize_html_tags(answer)
answer = cls.html_to_pwndoc_html(answer)
if output_without_html_tags:
answer = answer.replace(" ", "\n")
answer = cls._replace_html_pair_tags_with_md_equivalents(answer)
answer = cls._strip_unsupported_html_tags(answer)
answer = html.unescape(answer)
else:
answer = cls._custom_escape_html(answer)
return answer
@classmethod
def force_unescape_basic_tags(cls, text: str) -> str:
simple_tags = cls._get_basic_html_tags()
for full_tag_html in simple_tags:
full_tag_escaped = html.escape(full_tag_html)
text = text.replace(full_tag_escaped, full_tag_html)
# note: this only covers first level encoded, i.e. <p> but not <p>
# it also does not cover custom encoding, however neither of those should occur
return text
@staticmethod
def html_to_pwndoc_html(text: str) -> str:
text = html.unescape(text)
# PwnDoc is extremely strict in regards to HTML standard.
text = text.replace("
{% if ps.upload_to_pwndoc %}
Audit ID: {{ ps.audit_id }}
{% else %}
{% if ps.finished %}
Result file
{% else %}
Output file is not ready yet.
{% endif %}
{% endif %}
Import ID: {{ ps.get_folder_name() }}
Scan2Report arguments: {{ ps.scan2report_args }}
Locale: {{ ps.locale }}
Files used for import:
{% for filename in ps.stored_files %}
{{ filename }}
{% endfor %}
{% include 'job_status.html' %}
{% if not ps.finished %}
{% if config.RQ_DISABLED == False and ps.rq_is_still_in_enqueued() == False %}
⚠ The processing is not marked as finished, but the background job is no longer running. It might have failed silently.
{% endif %}
{% endif %}
{% endblock %} pwndocimportautomator-release-2022-09-22/templates/flash.html 0000664 0000000 0000000 00000000376 14313124554 0024200 0 ustar 00root root 0000000 0000000 {% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}