This commit is contained in:
2025-06-02 14:53:57 +03:00
parent 91231b6a53
commit d7dc3e4ec4
29 changed files with 1252 additions and 3571 deletions

3
.idea/.gitignore generated vendored
View File

@ -1,3 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.13 virtualenv at ~/.cache/pypoetry/virtualenvs/attack-module-hzqUXjWu-py3.13" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated
View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.13 virtualenv at ~/.cache/pypoetry/virtualenvs/attack-module-hzqUXjWu-py3.13" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 virtualenv at ~/.cache/pypoetry/virtualenvs/attack-module-hzqUXjWu-py3.13" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated
View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/attack_module.iml" filepath="$PROJECT_DIR$/.idea/attack_module.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated
View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

8
Makefile Normal file
View File

@ -0,0 +1,8 @@
.PHONY: window
.ONESHELL:
# TODO: make clean command
window:
PYTHONPATH=./ poetry run python src/main.py

View File

File diff suppressed because it is too large Load Diff

45
dpg.ini Normal file
View File

@ -0,0 +1,45 @@
[Window][Debug##Default]
Pos=60,60
Size=400,400
Collapsed=0
[Window][###24]
Pos=342,254
Size=516,351
Collapsed=0
[Window][###31]
Pos=340,250
Size=600,300
Collapsed=0
[Window][###39]
Size=210,800
Collapsed=0
[Window][###38]
Size=222,103
Collapsed=0
[Window][###35]
Size=222,103
Collapsed=0
[Window][###47]
Pos=400,200
Size=316,120
Collapsed=0
[Window][###42]
Size=1274,794
Collapsed=0
[Window][###52]
Size=210,800
Collapsed=0
[Window][###108]
Pos=340,250
Size=600,300
Collapsed=0

BIN
fonts/Exo2.ttf Normal file

Binary file not shown.

View File

@ -13,7 +13,8 @@ dependencies = [
"pymodbus (>=3.9.2,<4.0.0)",
"loguru (>=0.7.3,<0.8.0)",
"dbus-python (>=1.4.0,<2.0.0)",
"dearpygui (>=2.0.0,<3.0.0)"
"dearpygui (>=2.0.0,<3.0.0)",
"pyqt5 (>=5.15.11,<6.0.0)"
]

View File

@ -9,7 +9,7 @@ def main():
# config = HpingTestConfig(
# target="192.168.1.55",
# test_type="tcp",
# interface="enp7s0f1",
# interface="eth0",
# spoof_ip="192.168.1.151", # Спуфинг источника
# source_port=randint(45000,65535), # Порт источника
# dest_port=502, # Порт назначения (Modbus)
@ -22,21 +22,42 @@ def main():
# verbose=True,
# flood=True
# )
# config = HpingTestConfig(
# target="192.168.1.151",
# test_type="tcp",
# interface="enp7s0f1",
# spoof_ip="192.168.1.55", # Спуфинг источника
# source_port=randint(45000,65535), # Порт источника
# dest_port=502, # Порт назначения (Modbus)
# dest_port_range=False, # Использовать диапазон портов назначения (++)
# packet_size=12, # Размер пакета
# interval="u10000", # Интервал 30 микросекунд
# flags="PA", # Флаги PUSH + ACK (правильно: "P", "A")
# raw_data="/home/lodqa/attack_module_data/2_modbus_response.raw", # Сырые данные
# count=10000,
# verbose=True,
# flood=False
# )
# config = HpingTestConfig(
# target="192.168.1.55",
# test_type="icmp",
# interface="eth0",
# spoof_ip="192.168.1.151", # Спуфинг источника
# verbose=False,
# flood=True
# )
config = HpingTestConfig(
target="192.168.1.151",
test_type="tcp",
interface="enp7s0f1",
spoof_ip="192.168.1.55", # Спуфинг источника
target="192.168.1.55",
test_type="udp",
interface="eth0",
spoof_ip="192.168.1.151", # Спуфинг источника
source_port=randint(45000,65535), # Порт источника
dest_port=502, # Порт назначения (Modbus)
dest_port_range=False, # Использовать диапазон портов назначения (++)
packet_size=12, # Размер пакета
interval="u10000", # Интервал 30 микросекунд
flags="PA", # Флаги PUSH + ACK (правильно: "P", "A")
raw_data="/home/lodqa/attack_module_data/2_modbus_response.raw", # Сырые данные
count=10000,
verbose=True,
flood=False
verbose=False,
flood=True
)
# Запуск теста

26
scripts/test_db.py Normal file
View File

@ -0,0 +1,26 @@
from src.core.database.database import Database
from src.core.database.managers.session_manager import SessionManager
def main():
db = Database()
session_manager = SessionManager(db)
# Создаем сессию с именем
session = session_manager.create_session(name="Test Session 1")
print(f"Created session: {session}")
# Получаем все сессии
sessions = session_manager.get_all_sessions()
print(f"All sessions: {sessions}")
# Обновляем имя сессии
session_manager.update_session_name(session.id, "Updated Test Session")
updated_session = session_manager.get_session(session.id)
print(f"Updated session: {updated_session}")
# Удаляем сессию
session_manager.delete_session(session.id)
print(f"Deleted session {session.id}")
if __name__ == "__main__":
main()

View File

@ -84,7 +84,7 @@ class NmapScanner:
if __name__ == "__main__":
try:
scanner = NmapScanner("192.168.1.0/24", "-p 1-10000")
scanner = NmapScanner("192.168.1.55", "-p 1-10000")
result = scanner.start_scan()
path = "/home/lodqa/attack_module_data/nmap.json"
logger.info(f"Сохранение результатов в файл: {path}")

View File

@ -177,7 +177,6 @@ class Hping3Tester:
if "packets tramitted" in line:
parts = line.split()
try:
print(parts)
self.stats['sent'] = int(parts[0])
self.stats['received'] = int(parts[3])
loss_percent = float(parts[6].replace('%', ''))

View File

@ -0,0 +1,66 @@
from datetime import datetime
from typing import List, Optional
from src.core.database.database import Database
from src.core.models.models import Session
from src.utils.logger import get_logger
logger = get_logger("session_dao")
class SessionDAO:
def __init__(self, db: Database):
self.db = db
def create(self, name: Optional[str] = None) -> Session:
"""Создание новой сессии с именем."""
with self.db.get_cursor() as cursor:
cursor.execute(
"INSERT INTO sessions (name, created_at) VALUES (?, CURRENT_TIMESTAMP) RETURNING *",
(name,)
)
row = cursor.fetchone()
return Session(
id=row["id"],
name=row["name"],
created_at=datetime.fromisoformat(row["created_at"])
)
def get_by_id(self, session_id: int) -> Optional[Session]:
"""Получение сессии по ID."""
with self.db.get_cursor() as cursor:
cursor.execute("SELECT * FROM sessions WHERE id = ?", (session_id,))
row = cursor.fetchone()
if row:
return Session(
id=row["id"],
name=row["name"],
created_at=datetime.fromisoformat(row["created_at"])
)
return None
def get_all(self) -> List[Session]:
"""Получение всех сессий."""
with self.db.get_cursor() as cursor:
cursor.execute("SELECT * FROM sessions ORDER BY created_at DESC")
rows = cursor.fetchall()
return [
Session(
id=row["id"],
name=row["name"],
created_at=datetime.fromisoformat(row["created_at"])
) for row in rows
]
def delete(self, session_id: int) -> bool:
"""Удаление сессии по ID."""
with self.db.get_cursor() as cursor:
cursor.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
return cursor.rowcount > 0
def update_name(self, session_id: int, name: str) -> bool:
"""Обновление имени сессии."""
with self.db.get_cursor() as cursor:
cursor.execute(
"UPDATE sessions SET name = ? WHERE id = ?",
(name, session_id)
)
return cursor.rowcount > 0

View File

@ -1,121 +1,136 @@
# src/core/database/db.py
import sqlite3
from typing import List, Dict, Any
from pathlib import Path
from typing import Optional
from contextlib import contextmanager
from src.utils.logger import get_logger
logger = get_logger("sqlite_db")
logger = get_logger("database")
class SQLiteDB:
def __init__(self, db_path: str):
"""Инициализация соединения с базой данных."""
self.db_path = db_path
self._create_tables()
logger.info(f"Инициализация базы данных SQLite: {db_path}")
class Database:
def __init__(self, db_path: str = "/home/lodqa/attack_module_data/security_scanner.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn: Optional[sqlite3.Connection] = None
self.init_db()
def _create_tables(self):
"""Создание таблиц в базе данных."""
def connect(self):
"""Подключение к базе данных."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Таблица для хостов
cursor.execute("""
CREATE TABLE IF NOT EXISTS hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL UNIQUE,
state TEXT NOT NULL,
scan_timestamp TEXT NOT NULL
)
""")
# Таблица для портов
cursor.execute("""
CREATE TABLE IF NOT EXISTS ports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host_id INTEGER NOT NULL,
port INTEGER NOT NULL,
protocol TEXT,
state TEXT,
service TEXT,
version TEXT,
FOREIGN KEY (host_id) REFERENCES hosts(id) ON DELETE CASCADE
)
""")
conn.commit()
logger.info("Таблицы успешно созданы или уже существуют")
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
logger.info(f"Connected to database at {self.db_path}")
except sqlite3.Error as e:
logger.error(f"Ошибка при создании таблиц: {e}")
logger.error(f"Failed to connect to database: {e}")
raise
def save_scan_results(self, scan_results: Dict[str, Any]):
"""Сохранение результатов сканирования в базу данных."""
def init_db(self):
"""Инициализация базы данных и создание таблиц."""
self.connect()
cursor = self.conn.cursor()
# Создание таблиц
cursor.executescript("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER,
tool TEXT,
args TEXT,
summary TEXT,
duration INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS hosts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id INTEGER,
ip TEXT,
mac TEXT,
os TEXT,
FOREIGN KEY (scan_id) REFERENCES scans(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS ports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host_id INTEGER,
protocol TEXT,
port_num INTEGER,
state TEXT,
FOREIGN KEY (host_id) REFERENCES hosts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS service (
id INTEGER PRIMARY KEY AUTOINCREMENT,
port_id INTEGER,
name TEXT,
product TEXT,
extrainfo TEXT,
ostype TEXT,
FOREIGN KEY (port_id) REFERENCES ports(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS cpe (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_id INTEGER,
name TEXT,
FOREIGN KEY (service_id) REFERENCES service(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS modbus_scan_result (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scan_id INTEGER,
active_coils TEXT,
active_discrete_inputs TEXT,
active_holding_registers TEXT,
active_input_registers TEXT,
FOREIGN KEY (scan_id) REFERENCES scans(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS attacks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER,
tool TEXT,
args TEXT,
summary TEXT,
duration INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS attack_result (
id INTEGER PRIMARY KEY AUTOINCREMENT,
attack_id INTEGER,
summary TEXT,
FOREIGN KEY (attack_id) REFERENCES attacks(id) ON DELETE CASCADE
);
""")
self.conn.commit()
logger.info("Database tables initialized")
@contextmanager
def get_cursor(self):
"""Контекстный менеджер для работы с курсором."""
cursor = self.conn.cursor()
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Сохранение хостов
for ip, data in scan_results["hosts"].items():
state = data.get("state", {}).get("state", "unknown")
timestamp = data.get("scan_timestamp", "unknown")
cursor.execute(
"INSERT OR REPLACE INTO hosts (ip, state, scan_timestamp) VALUES (?, ?, ?)",
(ip, state, timestamp)
)
host_id = cursor.lastrowid if cursor.lastrowid else cursor.execute(
"SELECT id FROM hosts WHERE ip = ?", (ip,)
).fetchone()[0]
# Сохранение портов
ports = data.get("ports", [])
for port_data in ports:
cursor.execute(
"""
INSERT INTO ports (host_id, port, protocol, state, service, version)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
host_id,
port_data.get("portid"),
port_data.get("protocol"),
port_data.get("state"),
port_data.get("service", {}).get("name"),
port_data.get("service", {}).get("version")
)
)
conn.commit()
logger.info(f"Результаты сканирования сохранены для {len(scan_results['hosts'])} хостов")
except sqlite3.Error as e:
logger.error(f"Ошибка при сохранении результатов: {e}")
yield cursor
self.conn.commit()
except Exception as e:
self.conn.rollback()
logger.error(f"Database operation failed: {e}")
raise
finally:
cursor.close()
def get_all_hosts(self) -> List[Dict[str, Any]]:
"""Получение всех хостов из базы данных."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM hosts")
hosts = [{"id": row[0], "ip": row[1], "state": row[2], "scan_timestamp": row[3]} for row in cursor.fetchall()]
logger.info(f"Извлечено {len(hosts)} хостов из базы данных")
return hosts
except sqlite3.Error as e:
logger.error(f"Ошибка при получении хостов: {e}")
return []
def get_ports_by_host_id(self, host_id: int) -> List[Dict[str, Any]]:
"""Получение портов для указанного хоста."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM ports WHERE host_id = ?", (host_id,))
ports = [
{
"id": row[0],
"host_id": row[1],
"port": row[2],
"protocol": row[3],
"state": row[4],
"service": row[5],
"version": row[6]
}
for row in cursor.fetchall()
]
logger.debug(f"Извлечено {len(ports)} портов для хоста с ID {host_id}")
return ports
except sqlite3.Error as e:
logger.error(f"Ошибка при получении портов для хоста {host_id}: {e}")
return []
def close(self):
"""Закрытие соединения с базой данных."""
if self.conn:
self.conn.close()
logger.info("Database connection closed")

View File

@ -0,0 +1,157 @@
from src.core.models.models import Scan, Host, ModbusScanResult
from src.core.database.database import Database
from src.utils.logger import get_logger
from typing import List, Optional
logger = get_logger("scanner_service")
class ScannerManager:
def __init__(self, db: Database):
self.db = db
def save_scan(self, scan: Scan) -> Optional[Scan]:
"""Сохранение сканирования в базу данных"""
with self.db.get_cursor() as cursor:
cursor.execute(
"""
INSERT INTO scans (session_id, tool, args, summary, duration, created_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
RETURNING *
""",
(scan.session_id, scan.tool, scan.args, scan.summary, scan.duration)
)
row = cursor.fetchone()
if row:
return Scan(
id=row["id"],
session_id=row["session_id"],
tool=row["tool"],
args=row["args"],
summary=row["summary"],
duration=row["duration"],
created_at=row["created_at"]
)
return None
def update_scan(self, scan: Scan) -> bool:
"""Обновление информации о сканировании"""
with self.db.get_cursor() as cursor:
cursor.execute(
"""
UPDATE scans
SET summary = ?, duration = ?
WHERE id = ?
""",
(scan.summary, scan.duration, scan.id)
)
return cursor.rowcount > 0
def get_scan_by_id(self, scan_id: int) -> Optional[Scan]:
"""Получение сканирования по ID"""
with self.db.get_cursor() as cursor:
cursor.execute("SELECT * FROM scans WHERE id = ?", (scan_id,))
row = cursor.fetchone()
if row:
return Scan(
id=row["id"],
session_id=row["session_id"],
tool=row["tool"],
args=row["args"],
summary=row["summary"],
duration=row["duration"],
created_at=row["created_at"]
)
return None
def get_scans_by_session(self, session_id: int) -> List[Scan]:
"""Получение всех сканирований для сессии"""
with self.db.get_cursor() as cursor:
cursor.execute(
"SELECT * FROM scans WHERE session_id = ? ORDER BY created_at DESC",
(session_id,)
)
rows = cursor.fetchall()
return [
Scan(
id=row["id"],
session_id=row["session_id"],
tool=row["tool"],
args=row["args"],
summary=row["summary"],
duration=row["duration"],
created_at=row["created_at"]
) for row in rows
]
def save_host(self, scan_id: int, host: Host) -> Optional[Host]:
"""Сохранение хоста в базу данных"""
with self.db.get_cursor() as cursor:
cursor.execute(
"""
INSERT INTO hosts (scan_id, ip, mac, os)
VALUES (?, ?, ?, ?)
RETURNING *
""",
(scan_id, host.ip, host.mac, host.os)
)
row = cursor.fetchone()
if row:
return Host(
id=row["id"],
scan_id=row["scan_id"],
ip=row["ip"],
mac=row["mac"],
os=row["os"]
)
return None
def save_modbus_result(self, scan_id: int, result: dict) -> Optional[ModbusScanResult]:
"""Сохранение результатов сканирования Modbus"""
with self.db.get_cursor() as cursor:
cursor.execute(
"""
INSERT INTO modbus_scan_result (
scan_id, active_coils, active_discrete_inputs,
active_holding_registers, active_input_registers
)
VALUES (?, ?, ?, ?, ?)
RETURNING *
""",
(
scan_id,
result.get("active_coils"),
result.get("active_discrete_inputs"),
result.get("active_holding_registers"),
result.get("active_input_registers")
)
)
row = cursor.fetchone()
if row:
return ModbusScanResult(
id=row["id"],
scan_id=row["scan_id"],
active_coils=row["active_coils"],
active_discrete_inputs=row["active_discrete_inputs"],
active_holding_registers=row["active_holding_registers"],
active_input_registers=row["active_input_registers"]
)
return None
def get_modbus_result(self, scan_id: int) -> Optional[ModbusScanResult]:
"""Получение результатов сканирования Modbus по ID сканирования"""
with self.db.get_cursor() as cursor:
cursor.execute(
"SELECT * FROM modbus_scan_result WHERE scan_id = ?",
(scan_id,)
)
row = cursor.fetchone()
if row:
return ModbusScanResult(
id=row["id"],
scan_id=row["scan_id"],
active_coils=row["active_coils"],
active_discrete_inputs=row["active_discrete_inputs"],
active_holding_registers=row["active_holding_registers"],
active_input_registers=row["active_input_registers"]
)
return None

View File

@ -0,0 +1,51 @@
from typing import List, Optional
from src.core.database.database import Database
from src.core.database.dao.session_dao import SessionDAO
from src.core.models.models import Session
from src.utils.logger import get_logger
logger = get_logger("session_manager")
class SessionManager:
def __init__(self, db: Database):
self.db = db
self.session_dao = SessionDAO(db)
def create_session(self, name: Optional[str] = None) -> Session:
"""Создание новой сессии с именем."""
session = self.session_dao.create(name)
logger.info(f"Created session with ID {session.id} and name {session.name}")
return session
def get_session(self, session_id: int) -> Optional[Session]:
"""Получение сессии по ID."""
session = self.session_dao.get_by_id(session_id)
if session:
logger.info(f"Retrieved session with ID {session_id}, name {session.name}")
else:
logger.warning(f"Session with ID {session_id} not found")
return session
def get_all_sessions(self) -> List[Session]:
"""Получение всех сессий."""
sessions = self.session_dao.get_all()
logger.info(f"Retrieved {len(sessions)} sessions")
return sessions
def delete_session(self, session_id: int) -> bool:
"""Удаление сессии по ID."""
success = self.session_dao.delete(session_id)
if success:
logger.info(f"Deleted session with ID {session_id}")
else:
logger.warning(f"Session with ID {session_id} not found for deletion")
return success
def update_session_name(self, session_id: int, name: str) -> bool:
"""Обновление имени сессии."""
success = self.session_dao.update_name(session_id, name)
if success:
logger.info(f"Updated name for session with ID {session_id} to {name}")
else:
logger.warning(f"Session with ID {session_id} not found for name update")
return success

View File

@ -1,77 +0,0 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
@dataclass
class CPE:
cpe: str
@dataclass
class Service:
name: str
product: Optional[str] = None
version: Optional[str] = None
extrainfo: Optional[str] = None
ostype: Optional[str] = None
method: str
conf: str
@dataclass
class Port:
protocol: str
portid: str
state: str
reason: str
reason_ttl: str
service: Optional[Service] = None
cpe: List[CPE] = None
scripts: List[Any] = None
def __post_init__(self):
if self.cpe is None:
self.cpe = []
if self.scripts is None:
self.scripts = []
@dataclass
class HostState:
state: str
reason: str
reason_ttl: str
@dataclass
class HostInfo:
osmatch: Dict[str, Any]
ports: List[Port]
hostname: List[Any]
macaddress: Optional[str]
state: HostState
@dataclass
class TaskResult:
task: str
time: str
extrainfo: Optional[str] = None
@dataclass
class Runtime:
time: str
timestr: str
summary: str
elapsed: str
exit: str
@dataclass
class Stats:
scanner: str
args: str
start: str
startstr: str
version: str
xmloutputversion: str
@dataclass
class NmapReport:
hosts: Dict[str, HostInfo]
runtime: Runtime
stats: Stats
task_results: List[TaskResult]

76
src/core/models/models.py Normal file
View File

@ -0,0 +1,76 @@
# src/core/models/models.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Session:
id: Optional[int] = None
name: Optional[str] = None
created_at: Optional[datetime] = None
@dataclass
class Scan:
id: Optional[int] = None
session_id: Optional[int] = None
tool: Optional[str] = None
args: Optional[str] = None
summary: Optional[str] = None
duration: Optional[int] = None # in seconds
created_at: Optional[datetime] = None
@dataclass
class Host:
id: Optional[int] = None
scan_id: Optional[int] = None
ip: Optional[str] = None
mac: Optional[str] = None
os: Optional[str] = None
@dataclass
class Port:
id: Optional[int] = None
host_id: Optional[int] = None
protocol: Optional[str] = None
port_num: Optional[int] = None
state: Optional[str] = None
@dataclass
class Service:
id: Optional[int] = None
port_id: Optional[int] = None
name: Optional[str] = None
product: Optional[str] = None
extrainfo: Optional[str] = None
ostype: Optional[str] = None
@dataclass
class CPE:
id: Optional[int] = None
service_id: Optional[int] = None
name: Optional[str] = None
@dataclass
class ModbusScanResult:
id: Optional[int] = None
scan_id: Optional[int] = None
active_coils: Optional[str] = None
active_discrete_inputs: Optional[str] = None
active_holding_registers: Optional[str] = None
active_input_registers: Optional[str] = None
@dataclass
class Attack:
id: Optional[int] = None
session_id: Optional[int] = None
tool: Optional[str] = None
args: Optional[str] = None
summary: Optional[str] = None
duration: Optional[int] = None # in seconds
created_at: Optional[datetime] = None
@dataclass
class AttackResult:
id: Optional[int] = None
attack_id: Optional[int] = None
summary: Optional[str] = None

View File

@ -1,261 +0,0 @@
import dearpygui.dearpygui as dpg
from typing import Dict, Any, List, Callable
from src.core.api.nmap import NmapScanner
from src.core.database.database import SQLiteDB
from src.utils.logger import get_logger
import time
import os
logger = get_logger("gui")
class BaseTab:
"""Базовый класс для вкладок GUI."""
def __init__(self, parent: 'SecurityScannerGUI'):
self.parent = parent
def create_content(self):
"""Создание содержимого вкладки."""
raise NotImplementedError("Метод create_content должен быть переопределен в дочернем классе")
class ScanningTab(BaseTab):
"""Вкладка для сканирования (Nmap)."""
def create_content(self):
with dpg.tab(label="Сканирование"):
dpg.add_text("Настройки сканирования Nmap")
dpg.add_input_text(label="Целевой IP", default_value="192.168.1.0/24", tag="scan_ip")
dpg.add_input_text(label="Аргументы", default_value="-p 1-10000", tag="scan_args")
dpg.add_button(label="Запустить сканирование", callback=self.parent.start_scan)
class ResultsTab(BaseTab):
"""Вкладка для отображения результатов."""
def create_content(self):
with dpg.tab(label="Результаты"):
dpg.add_text("Результаты сканирования")
with dpg.group(horizontal=True):
dpg.add_text("Обновить результаты:")
dpg.add_button(label="Обновить", callback=self.parent.update_results)
with dpg.table(
header_row=True,
resizable=True,
policy=dpg.mvTable_SizingStretchProp,
borders_innerH=True,
borders_outerH=True,
borders_innerV=True,
borders_outerV=True,
tag="results_table"
):
dpg.add_table_column(label="IP")
dpg.add_table_column(label="Состояние")
dpg.add_table_column(label="Время сканирования")
self.parent.port_details_group = dpg.add_group(show=False)
class SettingsTab(BaseTab):
"""Вкладка для настроек."""
def create_content(self):
with dpg.tab(label="Настройки"):
dpg.add_text("Настройки приложения (в разработке)")
class ExploitationTab(BaseTab):
"""Вкладка для эксплуатации (заглушка)."""
def create_content(self):
with dpg.tab(label="Эксплуатация"):
dpg.add_text("Инструменты эксплуатации (в разработке: Metasploit, Scapy, Hping3)")
class GenerationTab(BaseTab):
"""Вкладка для генерации отчетов (заглушка)."""
def create_content(self):
with dpg.tab(label="Генерация"):
dpg.add_text("Генерация отчетов (PDF, JSON) в разработке")
class SecurityScannerGUI:
"""Главный класс GUI для управления интерфейсом."""
def __init__(self, db: SQLiteDB):
self.db = db
self.tabs = []
self.results_table = None
self.port_details_group = None
self.setup_dpg()
self.create_main_window()
def setup_dpg(self):
"""Настройка Dear PyGui с поддержкой кириллицы и адаптацией к размеру viewport'а."""
dpg.create_context()
# Создаём viewport с временными размерами
dpg.create_viewport(
title="Security Scanner",
width=800,
height=600,
x_pos=0,
y_pos=0,
resizable=True
)
# Загрузка шрифта с поддержкой кириллицы
font_path = os.path.join("/home/lodqa/attack_module_data/Exo2-VariableFont_wght.ttf")
if os.path.exists(font_path):
big_let_start = 0x00C0 # Capital "A" in cyrillic alphabet
big_let_end = 0x00DF # Capital "Я" in cyrillic alphabet
small_let_end = 0x00FF # small "я" in cyrillic alphabet
remap_big_let = 0x0410 # Starting number for remapped cyrillic alphabet
alph_len = big_let_end - big_let_start + 1 # adds the shift from big letters to small
alph_shift = remap_big_let - big_let_start # adds the shift from remapped to non-remapped
with dpg.font_registry():
with dpg.font(font_path, 18) as default_font:
dpg.add_font_range_hint(dpg.mvFontRangeHint_Default)
dpg.add_font_range_hint(dpg.mvFontRangeHint_Cyrillic)
biglet = remap_big_let
for i1 in range(big_let_start, big_let_end + 1):
dpg.add_char_remap(i1, biglet)
dpg.add_char_remap(i1 + alph_len, biglet + alph_len)
biglet += 1
dpg.bind_font(default_font)
else:
logger.warning(f"Шрифт {font_path} не найден. Используется шрифт по умолчанию.")
# Завершаем настройку Dear PyGui
dpg.setup_dearpygui()
dpg.show_viewport()
# Получаем текущие размеры viewport'а после инициализации
screen_width = dpg.get_viewport_client_width()
screen_height = dpg.get_viewport_client_height()
logger.debug(f"Размеры viewport'а: {screen_width}x{screen_height}")
# Устанавливаем размеры окна на основе доступного пространства
dpg.configure_viewport(
"Security Scanner",
width=screen_width,
height=screen_height,
x_pos=0,
y_pos=0
)
# Добавляем горячую клавишу для выхода (Ctrl+Q)
# with dpg.handler_registry():
# dpg.add_key_press_handler(dpg.mvKey_Q, modifier=dpg.mvKeyMod_Control, callback=lambda: dpg.stop_dearpygui())
def create_main_window(self):
"""Создание главного окна."""
with dpg.window(
label="Security Scanner",
width=dpg.get_viewport_client_width(),
height=dpg.get_viewport_client_height(),
pos=[0, 0],
no_title_bar=False,
no_resize=False,
no_move=False,
no_close=False
):
with dpg.tab_bar(tag="main_tab_bar"):
# Инициализация вкладок
self.tabs = [
ScanningTab(self),
ResultsTab(self),
SettingsTab(self),
ExploitationTab(self),
GenerationTab(self)
]
for tab in self.tabs:
tab.create_content()
with dpg.group():
dpg.add_text("Лог операций:")
self.log_text = dpg.add_text("", wrap=0)
def add_new_tab(self, tab_name: str, content_callback: Callable):
"""Добавление новой вкладки."""
with dpg.tab(label=tab_name, parent="main_tab_bar"):
content_callback()
def log(self, message: str, level: str = "info"):
"""Логирование сообщения в интерфейсе и в логгер."""
current_log = dpg.get_value(self.log_text)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
new_log = f"{current_log}[{timestamp}] {message}\n"
dpg.set_value(self.log_text, new_log)
getattr(logger, level)(message)
def start_scan(self, sender, app_data, user_data):
"""Запуск сканирования Nmap."""
ip = dpg.get_value("scan_ip")
args = dpg.get_value("scan_args")
self.log(f"Запуск сканирования для IP: {ip} с аргументами: {args}")
try:
scanner = NmapScanner(ip, args)
results = scanner.start_scan()
# Добавляем временную метку
for host in results["hosts"].values():
host["scan_timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S")
self.db.save_scan_results(results)
self.log(f"Сканирование завершено. Найдено хостов: {len(results['hosts'])}", "success")
self.update_results()
except Exception as e:
self.log(f"Ошибка при сканировании: {e}", "error")
def update_results(self, sender=None, app_data=None, user_data=None):
"""Обновление таблицы результатов."""
# Удаляем старую таблицу и создаем новую
if dpg.does_item_exist("results_table"):
dpg.delete_item("results_table")
with dpg.table(
header_row=True,
resizable=True,
policy=dpg.mvTable_SizingStretchProp,
borders_innerH=True,
borders_outerH=True,
borders_innerV=True,
borders_outerV=True,
tag="results_table"
):
dpg.add_table_column(label="IP")
dpg.add_table_column(label="Состояние")
dpg.add_table_column(label="Время сканирования")
hosts = self.db.get_all_hosts()
for host in hosts:
with dpg.table_row():
dpg.add_text(host["ip"])
dpg.add_text(host["state"])
dpg.add_text(host["scan_timestamp"])
dpg.add_table_cell().bind_item(
dpg.add_button(label="Детали", callback=self.show_port_details, user_data=host["id"])
)
def show_port_details(self, sender, app_data, user_data):
"""Отображение деталей портов для выбранного хоста."""
host_id = user_data
if self.port_details_group and dpg.does_item_exist(self.port_details_group):
dpg.delete_item(self.port_details_group, children_only=True)
else:
self.port_details_group = dpg.add_group(show=False)
with dpg.group(parent=self.port_details_group):
dpg.add_text(f"Порты для хоста (ID: {host_id})")
with dpg.table(
header_row=True,
resizable=True,
policy=dpg.mvTable_SizingStretchProp,
borders_innerH=True,
borders_outerH=True,
borders_innerV=True,
borders_outerV=True
):
dpg.add_table_column(label="Порт")
dpg.add_table_column(label="Протокол")
dpg.add_table_column(label="Состояние")
dpg.add_table_column(label="Сервис")
dpg.add_table_column(label="Версия")
ports = self.db.get_ports_by_host_id(host_id)
for port in ports:
with dpg.table_row():
dpg.add_text(str(port["port"]))
dpg.add_text(port["protocol"])
dpg.add_text(port["state"])
dpg.add_text(port["service"] or "N/A")
dpg.add_text(port["version"] or "N/A")
def run(self):
"""Запуск интерфейса."""
dpg.start_dearpygui()
dpg.destroy_context()

54
src/main.py Normal file → Executable file
View File

@ -1,12 +1,52 @@
from src.core.services.gui import SecurityScannerGUI
from src.core.database.database import SQLiteDB
import dearpygui.dearpygui as dpg
from src.ui.main_window import create_main_window
from src.core.database.database import Database
from src.utils.logger import get_logger
logger = get_logger("main")
def main():
# Инициализация базы данных
db = SQLiteDB("/home/lodqa/attack_module_data/database.db")
# Инициализация интерфейса
gui = SecurityScannerGUI(db)
gui.run()
db = Database()
dpg.create_context()
dpg.configure_app(init_file="dpg.ini")
# Увеличен размер шрифта с 16 до 24
big_let_start = 0x00C0 # Capital "A" in cyrillic alphabet
big_let_end = 0x00DF # Capital "Я" in cyrillic alphabet
small_let_end = 0x00FF # small "я" in cyrillic alphabet
remap_big_let = 0x0410 # Starting number for remapped cyrillic alphabet
alph_len = big_let_end - big_let_start + 1 # adds the shift from big letters to small
alph_shift = remap_big_let - big_let_start # adds the shift from remapped to non-remapped
with dpg.font_registry():
with dpg.font("fonts/Exo2.ttf", 24) as default_font:
dpg.add_font_range_hint(dpg.mvFontRangeHint_Default)
dpg.add_font_range_hint(dpg.mvFontRangeHint_Cyrillic)
biglet = remap_big_let # Starting number for remapped cyrillic alphabet
for i1 in range(big_let_start, big_let_end + 1): # Cycle through big letters in cyrillic alphabet
dpg.add_char_remap(i1, biglet) # Remap the big cyrillic letter
dpg.add_char_remap(i1 + alph_len, biglet + alph_len) # Remap the small cyrillic letter
biglet += 1 # choose next letter
dpg.bind_font(default_font)
create_main_window(db)
dpg.bind_font(default_font)
# Увеличены минимальные размеры окна
dpg.create_viewport(
title="Security Scanner",
min_width=1600,
min_height=900,
)
dpg.setup_dearpygui()
dpg.show_viewport()
# Растягиваем главное окно на весь viewport
dpg.set_primary_window("main_window", True)
dpg.start_dearpygui()
dpg.destroy_context()
logger.info("Application closed")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,92 @@
import dearpygui.dearpygui as dpg
from src.ui.scanner_window import create_scanner_window
from src.core.models.models import Session
from src.utils.logger import get_logger
logger = get_logger("config_window")
def create_configuration_window(db, session: Session):
"""Окно конфигурации сессии"""
with dpg.window(
tag="config_window",
label=f"Session: {session.name}",
no_collapse=True,
no_resize=True,
no_close=True,
no_title_bar=True,
no_scrollbar=True,
no_move=True,
no_scroll_with_mouse=True
):
# Главная горизонтальная группа для разделения на панели
with dpg.group(horizontal=True):
# Боковая панель навигации
with dpg.child_window(
tag="sidebar",
width=270,
height=-1,
border=False
):
dpg.add_text("Меню", color=(200, 200, 200))
dpg.add_separator()
dpg.add_button(
label="Сканирование",
callback=lambda: show_scanner_window(db, session),
width=-1,
height=50
)
dpg.add_spacer(height=10)
dpg.add_button(
label="Эксплуатация",
callback=lambda: logger.info("Attacks selected"),
width=-1,
height=50
)
dpg.add_spacer(height=10)
dpg.add_button(
label="Генерация отчета",
callback=lambda: logger.info("Reporting selected"),
width=-1,
height=50
)
dpg.add_spacer(height=10)
dpg.add_button(
label="Лог приложения",
callback=lambda: logger.info("Log selected"),
width=-1,
height=50
)
dpg.add_spacer(height=30)
dpg.add_text(f"Сессия: {session.name}", color=(150, 150, 150))
created_at = session.created_at.strftime(
"""%Y-%m-%d
%H:%M"""
) if session.created_at else "Unknown"
dpg.add_text(f"Дата создания: {created_at}", color=(150, 150, 150))
# Основная область контента (контейнер)
with dpg.child_window(
tag="content_area",
width=-1,
height=-1,
border=False
):
# Начальное содержимое
dpg.add_text("Выберите подмодуль", tag="initial_content")
# Установить окно конфигурации как основное
dpg.set_primary_window("config_window", True)
def show_scanner_window(db, session):
"""Показать окно сканирования"""
# Очистить контентную область
if dpg.does_item_exist("content_area"):
dpg.delete_item("content_area", children_only=True)
# Создать окно сканирования внутри контентной области
create_scanner_window(db, session, parent="content_area")

75
src/ui/main_window.py Normal file
View File

@ -0,0 +1,75 @@
import dearpygui.dearpygui as dpg
from src.ui.session_window import create_session_window
from src.core.database.managers.session_manager import SessionManager
from src.utils.logger import get_logger
logger = get_logger("main_window")
def create_main_window(db):
"""Создание главного окна приложения"""
with dpg.window(
tag="main_window",
label="Security Scanner",
no_collapse=True,
no_resize=True,
no_close=True,
no_title_bar=True,
no_move=True,
): # Убраны фиксированные размеры
# Заголовок с увеличенным шрифтом
dpg.add_text("Модуль тестирования безопасности", color=(255, 255, 255), pos=[600,0])
dpg.add_spacing(count=10)
dpg.add_separator()
# Увеличенные размеры кнопки
dpg.add_button(
label="Создать новую сессию",
callback=lambda: create_session_window(db),
width=300, # Увеличено с 200
height=50 # Увеличено с 40
)
dpg.add_spacer(height=30) # Увеличен отступ
# Список сессий с увеличенными размерами
dpg.add_text("Существующие сессии:")
session_list = dpg.add_listbox(
tag="session_list",
width=1600, # Растягиваем на всю ширину окна
num_items=15, # Увеличено количество отображаемых элементов
# height=500 # Увеличена высота
callback=lambda s: open_session(s, db)
)
load_sessions(db, session_list)
def load_sessions(db, session_list):
"""Загрузка списка сессий"""
session_manager = SessionManager(db)
sessions = session_manager.get_all_sessions()
session_items = []
for session in sessions:
date_str = session.created_at.strftime("%Y-%m-%d %H:%M") if session.created_at else "Unknown"
session_items.append(f"ID:{session.id} - {session.name or 'Unnamed Session'} - Created at:{date_str}")
dpg.configure_item(session_list, items=session_items)
logger.info(f"Loaded {len(sessions)} sessions")
def open_session(sender, db):
"""Открытие выбранной сессии"""
session_manager = SessionManager(db)
sessions = session_manager.get_all_sessions()
if dpg.get_value(sender) and sessions:
session_id = int(dpg.get_value(sender).split()[0][3:])
print(session_id)
print(len(sessions))
if session_id <= len(sessions):
session = sessions[session_id-1]
logger.info(f"Opening session: {session.name} (ID: {session.id})")
# Закрыть текущее окно и открыть окно конфигурации
dpg.delete_item("main_window")
from src.ui.configuration_window import create_configuration_window
create_configuration_window(db, session)

333
src/ui/scanner_window.py Normal file
View File

@ -0,0 +1,333 @@
import dearpygui.dearpygui as dpg
from src.core.models.models import Session, Scan, Host
from src.core.database.managers.scanner_manager import ScannerManager
from src.utils.logger import get_logger
import threading
import time
logger = get_logger("scanner_window")
def create_scanner_window(db, session: Session, parent=None):
"""Окно сканирования (с поддержкой родительского контейнера)"""
scanner_service = ScannerManager(db)
# Если указан родитель, создаем внутри него
if parent:
print(parent)
with dpg.child_window(
parent=parent,
tag="scanner_window",
width=-1,
height=-1
):
build_scanner_content(scanner_service, session)
else:
with dpg.child_window(
tag="scanner_window",
width=-1,
height=-1
):
build_scanner_content(scanner_service, session)
def build_scanner_content(scanner_service, session):
"""Построение содержимого окна сканирования"""
# Заголовок
dpg.add_text("Сканирование", color=(255, 255, 255))
dpg.add_separator()
dpg.add_spacer(height=10)
# Вкладки
with dpg.tab_bar(tag="scan_tabs"):
# Вкладка Nmap
with dpg.tab(tag="nmap_tab", label="Nmap Scanning"):
create_nmap_tab(scanner_service, session)
# Вкладка OpenVAS
with dpg.tab(tag="openvas_tab", label="OpenVAS Scanning"):
dpg.add_text("OpenVAS scanning will be implemented here")
# Вкладка Modbus
with dpg.tab(tag="modbus_tab", label="Modbus Scanning"):
create_modbus_tab(scanner_service, session)
# Вкладка результатов
with dpg.tab(tag="results_tab", label="Scan Results"):
create_results_tab(scanner_service, session)
def create_nmap_tab(scanner_service, session):
"""Вкладка Nmap сканирования"""
with dpg.group():
# Конфигурация сканирования
with dpg.group(tag="nmap_config_group"):
dpg.add_text("Scan Configuration")
dpg.add_text("Target:")
target_input = dpg.add_input_text(tag="nmap_target", width=300)
dpg.add_text("Scan Type:")
scan_type = dpg.add_combo(
tag="nmap_scan_type",
items=["Quick Scan", "Intensive Scan", "Full Scan", "Custom Scan"],
default_value="Quick Scan",
width=200
)
dpg.add_text("Custom Arguments:")
custom_args = dpg.add_input_text(tag="nmap_custom_args", width=300)
# Кнопка запуска
dpg.add_button(
label="Start Nmap Scan",
callback=lambda: start_nmap_scan(
scanner_service,
session,
dpg.get_value(target_input),
dpg.get_value(scan_type),
dpg.get_value(custom_args)
),
width=200,
height=40 # Увеличена высота
)
dpg.add_spacer(height=10)
# Область вывода
dpg.add_text("Output:")
output_area = dpg.add_input_text(
tag="nmap_output",
multiline=True,
height=300,
width=-1, # Растянуть на всю ширину
readonly=True
)
def create_modbus_tab(scanner_service, session):
"""Вкладка Modbus сканирования"""
with dpg.group():
# Конфигурация сканирования
with dpg.group(tag="modbus_config_group"):
dpg.add_text("Modbus Scan Configuration")
dpg.add_text("Target IP:")
target_input = dpg.add_input_text(tag="modbus_target", width=300)
dpg.add_text("Port:")
port_input = dpg.add_input_text(tag="modbus_port", default_value="502", width=100)
# Кнопка запуска
dpg.add_button(
label="Start Modbus Scan",
callback=lambda: start_modbus_scan(
scanner_service,
session,
dpg.get_value(target_input),
dpg.get_value(port_input)
),
width=200,
height=40 # Увеличена высота
)
dpg.add_spacer(height=10)
# Область вывода
dpg.add_text("Output:")
output_area = dpg.add_input_text(
tag="modbus_output",
multiline=True,
height=300,
width=-1, # Растянуть на всю ширину
readonly=True
)
def create_results_tab(scanner_service, session):
"""Вкладка результатов сканирования"""
with dpg.group():
# Список сканирований
dpg.add_text("Scan History:")
scan_list = dpg.add_listbox(
tag="scan_list",
width=-1, # Растянуть на всю ширину
num_items=10,
callback=lambda s: show_scan_details(scanner_service, dpg.get_value(s))
)
# Загрузка истории
load_scan_history(scanner_service, session, scan_list)
dpg.add_spacer(height=10)
# Детали сканирования
dpg.add_text("Scan Details:")
scan_details = dpg.add_input_text(
tag="scan_details",
multiline=True,
height=300,
width=-1, # Растянуть на всю ширину
readonly=True
)
def start_nmap_scan(scanner_service, session, target, scan_type, custom_args):
"""Запуск Nmap сканирования"""
if not target:
logger.warning("Nmap scan started without target")
return
# Обновить вывод
dpg.set_value("nmap_output", f"Starting {scan_type} scan on {target}...\n")
# Создать объект сканирования
scan = Scan(
session_id=session.id,
tool="nmap",
args=f"Type: {scan_type}, Args: {custom_args}",
summary="In progress"
)
# Сохранить в базу
saved_scan = scanner_service.save_scan(scan)
if saved_scan:
dpg.set_value("nmap_output", dpg.get_value("nmap_output") + f"Scan ID: {saved_scan.id}\n")
# Запуск в отдельном потоке
thread = threading.Thread(
target=run_nmap_scan,
args=(scanner_service, session, target, saved_scan.id)
)
thread.start()
def run_nmap_scan(scanner_service, session, target, scan_id):
"""Выполнение Nmap сканирования (в потоке)"""
try:
# Имитация сканирования
output = dpg.get_value("nmap_output") + "Scanning network...\n"
dpg.set_value("nmap_output", output)
time.sleep(2)
# Имитация результатов
hosts = [
{"ip": "192.168.1.100", "mac": "00:11:22:33:44:55", "os": "Linux"},
{"ip": "192.168.1.101", "mac": "AA:BB:CC:DD:EE:FF", "os": "Windows"}
]
# Сохранить результаты
for host in hosts:
host_obj = Host(
scan_id=scan_id,
ip=host["ip"],
mac=host["mac"],
os=host["os"]
)
scanner_service.save_host(scan_id, host_obj)
output = dpg.get_value("nmap_output") + f"Found host: {host['ip']} ({host['os']})\n"
dpg.set_value("nmap_output", output)
# Обновить статус
scan = Scan(
id=scan_id,
summary=f"Found {len(hosts)} hosts",
duration=2
)
scanner_service.update_scan(scan)
output = dpg.get_value("nmap_output") + "Scan completed!\n"
dpg.set_value("nmap_output", output)
# Обновить список сканирований
if dpg.does_item_exist("scan_list"):
load_scan_history(scanner_service, session, "scan_list")
except Exception as e:
output = dpg.get_value("nmap_output") + f"Error: {str(e)}\n"
dpg.set_value("nmap_output", output)
logger.error(f"Nmap scan failed: {e}")
def start_modbus_scan(scanner_service, session, target, port):
"""Запуск Modbus сканирования"""
if not target:
logger.warning("Modbus scan started without target")
return
# Обновить вывод
dpg.set_value("modbus_output", f"Starting Modbus scan on {target}:{port}...\n")
# Создать объект сканирования
scan = Scan(
session_id=session.id,
tool="modbus",
args=f"Target: {target}, Port: {port}",
summary="In progress"
)
# Сохранить в базу
saved_scan = scanner_service.save_scan(scan)
if saved_scan:
dpg.set_value("modbus_output", dpg.get_value("modbus_output") + f"Scan ID: {saved_scan.id}\n")
# Запуск в отдельном потоке
thread = threading.Thread(
target=run_modbus_scan,
args=(scanner_service, session, target, int(port), saved_scan.id)
)
thread.start()
def run_modbus_scan(scanner_service, session, target, port, scan_id):
"""Выполнение Modbus сканирования (в потоке)"""
try:
# Имитация сканирования
output = dpg.get_value("modbus_output") + "Connecting to device...\n"
dpg.set_value("modbus_output", output)
time.sleep(1)
# Имитация результатов
results = {
"active_coils": "0, 1, 5, 7",
"active_discrete_inputs": "2, 3",
"active_holding_registers": "100, 101, 105",
"active_input_registers": "200, 201"
}
# Сохранить результаты
scanner_service.save_modbus_result(scan_id, results)
# Обновить статус
scan = Scan(
id=scan_id,
summary="Modbus scan completed",
duration=1
)
scanner_service.update_scan(scan)
output = dpg.get_value("modbus_output") + "Scan completed!\n"
output += f"Active coils: {results['active_coils']}\n"
output += f"Active holding registers: {results['active_holding_registers']}\n"
dpg.set_value("modbus_output", output)
# Обновить список сканирований
if dpg.does_item_exist("scan_list"):
load_scan_history(scanner_service, session, "scan_list")
except Exception as e:
output = dpg.get_value("modbus_output") + f"Error: {str(e)}\n"
dpg.set_value("modbus_output", output)
logger.error(f"Modbus scan failed: {e}")
def load_scan_history(scanner_service, session, scan_list):
"""Загрузка истории сканирований"""
scans = scanner_service.get_scans_by_session(session.id)
scan_items = []
for scan in scans:
item_text = f"{scan.tool} scan - {scan.created_at.strftime('%Y-%m-%d %H:%M')}"
if scan.summary:
item_text += f" ({scan.summary})"
scan_items.append(item_text)
dpg.configure_item(scan_list, items=scan_items)
def show_scan_details(scanner_service, scan_index):
"""Отображение деталей сканирования"""
if scan_index is None:
return
# Получить все сканирования
# (В реальности нужно хранить текущую сессию или передавать ее)
# Для демо: просто покажем заглушку
details = f"Scan details for index {scan_index}\n"
dpg.set_value("scan_details", details)

74
src/ui/session_window.py Normal file
View File

@ -0,0 +1,74 @@
import dearpygui.dearpygui as dpg
from src.core.database.managers.session_manager import SessionManager
from src.utils.logger import get_logger
logger = get_logger("session_window")
def create_session_window(db):
"""Окно создания новой сессии"""
with dpg.window(
tag="session_window",
label="New Session",
no_collapse=True,
no_resize=True,
no_close=True,
no_title_bar=True,
width=600, # Увеличена ширина окна
height=300, # Увеличена высота окна
modal=True, # Добавлен модальный режим
):
dpg.add_text("Создание новое сессии")
dpg.add_separator()
dpg.add_text("Имя сессии:")
# Увеличен размер поля ввода
session_name = dpg.add_input_text(tag="session_name", width=500)
dpg.add_spacer(height=20)
# Увеличены размеры кнопок
with dpg.group(horizontal=True):
dpg.add_button(
label="Создать",
callback=lambda: create_session(db, session_name),
width=150, # Увеличено с 100
height=40 # Увеличена высота
)
dpg.add_spacer(width=20)
dpg.add_button(
label="Отмена",
callback=lambda: dpg.delete_item("session_window"),
width=150, # Увеличено с 100
height=40 # Увеличена высота
)
# Центрирование окна после создания
dpg.split_frame()
viewport_width = dpg.get_viewport_width()
viewport_height = dpg.get_viewport_height()
win_width = dpg.get_item_width("session_window")
win_height = dpg.get_item_height("session_window")
dpg.set_item_pos(
"session_window",
[(viewport_width - win_width) // 2, (viewport_height - win_height) // 2]
)
def create_session(db, session_name):
"""Создание новой сессии"""
name = dpg.get_value(session_name)
if not name:
logger.warning("Attempt to create session without name")
return
session_manager = SessionManager(db)
session = session_manager.create_session(name)
if session:
logger.info(f"Created new session: {name}")
dpg.delete_item("session_window")
# Обновить список сессий в главном окне
if dpg.does_item_exist("session_list"):
from src.ui.main_window import load_sessions
load_sessions(db, "session_list")

42
src/utils/window_utils.py Normal file
View File

@ -0,0 +1,42 @@
# src/utils/window_utils.py
from PyQt5.QtWidgets import QDialog, QMessageBox, QInputDialog
from PyQt5.QtCore import Qt
def show_floating_dialog(parent, title, message, icon=QMessageBox.Information):
"""Показывает диалоговое окно как плавающее поверх всех окон"""
dialog = QMessageBox(parent)
dialog.setWindowTitle(title)
dialog.setText(message)
dialog.setIcon(icon)
# Устанавливаем флаги для плавающего окна
dialog.setWindowFlags(
Qt.Window |
Qt.CustomizeWindowHint |
Qt.WindowTitleHint |
Qt.WindowStaysOnTopHint
)
# Отключаем тайлинг для этого окна
dialog.setProperty("hyprland_floating", True)
return dialog.exec_()
def create_floating_input(parent, title, label):
"""Создает плавающее окно ввода"""
dialog = QInputDialog(parent)
dialog.setWindowTitle(title)
dialog.setLabelText(label)
# Устанавливаем флаги для плавающего окна
dialog.setWindowFlags(
Qt.Window |
Qt.CustomizeWindowHint |
Qt.WindowTitleHint |
Qt.WindowStaysOnTopHint
)
# Отключаем тайлинг для этого окна
dialog.setProperty("hyprland_floating", True)
return dialog