add some functional

This commit is contained in:
2025-07-17 22:39:52 +03:00
parent 6c397a937a
commit f0250b8a74
12 changed files with 648 additions and 60 deletions

BIN
attack_module.7z Normal file

Binary file not shown.

67
dpg.ini
View File

@ -39,7 +39,7 @@ Size=210,800
Collapsed=0
[Window][###108]
Pos=340,250
Pos=492,806
Size=600,300
Collapsed=0
@ -83,3 +83,68 @@ Pos=340,250
Size=600,300
Collapsed=0
[Window][###126]
Pos=532,210
Size=400,200
Collapsed=0
[Window][###298]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###204]
Pos=247,200
Size=700,400
Collapsed=0
[Window][###226]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###288]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###310]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###332]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###354]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###376]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###398]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###305]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###327]
Pos=300,200
Size=700,400
Collapsed=0
[Window][###426]
Pos=300,200
Size=700,400
Collapsed=0

View File

@ -15,7 +15,9 @@ dependencies = [
"dbus-python (>=1.4.0,<2.0.0)",
"dearpygui (>=2.0.0,<3.0.0)",
"pyqt5 (>=5.15.11,<6.0.0)",
"python-gvm (>=26.2.0,<27.0.0)"
"python-gvm (>=26.2.0,<27.0.0)",
"reportlab (>=4.4.1,<5.0.0)",
"pyserial (>=3.5,<4.0)"
]

18
scripts/modbus_test.py Normal file
View File

@ -0,0 +1,18 @@
from pymodbus.client import ModbusSerialClient
client = ModbusSerialClient(port="/dev/ttyUSB0", baudrate=9600)
result = client.read_input_registers(address=1, count=2, slave=1)
print(f"Temp {result.registers[0] / 10}, Humidity {result.registers[1] / 10}")
slave_id = client.read_holding_registers(address=257, count=4, slave=1)
print(f"""Slave id {slave_id.registers[0]} Baudrate {slave_id.registers[1]}
Temp correction {slave_id.registers[2]} Humidity correction {slave_id.registers[3]}""")
print(slave_id)
# change_slave_id = client.write_register(address=257, value=1, slave=101)
# change_baudrate = client.write_register(address=258, value=1, slave=101)
# print(change_slave_id)
# print(change_baudrate)

View File

@ -1,4 +1,4 @@
from gvm.connections import UnixSocketConnection
from gvm.connections import TLSConnection
from gvm.protocols.gmp import Gmp
from gvm.errors import GvmError
from src.utils.logger import get_logger
@ -6,13 +6,15 @@ from src.utils.logger import get_logger
logger = get_logger("openvas_scanner")
class OpenvasScanner:
def __init__(self, socket_path='/var/lib/docker/volumes/greenbone-community-edition_gvmd_socket_vol/_data/gvmd.sock'):
def __init__(self, host='127.0.0.1'):
"""
Инициализация сканера OpenVAS.
:param socket_path: Путь к сокету GVM (по умолчанию '/var/run/gvmd.sock')
"""
self.socket_path = socket_path
self.host = host
self.port = 9392
self.password = None
self.connection = None
self.gmp = None
@ -24,7 +26,7 @@ class OpenvasScanner:
:param password: Пароль для аутентификации
"""
try:
self.connection = UnixSocketConnection(path=self.socket_path)
self.connection = TLSConnection(hostname=self.host, port=self.port)
with Gmp(connection=self.connection) as gmp:
gmp.authenticate(username, password)
logger.info(f"Успешно подключено к GVM через {self.socket_path}")

View File

@ -40,7 +40,7 @@ class SessionDAO:
def get_all(self) -> List[Session]:
"""Получение всех сессий."""
with self.db.get_cursor() as cursor:
cursor.execute("SELECT * FROM sessions ORDER BY created_at DESC")
cursor.execute("SELECT * FROM sessions ORDER BY created_at ASC")
rows = cursor.fetchall()
return [
Session(

View File

@ -1,18 +1,21 @@
# src/core/database/db.py
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Optional
from contextlib import contextmanager
from src.utils.logger import get_logger
logger = get_logger("database")
class Database:
def __init__(self, db_path: str = "/home/lodqa/attack_module_data/security_scanner.db"):
def __init__(
self, db_path: str = "/home/lodqa/attack_module_data/security_scanner.db"
):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn: Optional[sqlite3.Connection] = None
self.conn: Optional[sqlite3.Connection]
self.init_db()
def connect(self):
@ -133,4 +136,5 @@ class Database:
"""Закрытие соединения с базой данных."""
if self.conn:
self.conn.close()
logger.info("Database connection closed")
logger.info("Database connection closed")

View File

@ -0,0 +1,425 @@
import json
import os
from pathlib import Path
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from src.core.database.managers.scanner_manager import ScannerManager
from src.utils.logger import get_logger
logger = get_logger("reports")
# Определяем базовый путь проекта
BASE_DIR = Path(__file__).resolve().parent.parent.parent.parent
# Регистрируем шрифт Exo 2
def register_exo2_font():
"""Регистрация шрифта Exo 2 для поддержки кириллицы"""
try:
font_path = BASE_DIR / "fonts" / "Exo2.ttf"
# Регистрируем обычное начертание
pdfmetrics.registerFont(TTFont('Exo2', str(font_path)))
# Регистрируем жирное начертание (используем тот же файл, но с другим именем)
pdfmetrics.registerFont(TTFont('Exo2-Bold', str(font_path)))
# Создаем псевдо-курсивное начертание (если нужно)
pdfmetrics.registerFont(TTFont('Exo2-Italic', str(font_path)))
# Регистрируем семейство шрифтов
pdfmetrics.registerFontFamily('Exo2', normal='Exo2', bold='Exo2-Bold', italic='Exo2-Italic')
return True
except Exception as e:
logger.error(f"Ошибка регистрации шрифта Exo 2: {e}")
# Возвращаем False, если шрифт не удалось зарегистрировать
return False
# Функция generate_json_report остается без изменений
def generate_pdf_report(scanner_service: ScannerManager, session_id: int, output_path: str):
"""Генерация PDF отчета для указанной сессии с использованием шрифта Exo 2"""
try:
# Регистрируем шрифт
font_registered = register_exo2_font()
base_font = 'Exo2' if font_registered else 'Helvetica'
session = scanner_service.get_session_by_id(session_id)
if not session:
logger.warning(f"Сессия с ID {session_id} не найдена")
return
scans = scanner_service.get_scans_by_session(session_id)
attacks = scanner_service.get_attacks_by_session(session_id)
# Создаем PDF документ
doc = SimpleDocTemplate(output_path, pagesize=letter)
# Создаем кастомные стили с использованием Exo 2
styles = getSampleStyleSheet()
# Переопределяем стили для поддержки Exo 2
title_style = ParagraphStyle(
'Title',
parent=styles['Title'],
fontName=f'{base_font}-Bold',
fontSize=18,
alignment=1,
spaceAfter=12
)
heading1_style = ParagraphStyle(
'Heading1',
parent=styles['Heading1'],
fontName=f'{base_font}-Bold',
fontSize=14,
spaceAfter=6
)
heading2_style = ParagraphStyle(
'Heading2',
parent=styles['Heading2'],
fontName=f'{base_font}-Bold',
fontSize=12,
spaceAfter=6
)
heading3_style = ParagraphStyle(
'Heading3',
parent=styles['Heading3'],
fontName=f'{base_font}-Bold',
fontSize=11,
spaceAfter=6
)
heading4_style = ParagraphStyle(
'Heading4',
parent=styles['Heading4'],
fontName=base_font,
fontSize=10,
spaceAfter=4
)
normal_style = ParagraphStyle(
'Normal',
parent=styles['Normal'],
fontName=base_font,
fontSize=10,
spaceAfter=6
)
table_header_style = ParagraphStyle(
'TableHeader',
parent=styles['Normal'],
fontName=f'{base_font}-Bold',
fontSize=10,
alignment=1,
textColor=colors.black
)
table_cell_style = ParagraphStyle(
'TableCell',
parent=styles['Normal'],
fontName=base_font,
fontSize=9,
alignment=1
)
elements = []
# Заголовок отчета
elements.append(Paragraph(f"Отчет по тестированию безопасности", title_style))
elements.append(Paragraph(f"Сессия: {session.name}", heading2_style))
elements.append(Spacer(1, 12))
# Информация о сессии
elements.append(Paragraph("Информация о сессии", heading2_style))
session_data = [
[Paragraph("ID", table_header_style), Paragraph(str(session.id), table_cell_style)],
[Paragraph("Название", table_header_style), Paragraph(session.name, table_cell_style)],
[Paragraph("Дата создания", table_header_style), Paragraph(str(session.created_at), table_cell_style)]
]
session_table = Table(session_data, colWidths=[100, 400])
session_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#4A6F9E")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#E8EDF4")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
]))
elements.append(session_table)
elements.append(Spacer(1, 24))
# Сканирования
if scans:
elements.append(PageBreak())
elements.append(Paragraph("Результаты сканирования", heading1_style))
for scan in scans:
elements.append(Paragraph(f"Сканирование: {scan.tool}", heading2_style))
scan_data = [
[Paragraph("Параметр", table_header_style), Paragraph("Значение", table_header_style)],
[Paragraph("ID", table_header_style), Paragraph(str(scan.id), table_cell_style)],
[Paragraph("Аргументы", table_header_style), Paragraph(scan.args or "Нет", table_cell_style)],
[Paragraph("Сводка", table_header_style), Paragraph(scan.summary or "Нет", table_cell_style)],
[Paragraph("Длительность", table_header_style), Paragraph(f"{scan.duration} секунд", table_cell_style)],
[Paragraph("Дата создания", table_header_style), Paragraph(str(scan.created_at), table_cell_style)]
]
scan_table = Table(scan_data, colWidths=[150, 350])
scan_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#4A6F9E")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#E8EDF4")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
]))
elements.append(scan_table)
elements.append(Spacer(1, 12))
# Хосты
hosts = scanner_service.get_hosts_by_scan(scan.id)
if hosts:
elements.append(Paragraph("Обнаруженные хосты", heading3_style))
host_data = [[
Paragraph("IP адрес", table_header_style),
Paragraph("MAC адрес", table_header_style),
Paragraph("Операционная система", table_header_style)
]]
for host in hosts:
host_data.append([
Paragraph(host.ip, table_cell_style),
Paragraph(host.mac or "Нет", table_cell_style),
Paragraph(host.os or "Нет", table_cell_style)
])
host_table = Table(host_data, colWidths=[150, 150, 200])
host_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#4A6F9E")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#E8EDF4")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
]))
elements.append(host_table)
elements.append(Spacer(1, 12))
# Порты и сервисы
for host in hosts:
ports = scanner_service.get_ports_by_host(host.id)
if ports:
elements.append(Paragraph(f"Порты и сервисы для хоста: {host.ip}", heading4_style))
for port in ports:
port_info = [
[Paragraph("Порт", table_header_style), Paragraph("Протокол", table_header_style), Paragraph("Состояние", table_header_style)],
[
Paragraph(str(port.port_num), table_cell_style),
Paragraph(port.protocol, table_cell_style),
Paragraph(port.state, table_cell_style)
]
]
port_table = Table(port_info, colWidths=[80, 80, 80])
port_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#5D7CA6")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#F0F5FC")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
]))
elements.append(port_table)
services = scanner_service.get_services_by_port(port.id)
if services:
elements.append(Paragraph("Сервисы:", heading4_style))
service_data = [[
Paragraph("Название", table_header_style),
Paragraph("Продукт", table_header_style),
Paragraph("Версия", table_header_style),
Paragraph("Доп. информация", table_header_style)
]]
for service in services:
service_data.append([
Paragraph(service.name or "Нет", table_cell_style),
Paragraph(service.product or "Нет", table_cell_style),
Paragraph(service.version or "Нет", table_cell_style),
Paragraph(service.extrainfo or "Нет", table_cell_style)
])
service_table = Table(service_data, colWidths=[100, 100, 80, 120])
service_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#6D8CB8")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#F5F8FD")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
]))
elements.append(service_table)
elements.append(Spacer(1, 8))
# Результаты Modbus
modbus_result = scanner_service.get_modbus_result(scan.id)
if modbus_result:
elements.append(Paragraph("Результаты Modbus сканирования", heading3_style))
modbus_data = [
[Paragraph("Параметр", table_header_style), Paragraph("Значение", table_header_style)],
[Paragraph("Активные Coils", table_header_style), Paragraph(modbus_result.active_coils or "Нет", table_cell_style)],
[Paragraph("Активные Discrete Inputs", table_header_style), Paragraph(modbus_result.active_discrete_inputs or "Нет", table_cell_style)],
[Paragraph("Активные Holding Registers", table_header_style), Paragraph(modbus_result.active_holding_registers or "Нет", table_cell_style)],
[Paragraph("Активные Input Registers", table_header_style), Paragraph(modbus_result.active_input_registers or "Нет", table_cell_style)]
]
modbus_table = Table(modbus_data, colWidths=[200, 300])
modbus_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#4A6F9E")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#E8EDF4")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
]))
elements.append(modbus_table)
elements.append(Spacer(1, 24))
# Атаки
if attacks:
elements.append(PageBreak())
elements.append(Paragraph("Результаты атак", heading1_style))
for attack in attacks:
elements.append(Paragraph(f"Атака: {attack.tool}", heading2_style))
attack_data = [
[Paragraph("Параметр", table_header_style), Paragraph("Значение", table_header_style)],
[Paragraph("ID", table_header_style), Paragraph(str(attack.id), table_cell_style)],
[Paragraph("Аргументы", table_header_style), Paragraph(attack.args or "Нет", table_cell_style)],
[Paragraph("Сводка", table_header_style), Paragraph(attack.summary or "Нет", table_cell_style)],
[Paragraph("Длительность", table_header_style), Paragraph(f"{attack.duration} секунд", table_cell_style)],
[Paragraph("Дата создания", table_header_style), Paragraph(str(attack.created_at), table_cell_style)]
]
attack_table = Table(attack_data, colWidths=[150, 350])
attack_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#4A6F9E")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#E8EDF4")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
]))
elements.append(attack_table)
# Результаты атак
results = scanner_service.get_attack_results_by_attack(attack.id)
if results:
elements.append(Paragraph("Результаты:", heading3_style))
result_data = [[
Paragraph("ID", table_header_style),
Paragraph("Сводка", table_header_style)
]]
for result in results:
result_data.append([
Paragraph(str(result.id), table_cell_style),
Paragraph(result.summary or "Нет", table_cell_style)
])
result_table = Table(result_data, colWidths=[50, 450])
result_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#5D7CA6")),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor("#F0F5FC")),
('GRID', (0, 0), (-1, -1), 1, colors.black),
]))
elements.append(result_table)
elements.append(Spacer(1, 24))
# Создание PDF
doc.build(elements)
logger.info(f"PDF отчет успешно создан: {output_path}")
except Exception as e:
logger.error(f"Ошибка при создании PDF отчета: {e}")
raise
def generate_json_report(scanner_service: ScannerManager, session_id: int) -> str:
"""Генерация JSON отчета для указанной сессии"""
try:
session = scanner_service.get_session_by_id(session_id)
if not session:
logger.warning(f"Сессия с ID {session_id} не найдена")
return json.dumps({"error": "Сессия не найдена"})
scans = scanner_service.get_scans_by_session(session_id)
scans_data = []
for scan in scans:
hosts = scanner_service.get_hosts_by_scan(scan.id)
hosts_data = []
for host in hosts:
ports = scanner_service.get_ports_by_host(host.id)
ports_data = []
for port in ports:
services = scanner_service.get_services_by_port(port.id)
services_data = []
for service in services:
cpes = scanner_service.get_cpes_by_service(service.id)
cpes_data = [{"id": cpe.id, "name": cpe.name} for cpe in cpes]
services_data.append({
"id": service.id,
"name": service.name,
"product": service.product,
"extrainfo": service.extrainfo,
"ostype": service.ostype,
"cpes": cpes_data
})
ports_data.append({
"id": port.id,
"protocol": port.protocol,
"port_num": port.port_num,
"state": port.state,
"services": services_data
})
hosts_data.append({
"id": host.id,
"ip": host.ip,
"mac": host.mac,
"os": host.os,
"ports": ports_data
})
modbus_result = scanner_service.get_modbus_result(scan.id)
modbus_data = {
"active_coils": modbus_result.active_coils if modbus_result else "",
"active_discrete_inputs": modbus_result.active_discrete_inputs if modbus_result else "",
"active_holding_registers": modbus_result.active_holding_registers if modbus_result else "",
"active_input_registers": modbus_result.active_input_registers if modbus_result else ""
} if modbus_result else None
scans_data.append({
"id": scan.id,
"tool": scan.tool,
"args": scan.args,
"summary": scan.summary,
"duration": scan.duration,
"created_at": scan.created_at,
"hosts": hosts_data,
"modbus_result": modbus_data
})
attacks = scanner_service.get_attacks_by_session(session_id)
attacks_data = []
for attack in attacks:
results = scanner_service.get_attack_results_by_attack(attack.id)
results_data = [{"id": result.id, "summary": result.summary} for result in results]
attacks_data.append({
"id": attack.id,
"tool": attack.tool,
"args": attack.args,
"summary": attack.summary,
"duration": attack.duration,
"created_at": attack.created_at,
"results": results_data
})
report_data = {
"session": {
"id": session.id,
"name": session.name,
"created_at": session.created_at
},
"scans": scans_data,
"attacks": attacks_data
}
logger.info(f"JSON отчет успешно сгенерирован для сессии {session_id}")
return json.dumps(report_data, indent=4, ensure_ascii=False)
except Exception as e:
logger.error(f"Ошибка при генерации JSON отчета: {e}")
return json.dumps({"error": str(e)})

View File

@ -49,7 +49,7 @@ def create_configuration_window(db, session: Session):
dpg.add_button(
label="Генерация отчета",
callback=lambda: logger.info("Reporting selected"),
callback=lambda: show_report_window(db, session),
width=-1,
height=50
)
@ -110,4 +110,15 @@ def show_attack_window(db, session):
# Создать новую контентную область
# with dpg.child_window(parent="config_window", tag="content_area", width=-1, height=-1):
from src.ui.attack_window import create_attack_window
create_attack_window(db, session, parent="content_area")
create_attack_window(db, session, parent="content_area")
def show_report_window(db, session):
"""Показать окно эксплуатации"""
# Очистить контентную область
if dpg.does_item_exist("content_area"):
dpg.delete_item("content_area", children_only=True)
# Создать новую контентную область
# with dpg.child_window(parent="config_window", tag="content_area", width=-1, height=-1):
from src.ui.report_window import create_report_window
create_report_window(db, session, parent="content_area")

103
src/ui/report_window.py Normal file
View File

@ -0,0 +1,103 @@
import dearpygui.dearpygui as dpg
from src.core.models.models import Session
from src.core.database.managers.scanner_manager import ScannerManager
from src.core.services.reports import generate_json_report, generate_pdf_report
from src.utils.logger import get_logger
logger = get_logger("report_window")
def create_report_window(db, session: Session, parent=None):
"""Окно генерации отчета (с поддержкой родительского контейнера)"""
if parent:
with dpg.child_window(
parent=parent,
tag="report_window",
width=-1,
height=-1
):
build_report_content(db, session)
else:
with dpg.child_window(
tag="report_window",
width=-1,
height=-1
):
build_report_content(db, session)
def build_report_content(db, session):
"""Окно генерации отчетов для выбранной сессии"""
scanner_service = ScannerManager(db)
dpg.add_text(f"Сессия: {session.name}")
dpg.add_spacer(height=10)
dpg.add_button(
label="Генерировать JSON отчет",
callback=lambda: generate_json_report_callback(scanner_service, session.id),
width=400,
height=80
)
dpg.add_spacer(height=10)
dpg.add_button(
label="Генерировать PDF отчет",
callback=lambda: generate_pdf_report_callback(scanner_service, session.id),
width=400,
height=80
)
dpg.add_spacer(height=10)
dpg.add_text("", tag="report_status")
def generate_json_report_callback(scanner_service, session_id):
"""Обработчик для генерации JSON отчета"""
# Создаем диалог выбора файла
dpg.add_file_dialog(
directory_selector=False,
default_path="/home/lodqa/",
width = 800,
height = 400,
modal=True,
callback=lambda s, a: save_json_report(scanner_service, session_id, a['file_path_name']),
cancel_callback=lambda: dpg.set_value("report_status", "Выбор пути отменен"),
tag="json_file_dialog",
show=True
)
def save_json_report(scanner_service, session_id, file_path):
"""Сохранение JSON отчета"""
try:
report = generate_json_report(scanner_service, session_id)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"JSON отчет сохранен в {file_path}")
dpg.set_value("report_status", f"JSON отчет сохранен в {file_path}")
dpg.delete_item("json_file_dialog") # Удаляем диалог после использования
except Exception as e:
logger.error(f"Ошибка при сохранении JSON отчета: {e}")
dpg.set_value("report_status", f"Ошибка: {str(e)}")
dpg.delete_item("json_file_dialog") # Удаляем диалог при ошибке
def generate_pdf_report_callback(scanner_service, session_id):
"""Обработчик для генерации PDF отчета"""
# Создаем диалог выбора файла
dpg.add_file_dialog(
directory_selector=False,
default_path="/home/lodqa/",
width = 800,
height = 400,
modal=True,
callback=lambda s, a: save_pdf_report(scanner_service, session_id, a['file_path_name']),
cancel_callback=lambda: dpg.set_value("report_status", "Выбор пути отменен"),
tag="pdf_file_dialog",
show=True
)
def save_pdf_report(scanner_service, session_id, file_path):
"""Сохранение PDF отчета"""
try:
generate_pdf_report(scanner_service, session_id, file_path)
logger.info(f"PDF отчет сохранен в {file_path}")
dpg.set_value("report_status", f"PDF отчет сохранен в {file_path}")
dpg.delete_item("pdf_file_dialog") # Удаляем диалог после использования
except Exception as e:
logger.error(f"Ошибка при сохранении PDF отчета: {e}")
dpg.set_value("report_status", f"Ошибка: {str(e)}")
dpg.delete_item("pdf_file_dialog") # Удаляем диалог при ошибке

View File

@ -244,8 +244,8 @@ def create_openvas_tab(scanner_service, session):
# Группа для настроек подключения
with dpg.group(tag="openvas_connection_group"):
dpg.add_text("Настройки подключения к OpenVAS")
dpg.add_text("Сокет (например, /var/run/gvmd.sock):")
socket_input = dpg.add_input_text(tag="openvas_socket", default_value="/var/run/gvmd.sock", width=300)
dpg.add_text("IP-адрес Greenbone:")
host_input = dpg.add_input_text(tag="openvas_socket", default_value="127.0.0.1", width=300)
dpg.add_text("Имя пользователя:")
username_input = dpg.add_input_text(tag="openvas_username", width=300)
dpg.add_text("Пароль:")
@ -253,7 +253,7 @@ def create_openvas_tab(scanner_service, session):
dpg.add_button(
label="Подключиться",
callback=lambda: connect_to_openvas(
dpg.get_value(socket_input),
dpg.get_value(host_input),
dpg.get_value(username_input),
dpg.get_value(password_input)
),
@ -295,11 +295,11 @@ def create_openvas_tab(scanner_service, session):
readonly=True
)
def connect_to_openvas(socket_path, username, password):
def connect_to_openvas(host, username, password):
"""Подключение к OpenVAS"""
global openvas_scanner
try:
openvas_scanner = OpenvasScanner(socket_path=socket_path)
openvas_scanner = OpenvasScanner(host=host)
openvas_scanner.connect(username, password)
configs = openvas_scanner.get_configurations()
dpg.configure_item("openvas_config", items=configs)

View File

@ -1,42 +0,0 @@
# src/utils/window_utils.py
from PyQt5.QtWidgets import QDialog, QMessageBox, QInputDialog
from PyQt5.QtCore import Qt
def show_floating_dialog(parent, title, message, icon=QMessageBox.Information):
"""Показывает диалоговое окно как плавающее поверх всех окон"""
dialog = QMessageBox(parent)
dialog.setWindowTitle(title)
dialog.setText(message)
dialog.setIcon(icon)
# Устанавливаем флаги для плавающего окна
dialog.setWindowFlags(
Qt.Window |
Qt.CustomizeWindowHint |
Qt.WindowTitleHint |
Qt.WindowStaysOnTopHint
)
# Отключаем тайлинг для этого окна
dialog.setProperty("hyprland_floating", True)
return dialog.exec_()
def create_floating_input(parent, title, label):
"""Создает плавающее окно ввода"""
dialog = QInputDialog(parent)
dialog.setWindowTitle(title)
dialog.setLabelText(label)
# Устанавливаем флаги для плавающего окна
dialog.setWindowFlags(
Qt.Window |
Qt.CustomizeWindowHint |
Qt.WindowTitleHint |
Qt.WindowStaysOnTopHint
)
# Отключаем тайлинг для этого окна
dialog.setProperty("hyprland_floating", True)
return dialog