Files
attack_module/src/ui/attack_window.py
2025-06-03 02:47:00 +03:00

497 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dearpygui.dearpygui as dpg
from src.core.models.models import Session
from src.core.models.hping_test import HpingTestConfig
from src.core.models.modbus_load_test import LoadTestConfig, WriteTask
from src.core.attacks.hping_test import Hping3Tester
from src.core.attacks.modbus_load_test import ModbusLoadTester
from src.utils.logger import get_logger
import threading
import time
import os
logger = get_logger("attack_window")
# Глобальные переменные для управления атаками
hping_tester = None
modbus_tester = None
modbus_tasks = [] # Список задач для Modbus атаки
def create_attack_window(db, session: Session, parent=None):
"""Окно эксплуатации (с поддержкой родительского контейнера)"""
global modbus_tasks
modbus_tasks = [] # Сброс задач при открытии окна
if parent:
with dpg.child_window(
parent=parent,
tag="attack_window",
width=-1,
height=-1
):
build_attack_content(session)
else:
with dpg.child_window(
tag="attack_window",
width=-1,
height=-1
):
build_attack_content(session)
def build_attack_content(session):
"""Построение содержимого окна эксплуатации"""
dpg.add_text("Эксплуатация уязвимостей", color=(255, 255, 255))
dpg.add_separator()
dpg.add_spacer(height=10)
with dpg.tab_bar(tag="attack_tabs"):
# Вкладка Metasploit (заглушка)
with dpg.tab(tag="metasploit_tab", label="Metasploit"):
create_metasploit_tab()
# Вкладка hping3 (нагрузочное тестирование)
with dpg.tab(tag="hping_tab", label="Нагрузочное тестирование (hping3)"):
create_hping_tab()
# Вкладка Modbus (атаки на Modbus)
with dpg.tab(tag="modbus_attack_tab", label="Атаки на Modbus"):
create_modbus_attack_tab()
def create_metasploit_tab():
"""Вкладка Metasploit (заглушка)"""
with dpg.group():
dpg.add_text("Интеграция с Metasploit будет реализована в будущих версиях", color=(200, 200, 200))
dpg.add_spacer(height=20)
dpg.add_text("Функционал позволит:")
dpg.add_text("- Поиск и использование эксплойтов")
dpg.add_text("- Управление сессиями Meterpreter")
dpg.add_text("- Автоматизация тестов на проникновение")
dpg.add_spacer(height=20)
dpg.add_button(
label="Проверить подключение к Metasploit",
callback=lambda: dpg.set_value("metasploit_output", "❌ Metasploit не подключен\n"),
width=300,
height=40
)
dpg.add_spacer(height=10)
dpg.add_text("Вывод:")
dpg.add_input_text(
tag="metasploit_output",
multiline=True,
height=300,
width=-1,
readonly=True
)
def create_hping_tab():
"""Вкладка hping3 (нагрузочное тестирование)"""
with dpg.group():
with dpg.group(tag="hping_config_group"):
dpg.add_text("Конфигурация нагрузочного теста")
# Основные параметры
with dpg.group(horizontal=True):
dpg.add_text("Цель:")
dpg.add_input_text(tag="hping_target", width=200)
dpg.add_text("Порт:")
dpg.add_input_text(tag="hping_port", default_value="80", width=150)
with dpg.group(horizontal=True):
dpg.add_text("Тип теста:")
dpg.add_combo(
tag="hping_test_type",
items=["TCP", "UDP", "ICMP"],
default_value="TCP",
width=150
)
dpg.add_text("Размер пакета:")
dpg.add_input_int(
tag="hping_packet_size",
default_value=64,
width=150
)
# Дополнительные параметры
with dpg.collapsing_header(label="Дополнительные параметры"):
with dpg.group(horizontal=True):
dpg.add_text("Количество пакетов:")
dpg.add_input_int(
tag="hping_packet_count",
default_value=1000,
width=150
)
dpg.add_text("Интервал (сек):")
dpg.add_input_float(
tag="hping_interval",
default_value=0.1,
width=150
)
with dpg.group(horizontal=True):
dpg.add_text("Подменный IP:")
dpg.add_input_text(tag="hping_spoof_ip", width=150)
dpg.add_text("Флаги TCP:")
dpg.add_input_text(tag="hping_flags", width=150)
dpg.add_checkbox(
tag="hping_flood_mode",
label="Режим флуда"
)
dpg.add_checkbox(
tag="hping_verbose",
label="Подробный вывод"
)
# Управление тестом
with dpg.group(horizontal=True):
dpg.add_button(
label="Начать тест",
callback=start_hping_test,
width=150,
height=40
)
dpg.add_button(
label="Остановить тест",
callback=stop_hping_test,
width=150,
height=40
)
dpg.add_spacer(height=10)
dpg.add_text("Вывод:")
dpg.add_input_text(
tag="hping_output",
multiline=True,
height=300,
width=-1,
readonly=True
)
def create_modbus_attack_tab():
"""Вкладка Modbus (атаки)"""
global modbus_tasks
with dpg.group():
with dpg.group(tag="modbus_attack_config_group"):
dpg.add_text("Конфигурация атаки на Modbus")
# Основные параметры
with dpg.group(horizontal=True):
dpg.add_text("Целевой хост:")
dpg.add_input_text(tag="modbus_target_host", default_value="192.168.1.100", width=200)
dpg.add_text("Порт:")
dpg.add_input_text(tag="modbus_port", default_value="502", width=150)
with dpg.group(horizontal=True):
dpg.add_text("Потоки:")
dpg.add_input_int(tag="modbus_threads", default_value=5, width=150)
dpg.add_text("Длительность (сек):")
dpg.add_input_int(tag="modbus_duration", default_value=60, width=150)
# Задачи записи
with dpg.collapsing_header(label="Задачи записи"):
dpg.add_text("Добавьте задачи записи в регистры:")
# Кнопка добавления задачи
dpg.add_button(
label="Добавить задачу",
callback=open_modbus_task_window,
width=200
)
dpg.add_spacer(height=10)
# Контейнер для отображения задач
dpg.add_child_window(
tag="modbus_tasks_container",
height=200,
width=-1,
border=True
)
update_modbus_tasks_display()
# Кнопки управления
with dpg.group(horizontal=True):
dpg.add_button(
label="Начать атаку",
callback=start_modbus_attack,
width=200,
height=40
)
dpg.add_button(
label="Остановить атаку",
callback=stop_modbus_attack,
width=200,
height=40
)
dpg.add_spacer(height=10)
dpg.add_text("Вывод:")
dpg.add_input_text(
tag="modbus_attack_output",
multiline=True,
height=300,
width=-1,
readonly=True
)
def open_modbus_task_window():
"""Открыть модальное окно для добавления задачи Modbus"""
if dpg.does_item_exist("modbus_task_window"):
dpg.delete_item("modbus_task_window")
# Создаем модальное окно
with dpg.window(
tag="modbus_task_window",
label="Добавить задачу Modbus",
modal=True,
no_close=True,
width=700,
height=400,
pos=[300, 200]
):
dpg.add_text("Тип данных:")
dpg.add_combo(
tag="task_data_type",
items=["coil", "holding_register"],
default_value="coil",
width=200
)
dpg.add_text("Адрес регистра:")
dpg.add_input_int(
tag="task_address",
default_value=0,
min_value=0,
min_clamped=True,
width=200
)
dpg.add_text("Значение:")
dpg.add_input_int(
tag="task_value",
default_value=0,
width=200
)
dpg.add_text("Интервал между запросами (сек):")
dpg.add_input_float(
tag="task_interval",
default_value=0.5,
min_value=0.01,
min_clamped=True,
width=200
)
dpg.add_text("Количество повторений (0 = бесконечно):")
dpg.add_input_int(
tag="task_count",
default_value=10,
min_value=0,
min_clamped=True,
width=200
)
dpg.add_spacer(height=20)
with dpg.group(horizontal=True):
dpg.add_button(
label="Добавить",
callback=save_modbus_task,
width=150,
height=30
)
dpg.add_button(
label="Отмена",
callback=lambda: dpg.delete_item("modbus_task_window"),
width=150,
height=30
)
def save_modbus_task():
"""Сохранить задачу и добавить в список"""
global modbus_tasks
# Собираем данные из формы
task = {
"data_type": dpg.get_value("task_data_type"),
"address": dpg.get_value("task_address"),
"value": dpg.get_value("task_value"),
"interval": dpg.get_value("task_interval"),
"count": dpg.get_value("task_count")
}
# Добавляем задачу в глобальный список
modbus_tasks.append(task)
# Обновляем отображение задач
update_modbus_tasks_display()
# Закрываем окно
dpg.delete_item("modbus_task_window")
dpg.set_value("modbus_attack_output", "✅ Задача добавлена\n")
def update_modbus_tasks_display():
"""Обновить отображение списка задач"""
if dpg.does_item_exist("modbus_tasks_container"):
dpg.delete_item("modbus_tasks_container", children_only=True)
else:
return
with dpg.child_window(parent="modbus_tasks_container", tag="tasks_list", height=200, width=-1):
if not modbus_tasks:
dpg.add_text("Задачи не добавлены", color=(150, 150, 150))
return
for i, task in enumerate(modbus_tasks):
with dpg.group(horizontal=True):
dpg.add_text(f"{i+1}. {task['data_type']} @{task['address']} = {task['value']}")
dpg.add_text(f"Интервал: {task['interval']}с, Повторов: {task['count']}")
dpg.add_button(
label="Удалить",
callback=lambda s, a, u: remove_modbus_task(u),
user_data=i,
width=150
)
def remove_modbus_task(index):
"""Удалить задачу из списка"""
global modbus_tasks
if 0 <= index < len(modbus_tasks):
modbus_tasks.pop(index)
update_modbus_tasks_display()
dpg.set_value("modbus_attack_output", f"🗑️ Задача {index+1} удалена\n")
def start_hping_test():
"""Запуск нагрузочного теста hping3"""
global hping_tester
# Получение параметров из UI
config = HpingTestConfig(
target=dpg.get_value("hping_target"),
dest_port=int(dpg.get_value("hping_port")),
test_type=dpg.get_value("hping_test_type").lower(),
packet_size=dpg.get_value("hping_packet_size"),
count=dpg.get_value("hping_packet_count"),
interval=str(dpg.get_value("hping_interval")),
spoof_ip=dpg.get_value("hping_spoof_ip"),
flags=dpg.get_value("hping_flags"),
flood=dpg.get_value("hping_flood_mode"),
verbose=dpg.get_value("hping_verbose")
)
# Проверка обязательных параметров
if not config.target:
dpg.set_value("hping_output", "❌ Ошибка: Не указана цель теста\n")
return
# Создание тестера
hping_tester = Hping3Tester(config)
# Запуск в отдельном потоке
thread = threading.Thread(
target=run_hping_test,
args=(hping_tester,)
)
thread.start()
dpg.set_value("hping_output", "🚀 Запуск нагрузочного теста...\n")
def run_hping_test(tester):
"""Выполнение нагрузочного теста hping3 (в потоке)"""
try:
result = tester.run()
# Формирование отчета
output = "📊 Результаты нагрузочного теста:\n"
output += f"Цель: {result['target']}\n"
output += f"Протокол: {result['protocol'].upper()}\n"
output += f"Отправлено пакетов: {result['sent']}\n"
output += f"Получено пакетов: {result['received']}\n"
output += f"Потеряно пакетов: {result['loss_percent']}%\n"
output += f"Время выполнения: {result['duration']:.2f} сек\n"
output += f"Скорость: {result['pps']:.2f} пакетов/сек\n"
dpg.set_value("hping_output", output)
except Exception as e:
dpg.set_value("hping_output", f"❌ Ошибка: {str(e)}\n")
def stop_hping_test():
"""Остановка нагрузочного теста hping3"""
global hping_tester
if hping_tester:
hping_tester.stop()
dpg.set_value("hping_output", dpg.get_value("hping_output") + "🛑 Тест остановлен пользователем\n")
def start_modbus_attack():
"""Запуск атаки на Modbus"""
global modbus_tester, modbus_tasks
# Получение параметров из UI
config = LoadTestConfig(
host=dpg.get_value("modbus_target_host"),
port=int(dpg.get_value("modbus_port")),
threads=dpg.get_value("modbus_threads"),
duration=dpg.get_value("modbus_duration")
)
# Создаем задачи из глобального списка
tasks = []
for task in modbus_tasks:
tasks.append(WriteTask(
data_type=task['data_type'],
address=task['address'],
value=task['value'],
interval=task['interval'],
count=task['count']
))
if not tasks:
dpg.set_value("modbus_attack_output", "❌ Ошибка: Не добавлены задачи записи\n")
return
config.tasks = tasks
# Создание тестера
modbus_tester = ModbusLoadTester(config)
# Запуск в отдельном потоке
thread = threading.Thread(
target=run_modbus_attack,
args=(modbus_tester,)
)
thread.start()
dpg.set_value("modbus_attack_output", "🚀 Запуск атаки на Modbus...\n")
def run_modbus_attack(tester):
"""Выполнение атаки на Modbus (в потоке)"""
try:
result = tester.run()
# Формирование отчета
output = "📊 Результаты атаки на Modbus:\n"
output += f"Цель: {tester.config.host}:{tester.config.port}\n"
output += f"Всего запросов: {result['total_requests']}\n"
output += f"Успешных запросов: {result['successful_requests']}\n"
output += f"Неудачных запросов: {result['failed_requests']}\n"
output += f"Ошибок потоков: {result['thread_errors']}\n"
output += f"Время выполнения: {result['duration']:.2f} сек\n"
output += f"Скорость: {result['requests_per_sec']:.2f} запросов/сек\n"
dpg.set_value("modbus_attack_output", output)
except Exception as e:
dpg.set_value("modbus_attack_output", f"❌ Ошибка: {str(e)}\n")
def stop_modbus_attack():
"""Остановка атаки на Modbus"""
global modbus_tester
if modbus_tester:
modbus_tester.stop()
dpg.set_value("modbus_attack_output", dpg.get_value("modbus_attack_output") + "🛑 Атака остановлена пользователем\n")