1091 lines
49 KiB
Python
1091 lines
49 KiB
Python
"""
|
|
Database models and operations for DockMon
|
|
Uses SQLite for persistent storage of configuration and settings
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
from sqlalchemy import create_engine, Column, String, Integer, Boolean, DateTime, JSON, ForeignKey, Text, UniqueConstraint, text
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, Session, relationship
|
|
from sqlalchemy.pool import StaticPool
|
|
import json
|
|
import os
|
|
import logging
|
|
import secrets
|
|
import bcrypt
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
Base = declarative_base()
|
|
|
|
class User(Base):
|
|
"""User authentication and settings"""
|
|
__tablename__ = "users"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
username = Column(String, nullable=False, unique=True)
|
|
password_hash = Column(String, nullable=False)
|
|
is_first_login = Column(Boolean, default=True)
|
|
must_change_password = Column(Boolean, default=False)
|
|
dashboard_layout = Column(Text, nullable=True) # JSON string of GridStack layout
|
|
event_sort_order = Column(String, default='desc') # 'desc' (newest first) or 'asc' (oldest first)
|
|
container_sort_order = Column(String, default='name-asc') # Container sort preference on dashboard
|
|
modal_preferences = Column(Text, nullable=True) # JSON string of modal size/position preferences
|
|
created_at = Column(DateTime, default=datetime.now)
|
|
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
|
|
last_login = Column(DateTime, nullable=True)
|
|
|
|
class DockerHostDB(Base):
|
|
"""Docker host configuration"""
|
|
__tablename__ = "docker_hosts"
|
|
|
|
id = Column(String, primary_key=True)
|
|
name = Column(String, nullable=False, unique=True)
|
|
url = Column(String, nullable=False)
|
|
tls_cert = Column(Text, nullable=True)
|
|
tls_key = Column(Text, nullable=True)
|
|
tls_ca = Column(Text, nullable=True)
|
|
security_status = Column(String, nullable=True) # 'secure', 'insecure', 'unknown'
|
|
is_active = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=datetime.now)
|
|
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
|
|
|
|
# Relationships
|
|
auto_restart_configs = relationship("AutoRestartConfig", back_populates="host", cascade="all, delete-orphan")
|
|
|
|
class AutoRestartConfig(Base):
|
|
"""Auto-restart configuration for containers"""
|
|
__tablename__ = "auto_restart_configs"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
host_id = Column(String, ForeignKey("docker_hosts.id"))
|
|
container_id = Column(String, nullable=False)
|
|
container_name = Column(String, nullable=False)
|
|
enabled = Column(Boolean, default=True)
|
|
max_retries = Column(Integer, default=3)
|
|
retry_delay = Column(Integer, default=30)
|
|
restart_count = Column(Integer, default=0)
|
|
last_restart = Column(DateTime, nullable=True)
|
|
created_at = Column(DateTime, default=datetime.now)
|
|
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
|
|
|
|
# Relationships
|
|
host = relationship("DockerHostDB", back_populates="auto_restart_configs")
|
|
|
|
|
|
class GlobalSettings(Base):
|
|
"""Global application settings"""
|
|
__tablename__ = "global_settings"
|
|
|
|
id = Column(Integer, primary_key=True, default=1)
|
|
max_retries = Column(Integer, default=3)
|
|
retry_delay = Column(Integer, default=30)
|
|
default_auto_restart = Column(Boolean, default=False)
|
|
polling_interval = Column(Integer, default=2)
|
|
connection_timeout = Column(Integer, default=10)
|
|
log_retention_days = Column(Integer, default=7)
|
|
event_retention_days = Column(Integer, default=30) # Keep events for 30 days
|
|
enable_notifications = Column(Boolean, default=True)
|
|
auto_cleanup_events = Column(Boolean, default=True) # Auto cleanup old events
|
|
alert_template = Column(Text, nullable=True) # Global notification template
|
|
blackout_windows = Column(JSON, nullable=True) # Array of blackout time windows
|
|
first_run_complete = Column(Boolean, default=False) # Track if first run setup is complete
|
|
polling_interval_migrated = Column(Boolean, default=False) # Track if polling interval has been migrated to 2s
|
|
timezone_offset = Column(Integer, default=0) # Timezone offset in minutes from UTC
|
|
show_host_stats = Column(Boolean, default=True) # Show host statistics graphs on dashboard
|
|
show_container_stats = Column(Boolean, default=True) # Show container statistics on dashboard
|
|
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
|
|
|
|
class NotificationChannel(Base):
|
|
"""Notification channel configuration"""
|
|
__tablename__ = "notification_channels"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
name = Column(String, nullable=False, unique=True)
|
|
type = Column(String, nullable=False) # telegram, discord, slack, pushover
|
|
config = Column(JSON, nullable=False) # Channel-specific configuration
|
|
enabled = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=datetime.now)
|
|
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
|
|
|
|
class AlertRuleDB(Base):
|
|
"""Alert rules for container state changes"""
|
|
__tablename__ = "alert_rules"
|
|
|
|
id = Column(String, primary_key=True)
|
|
name = Column(String, nullable=False)
|
|
trigger_events = Column(JSON, nullable=True) # list of Docker events that trigger alert
|
|
trigger_states = Column(JSON, nullable=True) # list of states that trigger alert
|
|
notification_channels = Column(JSON, nullable=False) # list of channel IDs
|
|
cooldown_minutes = Column(Integer, default=15) # prevent spam
|
|
enabled = Column(Boolean, default=True)
|
|
last_triggered = Column(DateTime, nullable=True)
|
|
created_at = Column(DateTime, default=datetime.now)
|
|
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
|
|
|
|
# Relationships
|
|
containers = relationship("AlertRuleContainer", back_populates="alert_rule", cascade="all, delete-orphan")
|
|
|
|
class AlertRuleContainer(Base):
|
|
"""Container+Host pairs for alert rules"""
|
|
__tablename__ = "alert_rule_containers"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
alert_rule_id = Column(String, ForeignKey("alert_rules.id", ondelete="CASCADE"), nullable=False)
|
|
host_id = Column(String, ForeignKey("docker_hosts.id", ondelete="CASCADE"), nullable=False)
|
|
container_name = Column(String, nullable=False)
|
|
created_at = Column(DateTime, default=datetime.now)
|
|
|
|
# Relationships
|
|
alert_rule = relationship("AlertRuleDB", back_populates="containers")
|
|
host = relationship("DockerHostDB")
|
|
|
|
# Unique constraint to prevent duplicates
|
|
__table_args__ = (
|
|
UniqueConstraint('alert_rule_id', 'host_id', 'container_name', name='_alert_container_uc'),
|
|
)
|
|
|
|
class EventLog(Base):
|
|
"""Comprehensive event logging for all DockMon activities"""
|
|
__tablename__ = "event_logs"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
correlation_id = Column(String, nullable=True) # For linking related events
|
|
|
|
# Event categorization
|
|
category = Column(String, nullable=False) # container, host, system, alert, notification
|
|
event_type = Column(String, nullable=False) # state_change, action_taken, error, etc.
|
|
severity = Column(String, nullable=False, default='info') # debug, info, warning, error, critical
|
|
|
|
# Target information
|
|
host_id = Column(String, nullable=True)
|
|
host_name = Column(String, nullable=True)
|
|
container_id = Column(String, nullable=True)
|
|
container_name = Column(String, nullable=True)
|
|
|
|
# Event details
|
|
title = Column(String, nullable=False) # Short description
|
|
message = Column(Text, nullable=True) # Detailed description
|
|
old_state = Column(String, nullable=True)
|
|
new_state = Column(String, nullable=True)
|
|
triggered_by = Column(String, nullable=True) # user, system, auto_restart, alert
|
|
|
|
# Additional data
|
|
details = Column(JSON, nullable=True) # Structured additional data
|
|
duration_ms = Column(Integer, nullable=True) # For performance tracking
|
|
|
|
# Timestamps
|
|
timestamp = Column(DateTime, default=datetime.now, nullable=False)
|
|
|
|
# Index for efficient queries
|
|
__table_args__ = (
|
|
{"sqlite_autoincrement": True},
|
|
)
|
|
|
|
class DatabaseManager:
|
|
"""Database management and operations"""
|
|
|
|
def __init__(self, db_path: str = "data/dockmon.db"):
|
|
"""Initialize database connection"""
|
|
self.db_path = db_path
|
|
|
|
# Ensure data directory exists
|
|
data_dir = os.path.dirname(db_path)
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
|
|
# Set secure permissions on data directory (rwx for owner only)
|
|
try:
|
|
os.chmod(data_dir, 0o700)
|
|
logger.info(f"Set secure permissions (700) on data directory: {data_dir}")
|
|
except OSError as e:
|
|
logger.warning(f"Could not set permissions on data directory {data_dir}: {e}")
|
|
|
|
# Create engine with connection pooling
|
|
self.engine = create_engine(
|
|
f"sqlite:///{db_path}",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
echo=False
|
|
)
|
|
|
|
# Create session factory
|
|
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
|
|
|
|
# Create tables if they don't exist
|
|
Base.metadata.create_all(bind=self.engine)
|
|
|
|
# Run database migrations
|
|
self._run_migrations()
|
|
|
|
# Set secure permissions on database file (rw for owner only)
|
|
self._secure_database_file()
|
|
|
|
# Initialize default settings if needed
|
|
self._initialize_defaults()
|
|
|
|
def _run_migrations(self):
|
|
"""Run database migrations for schema updates"""
|
|
try:
|
|
with self.get_session() as session:
|
|
# Migration: Populate security_status for existing hosts
|
|
hosts_without_security_status = session.query(DockerHostDB).filter(
|
|
DockerHostDB.security_status.is_(None)
|
|
).all()
|
|
|
|
for host in hosts_without_security_status:
|
|
# Determine security status based on existing data
|
|
if host.url and not host.url.startswith('unix://'):
|
|
if host.tls_cert and host.tls_key:
|
|
host.security_status = 'secure'
|
|
else:
|
|
host.security_status = 'insecure'
|
|
# Unix socket connections don't need security status
|
|
|
|
if hosts_without_security_status:
|
|
session.commit()
|
|
print(f"Migrated {len(hosts_without_security_status)} hosts with security status")
|
|
|
|
# Migration: Add event_sort_order column to users table if it doesn't exist
|
|
inspector = session.connection().engine.dialect.get_columns(session.connection(), 'users')
|
|
column_names = [col.get('name', '') for col in inspector if 'name' in col]
|
|
|
|
if 'event_sort_order' not in column_names:
|
|
# Add the column using raw SQL
|
|
session.execute(text("ALTER TABLE users ADD COLUMN event_sort_order VARCHAR DEFAULT 'desc'"))
|
|
session.commit()
|
|
print("Added event_sort_order column to users table")
|
|
|
|
# Migration: Add container_sort_order column to users table if it doesn't exist
|
|
if 'container_sort_order' not in column_names:
|
|
# Add the column using raw SQL
|
|
session.execute(text("ALTER TABLE users ADD COLUMN container_sort_order VARCHAR DEFAULT 'name-asc'"))
|
|
session.commit()
|
|
print("Added container_sort_order column to users table")
|
|
|
|
# Migration: Add modal_preferences column to users table if it doesn't exist
|
|
if 'modal_preferences' not in column_names:
|
|
# Add the column using raw SQL
|
|
session.execute(text("ALTER TABLE users ADD COLUMN modal_preferences TEXT"))
|
|
session.commit()
|
|
print("Added modal_preferences column to users table")
|
|
|
|
# Migration: Add show_host_stats and show_container_stats columns to global_settings table
|
|
settings_inspector = session.connection().engine.dialect.get_columns(session.connection(), 'global_settings')
|
|
settings_column_names = [col['name'] for col in settings_inspector]
|
|
|
|
if 'show_host_stats' not in settings_column_names:
|
|
session.execute(text("ALTER TABLE global_settings ADD COLUMN show_host_stats BOOLEAN DEFAULT 1"))
|
|
session.commit()
|
|
print("Added show_host_stats column to global_settings table")
|
|
|
|
if 'show_container_stats' not in settings_column_names:
|
|
session.execute(text("ALTER TABLE global_settings ADD COLUMN show_container_stats BOOLEAN DEFAULT 1"))
|
|
session.commit()
|
|
print("Added show_container_stats column to global_settings table")
|
|
|
|
# Migration: Drop deprecated container_history table
|
|
# This table has been replaced by the EventLog table
|
|
inspector_result = session.connection().engine.dialect.get_table_names(session.connection())
|
|
if 'container_history' in inspector_result:
|
|
session.execute(text("DROP TABLE container_history"))
|
|
session.commit()
|
|
print("Dropped deprecated container_history table (replaced by EventLog)")
|
|
|
|
# Migration: Add polling_interval_migrated column if it doesn't exist
|
|
if 'polling_interval_migrated' not in settings_column_names:
|
|
session.execute(text("ALTER TABLE global_settings ADD COLUMN polling_interval_migrated BOOLEAN DEFAULT 0"))
|
|
session.commit()
|
|
print("Added polling_interval_migrated column to global_settings table")
|
|
|
|
# Migration: Update polling_interval to 2 seconds (only once, on first startup after this update)
|
|
settings = session.query(GlobalSettings).first()
|
|
if settings and not settings.polling_interval_migrated:
|
|
# Only update if the user hasn't customized it (still at old default of 5 or 10)
|
|
if settings.polling_interval >= 5:
|
|
settings.polling_interval = 2
|
|
settings.polling_interval_migrated = True
|
|
session.commit()
|
|
print("Migrated polling_interval to 2 seconds (from previous default)")
|
|
else:
|
|
# User has already customized to something < 5, just mark as migrated
|
|
settings.polling_interval_migrated = True
|
|
session.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Migration warning: {e}")
|
|
# Don't fail startup on migration errors
|
|
|
|
def _secure_database_file(self):
|
|
"""Set secure file permissions on the SQLite database file"""
|
|
try:
|
|
if os.path.exists(self.db_path):
|
|
# Set file permissions to 600 (read/write for owner only)
|
|
os.chmod(self.db_path, 0o600)
|
|
logger.info(f"Set secure permissions (600) on database file: {self.db_path}")
|
|
else:
|
|
# File doesn't exist yet - will be created by SQLAlchemy
|
|
# Schedule permission setting for after first connection
|
|
self._schedule_file_permissions()
|
|
except OSError as e:
|
|
logger.warning(f"Could not set permissions on database file {self.db_path}: {e}")
|
|
|
|
def _schedule_file_permissions(self):
|
|
"""Schedule file permission setting for after database file is created"""
|
|
# Create a connection to ensure the file exists
|
|
with self.engine.connect() as conn:
|
|
pass
|
|
|
|
# Now set permissions
|
|
try:
|
|
if os.path.exists(self.db_path):
|
|
os.chmod(self.db_path, 0o600)
|
|
logger.info(f"Set secure permissions (600) on newly created database file: {self.db_path}")
|
|
except OSError as e:
|
|
logger.warning(f"Could not set permissions on newly created database file {self.db_path}: {e}")
|
|
|
|
def _initialize_defaults(self):
|
|
"""Initialize default settings if they don't exist"""
|
|
with self.get_session() as session:
|
|
# Check if global settings exist
|
|
settings = session.query(GlobalSettings).first()
|
|
if not settings:
|
|
settings = GlobalSettings()
|
|
session.add(settings)
|
|
session.commit()
|
|
|
|
def get_session(self) -> Session:
|
|
"""Get a database session"""
|
|
return self.SessionLocal()
|
|
|
|
# Docker Host Operations
|
|
def add_host(self, host_data: dict) -> DockerHostDB:
|
|
"""Add a new Docker host"""
|
|
with self.get_session() as session:
|
|
try:
|
|
host = DockerHostDB(**host_data)
|
|
session.add(host)
|
|
session.commit()
|
|
session.refresh(host)
|
|
logger.info(f"Added host {host.name} ({host.id[:8]}) to database")
|
|
return host
|
|
except Exception as e:
|
|
logger.error(f"Failed to add host to database: {e}")
|
|
raise
|
|
|
|
def get_hosts(self, active_only: bool = True) -> List[DockerHostDB]:
|
|
"""Get all Docker hosts ordered by creation time"""
|
|
with self.get_session() as session:
|
|
query = session.query(DockerHostDB)
|
|
if active_only:
|
|
query = query.filter(DockerHostDB.is_active == True)
|
|
# Order by created_at to ensure consistent ordering (oldest first)
|
|
query = query.order_by(DockerHostDB.created_at)
|
|
return query.all()
|
|
|
|
def get_host(self, host_id: str) -> Optional[DockerHostDB]:
|
|
"""Get a specific Docker host"""
|
|
with self.get_session() as session:
|
|
return session.query(DockerHostDB).filter(DockerHostDB.id == host_id).first()
|
|
|
|
def update_host(self, host_id: str, updates: dict) -> Optional[DockerHostDB]:
|
|
"""Update a Docker host"""
|
|
with self.get_session() as session:
|
|
try:
|
|
host = session.query(DockerHostDB).filter(DockerHostDB.id == host_id).first()
|
|
if host:
|
|
for key, value in updates.items():
|
|
setattr(host, key, value)
|
|
host.updated_at = datetime.now()
|
|
session.commit()
|
|
session.refresh(host)
|
|
logger.info(f"Updated host {host.name} ({host_id[:8]}) in database")
|
|
return host
|
|
except Exception as e:
|
|
logger.error(f"Failed to update host {host_id[:8]} in database: {e}")
|
|
raise
|
|
|
|
def delete_host(self, host_id: str) -> bool:
|
|
"""Delete a Docker host and clean up related alert rules"""
|
|
with self.get_session() as session:
|
|
try:
|
|
host = session.query(DockerHostDB).filter(DockerHostDB.id == host_id).first()
|
|
if not host:
|
|
logger.warning(f"Attempted to delete non-existent host {host_id[:8]}")
|
|
return False
|
|
|
|
host_name = host.name
|
|
|
|
# Get all alert rules
|
|
all_rules = session.query(AlertRuleDB).all()
|
|
|
|
# Process each alert rule to remove containers from the deleted host
|
|
for rule in all_rules:
|
|
if not rule.containers:
|
|
continue
|
|
|
|
# Filter out containers from the deleted host
|
|
# rule.containers is a list of AlertRuleContainer objects
|
|
remaining_containers = [
|
|
c for c in rule.containers
|
|
if c.host_id != host_id
|
|
]
|
|
|
|
if not remaining_containers:
|
|
# No containers left, delete the entire alert
|
|
session.delete(rule)
|
|
logger.info(f"Deleted alert rule '{rule.name}' (all containers were on deleted host {host_id})")
|
|
elif len(remaining_containers) < len(rule.containers):
|
|
# Some containers remain, update the alert
|
|
rule.containers = remaining_containers
|
|
rule.updated_at = datetime.now()
|
|
logger.info(f"Updated alert rule '{rule.name}' (removed containers from host {host_id})")
|
|
|
|
# Delete the host
|
|
session.delete(host)
|
|
session.commit()
|
|
logger.info(f"Deleted host {host_name} ({host_id[:8]}) from database")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete host {host_id[:8]} from database: {e}")
|
|
raise
|
|
|
|
# Auto-Restart Configuration
|
|
def get_auto_restart_config(self, host_id: str, container_id: str) -> Optional[AutoRestartConfig]:
|
|
"""Get auto-restart configuration for a container"""
|
|
with self.get_session() as session:
|
|
return session.query(AutoRestartConfig).filter(
|
|
AutoRestartConfig.host_id == host_id,
|
|
AutoRestartConfig.container_id == container_id
|
|
).first()
|
|
|
|
def set_auto_restart(self, host_id: str, container_id: str, container_name: str, enabled: bool):
|
|
"""Set auto-restart configuration for a container"""
|
|
with self.get_session() as session:
|
|
try:
|
|
config = session.query(AutoRestartConfig).filter(
|
|
AutoRestartConfig.host_id == host_id,
|
|
AutoRestartConfig.container_id == container_id
|
|
).first()
|
|
|
|
if config:
|
|
config.enabled = enabled
|
|
config.updated_at = datetime.now()
|
|
if not enabled:
|
|
config.restart_count = 0
|
|
logger.info(f"Updated auto-restart for {container_name} ({container_id[:12]}): enabled={enabled}")
|
|
else:
|
|
config = AutoRestartConfig(
|
|
host_id=host_id,
|
|
container_id=container_id,
|
|
container_name=container_name,
|
|
enabled=enabled
|
|
)
|
|
session.add(config)
|
|
logger.info(f"Created auto-restart config for {container_name} ({container_id[:12]}): enabled={enabled}")
|
|
|
|
session.commit()
|
|
except Exception as e:
|
|
logger.error(f"Failed to set auto-restart for {container_id[:12]}: {e}")
|
|
raise
|
|
|
|
def increment_restart_count(self, host_id: str, container_id: str):
|
|
"""Increment restart count for a container"""
|
|
with self.get_session() as session:
|
|
try:
|
|
config = session.query(AutoRestartConfig).filter(
|
|
AutoRestartConfig.host_id == host_id,
|
|
AutoRestartConfig.container_id == container_id
|
|
).first()
|
|
|
|
if config:
|
|
config.restart_count += 1
|
|
config.last_restart = datetime.now()
|
|
session.commit()
|
|
logger.debug(f"Incremented restart count for {container_id[:12]} to {config.restart_count}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to increment restart count for {container_id[:12]}: {e}")
|
|
raise
|
|
|
|
def reset_restart_count(self, host_id: str, container_id: str):
|
|
"""Reset restart count for a container"""
|
|
with self.get_session() as session:
|
|
try:
|
|
config = session.query(AutoRestartConfig).filter(
|
|
AutoRestartConfig.host_id == host_id,
|
|
AutoRestartConfig.container_id == container_id
|
|
).first()
|
|
|
|
if config:
|
|
config.restart_count = 0
|
|
session.commit()
|
|
logger.debug(f"Reset restart count for {container_id[:12]}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to reset restart count for {container_id[:12]}: {e}")
|
|
raise
|
|
|
|
# Global Settings
|
|
def get_settings(self) -> GlobalSettings:
|
|
"""Get global settings"""
|
|
with self.get_session() as session:
|
|
return session.query(GlobalSettings).first()
|
|
|
|
def update_settings(self, updates: dict) -> GlobalSettings:
|
|
"""Update global settings"""
|
|
with self.get_session() as session:
|
|
try:
|
|
settings = session.query(GlobalSettings).first()
|
|
for key, value in updates.items():
|
|
if hasattr(settings, key):
|
|
setattr(settings, key, value)
|
|
settings.updated_at = datetime.now()
|
|
session.commit()
|
|
session.refresh(settings)
|
|
logger.info("Changed global settings")
|
|
return settings
|
|
except Exception as e:
|
|
logger.error(f"Failed to update global settings: {e}")
|
|
raise
|
|
|
|
# Notification Channels
|
|
def add_notification_channel(self, channel_data: dict) -> NotificationChannel:
|
|
"""Add a notification channel"""
|
|
with self.get_session() as session:
|
|
try:
|
|
channel = NotificationChannel(**channel_data)
|
|
session.add(channel)
|
|
session.commit()
|
|
session.refresh(channel)
|
|
logger.info(f"Added notification channel: {channel.name} (type: {channel.type})")
|
|
return channel
|
|
except Exception as e:
|
|
logger.error(f"Failed to add notification channel: {e}")
|
|
raise
|
|
|
|
def get_notification_channels(self, enabled_only: bool = True) -> List[NotificationChannel]:
|
|
"""Get all notification channels"""
|
|
with self.get_session() as session:
|
|
query = session.query(NotificationChannel)
|
|
if enabled_only:
|
|
query = query.filter(NotificationChannel.enabled == True)
|
|
return query.all()
|
|
|
|
def get_notification_channels_by_ids(self, channel_ids: List[int]) -> List[NotificationChannel]:
|
|
"""Get notification channels by their IDs"""
|
|
with self.get_session() as session:
|
|
channels = session.query(NotificationChannel).filter(
|
|
NotificationChannel.id.in_(channel_ids),
|
|
NotificationChannel.enabled == True
|
|
).all()
|
|
|
|
# Detach from session to avoid lazy loading issues
|
|
session.expunge_all()
|
|
return channels
|
|
|
|
def update_notification_channel(self, channel_id: int, updates: dict) -> Optional[NotificationChannel]:
|
|
"""Update a notification channel"""
|
|
with self.get_session() as session:
|
|
try:
|
|
channel = session.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
|
|
if channel:
|
|
for key, value in updates.items():
|
|
setattr(channel, key, value)
|
|
channel.updated_at = datetime.now()
|
|
session.commit()
|
|
session.refresh(channel)
|
|
logger.info(f"Updated notification channel: {channel.name} (ID: {channel_id})")
|
|
return channel
|
|
except Exception as e:
|
|
logger.error(f"Failed to update notification channel {channel_id}: {e}")
|
|
raise
|
|
|
|
def delete_notification_channel(self, channel_id: int) -> bool:
|
|
"""Delete a notification channel"""
|
|
with self.get_session() as session:
|
|
try:
|
|
channel = session.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
|
|
if channel:
|
|
channel_name = channel.name
|
|
session.delete(channel)
|
|
session.commit()
|
|
logger.info(f"Deleted notification channel: {channel_name} (ID: {channel_id})")
|
|
return True
|
|
logger.warning(f"Attempted to delete non-existent notification channel {channel_id}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete notification channel {channel_id}: {e}")
|
|
raise
|
|
|
|
# Alert Rules
|
|
def add_alert_rule(self, rule_data: dict) -> AlertRuleDB:
|
|
"""Add an alert rule with container+host pairs"""
|
|
with self.get_session() as session:
|
|
try:
|
|
# Extract containers list if present
|
|
containers_data = rule_data.pop('containers', None)
|
|
|
|
rule = AlertRuleDB(**rule_data)
|
|
session.add(rule)
|
|
session.flush() # Flush to get the ID without committing
|
|
|
|
# Add container+host pairs if provided
|
|
if containers_data:
|
|
for container in containers_data:
|
|
container_pair = AlertRuleContainer(
|
|
alert_rule_id=rule.id,
|
|
host_id=container['host_id'],
|
|
container_name=container['container_name']
|
|
)
|
|
session.add(container_pair)
|
|
|
|
session.commit()
|
|
logger.info(f"Added alert rule: {rule.name} (ID: {rule.id})")
|
|
except Exception as e:
|
|
logger.error(f"Failed to add alert rule: {e}")
|
|
raise
|
|
|
|
# Create a detached copy with all needed attributes
|
|
rule_dict = {
|
|
'id': rule.id,
|
|
'name': rule.name,
|
|
'trigger_events': rule.trigger_events,
|
|
'trigger_states': rule.trigger_states,
|
|
'notification_channels': rule.notification_channels,
|
|
'cooldown_minutes': rule.cooldown_minutes,
|
|
'enabled': rule.enabled,
|
|
'last_triggered': rule.last_triggered,
|
|
'created_at': rule.created_at,
|
|
'updated_at': rule.updated_at
|
|
}
|
|
|
|
# Return a new instance that's not attached to the session
|
|
detached_rule = AlertRuleDB(**rule_dict)
|
|
detached_rule.containers = [] # Initialize empty containers list
|
|
|
|
return detached_rule
|
|
|
|
def get_alert_rule(self, rule_id: str) -> Optional[AlertRuleDB]:
|
|
"""Get a single alert rule by ID"""
|
|
with self.get_session() as session:
|
|
from sqlalchemy.orm import joinedload
|
|
rule = session.query(AlertRuleDB).options(joinedload(AlertRuleDB.containers)).filter(AlertRuleDB.id == rule_id).first()
|
|
if rule:
|
|
session.expunge(rule)
|
|
return rule
|
|
|
|
def get_alert_rules(self, enabled_only: bool = True) -> List[AlertRuleDB]:
|
|
"""Get all alert rules"""
|
|
with self.get_session() as session:
|
|
from sqlalchemy.orm import joinedload
|
|
query = session.query(AlertRuleDB).options(joinedload(AlertRuleDB.containers))
|
|
if enabled_only:
|
|
query = query.filter(AlertRuleDB.enabled == True)
|
|
rules = query.all()
|
|
# Detach from session to avoid lazy loading issues
|
|
for rule in rules:
|
|
session.expunge(rule)
|
|
return rules
|
|
|
|
def update_alert_rule(self, rule_id: str, updates: dict) -> Optional[AlertRuleDB]:
|
|
"""Update an alert rule and its container+host pairs"""
|
|
with self.get_session() as session:
|
|
try:
|
|
from sqlalchemy.orm import joinedload
|
|
rule = session.query(AlertRuleDB).options(joinedload(AlertRuleDB.containers)).filter(AlertRuleDB.id == rule_id).first()
|
|
if rule:
|
|
# Check if containers field is present before extracting it
|
|
has_containers_update = 'containers' in updates
|
|
containers_data = updates.pop('containers', None)
|
|
|
|
# Update rule fields
|
|
for key, value in updates.items():
|
|
setattr(rule, key, value)
|
|
rule.updated_at = datetime.now()
|
|
|
|
# Update container+host pairs if containers field was explicitly provided
|
|
# (could be None for "all containers", empty list, or list with specific containers)
|
|
if has_containers_update:
|
|
# Delete existing container pairs
|
|
session.query(AlertRuleContainer).filter(
|
|
AlertRuleContainer.alert_rule_id == rule_id
|
|
).delete()
|
|
|
|
# Add new container pairs (if containers_data is None or empty, no new pairs are added)
|
|
if containers_data:
|
|
for container in containers_data:
|
|
container_pair = AlertRuleContainer(
|
|
alert_rule_id=rule_id,
|
|
host_id=container['host_id'],
|
|
container_name=container['container_name']
|
|
)
|
|
session.add(container_pair)
|
|
|
|
session.commit()
|
|
session.refresh(rule)
|
|
# Load containers relationship
|
|
_ = rule.containers
|
|
session.expunge(rule)
|
|
logger.info(f"Updated alert rule: {rule.name} (ID: {rule_id})")
|
|
return rule
|
|
except Exception as e:
|
|
logger.error(f"Failed to update alert rule {rule_id}: {e}")
|
|
raise
|
|
|
|
def delete_alert_rule(self, rule_id: str) -> bool:
|
|
"""Delete an alert rule"""
|
|
with self.get_session() as session:
|
|
try:
|
|
rule = session.query(AlertRuleDB).filter(AlertRuleDB.id == rule_id).first()
|
|
if rule:
|
|
rule_name = rule.name
|
|
session.delete(rule)
|
|
session.commit()
|
|
logger.info(f"Deleted alert rule: {rule_name} (ID: {rule_id})")
|
|
return True
|
|
logger.warning(f"Attempted to delete non-existent alert rule {rule_id}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete alert rule {rule_id}: {e}")
|
|
raise
|
|
|
|
def get_alerts_dependent_on_channel(self, channel_id: int) -> List[dict]:
|
|
"""Find alerts that would be orphaned if this channel is deleted (only have this one channel)"""
|
|
with self.get_session() as session:
|
|
all_rules = session.query(AlertRuleDB).all()
|
|
dependent_alerts = []
|
|
|
|
for rule in all_rules:
|
|
# Parse notification_channels JSON
|
|
channels = rule.notification_channels if isinstance(rule.notification_channels, list) else []
|
|
# Check if this is the ONLY channel for this alert
|
|
if len(channels) == 1 and channel_id in channels:
|
|
dependent_alerts.append({
|
|
'id': rule.id,
|
|
'name': rule.name
|
|
})
|
|
|
|
return dependent_alerts
|
|
|
|
# Event Logging Operations
|
|
def add_event(self, event_data: dict) -> EventLog:
|
|
"""Add an event to the event log"""
|
|
with self.get_session() as session:
|
|
event = EventLog(**event_data)
|
|
session.add(event)
|
|
session.commit()
|
|
session.refresh(event)
|
|
return event
|
|
|
|
def get_events(self,
|
|
category: Optional[List[str]] = None,
|
|
event_type: Optional[str] = None,
|
|
severity: Optional[List[str]] = None,
|
|
host_id: Optional[List[str]] = None,
|
|
container_id: Optional[str] = None,
|
|
container_name: Optional[str] = None,
|
|
start_date: Optional[datetime] = None,
|
|
end_date: Optional[datetime] = None,
|
|
correlation_id: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
limit: int = 100,
|
|
offset: int = 0) -> tuple[List[EventLog], int]:
|
|
"""Get events with filtering and pagination - returns (events, total_count)
|
|
|
|
Multi-select filters (category, severity, host_id) accept lists for OR filtering.
|
|
"""
|
|
with self.get_session() as session:
|
|
query = session.query(EventLog)
|
|
|
|
# Apply filters - use IN clause for lists
|
|
if category:
|
|
if isinstance(category, list) and category:
|
|
query = query.filter(EventLog.category.in_(category))
|
|
elif isinstance(category, str):
|
|
query = query.filter(EventLog.category == category)
|
|
if event_type:
|
|
query = query.filter(EventLog.event_type == event_type)
|
|
if severity:
|
|
if isinstance(severity, list) and severity:
|
|
query = query.filter(EventLog.severity.in_(severity))
|
|
elif isinstance(severity, str):
|
|
query = query.filter(EventLog.severity == severity)
|
|
if host_id:
|
|
if isinstance(host_id, list) and host_id:
|
|
query = query.filter(EventLog.host_id.in_(host_id))
|
|
elif isinstance(host_id, str):
|
|
query = query.filter(EventLog.host_id == host_id)
|
|
if container_id:
|
|
query = query.filter(EventLog.container_id == container_id)
|
|
if container_name:
|
|
query = query.filter(EventLog.container_name.like(f'%{container_name}%'))
|
|
if start_date:
|
|
query = query.filter(EventLog.timestamp >= start_date)
|
|
if end_date:
|
|
query = query.filter(EventLog.timestamp <= end_date)
|
|
if correlation_id:
|
|
query = query.filter(EventLog.correlation_id == correlation_id)
|
|
if search:
|
|
search_term = f'%{search}%'
|
|
query = query.filter(
|
|
(EventLog.title.like(search_term)) |
|
|
(EventLog.message.like(search_term)) |
|
|
(EventLog.container_name.like(search_term))
|
|
)
|
|
|
|
# Get total count for pagination
|
|
total_count = query.count()
|
|
|
|
# Apply ordering, limit and offset
|
|
events = query.order_by(EventLog.timestamp.desc()).offset(offset).limit(limit).all()
|
|
|
|
return events, total_count
|
|
|
|
def get_event_by_id(self, event_id: int) -> Optional[EventLog]:
|
|
"""Get a specific event by ID"""
|
|
with self.get_session() as session:
|
|
return session.query(EventLog).filter(EventLog.id == event_id).first()
|
|
|
|
def get_events_by_correlation(self, correlation_id: str) -> List[EventLog]:
|
|
"""Get all events with the same correlation ID"""
|
|
with self.get_session() as session:
|
|
return session.query(EventLog).filter(
|
|
EventLog.correlation_id == correlation_id
|
|
).order_by(EventLog.timestamp.asc()).all()
|
|
|
|
def cleanup_old_events(self, days: int = 30):
|
|
"""Clean up old event logs"""
|
|
with self.get_session() as session:
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
deleted_count = session.query(EventLog).filter(
|
|
EventLog.timestamp < cutoff_date
|
|
).delete()
|
|
session.commit()
|
|
return deleted_count
|
|
|
|
def get_event_statistics(self,
|
|
start_date: Optional[datetime] = None,
|
|
end_date: Optional[datetime] = None) -> Dict[str, Any]:
|
|
"""Get event statistics for dashboard"""
|
|
with self.get_session() as session:
|
|
query = session.query(EventLog)
|
|
|
|
if start_date:
|
|
query = query.filter(EventLog.timestamp >= start_date)
|
|
if end_date:
|
|
query = query.filter(EventLog.timestamp <= end_date)
|
|
|
|
total_events = query.count()
|
|
|
|
# Count by category
|
|
category_counts = {}
|
|
for category, count in session.query(EventLog.category,
|
|
session.func.count(EventLog.id)).group_by(EventLog.category).all():
|
|
category_counts[category] = count
|
|
|
|
# Count by severity
|
|
severity_counts = {}
|
|
for severity, count in session.query(EventLog.severity,
|
|
session.func.count(EventLog.id)).group_by(EventLog.severity).all():
|
|
severity_counts[severity] = count
|
|
|
|
return {
|
|
'total_events': total_events,
|
|
'category_counts': category_counts,
|
|
'severity_counts': severity_counts,
|
|
'period_start': start_date.isoformat() if start_date else None,
|
|
'period_end': end_date.isoformat() if end_date else None
|
|
}
|
|
|
|
|
|
# User management methods
|
|
def _hash_password(self, password: str) -> str:
|
|
"""Hash a password using bcrypt with salt"""
|
|
# Generate salt and hash password
|
|
salt = bcrypt.gensalt(rounds=12) # 12 rounds is a good balance of security/speed
|
|
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
|
|
return hashed.decode('utf-8')
|
|
|
|
def _verify_password(self, password: str, hashed: str) -> bool:
|
|
"""Verify a password against a bcrypt hash"""
|
|
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
|
|
|
def get_or_create_default_user(self) -> None:
|
|
"""Create default admin user if no users exist"""
|
|
with self.get_session() as session:
|
|
# Check if ANY user exists (not just 'admin')
|
|
user_count = session.query(User).count()
|
|
if user_count == 0:
|
|
# Only create default admin user if no users exist at all
|
|
user = User(
|
|
username="admin",
|
|
password_hash=self._hash_password("dockmon123"), # Default password
|
|
is_first_login=True,
|
|
must_change_password=True
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
logger.info("Created default admin user")
|
|
|
|
def verify_user_credentials(self, username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
"""Verify user credentials and return user info if valid"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
|
|
# Prevent timing attack: always run bcrypt even if user doesn't exist
|
|
if user:
|
|
is_valid = self._verify_password(password, user.password_hash)
|
|
else:
|
|
# Run dummy bcrypt to maintain constant time
|
|
dummy_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewY5GyYFj.N/wx9S"
|
|
self._verify_password(password, dummy_hash)
|
|
is_valid = False
|
|
|
|
if user and is_valid:
|
|
# Update last login
|
|
user.last_login = datetime.now()
|
|
session.commit()
|
|
return {
|
|
"username": user.username,
|
|
"is_first_login": user.is_first_login,
|
|
"must_change_password": user.must_change_password
|
|
}
|
|
return None
|
|
|
|
def change_user_password(self, username: str, new_password: str) -> bool:
|
|
"""Change user password"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
user.password_hash = self._hash_password(new_password)
|
|
user.is_first_login = False
|
|
user.must_change_password = False
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
logger.info(f"Password changed for user: {username}")
|
|
return True
|
|
return False
|
|
|
|
def username_exists(self, username: str) -> bool:
|
|
"""Check if username already exists"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
return user is not None
|
|
|
|
def change_username(self, old_username: str, new_username: str) -> bool:
|
|
"""Change user's username"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == old_username).first()
|
|
if user:
|
|
user.username = new_username
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
logger.info(f"Username changed from {old_username} to {new_username}")
|
|
return True
|
|
return False
|
|
|
|
def reset_user_password(self, username: str, new_password: str = None) -> str:
|
|
"""Reset user password (for CLI tool)"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
return None
|
|
|
|
# Generate new password if not provided
|
|
if not new_password:
|
|
new_password = secrets.token_urlsafe(12)
|
|
|
|
user.password_hash = self._hash_password(new_password)
|
|
user.must_change_password = True
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
logger.info(f"Password reset for user: {username}")
|
|
return new_password
|
|
|
|
def list_users(self) -> List[str]:
|
|
"""List all usernames"""
|
|
with self.get_session() as session:
|
|
users = session.query(User.username).all()
|
|
return [u[0] for u in users]
|
|
|
|
def get_dashboard_layout(self, username: str) -> Optional[str]:
|
|
"""Get dashboard layout for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
return user.dashboard_layout
|
|
return None
|
|
|
|
def save_dashboard_layout(self, username: str, layout: str) -> bool:
|
|
"""Save dashboard layout for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
user.dashboard_layout = layout
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
return True
|
|
return False
|
|
|
|
def get_modal_preferences(self, username: str) -> Optional[str]:
|
|
"""Get modal preferences for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
return user.modal_preferences
|
|
return None
|
|
|
|
def save_modal_preferences(self, username: str, preferences: str) -> bool:
|
|
"""Save modal preferences for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
user.modal_preferences = preferences
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
return True
|
|
return False
|
|
|
|
def get_event_sort_order(self, username: str) -> str:
|
|
"""Get event sort order preference for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user and user.event_sort_order:
|
|
return user.event_sort_order
|
|
return 'desc' # Default to newest first
|
|
|
|
def save_event_sort_order(self, username: str, sort_order: str) -> bool:
|
|
"""Save event sort order preference for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
# Validate sort order
|
|
if sort_order not in ['asc', 'desc']:
|
|
return False
|
|
user.event_sort_order = sort_order
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
return True
|
|
return False
|
|
|
|
def get_container_sort_order(self, username: str) -> str:
|
|
"""Get container sort order preference for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user and user.container_sort_order:
|
|
return user.container_sort_order
|
|
return 'name-asc' # Default to name A-Z
|
|
|
|
def save_container_sort_order(self, username: str, sort_order: str) -> bool:
|
|
"""Save container sort order preference for a user"""
|
|
with self.get_session() as session:
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
# Validate sort order
|
|
valid_sorts = ['name-asc', 'name-desc', 'status', 'memory-desc', 'memory-asc', 'cpu-desc', 'cpu-asc']
|
|
if sort_order not in valid_sorts:
|
|
return False
|
|
user.container_sort_order = sort_order
|
|
user.updated_at = datetime.now()
|
|
session.commit()
|
|
return True
|
|
return False |