without reports

This commit is contained in:
2025-06-04 21:39:07 +03:00
parent e23d6a5fa8
commit 6c397a937a
6 changed files with 269 additions and 60 deletions

View File

@ -78,3 +78,8 @@ Pos=300,200
Size=700,400 Size=700,400
Collapsed=0 Collapsed=0
[Window][###118]
Pos=340,250
Size=600,300
Collapsed=0

View File

@ -14,7 +14,8 @@ dependencies = [
"loguru (>=0.7.3,<0.8.0)", "loguru (>=0.7.3,<0.8.0)",
"dbus-python (>=1.4.0,<2.0.0)", "dbus-python (>=1.4.0,<2.0.0)",
"dearpygui (>=2.0.0,<3.0.0)", "dearpygui (>=2.0.0,<3.0.0)",
"pyqt5 (>=5.15.11,<6.0.0)" "pyqt5 (>=5.15.11,<6.0.0)",
"python-gvm (>=26.2.0,<27.0.0)"
] ]

View File

@ -1,76 +1,126 @@
from gvm.connections import TLSConnection from gvm.connections import UnixSocketConnection
from gvm.protocols.gmp import GMP from gvm.protocols.gmp import Gmp
from gvm.errors import GvmError
from src.utils.logger import get_logger from src.utils.logger import get_logger
logger = get_logger("openvas_scanner") logger = get_logger("openvas_scanner")
class OpenvasScanner: class OpenvasScanner:
def __init__(self, hostname, port, username, password): def __init__(self, socket_path='/var/lib/docker/volumes/greenbone-community-edition_gvmd_socket_vol/_data/gvmd.sock'):
"""Инициализация сканера OpenVAS""" """
Инициализация сканера OpenVAS.
:param socket_path: Путь к сокету GVM (по умолчанию '/var/run/gvmd.sock')
"""
self.socket_path = socket_path
self.connection = None
self.gmp = None
def connect(self, username, password):
"""
Подключение к GVM.
:param username: Имя пользователя для аутентификации
:param password: Пароль для аутентификации
"""
try: try:
self.connection = TLSConnection(hostname=hostname, port=port) self.connection = UnixSocketConnection(path=self.socket_path)
self.gmp = GMP(self.connection) with Gmp(connection=self.connection) as gmp:
self.gmp.authenticate(username, password) gmp.authenticate(username, password)
self.configurations = {config.get_name(): config.get_id() for config in self.gmp.get_configurations().get_configurations()} logger.info(f"Успешно подключено к GVM через {self.socket_path}")
logger.info(f"Connected to GVM at {hostname}:{port}") except GvmError as e:
except Exception as e: logger.error(f"Ошибка подключения к GVM: {e}")
logger.error(f"Failed to connect to GVM: {e}")
raise raise
def get_configurations(self): def get_configurations(self):
"""Получить список конфигураций сканирования""" """
return list(self.configurations.keys()) Получение списка конфигураций сканирования.
def start_scan(self, scan_name, targets, scan_config_name): :return: Список названий конфигураций
"""Начать сканирование""" """
try:
configs = self.gmp.get_scan_configs()
return [config.get('name') for config in configs]
except GvmError as e:
logger.error(f"Ошибка при получении конфигураций: {e}")
raise
def create_target(self, name, hosts):
"""
Создание цели для сканирования.
:param name: Имя цели
:param hosts: Список хостов (IP-адресов или сетей)
:return: ID созданной цели
"""
try:
target = self.gmp.create_target(name=name, hosts=hosts)
return target.get('id')
except GvmError as e:
logger.error(f"Ошибка при создании цели: {e}")
raise
def create_task(self, name, config_name, target_id):
"""
Создание задачи сканирования.
:param name: Имя задачи
:param config_name: Название конфигурации сканирования
:param target_id: ID цели
:return: ID созданной задачи
"""
try:
configs = self.gmp.get_scan_configs(filter=f"name={config_name}")
if not configs:
raise ValueError(f"Конфигурация '{config_name}' не найдена")
config_id = configs[0].get('id')
task = self.gmp.create_task(name=name, config_id=config_id, target_id=target_id)
return task.get('id')
except GvmError as e:
logger.error(f"Ошибка при создании задачи: {e}")
raise
def start_task(self, task_id):
"""
Запуск задачи сканирования.
:param task_id: ID задачи
"""
try: try:
config_id = self.configurations.get(scan_config_name)
if not config_id:
raise ValueError(f"Конфигурация {scan_config_name} не найдена")
# Создать цель
target_response = self.gmp.create_target(
name=f"Target for {scan_name}",
hosts=targets.split(',')
)
target_id = target_response.get_id()
# Создать задачу
task_response = self.gmp.create_task(
name=scan_name,
config_id=config_id,
target_id=target_id
)
task_id = task_response.get_id()
# Запустить задачу
self.gmp.start_task(task_id) self.gmp.start_task(task_id)
logger.info(f"Started scan with task ID: {task_id}") logger.info(f"Задача {task_id} запущена")
return task_id except GvmError as e:
except Exception as e: logger.error(f"Ошибка при запуске задачи: {e}")
logger.error(f"Failed to start scan: {e}")
raise raise
def get_task_status(self, task_id): def get_task_status(self, task_id):
"""Получить статус задачи""" """
Получение статуса задачи.
:param task_id: ID задачи
:return: Статус задачи
"""
try: try:
tasks = self.gmp.get_tasks(filter_string=f"id={task_id}").get_tasks() task = self.gmp.get_task(task_id)
if tasks: return task.get('status')
return tasks[0].get_status() except GvmError as e:
logger.warning(f"Task {task_id} not found") logger.error(f"Ошибка при получении статуса задачи: {e}")
return None
except Exception as e:
logger.error(f"Failed to get task status: {e}")
raise raise
def get_report(self, task_id): def get_report(self, task_id):
"""Получить отчет по задаче""" """
Получение отчета по задаче.
:param task_id: ID задачи
:return: Отчет в формате XML
"""
try: try:
reports = self.gmp.get_reports(filter_string=f"task={task_id}") reports = self.gmp.get_reports(filter=f"task_id={task_id}")
if reports.get_reports(): if reports:
report_id = reports.get_reports()[0].get_id() report_id = reports[0].get('id')
report = self.gmp.get_report(report_id).get_report() report = self.gmp.get_report(report_id, format='xml')
logger.info(f"Retrieved report for task {task_id}")
return report return report
logger.warning(f"No report found for task {task_id}")
return None return None
except Exception as e: except GvmError as e:
logger.error(f"Failed to get report: {e}") logger.error(f"Ошибка при получении отчета: {e}")
raise raise

View File

@ -56,6 +56,9 @@ def build_attack_content(session):
with dpg.tab(tag="modbus_attack_tab", label="Атаки на Modbus"): with dpg.tab(tag="modbus_attack_tab", label="Атаки на Modbus"):
create_modbus_attack_tab() create_modbus_attack_tab()
with dpg.tab(tag="scapy_tab", label="Scapy"):
logger.info("Вкладка Scapy нажата")
def create_metasploit_tab(): def create_metasploit_tab():
"""Вкладка Metasploit (заглушка)""" """Вкладка Metasploit (заглушка)"""
with dpg.group(): with dpg.group():

View File

@ -17,7 +17,7 @@ def create_main_window(db):
no_move=True, no_move=True,
): # Убраны фиксированные размеры ): # Убраны фиксированные размеры
# Заголовок с увеличенным шрифтом # Заголовок с увеличенным шрифтом
dpg.add_text("Модуль тестирования безопасности", color=(255, 255, 255), pos=[600,0]) dpg.add_text("Модуль тестирования безопасности", color=(255, 255, 255), pos=[500,0])
dpg.add_spacing(count=10) dpg.add_spacing(count=10)
dpg.add_separator() dpg.add_separator()

View File

@ -3,6 +3,7 @@ from src.core.models.models import Session, Scan, Host, Port, Service
from src.core.database.managers.scanner_manager import ScannerManager from src.core.database.managers.scanner_manager import ScannerManager
from src.core.api.nmap import NmapScanner from src.core.api.nmap import NmapScanner
from src.core.attacks.modbus_scan import ModbusScanner from src.core.attacks.modbus_scan import ModbusScanner
from src.core.api.openvas import OpenvasScanner
from src.utils.logger import get_logger from src.utils.logger import get_logger
import threading import threading
import time import time
@ -13,6 +14,7 @@ logger = get_logger("scanner_window")
# Глобальные переменные для управления сканерами # Глобальные переменные для управления сканерами
nmap_scanner = None nmap_scanner = None
modbus_scanner = None modbus_scanner = None
openvas_scanner = None
def create_scanner_window(db, session: Session, parent=None): def create_scanner_window(db, session: Session, parent=None):
"""Окно сканирования (с поддержкой родительского контейнера)""" """Окно сканирования (с поддержкой родительского контейнера)"""
@ -45,7 +47,7 @@ def build_scanner_content(scanner_service, session):
create_nmap_tab(scanner_service, session) create_nmap_tab(scanner_service, session)
with dpg.tab(tag="openvas_tab", label="OpenVAS Scanning"): with dpg.tab(tag="openvas_tab", label="OpenVAS Scanning"):
dpg.add_text("OpenVAS scanning will be implemented here") create_openvas_tab(scanner_service, session)
with dpg.tab(tag="modbus_tab", label="Modbus Scanning"): with dpg.tab(tag="modbus_tab", label="Modbus Scanning"):
create_modbus_tab(scanner_service, session) create_modbus_tab(scanner_service, session)
@ -236,6 +238,154 @@ def run_nmap_scan(scanner_service, session, scan_id, scanner):
) )
scanner_service.update_scan(scan) scanner_service.update_scan(scan)
def create_openvas_tab(scanner_service, session):
"""Вкладка OpenVAS сканирования"""
with dpg.group():
# Группа для настроек подключения
with dpg.group(tag="openvas_connection_group"):
dpg.add_text("Настройки подключения к OpenVAS")
dpg.add_text("Сокет (например, /var/run/gvmd.sock):")
socket_input = dpg.add_input_text(tag="openvas_socket", default_value="/var/run/gvmd.sock", width=300)
dpg.add_text("Имя пользователя:")
username_input = dpg.add_input_text(tag="openvas_username", width=300)
dpg.add_text("Пароль:")
password_input = dpg.add_input_text(tag="openvas_password", password=True, width=300)
dpg.add_button(
label="Подключиться",
callback=lambda: connect_to_openvas(
dpg.get_value(socket_input),
dpg.get_value(username_input),
dpg.get_value(password_input)
),
width=200,
height=40
)
dpg.add_spacer(height=10)
# Группа для настроек сканирования (скрыта до подключения)
with dpg.group(tag="openvas_scan_group", show=False):
dpg.add_text("Настройки сканирования")
dpg.add_text("Имя сканирования:")
scan_name_input = dpg.add_input_text(tag="openvas_scan_name", width=300)
dpg.add_text("Цели (IP-адреса или сети, через запятую):")
targets_input = dpg.add_input_text(tag="openvas_targets", width=300)
dpg.add_text("Конфигурация сканирования:")
config_combo = dpg.add_combo(tag="openvas_config", items=[], width=300)
dpg.add_button(
label="Начать сканирование",
callback=lambda: start_openvas_scan(
scanner_service,
session,
dpg.get_value(scan_name_input),
dpg.get_value(targets_input),
dpg.get_value(config_combo)
),
width=200,
height=40
)
dpg.add_spacer(height=10)
# Область вывода
dpg.add_text("Вывод:")
dpg.add_input_text(
tag="openvas_output",
multiline=True,
height=300,
width=-1,
readonly=True
)
def connect_to_openvas(socket_path, username, password):
"""Подключение к OpenVAS"""
global openvas_scanner
try:
openvas_scanner = OpenvasScanner(socket_path=socket_path)
openvas_scanner.connect(username, password)
configs = openvas_scanner.get_configurations()
dpg.configure_item("openvas_config", items=configs)
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + "✅ Подключено к OpenVAS\n")
dpg.configure_item("openvas_scan_group", show=True)
except Exception as e:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"❌ Ошибка подключения: {str(e)}\n")
logger.error(f"OpenVAS connection failed: {e}")
def start_openvas_scan(scanner_service, session, scan_name, targets, config_name):
"""Запуск OpenVAS сканирования"""
global openvas_scanner
if openvas_scanner is None:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + "Не подключено к OpenVAS\n")
return
try:
# Создание цели
target_id = openvas_scanner.create_target(scan_name, targets.split(','))
# Создание задачи
task_id = openvas_scanner.create_task(scan_name, config_name, target_id)
# Запуск задачи
openvas_scanner.start_task(task_id)
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"🚀 Задача {task_id} запущена\n")
# Сохранение сканирования в базе данных
scan = Scan(
session_id=session.id,
tool="openvas",
args=f"Имя: {scan_name}, Цели: {targets}, Конфигурация: {config_name}",
summary="В процессе",
created_at=datetime.utcnow()
)
saved_scan = scanner_service.save_scan(scan)
if saved_scan:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"📝 ID сканирования: {saved_scan.id}\n")
# Запуск потока для отслеживания статуса задачи
thread = threading.Thread(
target=monitor_openvas_task,
args=(scanner_service, session, saved_scan.id, task_id)
)
thread.start()
except Exception as e:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"❌ Ошибка: {str(e)}\n")
logger.error(f"OpenVAS scan failed: {e}")
def monitor_openvas_task(scanner_service, session, scan_id, task_id):
"""Отслеживание статуса задачи OpenVAS"""
global openvas_scanner
try:
start_time = time.time()
while True:
status = openvas_scanner.get_task_status(task_id)
if status == 'Done':
report = openvas_scanner.get_report(task_id)
if report:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + "✅ Сканирование завершено\n")
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"Отчет:\n{report}\n")
else:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + "❌ Отчет не найден\n")
break
elif status in ['Running', 'Requested']:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"⏳ Статус: {status}\n")
else:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"❌ Неизвестный статус: {status}\n")
break
time.sleep(10)
# Обновление статуса сканирования в базе данных
scan = Scan(
id=scan_id,
summary="OpenVAS сканирование завершено",
duration=int(time.time() - start_time),
created_at=datetime.utcnow()
)
scanner_service.update_scan(scan)
if dpg.does_item_exist("scan_list"):
load_scan_history(scanner_service, session, "scan_list")
except Exception as e:
dpg.set_value("openvas_output", dpg.get_value("openvas_output") + f"❌ Ошибка: {str(e)}\n")
logger.error(f"OpenVAS task monitoring failed: {e}")
def create_modbus_tab(scanner_service, session): def create_modbus_tab(scanner_service, session):
"""Вкладка Modbus сканирования""" """Вкладка Modbus сканирования"""
with dpg.group(): with dpg.group():