From e23d6a5fa82c59a32d36b31c6db2d863b43e90c9 Mon Sep 17 00:00:00 2001 From: Yuri Dmitriev Date: Tue, 3 Jun 2025 02:47:00 +0300 Subject: [PATCH] add some attacks --- Makefile | 2 +- dpg.ini | 35 ++ src/core/api/nmap.py | 255 ++++++--- src/core/api/openvas.py | 76 +++ src/core/database/managers/scanner_manager.py | 284 +++++++++- src/core/models/models.py | 2 +- src/ui/attack_window.py | 497 +++++++++++++++++ src/ui/configuration_window.py | 27 +- src/ui/logger_window.py | 79 +++ src/ui/scanner_window.py | 512 ++++++++++++------ 10 files changed, 1525 insertions(+), 244 deletions(-) create mode 100644 src/core/api/openvas.py create mode 100644 src/ui/attack_window.py create mode 100644 src/ui/logger_window.py diff --git a/Makefile b/Makefile index 07131b7..e986c4a 100644 --- a/Makefile +++ b/Makefile @@ -5,4 +5,4 @@ # TODO: make clean command window: - PYTHONPATH=./ poetry run python src/main.py \ No newline at end of file + PYTHONPATH=./ poetry run python src/main.py diff --git a/dpg.ini b/dpg.ini index 5c5a1fc..616a576 100644 --- a/dpg.ini +++ b/dpg.ini @@ -43,3 +43,38 @@ Pos=340,250 Size=600,300 Collapsed=0 +[Window][###200] +Pos=300,300 +Size=700,400 +Collapsed=0 + +[Window][###265] +Pos=300,200 +Size=400,300 +Collapsed=0 + +[Window][###203] +Pos=300,200 +Size=700,400 +Collapsed=0 + +[Window][###225] +Pos=300,200 +Size=700,400 +Collapsed=0 + +[Window][###247] +Pos=300,200 +Size=700,400 +Collapsed=0 + +[Window][###269] +Pos=300,200 +Size=700,400 +Collapsed=0 + +[Window][###291] +Pos=300,200 +Size=700,400 +Collapsed=0 + diff --git a/src/core/api/nmap.py b/src/core/api/nmap.py index b1a4bbe..b72fd51 100644 --- a/src/core/api/nmap.py +++ b/src/core/api/nmap.py @@ -1,95 +1,214 @@ import re -import json -from typing import Any, Dict -import nmap3 +import subprocess +import shlex +import time +import os +import signal +from typing import Dict, Any, List from src.utils.logger import get_logger logger = get_logger("nmap_scanner") - class NmapScanner: - def __init__(self, ip: str, args: str = "-sV"): + def __init__(self, ip: str, scan_type: str = "Quick Scan", custom_args: str = ""): """ Инициализация сканера Nmap :param ip: целевой IP-адрес - :param args: аргументы для сканирования (по умолчанию: -sV) + :param scan_type: тип сканирования (Quick Scan, Intensive Scan, Full Scan, Custom Scan) + :param custom_args: пользовательские аргументы для сканирования """ self.ip = ip - self.args = args - self.nmap = nmap3.Nmap() - logger.info(f"Инициализация NmapScanner для IP: {ip}, аргументы: {args}") + self.scan_type = scan_type + self.custom_args = custom_args + self.process = None + self.start_time = 0 + self.output_file = "/home/lodqa/attack_module_data/nmap_output.xml" + self.stats = { + 'hosts_up': 0, + 'hosts_down': 0, + 'duration': 0.0, + 'start_time': 0, + 'end_time': 0, + 'hosts': {}, + 'service_info': {} + } + self.running = False + logger.info(f"Инициализация NmapScanner для IP: {ip}, тип: {scan_type}, аргументы: {custom_args}") + + def _get_nmap_args(self) -> List[str]: + """ + Определение аргументов Nmap на основе типа сканирования и пользовательских аргументов + + :return: список аргументов Nmap + """ + scan_args = { + "Quick Scan": "-T4 -F", + "Intensive Scan": "-T4 -A", + "Full Scan": "-T4 -p- -A", + "Custom Scan": self.custom_args + } + base_args = scan_args.get(self.scan_type, "-T4") + args = shlex.split(base_args) + args.extend(["-oX", self.output_file]) + args.append(self.ip) + return ["nmap"] + args def start_scan(self) -> Dict[str, Any]: """ - Запуск сканирования и возврат очищенных результатов + Запуск сканирования Nmap и возврат результатов - :return: словарь с разделенными результатами + :return: словарь с результатами сканирования """ - logger.info(f"🚀 Запуск сканирования Nmap для {self.ip} с аргументами: {self.args}") + logger.info(f"🚀 Запуск сканирования Nmap для {self.ip}") + self.running = True + self.start_time = time.time() + self.stats['start_time'] = self.start_time + + command = self._get_nmap_args() + logger.info(f"Команда Nmap: {' '.join(command)}") + try: - # Запуск сканирования с определением версий - scan_result = self.nmap.nmap_version_detection(self.ip, args=self.args) - logger.debug(f"Сырые результаты сканирования: {json.dumps(scan_result, indent=2)}") + # Запуск nmap в фоновом процессе + self.process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) - # Очистка и структурирование результатов - cleaned_result = self.return_clear_result(scan_result) - logger.success(f"✅ Сканирование завершено для {self.ip}. Найдено хостов: {len(cleaned_result['hosts'])}") - return cleaned_result + # Ожидание завершения процесса + while self.running and self.process.poll() is None: + time.sleep(0.5) - except Exception as e: - logger.error(f"Ошибка при выполнении сканирования Nmap: {e}") - return {"hosts": {}, "service_info": {}} + if self.running: + # Процесс завершился нормально + stdout, stderr = self.process.communicate() + self.stats['end_time'] = time.time() + self.stats['duration'] = self.stats['end_time'] - self.start_time - @staticmethod - def return_clear_result(raw_data: Dict[str, Any]) -> Dict[str, Any]: - """ - Очистка и структурирование сырых данных от Nmap - - :param raw_data: сырые данные от nmap_version_detection - :return: словарь с разделенными результатами - """ - logger.info("Начало обработки сырых данных Nmap") - try: - # Регулярное выражение для проверки IPv4 - ip_pattern = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$') - - hosts = {} - service_info = {} - - for key, value in raw_data.items(): - # Проверяем, является ли ключ IP-адресом - if ip_pattern.match(key): - # Проверяем, в каком состоянии находится хост - if value.get('state', {}).get('state') == "up": - hosts[key] = value - logger.debug(f"Обнаружен активный хост: {key}") - else: - logger.debug( - f"Хост {key} не активен (состояние: {value.get('state', {}).get('state', 'unknown')})") + # Чтение вывода из файла + if os.path.exists(self.output_file): + with open(self.output_file, 'r') as f: + xml_output = f.read() + os.remove(self.output_file) + self._parse_output(xml_output, stderr) else: - service_info[key] = value + self.stats['error'] = "Output file not generated" + logger.error("No output file generated by nmap") + else: + # Сканирование было остановлено + self.stats['error'] = "Scan stopped by user" - logger.info( - f"Обработка завершена. Активных хостов: {len(hosts)}, сервисная информация: {len(service_info)}") - return { - "hosts": hosts, - "service_info": service_info - } + self._log_summary() + return self.stats except Exception as e: - logger.error(f"Ошибка при обработке данных Nmap: {e}") - return {"hosts": {}, "service_info": {}} + logger.error(f"Ошибка при выполнении Nmap: {e}") + self.stats['error'] = str(e) + return self.stats + finally: + self.running = False + self.process = None + def stop(self): + """Остановка сканирования Nmap""" + if self.process and self.running: + logger.info("🛑 Stopping Nmap scan") + self.running = False + self.process.terminate() + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() -if __name__ == "__main__": - try: - scanner = NmapScanner("192.168.1.55", "-p 1-10000") - result = scanner.start_scan() - path = "/home/lodqa/attack_module_data/nmap.json" - logger.info(f"Сохранение результатов в файл: {path}") - with open(path, 'w', encoding='utf-8') as file: - json.dump(result, file, ensure_ascii=False, indent=4) - logger.success(f"Результаты успешно сохранены в {path}") - except Exception as e: - logger.error(f"Ошибка в главном блоке: {e}") \ No newline at end of file + def _parse_output(self, xml_output: str, stderr: str): + """ + Анализ XML-вывода Nmap + + :param xml_output: XML-вывод команды nmap + :param stderr: ошибки, если есть + """ + if stderr: + logger.error(f"Nmap errors:\n{stderr}") + + if not xml_output: + logger.warning("No XML output to parse.") + return + + try: + import xml.etree.ElementTree as ET + root = ET.fromstring(xml_output) + + # Подсчет хостов + for host in root.findall(".//host"): + status = host.find("status").get("state") + if status == "up": + self.stats['hosts_up'] += 1 + else: + self.stats['hosts_down'] += 1 + + # Извлечение информации о хосте + ip = host.find("address[@addrtype='ipv4']").get("addr") + host_data = { + "ip": ip, + "state": status, + "ports": [], + "osmatch": [], + "macaddress": {} + } + + # Извлечение MAC-адреса + mac = host.find("address[@addrtype='mac']") + if mac is not None: + host_data["macaddress"] = {"addr": mac.get("addr"), "vendor": mac.get("vendor", "Unknown")} + + # Извлечение информации о портах + ports = host.find("ports") + if ports is not None: + for port in ports.findall("port"): + port_info = { + "portid": port.get("portid"), + "protocol": port.get("protocol"), + "state": port.find("state").get("state"), + "service": {} + } + service = port.find("service") + if service is not None: + port_info["service"] = { + "name": service.get("name", "Unknown"), + "product": service.get("product", ""), + "version": service.get("version", ""), + "extrainfo": service.get("extrainfo", "") + } + host_data["ports"].append(port_info) + + # Извлечение информации об ОС + os = host.find("os") + if os is not None: + for osmatch in os.findall("osmatch"): + host_data["osmatch"].append({ + "name": osmatch.get("name", "Unknown"), + "accuracy": osmatch.get("accuracy", "0") + }) + + self.stats['hosts'][ip] = host_data + + logger.debug(f"Parsed hosts: {self.stats['hosts']}") + + except Exception as e: + logger.error(f"Ошибка при разборе XML вывода Nmap: {e}") + self.stats['error'] = str(e) + + def _log_summary(self): + """Логирование результатов сканирования""" + if 'error' in self.stats: + logger.error(f"❌ Scan failed: {self.stats['error']}") + return + + logger.success("📊 Nmap scan summary:") + logger.success(f" Target: {self.ip}") + logger.success(f" Scan type: {self.scan_type}") + logger.success(f" Hosts up: {self.stats['hosts_up']}") + logger.success(f" Hosts down: {self.stats['hosts_down']}") + logger.success(f" Duration: {self.stats['duration']:.2f} seconds") \ No newline at end of file diff --git a/src/core/api/openvas.py b/src/core/api/openvas.py new file mode 100644 index 0000000..c1f6180 --- /dev/null +++ b/src/core/api/openvas.py @@ -0,0 +1,76 @@ +from gvm.connections import TLSConnection +from gvm.protocols.gmp import GMP +from src.utils.logger import get_logger + +logger = get_logger("openvas_scanner") + +class OpenvasScanner: + def __init__(self, hostname, port, username, password): + """Инициализация сканера OpenVAS""" + try: + self.connection = TLSConnection(hostname=hostname, port=port) + self.gmp = GMP(self.connection) + self.gmp.authenticate(username, password) + self.configurations = {config.get_name(): config.get_id() for config in self.gmp.get_configurations().get_configurations()} + logger.info(f"Connected to GVM at {hostname}:{port}") + except Exception as e: + logger.error(f"Failed to connect to GVM: {e}") + raise + + def get_configurations(self): + """Получить список конфигураций сканирования""" + return list(self.configurations.keys()) + + def start_scan(self, scan_name, targets, scan_config_name): + """Начать сканирование""" + try: + config_id = self.configurations.get(scan_config_name) + if not config_id: + raise ValueError(f"Конфигурация {scan_config_name} не найдена") + # Создать цель + target_response = self.gmp.create_target( + name=f"Target for {scan_name}", + hosts=targets.split(',') + ) + target_id = target_response.get_id() + # Создать задачу + task_response = self.gmp.create_task( + name=scan_name, + config_id=config_id, + target_id=target_id + ) + task_id = task_response.get_id() + # Запустить задачу + self.gmp.start_task(task_id) + logger.info(f"Started scan with task ID: {task_id}") + return task_id + except Exception as e: + logger.error(f"Failed to start scan: {e}") + raise + + def get_task_status(self, task_id): + """Получить статус задачи""" + try: + tasks = self.gmp.get_tasks(filter_string=f"id={task_id}").get_tasks() + if tasks: + return tasks[0].get_status() + logger.warning(f"Task {task_id} not found") + return None + except Exception as e: + logger.error(f"Failed to get task status: {e}") + raise + + def get_report(self, task_id): + """Получить отчет по задаче""" + try: + reports = self.gmp.get_reports(filter_string=f"task={task_id}") + if reports.get_reports(): + report_id = reports.get_reports()[0].get_id() + report = self.gmp.get_report(report_id).get_report() + logger.info(f"Retrieved report for task {task_id}") + return report + logger.warning(f"No report found for task {task_id}") + return None + except Exception as e: + logger.error(f"Failed to get report: {e}") + raise \ No newline at end of file diff --git a/src/core/database/managers/scanner_manager.py b/src/core/database/managers/scanner_manager.py index 9b014b0..2e9a344 100644 --- a/src/core/database/managers/scanner_manager.py +++ b/src/core/database/managers/scanner_manager.py @@ -1,4 +1,4 @@ -from src.core.models.models import Scan, Host, ModbusScanResult +from src.core.models.models import Scan, Host, Port, Service, CPE, ModbusScanResult, Attack, AttackResult, Session from src.core.database.database import Database from src.utils.logger import get_logger from typing import List, Optional @@ -9,8 +9,41 @@ class ScannerManager: def __init__(self, db: Database): self.db = db + def save_session(self, session: Session) -> Optional[Session]: + """Save a Session object to the database.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + INSERT INTO sessions (name, created_at) + VALUES (?, CURRENT_TIMESTAMP) + RETURNING * + """, + (session.name,) + ) + row = cursor.fetchone() + if row: + return Session( + id=row["id"], + name=row["name"], + created_at=row["created_at"] + ) + return None + + def get_session_by_id(self, session_id: int) -> Optional[Session]: + """Retrieve a Session by ID.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM sessions WHERE id = ?", (session_id,)) + row = cursor.fetchone() + if row: + return Session( + id=row["id"], + name=row["name"], + created_at=row["created_at"] + ) + return None + def save_scan(self, scan: Scan) -> Optional[Scan]: - """Сохранение сканирования в базу данных""" + """Save a Scan object to the database.""" with self.db.get_cursor() as cursor: cursor.execute( """ @@ -34,7 +67,7 @@ class ScannerManager: return None def update_scan(self, scan: Scan) -> bool: - """Обновление информации о сканировании""" + """Update an existing Scan object.""" with self.db.get_cursor() as cursor: cursor.execute( """ @@ -47,7 +80,7 @@ class ScannerManager: return cursor.rowcount > 0 def get_scan_by_id(self, scan_id: int) -> Optional[Scan]: - """Получение сканирования по ID""" + """Retrieve a Scan by ID.""" with self.db.get_cursor() as cursor: cursor.execute("SELECT * FROM scans WHERE id = ?", (scan_id,)) row = cursor.fetchone() @@ -64,10 +97,10 @@ class ScannerManager: return None def get_scans_by_session(self, session_id: int) -> List[Scan]: - """Получение всех сканирований для сессии""" + """Retrieve all Scans for a session.""" with self.db.get_cursor() as cursor: cursor.execute( - "SELECT * FROM scans WHERE session_id = ? ORDER BY created_at DESC", + "SELECT * FROM scans WHERE session_id = ? ORDER BY created_at ASC", (session_id,) ) rows = cursor.fetchall() @@ -84,7 +117,7 @@ class ScannerManager: ] def save_host(self, scan_id: int, host: Host) -> Optional[Host]: - """Сохранение хоста в базу данных""" + """Save a Host object to the database.""" with self.db.get_cursor() as cursor: cursor.execute( """ @@ -105,8 +138,132 @@ class ScannerManager: ) return None + def get_hosts_by_scan(self, scan_id: int) -> List[Host]: + """Retrieve all Hosts for a scan.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM hosts WHERE scan_id = ?", (scan_id,)) + rows = cursor.fetchall() + return [ + Host( + id=row["id"], + scan_id=row["scan_id"], + ip=row["ip"], + mac=row["mac"], + os=row["os"] + ) for row in rows + ] + + def save_port(self, host_id: int, port: Port) -> Optional[Port]: + """Save a Port object to the database.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + INSERT INTO ports (host_id, protocol, port_num, state) + VALUES (?, ?, ?, ?) + RETURNING * + """, + (host_id, port.protocol, port.port_num, port.state) + ) + row = cursor.fetchone() + if row: + return Port( + id=row["id"], + host_id=row["host_id"], + protocol=row["protocol"], + port_num=row["port_num"], + state=row["state"] + ) + return None + + def get_ports_by_host(self, host_id: int) -> List[Port]: + """Retrieve all Ports for a host.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM ports WHERE host_id = ?", (host_id,)) + rows = cursor.fetchall() + return [ + Port( + id=row["id"], + host_id=row["host_id"], + protocol=row["protocol"], + port_num=row["port_num"], + state=row["state"] + ) for row in rows + ] + + def save_service(self, port_id: int, service: Service) -> Optional[Service]: + """Save a Service object to the database.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + INSERT INTO service (port_id, name, product, extrainfo, ostype) + VALUES (?, ?, ?, ?, ?) + RETURNING * + """, + (port_id, service.name, service.product, service.extrainfo, service.ostype) + ) + row = cursor.fetchone() + if row: + return Service( + id=row["id"], + port_id=row["port_id"], + name=row["name"], + product=row["product"], + extrainfo=row["extrainfo"], + ostype=row["ostype"] + ) + return None + + def get_services_by_port(self, port_id: int) -> List[Service]: + """Retrieve all Services for a port.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM service WHERE port_id = ?", (port_id,)) + rows = cursor.fetchall() + return [ + Service( + id=row["id"], + port_id=row["port_id"], + name=row["name"], + product=row["product"], + extrainfo=row["extrainfo"], + ostype=row["ostype"] + ) for row in rows + ] + + def save_cpe(self, service_id: int, cpe: CPE) -> Optional[CPE]: + """Save a CPE object to the database.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + INSERT INTO cpe (service_id, name) + VALUES (?, ?) + RETURNING * + """, + (service_id, cpe.name) + ) + row = cursor.fetchone() + if row: + return CPE( + id=row["id"], + service_id=row["service_id"], + name=row["name"] + ) + return None + + def get_cpes_by_service(self, service_id: int) -> List[CPE]: + """Retrieve all CPEs for a service.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM cpe WHERE service_id = ?", (service_id,)) + rows = cursor.fetchall() + return [ + CPE( + id=row["id"], + service_id=row["service_id"], + name=row["name"] + ) for row in rows + ] + def save_modbus_result(self, scan_id: int, result: dict) -> Optional[ModbusScanResult]: - """Сохранение результатов сканирования Modbus""" + """Save Modbus scan results to the database.""" with self.db.get_cursor() as cursor: cursor.execute( """ @@ -138,7 +295,7 @@ class ScannerManager: return None def get_modbus_result(self, scan_id: int) -> Optional[ModbusScanResult]: - """Получение результатов сканирования Modbus по ID сканирования""" + """Retrieve Modbus scan results by scan ID.""" with self.db.get_cursor() as cursor: cursor.execute( "SELECT * FROM modbus_scan_result WHERE scan_id = ?", @@ -154,4 +311,111 @@ class ScannerManager: active_holding_registers=row["active_holding_registers"], active_input_registers=row["active_input_registers"] ) - return None \ No newline at end of file + return None + + def save_attack(self, attack: Attack) -> Optional[Attack]: + """Save an Attack object to the database.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + INSERT INTO attacks (session_id, tool, args, summary, duration, created_at) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + RETURNING * + """, + (attack.session_id, attack.tool, attack.args, attack.summary, attack.duration) + ) + row = cursor.fetchone() + if row: + return Attack( + id=row["id"], + session_id=row["session_id"], + tool=row["tool"], + args=row["args"], + summary=row["summary"], + duration=row["duration"], + created_at=row["created_at"] + ) + return None + + def update_attack(self, attack: Attack) -> bool: + """Update an existing Attack object.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + UPDATE attacks + SET summary = ?, duration = ? + WHERE id = ? + """, + (attack.summary, attack.duration, attack.id) + ) + return cursor.rowcount > 0 + + def get_attack_by_id(self, attack_id: int) -> Optional[Attack]: + """Retrieve an Attack by ID.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM attacks WHERE id = ?", (attack_id,)) + row = cursor.fetchone() + if row: + return Attack( + id=row["id"], + session_id=row["session_id"], + tool=row["tool"], + args=row["args"], + summary=row["summary"], + duration=row["duration"], + created_at=row["created_at"] + ) + return None + + def get_attacks_by_session(self, session_id: int) -> List[Attack]: + """Retrieve all Attacks for a session.""" + with self.db.get_cursor() as cursor: + cursor.execute( + "SELECT * FROM attacks WHERE session_id = ? ORDER BY created_at DESC", + (session_id,) + ) + rows = cursor.fetchall() + return [ + Attack( + id=row["id"], + session_id=row["session_id"], + tool=row["tool"], + args=row["args"], + summary=row["summary"], + duration=row["duration"], + created_at=row["created_at"] + ) for row in rows + ] + + def save_attack_result(self, attack_id: int, attack_result: AttackResult) -> Optional[AttackResult]: + """Save an AttackResult object to the database.""" + with self.db.get_cursor() as cursor: + cursor.execute( + """ + INSERT INTO attack_result (attack_id, summary) + VALUES (?, ?) + RETURNING * + """, + (attack_id, attack_result.summary) + ) + row = cursor.fetchone() + if row: + return AttackResult( + id=row["id"], + attack_id=row["attack_id"], + summary=row["summary"] + ) + return None + + def get_attack_results_by_attack(self, attack_id: int) -> List[AttackResult]: + """Retrieve all AttackResults for an attack.""" + with self.db.get_cursor() as cursor: + cursor.execute("SELECT * FROM attack_result WHERE attack_id = ?", (attack_id,)) + rows = cursor.fetchall() + return [ + AttackResult( + id=row["id"], + attack_id=row["attack_id"], + summary=row["summary"] + ) for row in rows + ] \ No newline at end of file diff --git a/src/core/models/models.py b/src/core/models/models.py index 641f317..13e7dae 100644 --- a/src/core/models/models.py +++ b/src/core/models/models.py @@ -1,4 +1,3 @@ -# src/core/models/models.py from dataclasses import dataclass from datetime import datetime from typing import Optional @@ -41,6 +40,7 @@ class Service: port_id: Optional[int] = None name: Optional[str] = None product: Optional[str] = None + version: Optional[str] = None # Added version field extrainfo: Optional[str] = None ostype: Optional[str] = None diff --git a/src/ui/attack_window.py b/src/ui/attack_window.py new file mode 100644 index 0000000..ba834c0 --- /dev/null +++ b/src/ui/attack_window.py @@ -0,0 +1,497 @@ +import dearpygui.dearpygui as dpg +from src.core.models.models import Session +from src.core.models.hping_test import HpingTestConfig +from src.core.models.modbus_load_test import LoadTestConfig, WriteTask +from src.core.attacks.hping_test import Hping3Tester +from src.core.attacks.modbus_load_test import ModbusLoadTester +from src.utils.logger import get_logger +import threading +import time +import os + +logger = get_logger("attack_window") + +# Глобальные переменные для управления атаками +hping_tester = None +modbus_tester = None +modbus_tasks = [] # Список задач для Modbus атаки + +def create_attack_window(db, session: Session, parent=None): + """Окно эксплуатации (с поддержкой родительского контейнера)""" + global modbus_tasks + modbus_tasks = [] # Сброс задач при открытии окна + + if parent: + with dpg.child_window( + parent=parent, + tag="attack_window", + width=-1, + height=-1 + ): + build_attack_content(session) + else: + with dpg.child_window( + tag="attack_window", + width=-1, + height=-1 + ): + build_attack_content(session) + +def build_attack_content(session): + """Построение содержимого окна эксплуатации""" + dpg.add_text("Эксплуатация уязвимостей", color=(255, 255, 255)) + dpg.add_separator() + dpg.add_spacer(height=10) + + with dpg.tab_bar(tag="attack_tabs"): + # Вкладка Metasploit (заглушка) + with dpg.tab(tag="metasploit_tab", label="Metasploit"): + create_metasploit_tab() + + # Вкладка hping3 (нагрузочное тестирование) + with dpg.tab(tag="hping_tab", label="Нагрузочное тестирование (hping3)"): + create_hping_tab() + + # Вкладка Modbus (атаки на Modbus) + with dpg.tab(tag="modbus_attack_tab", label="Атаки на Modbus"): + create_modbus_attack_tab() + +def create_metasploit_tab(): + """Вкладка Metasploit (заглушка)""" + with dpg.group(): + dpg.add_text("Интеграция с Metasploit будет реализована в будущих версиях", color=(200, 200, 200)) + dpg.add_spacer(height=20) + dpg.add_text("Функционал позволит:") + dpg.add_text("- Поиск и использование эксплойтов") + dpg.add_text("- Управление сессиями Meterpreter") + dpg.add_text("- Автоматизация тестов на проникновение") + dpg.add_spacer(height=20) + dpg.add_button( + label="Проверить подключение к Metasploit", + callback=lambda: dpg.set_value("metasploit_output", "❌ Metasploit не подключен\n"), + width=300, + height=40 + ) + dpg.add_spacer(height=10) + dpg.add_text("Вывод:") + dpg.add_input_text( + tag="metasploit_output", + multiline=True, + height=300, + width=-1, + readonly=True + ) + +def create_hping_tab(): + """Вкладка hping3 (нагрузочное тестирование)""" + with dpg.group(): + with dpg.group(tag="hping_config_group"): + dpg.add_text("Конфигурация нагрузочного теста") + + # Основные параметры + with dpg.group(horizontal=True): + dpg.add_text("Цель:") + dpg.add_input_text(tag="hping_target", width=200) + + dpg.add_text("Порт:") + dpg.add_input_text(tag="hping_port", default_value="80", width=150) + + with dpg.group(horizontal=True): + dpg.add_text("Тип теста:") + dpg.add_combo( + tag="hping_test_type", + items=["TCP", "UDP", "ICMP"], + default_value="TCP", + width=150 + ) + + dpg.add_text("Размер пакета:") + dpg.add_input_int( + tag="hping_packet_size", + default_value=64, + width=150 + ) + + # Дополнительные параметры + with dpg.collapsing_header(label="Дополнительные параметры"): + with dpg.group(horizontal=True): + dpg.add_text("Количество пакетов:") + dpg.add_input_int( + tag="hping_packet_count", + default_value=1000, + width=150 + ) + + dpg.add_text("Интервал (сек):") + dpg.add_input_float( + tag="hping_interval", + default_value=0.1, + width=150 + ) + + with dpg.group(horizontal=True): + dpg.add_text("Подменный IP:") + dpg.add_input_text(tag="hping_spoof_ip", width=150) + + dpg.add_text("Флаги TCP:") + dpg.add_input_text(tag="hping_flags", width=150) + + dpg.add_checkbox( + tag="hping_flood_mode", + label="Режим флуда" + ) + dpg.add_checkbox( + tag="hping_verbose", + label="Подробный вывод" + ) + + # Управление тестом + with dpg.group(horizontal=True): + dpg.add_button( + label="Начать тест", + callback=start_hping_test, + width=150, + height=40 + ) + dpg.add_button( + label="Остановить тест", + callback=stop_hping_test, + width=150, + height=40 + ) + + dpg.add_spacer(height=10) + dpg.add_text("Вывод:") + dpg.add_input_text( + tag="hping_output", + multiline=True, + height=300, + width=-1, + readonly=True + ) + +def create_modbus_attack_tab(): + """Вкладка Modbus (атаки)""" + global modbus_tasks + + with dpg.group(): + with dpg.group(tag="modbus_attack_config_group"): + dpg.add_text("Конфигурация атаки на Modbus") + + # Основные параметры + with dpg.group(horizontal=True): + dpg.add_text("Целевой хост:") + dpg.add_input_text(tag="modbus_target_host", default_value="192.168.1.100", width=200) + + dpg.add_text("Порт:") + dpg.add_input_text(tag="modbus_port", default_value="502", width=150) + + with dpg.group(horizontal=True): + dpg.add_text("Потоки:") + dpg.add_input_int(tag="modbus_threads", default_value=5, width=150) + + dpg.add_text("Длительность (сек):") + dpg.add_input_int(tag="modbus_duration", default_value=60, width=150) + + # Задачи записи + with dpg.collapsing_header(label="Задачи записи"): + dpg.add_text("Добавьте задачи записи в регистры:") + + # Кнопка добавления задачи + dpg.add_button( + label="Добавить задачу", + callback=open_modbus_task_window, + width=200 + ) + dpg.add_spacer(height=10) + + # Контейнер для отображения задач + dpg.add_child_window( + tag="modbus_tasks_container", + height=200, + width=-1, + border=True + ) + update_modbus_tasks_display() + + # Кнопки управления + with dpg.group(horizontal=True): + dpg.add_button( + label="Начать атаку", + callback=start_modbus_attack, + width=200, + height=40 + ) + dpg.add_button( + label="Остановить атаку", + callback=stop_modbus_attack, + width=200, + height=40 + ) + + dpg.add_spacer(height=10) + dpg.add_text("Вывод:") + dpg.add_input_text( + tag="modbus_attack_output", + multiline=True, + height=300, + width=-1, + readonly=True + ) + +def open_modbus_task_window(): + """Открыть модальное окно для добавления задачи Modbus""" + if dpg.does_item_exist("modbus_task_window"): + dpg.delete_item("modbus_task_window") + + # Создаем модальное окно + with dpg.window( + tag="modbus_task_window", + label="Добавить задачу Modbus", + modal=True, + no_close=True, + width=700, + height=400, + pos=[300, 200] + ): + dpg.add_text("Тип данных:") + dpg.add_combo( + tag="task_data_type", + items=["coil", "holding_register"], + default_value="coil", + width=200 + ) + + dpg.add_text("Адрес регистра:") + dpg.add_input_int( + tag="task_address", + default_value=0, + min_value=0, + min_clamped=True, + width=200 + ) + + dpg.add_text("Значение:") + dpg.add_input_int( + tag="task_value", + default_value=0, + width=200 + ) + + dpg.add_text("Интервал между запросами (сек):") + dpg.add_input_float( + tag="task_interval", + default_value=0.5, + min_value=0.01, + min_clamped=True, + width=200 + ) + + dpg.add_text("Количество повторений (0 = бесконечно):") + dpg.add_input_int( + tag="task_count", + default_value=10, + min_value=0, + min_clamped=True, + width=200 + ) + + dpg.add_spacer(height=20) + + with dpg.group(horizontal=True): + dpg.add_button( + label="Добавить", + callback=save_modbus_task, + width=150, + height=30 + ) + dpg.add_button( + label="Отмена", + callback=lambda: dpg.delete_item("modbus_task_window"), + width=150, + height=30 + ) + +def save_modbus_task(): + """Сохранить задачу и добавить в список""" + global modbus_tasks + + # Собираем данные из формы + task = { + "data_type": dpg.get_value("task_data_type"), + "address": dpg.get_value("task_address"), + "value": dpg.get_value("task_value"), + "interval": dpg.get_value("task_interval"), + "count": dpg.get_value("task_count") + } + + # Добавляем задачу в глобальный список + modbus_tasks.append(task) + + # Обновляем отображение задач + update_modbus_tasks_display() + + # Закрываем окно + dpg.delete_item("modbus_task_window") + dpg.set_value("modbus_attack_output", "✅ Задача добавлена\n") + +def update_modbus_tasks_display(): + """Обновить отображение списка задач""" + if dpg.does_item_exist("modbus_tasks_container"): + dpg.delete_item("modbus_tasks_container", children_only=True) + else: + return + + with dpg.child_window(parent="modbus_tasks_container", tag="tasks_list", height=200, width=-1): + if not modbus_tasks: + dpg.add_text("Задачи не добавлены", color=(150, 150, 150)) + return + + for i, task in enumerate(modbus_tasks): + with dpg.group(horizontal=True): + dpg.add_text(f"{i+1}. {task['data_type']} @{task['address']} = {task['value']}") + dpg.add_text(f"Интервал: {task['interval']}с, Повторов: {task['count']}") + dpg.add_button( + label="Удалить", + callback=lambda s, a, u: remove_modbus_task(u), + user_data=i, + width=150 + ) + +def remove_modbus_task(index): + """Удалить задачу из списка""" + global modbus_tasks + if 0 <= index < len(modbus_tasks): + modbus_tasks.pop(index) + update_modbus_tasks_display() + dpg.set_value("modbus_attack_output", f"🗑️ Задача {index+1} удалена\n") + +def start_hping_test(): + """Запуск нагрузочного теста hping3""" + global hping_tester + + # Получение параметров из UI + config = HpingTestConfig( + target=dpg.get_value("hping_target"), + dest_port=int(dpg.get_value("hping_port")), + test_type=dpg.get_value("hping_test_type").lower(), + packet_size=dpg.get_value("hping_packet_size"), + count=dpg.get_value("hping_packet_count"), + interval=str(dpg.get_value("hping_interval")), + spoof_ip=dpg.get_value("hping_spoof_ip"), + flags=dpg.get_value("hping_flags"), + flood=dpg.get_value("hping_flood_mode"), + verbose=dpg.get_value("hping_verbose") + ) + + # Проверка обязательных параметров + if not config.target: + dpg.set_value("hping_output", "❌ Ошибка: Не указана цель теста\n") + return + + # Создание тестера + hping_tester = Hping3Tester(config) + + # Запуск в отдельном потоке + thread = threading.Thread( + target=run_hping_test, + args=(hping_tester,) + ) + thread.start() + + dpg.set_value("hping_output", "🚀 Запуск нагрузочного теста...\n") + +def run_hping_test(tester): + """Выполнение нагрузочного теста hping3 (в потоке)""" + try: + result = tester.run() + + # Формирование отчета + output = "📊 Результаты нагрузочного теста:\n" + output += f"Цель: {result['target']}\n" + output += f"Протокол: {result['protocol'].upper()}\n" + output += f"Отправлено пакетов: {result['sent']}\n" + output += f"Получено пакетов: {result['received']}\n" + output += f"Потеряно пакетов: {result['loss_percent']}%\n" + output += f"Время выполнения: {result['duration']:.2f} сек\n" + output += f"Скорость: {result['pps']:.2f} пакетов/сек\n" + + dpg.set_value("hping_output", output) + + except Exception as e: + dpg.set_value("hping_output", f"❌ Ошибка: {str(e)}\n") + +def stop_hping_test(): + """Остановка нагрузочного теста hping3""" + global hping_tester + if hping_tester: + hping_tester.stop() + dpg.set_value("hping_output", dpg.get_value("hping_output") + "🛑 Тест остановлен пользователем\n") + +def start_modbus_attack(): + """Запуск атаки на Modbus""" + global modbus_tester, modbus_tasks + + # Получение параметров из UI + config = LoadTestConfig( + host=dpg.get_value("modbus_target_host"), + port=int(dpg.get_value("modbus_port")), + threads=dpg.get_value("modbus_threads"), + duration=dpg.get_value("modbus_duration") + ) + + # Создаем задачи из глобального списка + tasks = [] + for task in modbus_tasks: + tasks.append(WriteTask( + data_type=task['data_type'], + address=task['address'], + value=task['value'], + interval=task['interval'], + count=task['count'] + )) + + if not tasks: + dpg.set_value("modbus_attack_output", "❌ Ошибка: Не добавлены задачи записи\n") + return + + config.tasks = tasks + + # Создание тестера + modbus_tester = ModbusLoadTester(config) + + # Запуск в отдельном потоке + thread = threading.Thread( + target=run_modbus_attack, + args=(modbus_tester,) + ) + thread.start() + + dpg.set_value("modbus_attack_output", "🚀 Запуск атаки на Modbus...\n") + +def run_modbus_attack(tester): + """Выполнение атаки на Modbus (в потоке)""" + try: + result = tester.run() + + # Формирование отчета + output = "📊 Результаты атаки на Modbus:\n" + output += f"Цель: {tester.config.host}:{tester.config.port}\n" + output += f"Всего запросов: {result['total_requests']}\n" + output += f"Успешных запросов: {result['successful_requests']}\n" + output += f"Неудачных запросов: {result['failed_requests']}\n" + output += f"Ошибок потоков: {result['thread_errors']}\n" + output += f"Время выполнения: {result['duration']:.2f} сек\n" + output += f"Скорость: {result['requests_per_sec']:.2f} запросов/сек\n" + + dpg.set_value("modbus_attack_output", output) + + except Exception as e: + dpg.set_value("modbus_attack_output", f"❌ Ошибка: {str(e)}\n") + +def stop_modbus_attack(): + """Остановка атаки на Modbus""" + global modbus_tester + if modbus_tester: + modbus_tester.stop() + dpg.set_value("modbus_attack_output", dpg.get_value("modbus_attack_output") + "🛑 Атака остановлена пользователем\n") \ No newline at end of file diff --git a/src/ui/configuration_window.py b/src/ui/configuration_window.py index fc14d4a..5bec901 100644 --- a/src/ui/configuration_window.py +++ b/src/ui/configuration_window.py @@ -1,5 +1,6 @@ import dearpygui.dearpygui as dpg from src.ui.scanner_window import create_scanner_window +from src.ui.logger_window import create_logger_window from src.core.models.models import Session from src.utils.logger import get_logger @@ -40,7 +41,7 @@ def create_configuration_window(db, session: Session): dpg.add_button( label="Эксплуатация", - callback=lambda: logger.info("Attacks selected"), + callback=lambda: show_attack_window(db, session), width=-1, height=50 ) @@ -56,7 +57,7 @@ def create_configuration_window(db, session: Session): dpg.add_button( label="Лог приложения", - callback=lambda: logger.info("Log selected"), + callback=lambda: show_logger_window(db, session), width=-1, height=50 ) @@ -89,4 +90,24 @@ def show_scanner_window(db, session): dpg.delete_item("content_area", children_only=True) # Создать окно сканирования внутри контентной области - create_scanner_window(db, session, parent="content_area") \ No newline at end of file + create_scanner_window(db, session, parent="content_area") + +def show_logger_window(db, session): + """Показать окно логов""" + # Очистить контентную область + if dpg.does_item_exist("content_area"): + dpg.delete_item("content_area", children_only=True) + + # Создать окно логов внутри контентной области + create_logger_window(db, session, parent="content_area") + +def show_attack_window(db, session): + """Показать окно эксплуатации""" + # Очистить контентную область + if dpg.does_item_exist("content_area"): + dpg.delete_item("content_area", children_only=True) + + # Создать новую контентную область + # with dpg.child_window(parent="config_window", tag="content_area", width=-1, height=-1): + from src.ui.attack_window import create_attack_window + create_attack_window(db, session, parent="content_area") \ No newline at end of file diff --git a/src/ui/logger_window.py b/src/ui/logger_window.py new file mode 100644 index 0000000..2c07f43 --- /dev/null +++ b/src/ui/logger_window.py @@ -0,0 +1,79 @@ +import dearpygui.dearpygui as dpg +from src.utils.logger import get_logger +import os + +logger = get_logger("logger_window") + +LOG_FILE_PATH = "/home/lodqa/attack_module_data/logs/security-scanner.log" + +def create_logger_window(db, session, parent=None): + """Окно просмотра логов приложения""" + try: + # Создаем дочернее окно внутри родительского контейнера (если указан) + with dpg.child_window( + parent=parent, + tag="logger_window", + width=-1, + height=-1, + border=False + ): + build_logger_content(db, session) + except Exception as e: + logger.error(f"Failed to create logger window: {e}") + dpg.add_text(f"Ошибка при создании окна логов: {str(e)}", color=(255, 100, 100)) + +def build_logger_content(db, session): + """Построение содержимого окна логов""" + dpg.add_text("Лог приложения", color=(255, 255, 255)) + dpg.add_separator() + dpg.add_spacer(height=10) + + # Кнопка для обновления логов + dpg.add_button( + label="Обновить лог", + callback=lambda: refresh_log(), + width=150, + height=40 + ) + dpg.add_spacer(height=10) + + # Область вывода логов + dpg.add_text("Содержимое лога:") + log_output = dpg.add_input_text( + tag="log_output", + multiline=True, + height=-1, + width=-1, + readonly=True + ) + + # Первоначальная загрузка логов + refresh_log() + +def refresh_log(): + """Чтение и обновление содержимого лог-файла""" + try: + if not os.path.exists(LOG_FILE_PATH): + logger.warning(f"Log file not found: {LOG_FILE_PATH}") + dpg.set_value("log_output", f"Ошибка: Файл лога не найден по пути {LOG_FILE_PATH}") + return + + with open(LOG_FILE_PATH, 'r', encoding='utf-8') as log_file: + log_content = log_file.read() + + if not log_content.strip(): + logger.info("Log file is empty") + dpg.set_value("log_output", "Лог-файл пуст") + else: + dpg.set_value("log_output", log_content) + logger.info("Log file loaded successfully") + + except PermissionError: + logger.error(f"Permission denied accessing log file: {LOG_FILE_PATH}") + dpg.set_value("log_output", f"Ошибка: Нет прав доступа к файлу {LOG_FILE_PATH}") + except UnicodeDecodeError: + logger.error(f"Failed to decode log file: {LOG_FILE_PATH}") + dpg.set_value("log_output", f"Ошибка: Не удалось декодировать файл лога {LOG_FILE_PATH}") + except Exception as e: + logger.error(f"Failed to read log file: {e}") + dpg.set_value("log_output", f"Ошибка при чтении лога: {str(e)}") \ No newline at end of file diff --git a/src/ui/scanner_window.py b/src/ui/scanner_window.py index 4654a65..63bacf2 100644 --- a/src/ui/scanner_window.py +++ b/src/ui/scanner_window.py @@ -1,19 +1,24 @@ import dearpygui.dearpygui as dpg -from src.core.models.models import Session, Scan, Host +from src.core.models.models import Session, Scan, Host, Port, Service from src.core.database.managers.scanner_manager import ScannerManager +from src.core.api.nmap import NmapScanner +from src.core.attacks.modbus_scan import ModbusScanner from src.utils.logger import get_logger import threading import time +from datetime import datetime logger = get_logger("scanner_window") +# Глобальные переменные для управления сканерами +nmap_scanner = None +modbus_scanner = None + def create_scanner_window(db, session: Session, parent=None): """Окно сканирования (с поддержкой родительского контейнера)""" scanner_service = ScannerManager(db) - # Если указан родитель, создаем внутри него if parent: - print(parent) with dpg.child_window( parent=parent, tag="scanner_window", @@ -31,53 +36,42 @@ def create_scanner_window(db, session: Session, parent=None): def build_scanner_content(scanner_service, session): """Построение содержимого окна сканирования""" - # Заголовок dpg.add_text("Сканирование", color=(255, 255, 255)) dpg.add_separator() dpg.add_spacer(height=10) - # Вкладки with dpg.tab_bar(tag="scan_tabs"): - # Вкладка Nmap with dpg.tab(tag="nmap_tab", label="Nmap Scanning"): create_nmap_tab(scanner_service, session) - # Вкладка OpenVAS with dpg.tab(tag="openvas_tab", label="OpenVAS Scanning"): dpg.add_text("OpenVAS scanning will be implemented here") - # Вкладка Modbus with dpg.tab(tag="modbus_tab", label="Modbus Scanning"): create_modbus_tab(scanner_service, session) - # Вкладка результатов with dpg.tab(tag="results_tab", label="Scan Results"): create_results_tab(scanner_service, session) def create_nmap_tab(scanner_service, session): """Вкладка Nmap сканирования""" with dpg.group(): - # Конфигурация сканирования with dpg.group(tag="nmap_config_group"): - dpg.add_text("Scan Configuration") - - dpg.add_text("Target:") + dpg.add_text("Конфигурация сканирования") + dpg.add_text("Цель:") target_input = dpg.add_input_text(tag="nmap_target", width=300) - - dpg.add_text("Scan Type:") + dpg.add_text("Тип сканирования:") scan_type = dpg.add_combo( tag="nmap_scan_type", items=["Quick Scan", "Intensive Scan", "Full Scan", "Custom Scan"], default_value="Quick Scan", width=200 ) - - dpg.add_text("Custom Arguments:") + dpg.add_text("Пользовательские аргументы:") custom_args = dpg.add_input_text(tag="nmap_custom_args", width=300) - # Кнопка запуска dpg.add_button( - label="Start Nmap Scan", + label="Начать сканирование Nmap", callback=lambda: start_nmap_scan( scanner_service, session, @@ -86,227 +80,367 @@ def create_nmap_tab(scanner_service, session): dpg.get_value(custom_args) ), width=200, - height=40 # Увеличена высота + height=40 ) dpg.add_spacer(height=10) - # Область вывода - dpg.add_text("Output:") - output_area = dpg.add_input_text( + dpg.add_button( + label="Остановить сканирование", + callback=lambda: stop_nmap_scan(), + width=200, + height=40 + ) + dpg.add_spacer(height=10) + + dpg.add_text("Вывод:") + dpg.add_input_text( tag="nmap_output", multiline=True, height=300, - width=-1, # Растянуть на всю ширину - readonly=True - ) - -def create_modbus_tab(scanner_service, session): - """Вкладка Modbus сканирования""" - with dpg.group(): - # Конфигурация сканирования - with dpg.group(tag="modbus_config_group"): - dpg.add_text("Modbus Scan Configuration") - - dpg.add_text("Target IP:") - target_input = dpg.add_input_text(tag="modbus_target", width=300) - - dpg.add_text("Port:") - port_input = dpg.add_input_text(tag="modbus_port", default_value="502", width=100) - - # Кнопка запуска - dpg.add_button( - label="Start Modbus Scan", - callback=lambda: start_modbus_scan( - scanner_service, - session, - dpg.get_value(target_input), - dpg.get_value(port_input) - ), - width=200, - height=40 # Увеличена высота - ) - dpg.add_spacer(height=10) - - # Область вывода - dpg.add_text("Output:") - output_area = dpg.add_input_text( - tag="modbus_output", - multiline=True, - height=300, - width=-1, # Растянуть на всю ширину - readonly=True - ) - -def create_results_tab(scanner_service, session): - """Вкладка результатов сканирования""" - with dpg.group(): - # Список сканирований - dpg.add_text("Scan History:") - scan_list = dpg.add_listbox( - tag="scan_list", - width=-1, # Растянуть на всю ширину - num_items=10, - callback=lambda s: show_scan_details(scanner_service, dpg.get_value(s)) - ) - - # Загрузка истории - load_scan_history(scanner_service, session, scan_list) - dpg.add_spacer(height=10) - - # Детали сканирования - dpg.add_text("Scan Details:") - scan_details = dpg.add_input_text( - tag="scan_details", - multiline=True, - height=300, - width=-1, # Растянуть на всю ширину + width=-1, readonly=True ) def start_nmap_scan(scanner_service, session, target, scan_type, custom_args): """Запуск Nmap сканирования""" + global nmap_scanner + if not target: logger.warning("Nmap scan started without target") + dpg.set_value("nmap_output", "Ошибка: Не указана цель сканирования\n") return - # Обновить вывод - dpg.set_value("nmap_output", f"Starting {scan_type} scan on {target}...\n") + dpg.set_value("nmap_output", f"🚀 Начало сканирования {scan_type} на {target}...\n") - # Создать объект сканирования scan = Scan( session_id=session.id, tool="nmap", - args=f"Type: {scan_type}, Args: {custom_args}", - summary="In progress" + args=f"Тип: {scan_type}, Аргументы: {custom_args}", + summary="В процессе", + created_at=datetime.utcnow() ) - # Сохранить в базу saved_scan = scanner_service.save_scan(scan) if saved_scan: - dpg.set_value("nmap_output", dpg.get_value("nmap_output") + f"Scan ID: {saved_scan.id}\n") + dpg.set_value("nmap_output", dpg.get_value("nmap_output") + f"📝 ID сканирования: {saved_scan.id}\n") + + nmap_scanner = NmapScanner( + ip=target, + scan_type=scan_type, + custom_args=custom_args + ) - # Запуск в отдельном потоке thread = threading.Thread( target=run_nmap_scan, - args=(scanner_service, session, target, saved_scan.id) + args=(scanner_service, session, saved_scan.id, nmap_scanner) ) thread.start() -def run_nmap_scan(scanner_service, session, target, scan_id): +def stop_nmap_scan(): + """Остановка Nmap сканирования""" + global nmap_scanner + if nmap_scanner: + nmap_scanner.stop() + dpg.set_value("nmap_output", dpg.get_value("nmap_output") + "🛑 Сканирование остановлено пользователем\n") + logger.info("Nmap scan stopped by user") + +def run_nmap_scan(scanner_service, session, scan_id, scanner): """Выполнение Nmap сканирования (в потоке)""" try: - # Имитация сканирования - output = dpg.get_value("nmap_output") + "Scanning network...\n" - dpg.set_value("nmap_output", output) - time.sleep(2) + dpg.set_value("nmap_output", dpg.get_value("nmap_output") + "🔍 Сканирование сети...\n") - # Имитация результатов - hosts = [ - {"ip": "192.168.1.100", "mac": "00:11:22:33:44:55", "os": "Linux"}, - {"ip": "192.168.1.101", "mac": "AA:BB:CC:DD:EE:FF", "os": "Windows"} - ] + result = scanner.start_scan() - # Сохранить результаты - for host in hosts: + if 'error' in result: + dpg.set_value("nmap_output", dpg.get_value("nmap_output") + f"❌ Ошибка: {result['error']}\n") + scan = Scan( + id=scan_id, + summary=f"Ошибка: {result['error']}", + duration=result.get('duration', 0), + created_at=datetime.utcnow() + ) + scanner_service.update_scan(scan) + return + + for ip, host_data in result.get("hosts", {}).items(): + os_name = "Неизвестно" + if host_data.get("osmatch"): + os_name = host_data["osmatch"][0].get("name", "Неизвестно") + host_obj = Host( scan_id=scan_id, - ip=host["ip"], - mac=host["mac"], - os=host["os"] + ip=ip, + mac=host_data.get("macaddress", {}).get("addr", "N/A"), + os=os_name ) - scanner_service.save_host(scan_id, host_obj) - output = dpg.get_value("nmap_output") + f"Found host: {host['ip']} ({host['os']})\n" + saved_host = scanner_service.save_host(scan_id, host_obj) + + output = dpg.get_value("nmap_output") + f"🔍 Найден хост: {ip} ({os_name})\n" + output += f"MAC: {host_obj.mac}\n" dpg.set_value("nmap_output", output) + + for port_info in host_data.get("ports", []): + port_id = port_info.get("portid", "N/A") + protocol = port_info.get("protocol", "N/A") + state = port_info.get("state", "unknown") + service = port_info.get("service", {}) + + port_obj = Port( + host_id=saved_host.id, + protocol=protocol, + port_num=int(port_id) if port_id.isdigit() else None, + state=state + ) + saved_port = scanner_service.save_port(saved_host.id, port_obj) + + service_obj = Service( + port_id=saved_port.id, + name=service.get("name", "Unknown"), + product=service.get("product", ""), + version=service.get("version", ""), + extrainfo=service.get("extrainfo", "") + ) + scanner_service.save_service(saved_port.id, service_obj) + + output = dpg.get_value("nmap_output") + output += f" Порт: {port_id}/{protocol} - " + output += f"{service_obj.name} {service_obj.product} {service_obj.version}\n" + dpg.set_value("nmap_output", output) - # Обновить статус scan = Scan( id=scan_id, - summary=f"Found {len(hosts)} hosts", - duration=2 + summary=f"Найдено {result.get('hosts_up', 0)} хостов", + duration=result.get('duration', 0), + created_at=datetime.utcnow() ) scanner_service.update_scan(scan) - output = dpg.get_value("nmap_output") + "Scan completed!\n" + output = dpg.get_value("nmap_output") + output += f"✅ Сканирование завершено за {result.get('duration', 0):.2f} секунд\n" + output += f"Найдено хостов: {result.get('hosts_up', 0)}\n" dpg.set_value("nmap_output", output) - # Обновить список сканирований if dpg.does_item_exist("scan_list"): load_scan_history(scanner_service, session, "scan_list") except Exception as e: - output = dpg.get_value("nmap_output") + f"Error: {str(e)}\n" + output = dpg.get_value("nmap_output") + f"❌ Критическая ошибка: {str(e)}\n" dpg.set_value("nmap_output", output) logger.error(f"Nmap scan failed: {e}") + + scan = Scan( + id=scan_id, + summary=f"Ошибка: {str(e)}", + duration=time.time() - scanner.start_time if hasattr(scanner, 'start_time') else 0, + created_at=datetime.utcnow() + ) + scanner_service.update_scan(scan) -def start_modbus_scan(scanner_service, session, target, port): +def create_modbus_tab(scanner_service, session): + """Вкладка Modbus сканирования""" + with dpg.group(): + with dpg.group(tag="modbus_config_group"): + dpg.add_text("Конфигурация Modbus сканирования") + dpg.add_text("IP-адрес:") + ip_input = dpg.add_input_text(tag="modbus_ip", width=300) + dpg.add_text("Порт:") + port_input = dpg.add_input_int(tag="modbus_port", default_value=502, width=150) + dpg.add_text("Coils (начало - конец):") + with dpg.group(horizontal=True): + coil_start = dpg.add_input_int(tag="modbus_coil_start", default_value=0, width=150) + coil_end = dpg.add_input_int(tag="modbus_coil_end", default_value=100, width=150) + dpg.add_text("Discrete Inputs (начало - конец):") + with dpg.group(horizontal=True): + discrete_start = dpg.add_input_int(tag="modbus_discrete_start", default_value=0, width=150) + discrete_end = dpg.add_input_int(tag="modbus_discrete_end", default_value=100, width=150) + dpg.add_text("Holding Registers (начало - конец):") + with dpg.group(horizontal=True): + holding_start = dpg.add_input_int(tag="modbus_holding_start", default_value=0, width=150) + holding_end = dpg.add_input_int(tag="modbus_holding_end", default_value=100, width=150) + dpg.add_text("Input Registers (начало - конец):") + with dpg.group(horizontal=True): + input_start = dpg.add_input_int(tag="modbus_input_start", default_value=0, width=150) + input_end = dpg.add_input_int(tag="modbus_input_end", default_value=100, width=150) + dpg.add_text("Длительность сканирования (сек):") + duration_input = dpg.add_input_int(tag="modbus_duration", default_value=10, width=150) + dpg.add_text("Размер блока:") + block_size_input = dpg.add_input_int(tag="modbus_block_size", default_value=50, width=150) + + dpg.add_button( + label="Начать Modbus сканирование", + callback=lambda: start_modbus_scan( + scanner_service, + session, + dpg.get_value(ip_input), + dpg.get_value(port_input), + (dpg.get_value(coil_start), dpg.get_value(coil_end)), + (dpg.get_value(discrete_start), dpg.get_value(discrete_end)), + (dpg.get_value(holding_start), dpg.get_value(holding_end)), + (dpg.get_value(input_start), dpg.get_value(input_end)), + dpg.get_value(duration_input), + dpg.get_value(block_size_input) + ), + width=200, + height=40 + ) + dpg.add_spacer(height=10) + + dpg.add_text("Вывод:") + dpg.add_input_text( + tag="modbus_output", + multiline=True, + height=300, + width=-1, + readonly=True + ) + +def start_modbus_scan(scanner_service, session, ip, port, coil_range, discrete_range, holding_range, input_range, duration, block_size): """Запуск Modbus сканирования""" - if not target: - logger.warning("Modbus scan started without target") + global modbus_scanner + + if not ip: + logger.warning("Modbus scan started without IP") + dpg.set_value("modbus_output", "Ошибка: Не указан IP-адрес\n") return - # Обновить вывод - dpg.set_value("modbus_output", f"Starting Modbus scan on {target}:{port}...\n") + try: + if not (0 <= port <= 65535): + raise ValueError("Порт должен быть в диапазоне 0-65535") + if not (coil_range[0] <= coil_range[1] and discrete_range[0] <= discrete_range[1] and + holding_range[0] <= holding_range[1] and input_range[0] <= input_range[1]): + raise ValueError("Начало диапазона должно быть меньше или равно концу") + if duration <= 0: + raise ValueError("Длительность сканирования должна быть положительной") + if block_size <= 0: + raise ValueError("Размер блока должен быть положительным") + except ValueError as e: + logger.warning(f"Invalid Modbus scan parameters: {e}") + dpg.set_value("modbus_output", f"Ошибка: {str(e)}\n") + return + + dpg.set_value("modbus_output", f"🚀 Начало Modbus сканирования на {ip}:{port}...\n") - # Создать объект сканирования scan = Scan( session_id=session.id, tool="modbus", - args=f"Target: {target}, Port: {port}", - summary="In progress" + args=f"IP: {ip}, Port: {port}, Coils: {coil_range}, Discrete: {discrete_range}, Holding: {holding_range}, Input: {input_range}, Duration: {duration}s, Block: {block_size}", + summary="В процессе", + created_at=datetime.utcnow() ) - # Сохранить в базу saved_scan = scanner_service.save_scan(scan) if saved_scan: - dpg.set_value("modbus_output", dpg.get_value("modbus_output") + f"Scan ID: {saved_scan.id}\n") + dpg.set_value("modbus_output", dpg.get_value("modbus_output") + f"📝 ID сканирования: {saved_scan.id}\n") + + modbus_scanner = ModbusScanner( + host=ip, + port=port, + coil_range=coil_range, + discrete_input_range=discrete_range, + holding_register_range=holding_range, + input_register_range=input_range, + scan_duration=duration, + block_size=block_size + ) - # Запуск в отдельном потоке thread = threading.Thread( target=run_modbus_scan, - args=(scanner_service, session, target, int(port), saved_scan.id) + args=(scanner_service, session, saved_scan.id, modbus_scanner) ) thread.start() -def run_modbus_scan(scanner_service, session, target, port, scan_id): +def run_modbus_scan(scanner_service, session, scan_id, scanner): """Выполнение Modbus сканирования (в потоке)""" try: - # Имитация сканирования - output = dpg.get_value("modbus_output") + "Connecting to device...\n" - dpg.set_value("modbus_output", output) - time.sleep(1) + dpg.set_value("modbus_output", dpg.get_value("modbus_output") + "🔍 Подключение к устройству...\n") - # Имитация результатов - results = { - "active_coils": "0, 1, 5, 7", - "active_discrete_inputs": "2, 3", - "active_holding_registers": "100, 101, 105", - "active_input_registers": "200, 201" + start_time = time.time() + results = scanner.scan_registers() + duration = int(time.time() - start_time) + + if results is None: + output = dpg.get_value("modbus_output") + "❌ Ошибка: Не удалось выполнить сканирование, проверьте подключение устройства\n" + dpg.set_value("modbus_output", output) + scan = Scan( + id=scan_id, + summary="Ошибка: Сканирование не выполнено", + duration=duration, + created_at=datetime.utcnow() + ) + scanner_service.update_scan(scan) + return + + # Convert results to strings of active addresses + active_coils = ",".join(str(addr) for addr in sorted(results.active_coils.keys())) if results.active_coils else "" + active_discrete = ",".join(str(addr) for addr in sorted(results.active_discrete_inputs.keys())) if results.active_discrete_inputs else "" + active_holding = ",".join(str(addr) for addr in sorted(results.active_holding_registers.keys())) if results.active_holding_registers else "" + active_input = ",".join(str(addr) for addr in sorted(results.active_input_registers.keys())) if results.active_input_registers else "" + + # Save results to database + modbus_results = { + "active_coils": active_coils, + "active_discrete_inputs": active_discrete, + "active_holding_registers": active_holding, + "active_input_registers": active_input } + scanner_service.save_modbus_result(scan_id, modbus_results) - # Сохранить результаты - scanner_service.save_modbus_result(scan_id, results) - - # Обновить статус + # Update scan status scan = Scan( id=scan_id, - summary="Modbus scan completed", - duration=1 + summary=f"Modbus сканирование завершено: {len(results.active_coils)} coils, {len(results.active_discrete_inputs)} discrete inputs, {len(results.active_holding_registers)} holding registers, {len(results.active_input_registers)} input registers", + duration=duration, + created_at=datetime.utcnow() ) scanner_service.update_scan(scan) - output = dpg.get_value("modbus_output") + "Scan completed!\n" - output += f"Active coils: {results['active_coils']}\n" - output += f"Active holding registers: {results['active_holding_registers']}\n" + # Display results + output = dpg.get_value("modbus_output") + f"✅ Сканирование завершено за {duration:.2f} секунд!\n" + output += f"Активные Coils: {active_coils or 'Нет'}\n" + output += f"Активные Discrete Inputs: {active_discrete or 'Нет'}\n" + output += f"Активные Holding Registers: {active_holding or 'Нет'}\n" + output += f"Активные Input Registers: {active_input or 'Нет'}\n" dpg.set_value("modbus_output", output) - # Обновить список сканирований + # Update scan history if dpg.does_item_exist("scan_list"): load_scan_history(scanner_service, session, "scan_list") except Exception as e: - output = dpg.get_value("modbus_output") + f"Error: {str(e)}\n" + output = dpg.get_value("modbus_output") + f"❌ Ошибка: {str(e)}\n" dpg.set_value("modbus_output", output) logger.error(f"Modbus scan failed: {e}") + + scan = Scan( + id=scan_id, + summary=f"Ошибка: {str(e)}", + duration=int(time.time() - start_time) if 'start_time' in locals() else 0, + created_at=datetime.utcnow() + ) + scanner_service.update_scan(scan) + +def create_results_tab(scanner_service, session): + """Вкладка результатов сканирования""" + with dpg.group(): + dpg.add_text("История сканирований:") + scan_list = dpg.add_listbox( + tag="scan_list", + width=-1, + num_items=10, + callback=lambda s: show_scan_details(scanner_service, session, dpg.get_value(s)) + ) + + load_scan_history(scanner_service, session, "scan_list") + dpg.add_spacer(height=10) + + dpg.add_text("Детали сканирования:") + dpg.add_input_text( + tag="scan_details", + multiline=True, + height=300, + width=-1, + readonly=True + ) def load_scan_history(scanner_service, session, scan_list): """Загрузка истории сканирований""" @@ -314,20 +448,76 @@ def load_scan_history(scanner_service, session, scan_list): scan_items = [] for scan in scans: - item_text = f"{scan.tool} scan - {scan.created_at.strftime('%Y-%m-%d %H:%M')}" + created_at = scan.created_at + if isinstance(created_at, str): + try: + created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + except ValueError: + logger.warning(f"Invalid created_at format for scan {scan.id}: {created_at}") + created_at = datetime.utcnow() + + item = f"ID:{scan.id} - {scan.tool} scan - {created_at.strftime('%Y-%m-%d %H:%M')}" if scan.summary: - item_text += f" ({scan.summary})" - scan_items.append(item_text) + item += f" ({scan.summary})" + scan_items.append(item) dpg.configure_item(scan_list, items=scan_items) -def show_scan_details(scanner_service, scan_index): +def show_scan_details(scanner_service, session, scan_index): """Отображение деталей сканирования""" if scan_index is None: return - # Получить все сканирования - # (В реальности нужно хранить текущую сессию или передавать ее) - # Для демо: просто покажем заглушку - details = f"Scan details for index {scan_index}\n" + scans = scanner_service.get_scans_by_session(session.id) + try: + # Extract scan ID from "ID:{id} - tool scan - ..." + scan_id = int(scan_index.split()[0].split(':')[1]) + # Find scan by ID + scan = next((s for s in scans if s.id == scan_id), None) + if scan is None: + logger.warning(f"Scan with ID {scan_id} not found") + return + except (ValueError, IndexError): + logger.warning(f"Failed to parse scan index: {scan_index}") + return + + created_at = scan.created_at + if isinstance(created_at, str): + try: + created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + except ValueError: + logger.warning(f"Invalid created_at format for scan {scan.id}: {created_at}") + created_at = datetime.utcnow() + + details = f"Scan ID: {scan.id}\n" + details += f"Tool: {scan.tool}\n" + details += f"Args: {scan.args}\n" + details += f"Summary: {scan.summary}\n" + details += f"Duration: {scan.duration} seconds\n" + details += f"Created: {created_at.strftime('%Y-%m-%d %H:%M')}\n" + + if scan.tool == "nmap": + hosts = scanner_service.get_hosts_by_scan(scan.id) + if hosts: + details += "\nHosts:\n" + for host in hosts: + details += f" IP: {host.ip}, MAC: {host.mac}, OS: {host.os}\n" + ports = scanner_service.get_ports_by_host(host.id) + if ports: + details += " Ports:\n" + for port in ports: + details += f" {port.port_num}/{port.protocol} ({port.state})\n" + services = scanner_service.get_services_by_port(port.id) + if services: + for service in services: + details += f" Service: {service.name} {service.product} {service.version} ({service.extrainfo})\n" + elif scan.tool == "modbus": + modbus_result = scanner_service.get_modbus_result(scan.id) + if modbus_result: + details += "\nModbus Results:\n" + details += f" Active Coils: {modbus_result.active_coils or 'Нет'}\n" + details += f" Active Discrete Inputs: {modbus_result.active_discrete_inputs or 'Нет'}\n" + details += f" Active Holding Registers: {modbus_result.active_holding_registers or 'Нет'}\n" + details += f" Active Input Registers: {modbus_result.active_input_registers or 'Нет'}\n" + dpg.set_value("scan_details", details) \ No newline at end of file