Files
docker_dev/dockmon/backend/database.py

1091 lines
49 KiB
Python

"""
Database models and operations for DockMon
Uses SQLite for persistent storage of configuration and settings
"""
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine, Column, String, Integer, Boolean, DateTime, JSON, ForeignKey, Text, UniqueConstraint, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from sqlalchemy.pool import StaticPool
import json
import os
import logging
import secrets
import bcrypt
logger = logging.getLogger(__name__)
Base = declarative_base()
class User(Base):
"""User authentication and settings"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String, nullable=False, unique=True)
password_hash = Column(String, nullable=False)
is_first_login = Column(Boolean, default=True)
must_change_password = Column(Boolean, default=False)
dashboard_layout = Column(Text, nullable=True) # JSON string of GridStack layout
event_sort_order = Column(String, default='desc') # 'desc' (newest first) or 'asc' (oldest first)
container_sort_order = Column(String, default='name-asc') # Container sort preference on dashboard
modal_preferences = Column(Text, nullable=True) # JSON string of modal size/position preferences
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
last_login = Column(DateTime, nullable=True)
class DockerHostDB(Base):
"""Docker host configuration"""
__tablename__ = "docker_hosts"
id = Column(String, primary_key=True)
name = Column(String, nullable=False, unique=True)
url = Column(String, nullable=False)
tls_cert = Column(Text, nullable=True)
tls_key = Column(Text, nullable=True)
tls_ca = Column(Text, nullable=True)
security_status = Column(String, nullable=True) # 'secure', 'insecure', 'unknown'
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# Relationships
auto_restart_configs = relationship("AutoRestartConfig", back_populates="host", cascade="all, delete-orphan")
class AutoRestartConfig(Base):
"""Auto-restart configuration for containers"""
__tablename__ = "auto_restart_configs"
id = Column(Integer, primary_key=True, autoincrement=True)
host_id = Column(String, ForeignKey("docker_hosts.id"))
container_id = Column(String, nullable=False)
container_name = Column(String, nullable=False)
enabled = Column(Boolean, default=True)
max_retries = Column(Integer, default=3)
retry_delay = Column(Integer, default=30)
restart_count = Column(Integer, default=0)
last_restart = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# Relationships
host = relationship("DockerHostDB", back_populates="auto_restart_configs")
class GlobalSettings(Base):
"""Global application settings"""
__tablename__ = "global_settings"
id = Column(Integer, primary_key=True, default=1)
max_retries = Column(Integer, default=3)
retry_delay = Column(Integer, default=30)
default_auto_restart = Column(Boolean, default=False)
polling_interval = Column(Integer, default=2)
connection_timeout = Column(Integer, default=10)
log_retention_days = Column(Integer, default=7)
event_retention_days = Column(Integer, default=30) # Keep events for 30 days
enable_notifications = Column(Boolean, default=True)
auto_cleanup_events = Column(Boolean, default=True) # Auto cleanup old events
alert_template = Column(Text, nullable=True) # Global notification template
blackout_windows = Column(JSON, nullable=True) # Array of blackout time windows
first_run_complete = Column(Boolean, default=False) # Track if first run setup is complete
polling_interval_migrated = Column(Boolean, default=False) # Track if polling interval has been migrated to 2s
timezone_offset = Column(Integer, default=0) # Timezone offset in minutes from UTC
show_host_stats = Column(Boolean, default=True) # Show host statistics graphs on dashboard
show_container_stats = Column(Boolean, default=True) # Show container statistics on dashboard
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class NotificationChannel(Base):
"""Notification channel configuration"""
__tablename__ = "notification_channels"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False, unique=True)
type = Column(String, nullable=False) # telegram, discord, slack, pushover
config = Column(JSON, nullable=False) # Channel-specific configuration
enabled = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class AlertRuleDB(Base):
"""Alert rules for container state changes"""
__tablename__ = "alert_rules"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
trigger_events = Column(JSON, nullable=True) # list of Docker events that trigger alert
trigger_states = Column(JSON, nullable=True) # list of states that trigger alert
notification_channels = Column(JSON, nullable=False) # list of channel IDs
cooldown_minutes = Column(Integer, default=15) # prevent spam
enabled = Column(Boolean, default=True)
last_triggered = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# Relationships
containers = relationship("AlertRuleContainer", back_populates="alert_rule", cascade="all, delete-orphan")
class AlertRuleContainer(Base):
"""Container+Host pairs for alert rules"""
__tablename__ = "alert_rule_containers"
id = Column(Integer, primary_key=True, autoincrement=True)
alert_rule_id = Column(String, ForeignKey("alert_rules.id", ondelete="CASCADE"), nullable=False)
host_id = Column(String, ForeignKey("docker_hosts.id", ondelete="CASCADE"), nullable=False)
container_name = Column(String, nullable=False)
created_at = Column(DateTime, default=datetime.now)
# Relationships
alert_rule = relationship("AlertRuleDB", back_populates="containers")
host = relationship("DockerHostDB")
# Unique constraint to prevent duplicates
__table_args__ = (
UniqueConstraint('alert_rule_id', 'host_id', 'container_name', name='_alert_container_uc'),
)
class EventLog(Base):
"""Comprehensive event logging for all DockMon activities"""
__tablename__ = "event_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
correlation_id = Column(String, nullable=True) # For linking related events
# Event categorization
category = Column(String, nullable=False) # container, host, system, alert, notification
event_type = Column(String, nullable=False) # state_change, action_taken, error, etc.
severity = Column(String, nullable=False, default='info') # debug, info, warning, error, critical
# Target information
host_id = Column(String, nullable=True)
host_name = Column(String, nullable=True)
container_id = Column(String, nullable=True)
container_name = Column(String, nullable=True)
# Event details
title = Column(String, nullable=False) # Short description
message = Column(Text, nullable=True) # Detailed description
old_state = Column(String, nullable=True)
new_state = Column(String, nullable=True)
triggered_by = Column(String, nullable=True) # user, system, auto_restart, alert
# Additional data
details = Column(JSON, nullable=True) # Structured additional data
duration_ms = Column(Integer, nullable=True) # For performance tracking
# Timestamps
timestamp = Column(DateTime, default=datetime.now, nullable=False)
# Index for efficient queries
__table_args__ = (
{"sqlite_autoincrement": True},
)
class DatabaseManager:
"""Database management and operations"""
def __init__(self, db_path: str = "data/dockmon.db"):
"""Initialize database connection"""
self.db_path = db_path
# Ensure data directory exists
data_dir = os.path.dirname(db_path)
os.makedirs(data_dir, exist_ok=True)
# Set secure permissions on data directory (rwx for owner only)
try:
os.chmod(data_dir, 0o700)
logger.info(f"Set secure permissions (700) on data directory: {data_dir}")
except OSError as e:
logger.warning(f"Could not set permissions on data directory {data_dir}: {e}")
# Create engine with connection pooling
self.engine = create_engine(
f"sqlite:///{db_path}",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
echo=False
)
# Create session factory
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
# Create tables if they don't exist
Base.metadata.create_all(bind=self.engine)
# Run database migrations
self._run_migrations()
# Set secure permissions on database file (rw for owner only)
self._secure_database_file()
# Initialize default settings if needed
self._initialize_defaults()
def _run_migrations(self):
"""Run database migrations for schema updates"""
try:
with self.get_session() as session:
# Migration: Populate security_status for existing hosts
hosts_without_security_status = session.query(DockerHostDB).filter(
DockerHostDB.security_status.is_(None)
).all()
for host in hosts_without_security_status:
# Determine security status based on existing data
if host.url and not host.url.startswith('unix://'):
if host.tls_cert and host.tls_key:
host.security_status = 'secure'
else:
host.security_status = 'insecure'
# Unix socket connections don't need security status
if hosts_without_security_status:
session.commit()
print(f"Migrated {len(hosts_without_security_status)} hosts with security status")
# Migration: Add event_sort_order column to users table if it doesn't exist
inspector = session.connection().engine.dialect.get_columns(session.connection(), 'users')
column_names = [col.get('name', '') for col in inspector if 'name' in col]
if 'event_sort_order' not in column_names:
# Add the column using raw SQL
session.execute(text("ALTER TABLE users ADD COLUMN event_sort_order VARCHAR DEFAULT 'desc'"))
session.commit()
print("Added event_sort_order column to users table")
# Migration: Add container_sort_order column to users table if it doesn't exist
if 'container_sort_order' not in column_names:
# Add the column using raw SQL
session.execute(text("ALTER TABLE users ADD COLUMN container_sort_order VARCHAR DEFAULT 'name-asc'"))
session.commit()
print("Added container_sort_order column to users table")
# Migration: Add modal_preferences column to users table if it doesn't exist
if 'modal_preferences' not in column_names:
# Add the column using raw SQL
session.execute(text("ALTER TABLE users ADD COLUMN modal_preferences TEXT"))
session.commit()
print("Added modal_preferences column to users table")
# Migration: Add show_host_stats and show_container_stats columns to global_settings table
settings_inspector = session.connection().engine.dialect.get_columns(session.connection(), 'global_settings')
settings_column_names = [col['name'] for col in settings_inspector]
if 'show_host_stats' not in settings_column_names:
session.execute(text("ALTER TABLE global_settings ADD COLUMN show_host_stats BOOLEAN DEFAULT 1"))
session.commit()
print("Added show_host_stats column to global_settings table")
if 'show_container_stats' not in settings_column_names:
session.execute(text("ALTER TABLE global_settings ADD COLUMN show_container_stats BOOLEAN DEFAULT 1"))
session.commit()
print("Added show_container_stats column to global_settings table")
# Migration: Drop deprecated container_history table
# This table has been replaced by the EventLog table
inspector_result = session.connection().engine.dialect.get_table_names(session.connection())
if 'container_history' in inspector_result:
session.execute(text("DROP TABLE container_history"))
session.commit()
print("Dropped deprecated container_history table (replaced by EventLog)")
# Migration: Add polling_interval_migrated column if it doesn't exist
if 'polling_interval_migrated' not in settings_column_names:
session.execute(text("ALTER TABLE global_settings ADD COLUMN polling_interval_migrated BOOLEAN DEFAULT 0"))
session.commit()
print("Added polling_interval_migrated column to global_settings table")
# Migration: Update polling_interval to 2 seconds (only once, on first startup after this update)
settings = session.query(GlobalSettings).first()
if settings and not settings.polling_interval_migrated:
# Only update if the user hasn't customized it (still at old default of 5 or 10)
if settings.polling_interval >= 5:
settings.polling_interval = 2
settings.polling_interval_migrated = True
session.commit()
print("Migrated polling_interval to 2 seconds (from previous default)")
else:
# User has already customized to something < 5, just mark as migrated
settings.polling_interval_migrated = True
session.commit()
except Exception as e:
print(f"Migration warning: {e}")
# Don't fail startup on migration errors
def _secure_database_file(self):
"""Set secure file permissions on the SQLite database file"""
try:
if os.path.exists(self.db_path):
# Set file permissions to 600 (read/write for owner only)
os.chmod(self.db_path, 0o600)
logger.info(f"Set secure permissions (600) on database file: {self.db_path}")
else:
# File doesn't exist yet - will be created by SQLAlchemy
# Schedule permission setting for after first connection
self._schedule_file_permissions()
except OSError as e:
logger.warning(f"Could not set permissions on database file {self.db_path}: {e}")
def _schedule_file_permissions(self):
"""Schedule file permission setting for after database file is created"""
# Create a connection to ensure the file exists
with self.engine.connect() as conn:
pass
# Now set permissions
try:
if os.path.exists(self.db_path):
os.chmod(self.db_path, 0o600)
logger.info(f"Set secure permissions (600) on newly created database file: {self.db_path}")
except OSError as e:
logger.warning(f"Could not set permissions on newly created database file {self.db_path}: {e}")
def _initialize_defaults(self):
"""Initialize default settings if they don't exist"""
with self.get_session() as session:
# Check if global settings exist
settings = session.query(GlobalSettings).first()
if not settings:
settings = GlobalSettings()
session.add(settings)
session.commit()
def get_session(self) -> Session:
"""Get a database session"""
return self.SessionLocal()
# Docker Host Operations
def add_host(self, host_data: dict) -> DockerHostDB:
"""Add a new Docker host"""
with self.get_session() as session:
try:
host = DockerHostDB(**host_data)
session.add(host)
session.commit()
session.refresh(host)
logger.info(f"Added host {host.name} ({host.id[:8]}) to database")
return host
except Exception as e:
logger.error(f"Failed to add host to database: {e}")
raise
def get_hosts(self, active_only: bool = True) -> List[DockerHostDB]:
"""Get all Docker hosts ordered by creation time"""
with self.get_session() as session:
query = session.query(DockerHostDB)
if active_only:
query = query.filter(DockerHostDB.is_active == True)
# Order by created_at to ensure consistent ordering (oldest first)
query = query.order_by(DockerHostDB.created_at)
return query.all()
def get_host(self, host_id: str) -> Optional[DockerHostDB]:
"""Get a specific Docker host"""
with self.get_session() as session:
return session.query(DockerHostDB).filter(DockerHostDB.id == host_id).first()
def update_host(self, host_id: str, updates: dict) -> Optional[DockerHostDB]:
"""Update a Docker host"""
with self.get_session() as session:
try:
host = session.query(DockerHostDB).filter(DockerHostDB.id == host_id).first()
if host:
for key, value in updates.items():
setattr(host, key, value)
host.updated_at = datetime.now()
session.commit()
session.refresh(host)
logger.info(f"Updated host {host.name} ({host_id[:8]}) in database")
return host
except Exception as e:
logger.error(f"Failed to update host {host_id[:8]} in database: {e}")
raise
def delete_host(self, host_id: str) -> bool:
"""Delete a Docker host and clean up related alert rules"""
with self.get_session() as session:
try:
host = session.query(DockerHostDB).filter(DockerHostDB.id == host_id).first()
if not host:
logger.warning(f"Attempted to delete non-existent host {host_id[:8]}")
return False
host_name = host.name
# Get all alert rules
all_rules = session.query(AlertRuleDB).all()
# Process each alert rule to remove containers from the deleted host
for rule in all_rules:
if not rule.containers:
continue
# Filter out containers from the deleted host
# rule.containers is a list of AlertRuleContainer objects
remaining_containers = [
c for c in rule.containers
if c.host_id != host_id
]
if not remaining_containers:
# No containers left, delete the entire alert
session.delete(rule)
logger.info(f"Deleted alert rule '{rule.name}' (all containers were on deleted host {host_id})")
elif len(remaining_containers) < len(rule.containers):
# Some containers remain, update the alert
rule.containers = remaining_containers
rule.updated_at = datetime.now()
logger.info(f"Updated alert rule '{rule.name}' (removed containers from host {host_id})")
# Delete the host
session.delete(host)
session.commit()
logger.info(f"Deleted host {host_name} ({host_id[:8]}) from database")
return True
except Exception as e:
logger.error(f"Failed to delete host {host_id[:8]} from database: {e}")
raise
# Auto-Restart Configuration
def get_auto_restart_config(self, host_id: str, container_id: str) -> Optional[AutoRestartConfig]:
"""Get auto-restart configuration for a container"""
with self.get_session() as session:
return session.query(AutoRestartConfig).filter(
AutoRestartConfig.host_id == host_id,
AutoRestartConfig.container_id == container_id
).first()
def set_auto_restart(self, host_id: str, container_id: str, container_name: str, enabled: bool):
"""Set auto-restart configuration for a container"""
with self.get_session() as session:
try:
config = session.query(AutoRestartConfig).filter(
AutoRestartConfig.host_id == host_id,
AutoRestartConfig.container_id == container_id
).first()
if config:
config.enabled = enabled
config.updated_at = datetime.now()
if not enabled:
config.restart_count = 0
logger.info(f"Updated auto-restart for {container_name} ({container_id[:12]}): enabled={enabled}")
else:
config = AutoRestartConfig(
host_id=host_id,
container_id=container_id,
container_name=container_name,
enabled=enabled
)
session.add(config)
logger.info(f"Created auto-restart config for {container_name} ({container_id[:12]}): enabled={enabled}")
session.commit()
except Exception as e:
logger.error(f"Failed to set auto-restart for {container_id[:12]}: {e}")
raise
def increment_restart_count(self, host_id: str, container_id: str):
"""Increment restart count for a container"""
with self.get_session() as session:
try:
config = session.query(AutoRestartConfig).filter(
AutoRestartConfig.host_id == host_id,
AutoRestartConfig.container_id == container_id
).first()
if config:
config.restart_count += 1
config.last_restart = datetime.now()
session.commit()
logger.debug(f"Incremented restart count for {container_id[:12]} to {config.restart_count}")
except Exception as e:
logger.error(f"Failed to increment restart count for {container_id[:12]}: {e}")
raise
def reset_restart_count(self, host_id: str, container_id: str):
"""Reset restart count for a container"""
with self.get_session() as session:
try:
config = session.query(AutoRestartConfig).filter(
AutoRestartConfig.host_id == host_id,
AutoRestartConfig.container_id == container_id
).first()
if config:
config.restart_count = 0
session.commit()
logger.debug(f"Reset restart count for {container_id[:12]}")
except Exception as e:
logger.error(f"Failed to reset restart count for {container_id[:12]}: {e}")
raise
# Global Settings
def get_settings(self) -> GlobalSettings:
"""Get global settings"""
with self.get_session() as session:
return session.query(GlobalSettings).first()
def update_settings(self, updates: dict) -> GlobalSettings:
"""Update global settings"""
with self.get_session() as session:
try:
settings = session.query(GlobalSettings).first()
for key, value in updates.items():
if hasattr(settings, key):
setattr(settings, key, value)
settings.updated_at = datetime.now()
session.commit()
session.refresh(settings)
logger.info("Changed global settings")
return settings
except Exception as e:
logger.error(f"Failed to update global settings: {e}")
raise
# Notification Channels
def add_notification_channel(self, channel_data: dict) -> NotificationChannel:
"""Add a notification channel"""
with self.get_session() as session:
try:
channel = NotificationChannel(**channel_data)
session.add(channel)
session.commit()
session.refresh(channel)
logger.info(f"Added notification channel: {channel.name} (type: {channel.type})")
return channel
except Exception as e:
logger.error(f"Failed to add notification channel: {e}")
raise
def get_notification_channels(self, enabled_only: bool = True) -> List[NotificationChannel]:
"""Get all notification channels"""
with self.get_session() as session:
query = session.query(NotificationChannel)
if enabled_only:
query = query.filter(NotificationChannel.enabled == True)
return query.all()
def get_notification_channels_by_ids(self, channel_ids: List[int]) -> List[NotificationChannel]:
"""Get notification channels by their IDs"""
with self.get_session() as session:
channels = session.query(NotificationChannel).filter(
NotificationChannel.id.in_(channel_ids),
NotificationChannel.enabled == True
).all()
# Detach from session to avoid lazy loading issues
session.expunge_all()
return channels
def update_notification_channel(self, channel_id: int, updates: dict) -> Optional[NotificationChannel]:
"""Update a notification channel"""
with self.get_session() as session:
try:
channel = session.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
if channel:
for key, value in updates.items():
setattr(channel, key, value)
channel.updated_at = datetime.now()
session.commit()
session.refresh(channel)
logger.info(f"Updated notification channel: {channel.name} (ID: {channel_id})")
return channel
except Exception as e:
logger.error(f"Failed to update notification channel {channel_id}: {e}")
raise
def delete_notification_channel(self, channel_id: int) -> bool:
"""Delete a notification channel"""
with self.get_session() as session:
try:
channel = session.query(NotificationChannel).filter(NotificationChannel.id == channel_id).first()
if channel:
channel_name = channel.name
session.delete(channel)
session.commit()
logger.info(f"Deleted notification channel: {channel_name} (ID: {channel_id})")
return True
logger.warning(f"Attempted to delete non-existent notification channel {channel_id}")
return False
except Exception as e:
logger.error(f"Failed to delete notification channel {channel_id}: {e}")
raise
# Alert Rules
def add_alert_rule(self, rule_data: dict) -> AlertRuleDB:
"""Add an alert rule with container+host pairs"""
with self.get_session() as session:
try:
# Extract containers list if present
containers_data = rule_data.pop('containers', None)
rule = AlertRuleDB(**rule_data)
session.add(rule)
session.flush() # Flush to get the ID without committing
# Add container+host pairs if provided
if containers_data:
for container in containers_data:
container_pair = AlertRuleContainer(
alert_rule_id=rule.id,
host_id=container['host_id'],
container_name=container['container_name']
)
session.add(container_pair)
session.commit()
logger.info(f"Added alert rule: {rule.name} (ID: {rule.id})")
except Exception as e:
logger.error(f"Failed to add alert rule: {e}")
raise
# Create a detached copy with all needed attributes
rule_dict = {
'id': rule.id,
'name': rule.name,
'trigger_events': rule.trigger_events,
'trigger_states': rule.trigger_states,
'notification_channels': rule.notification_channels,
'cooldown_minutes': rule.cooldown_minutes,
'enabled': rule.enabled,
'last_triggered': rule.last_triggered,
'created_at': rule.created_at,
'updated_at': rule.updated_at
}
# Return a new instance that's not attached to the session
detached_rule = AlertRuleDB(**rule_dict)
detached_rule.containers = [] # Initialize empty containers list
return detached_rule
def get_alert_rule(self, rule_id: str) -> Optional[AlertRuleDB]:
"""Get a single alert rule by ID"""
with self.get_session() as session:
from sqlalchemy.orm import joinedload
rule = session.query(AlertRuleDB).options(joinedload(AlertRuleDB.containers)).filter(AlertRuleDB.id == rule_id).first()
if rule:
session.expunge(rule)
return rule
def get_alert_rules(self, enabled_only: bool = True) -> List[AlertRuleDB]:
"""Get all alert rules"""
with self.get_session() as session:
from sqlalchemy.orm import joinedload
query = session.query(AlertRuleDB).options(joinedload(AlertRuleDB.containers))
if enabled_only:
query = query.filter(AlertRuleDB.enabled == True)
rules = query.all()
# Detach from session to avoid lazy loading issues
for rule in rules:
session.expunge(rule)
return rules
def update_alert_rule(self, rule_id: str, updates: dict) -> Optional[AlertRuleDB]:
"""Update an alert rule and its container+host pairs"""
with self.get_session() as session:
try:
from sqlalchemy.orm import joinedload
rule = session.query(AlertRuleDB).options(joinedload(AlertRuleDB.containers)).filter(AlertRuleDB.id == rule_id).first()
if rule:
# Check if containers field is present before extracting it
has_containers_update = 'containers' in updates
containers_data = updates.pop('containers', None)
# Update rule fields
for key, value in updates.items():
setattr(rule, key, value)
rule.updated_at = datetime.now()
# Update container+host pairs if containers field was explicitly provided
# (could be None for "all containers", empty list, or list with specific containers)
if has_containers_update:
# Delete existing container pairs
session.query(AlertRuleContainer).filter(
AlertRuleContainer.alert_rule_id == rule_id
).delete()
# Add new container pairs (if containers_data is None or empty, no new pairs are added)
if containers_data:
for container in containers_data:
container_pair = AlertRuleContainer(
alert_rule_id=rule_id,
host_id=container['host_id'],
container_name=container['container_name']
)
session.add(container_pair)
session.commit()
session.refresh(rule)
# Load containers relationship
_ = rule.containers
session.expunge(rule)
logger.info(f"Updated alert rule: {rule.name} (ID: {rule_id})")
return rule
except Exception as e:
logger.error(f"Failed to update alert rule {rule_id}: {e}")
raise
def delete_alert_rule(self, rule_id: str) -> bool:
"""Delete an alert rule"""
with self.get_session() as session:
try:
rule = session.query(AlertRuleDB).filter(AlertRuleDB.id == rule_id).first()
if rule:
rule_name = rule.name
session.delete(rule)
session.commit()
logger.info(f"Deleted alert rule: {rule_name} (ID: {rule_id})")
return True
logger.warning(f"Attempted to delete non-existent alert rule {rule_id}")
return False
except Exception as e:
logger.error(f"Failed to delete alert rule {rule_id}: {e}")
raise
def get_alerts_dependent_on_channel(self, channel_id: int) -> List[dict]:
"""Find alerts that would be orphaned if this channel is deleted (only have this one channel)"""
with self.get_session() as session:
all_rules = session.query(AlertRuleDB).all()
dependent_alerts = []
for rule in all_rules:
# Parse notification_channels JSON
channels = rule.notification_channels if isinstance(rule.notification_channels, list) else []
# Check if this is the ONLY channel for this alert
if len(channels) == 1 and channel_id in channels:
dependent_alerts.append({
'id': rule.id,
'name': rule.name
})
return dependent_alerts
# Event Logging Operations
def add_event(self, event_data: dict) -> EventLog:
"""Add an event to the event log"""
with self.get_session() as session:
event = EventLog(**event_data)
session.add(event)
session.commit()
session.refresh(event)
return event
def get_events(self,
category: Optional[List[str]] = None,
event_type: Optional[str] = None,
severity: Optional[List[str]] = None,
host_id: Optional[List[str]] = None,
container_id: Optional[str] = None,
container_name: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
correlation_id: Optional[str] = None,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0) -> tuple[List[EventLog], int]:
"""Get events with filtering and pagination - returns (events, total_count)
Multi-select filters (category, severity, host_id) accept lists for OR filtering.
"""
with self.get_session() as session:
query = session.query(EventLog)
# Apply filters - use IN clause for lists
if category:
if isinstance(category, list) and category:
query = query.filter(EventLog.category.in_(category))
elif isinstance(category, str):
query = query.filter(EventLog.category == category)
if event_type:
query = query.filter(EventLog.event_type == event_type)
if severity:
if isinstance(severity, list) and severity:
query = query.filter(EventLog.severity.in_(severity))
elif isinstance(severity, str):
query = query.filter(EventLog.severity == severity)
if host_id:
if isinstance(host_id, list) and host_id:
query = query.filter(EventLog.host_id.in_(host_id))
elif isinstance(host_id, str):
query = query.filter(EventLog.host_id == host_id)
if container_id:
query = query.filter(EventLog.container_id == container_id)
if container_name:
query = query.filter(EventLog.container_name.like(f'%{container_name}%'))
if start_date:
query = query.filter(EventLog.timestamp >= start_date)
if end_date:
query = query.filter(EventLog.timestamp <= end_date)
if correlation_id:
query = query.filter(EventLog.correlation_id == correlation_id)
if search:
search_term = f'%{search}%'
query = query.filter(
(EventLog.title.like(search_term)) |
(EventLog.message.like(search_term)) |
(EventLog.container_name.like(search_term))
)
# Get total count for pagination
total_count = query.count()
# Apply ordering, limit and offset
events = query.order_by(EventLog.timestamp.desc()).offset(offset).limit(limit).all()
return events, total_count
def get_event_by_id(self, event_id: int) -> Optional[EventLog]:
"""Get a specific event by ID"""
with self.get_session() as session:
return session.query(EventLog).filter(EventLog.id == event_id).first()
def get_events_by_correlation(self, correlation_id: str) -> List[EventLog]:
"""Get all events with the same correlation ID"""
with self.get_session() as session:
return session.query(EventLog).filter(
EventLog.correlation_id == correlation_id
).order_by(EventLog.timestamp.asc()).all()
def cleanup_old_events(self, days: int = 30):
"""Clean up old event logs"""
with self.get_session() as session:
cutoff_date = datetime.now() - timedelta(days=days)
deleted_count = session.query(EventLog).filter(
EventLog.timestamp < cutoff_date
).delete()
session.commit()
return deleted_count
def get_event_statistics(self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> Dict[str, Any]:
"""Get event statistics for dashboard"""
with self.get_session() as session:
query = session.query(EventLog)
if start_date:
query = query.filter(EventLog.timestamp >= start_date)
if end_date:
query = query.filter(EventLog.timestamp <= end_date)
total_events = query.count()
# Count by category
category_counts = {}
for category, count in session.query(EventLog.category,
session.func.count(EventLog.id)).group_by(EventLog.category).all():
category_counts[category] = count
# Count by severity
severity_counts = {}
for severity, count in session.query(EventLog.severity,
session.func.count(EventLog.id)).group_by(EventLog.severity).all():
severity_counts[severity] = count
return {
'total_events': total_events,
'category_counts': category_counts,
'severity_counts': severity_counts,
'period_start': start_date.isoformat() if start_date else None,
'period_end': end_date.isoformat() if end_date else None
}
# User management methods
def _hash_password(self, password: str) -> str:
"""Hash a password using bcrypt with salt"""
# Generate salt and hash password
salt = bcrypt.gensalt(rounds=12) # 12 rounds is a good balance of security/speed
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def _verify_password(self, password: str, hashed: str) -> bool:
"""Verify a password against a bcrypt hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def get_or_create_default_user(self) -> None:
"""Create default admin user if no users exist"""
with self.get_session() as session:
# Check if ANY user exists (not just 'admin')
user_count = session.query(User).count()
if user_count == 0:
# Only create default admin user if no users exist at all
user = User(
username="admin",
password_hash=self._hash_password("dockmon123"), # Default password
is_first_login=True,
must_change_password=True
)
session.add(user)
session.commit()
logger.info("Created default admin user")
def verify_user_credentials(self, username: str, password: str) -> Optional[Dict[str, Any]]:
"""Verify user credentials and return user info if valid"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
# Prevent timing attack: always run bcrypt even if user doesn't exist
if user:
is_valid = self._verify_password(password, user.password_hash)
else:
# Run dummy bcrypt to maintain constant time
dummy_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewY5GyYFj.N/wx9S"
self._verify_password(password, dummy_hash)
is_valid = False
if user and is_valid:
# Update last login
user.last_login = datetime.now()
session.commit()
return {
"username": user.username,
"is_first_login": user.is_first_login,
"must_change_password": user.must_change_password
}
return None
def change_user_password(self, username: str, new_password: str) -> bool:
"""Change user password"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
user.password_hash = self._hash_password(new_password)
user.is_first_login = False
user.must_change_password = False
user.updated_at = datetime.now()
session.commit()
logger.info(f"Password changed for user: {username}")
return True
return False
def username_exists(self, username: str) -> bool:
"""Check if username already exists"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
return user is not None
def change_username(self, old_username: str, new_username: str) -> bool:
"""Change user's username"""
with self.get_session() as session:
user = session.query(User).filter(User.username == old_username).first()
if user:
user.username = new_username
user.updated_at = datetime.now()
session.commit()
logger.info(f"Username changed from {old_username} to {new_username}")
return True
return False
def reset_user_password(self, username: str, new_password: str = None) -> str:
"""Reset user password (for CLI tool)"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if not user:
return None
# Generate new password if not provided
if not new_password:
new_password = secrets.token_urlsafe(12)
user.password_hash = self._hash_password(new_password)
user.must_change_password = True
user.updated_at = datetime.now()
session.commit()
logger.info(f"Password reset for user: {username}")
return new_password
def list_users(self) -> List[str]:
"""List all usernames"""
with self.get_session() as session:
users = session.query(User.username).all()
return [u[0] for u in users]
def get_dashboard_layout(self, username: str) -> Optional[str]:
"""Get dashboard layout for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
return user.dashboard_layout
return None
def save_dashboard_layout(self, username: str, layout: str) -> bool:
"""Save dashboard layout for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
user.dashboard_layout = layout
user.updated_at = datetime.now()
session.commit()
return True
return False
def get_modal_preferences(self, username: str) -> Optional[str]:
"""Get modal preferences for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
return user.modal_preferences
return None
def save_modal_preferences(self, username: str, preferences: str) -> bool:
"""Save modal preferences for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
user.modal_preferences = preferences
user.updated_at = datetime.now()
session.commit()
return True
return False
def get_event_sort_order(self, username: str) -> str:
"""Get event sort order preference for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user and user.event_sort_order:
return user.event_sort_order
return 'desc' # Default to newest first
def save_event_sort_order(self, username: str, sort_order: str) -> bool:
"""Save event sort order preference for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
# Validate sort order
if sort_order not in ['asc', 'desc']:
return False
user.event_sort_order = sort_order
user.updated_at = datetime.now()
session.commit()
return True
return False
def get_container_sort_order(self, username: str) -> str:
"""Get container sort order preference for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user and user.container_sort_order:
return user.container_sort_order
return 'name-asc' # Default to name A-Z
def save_container_sort_order(self, username: str, sort_order: str) -> bool:
"""Save container sort order preference for a user"""
with self.get_session() as session:
user = session.query(User).filter(User.username == username).first()
if user:
# Validate sort order
valid_sorts = ['name-asc', 'name-desc', 'status', 'memory-desc', 'memory-asc', 'cpu-desc', 'cpu-asc']
if sort_order not in valid_sorts:
return False
user.container_sort_order = sort_order
user.updated_at = datetime.now()
session.commit()
return True
return False