#!/usr/bin/env python3 """ DockMon Backend - Docker Container Monitoring System Supports multiple Docker hosts with auto-restart and alerts """ import asyncio import json import logging import time import uuid from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from contextlib import asynccontextmanager import docker from docker import DockerClient from docker.errors import DockerException, APIError from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request, Depends, status, Cookie, Response, Query from fastapi.middleware.cors import CORSMiddleware # Session-based auth - no longer need HTTPBearer from fastapi.responses import FileResponse from database import DatabaseManager from realtime import RealtimeMonitor from notifications import NotificationService, AlertProcessor from event_logger import EventLogger, EventContext, EventCategory, EventType, EventSeverity, PerformanceTimer # Import extracted modules from config.settings import AppConfig, get_cors_origins, setup_logging from models.docker_models import DockerHostConfig, DockerHost from models.settings_models import GlobalSettings, AlertRule from models.request_models import ( AutoRestartRequest, AlertRuleCreate, AlertRuleUpdate, NotificationChannelCreate, NotificationChannelUpdate, EventLogFilter ) from security.audit import security_audit from security.rate_limiting import rate_limiter, rate_limit_auth, rate_limit_hosts, rate_limit_containers, rate_limit_notifications, rate_limit_default from auth.routes import router as auth_router, verify_frontend_session verify_session_auth = verify_frontend_session from websocket.connection import ConnectionManager, DateTimeEncoder from websocket.rate_limiter import ws_rate_limiter from docker_monitor.monitor import DockerMonitor # Configure logging setup_logging() logger = logging.getLogger(__name__) # ==================== FastAPI Application ==================== # Create monitor instance monitor = DockerMonitor() # ==================== Authentication ==================== # Session-based authentication only - no API keys needed @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifecycle""" # Startup logger.info("Starting DockMon backend...") # Ensure default user exists monitor.db.get_or_create_default_user() await monitor.event_logger.start() monitor.event_logger.log_system_event("DockMon Backend Starting", "DockMon backend is initializing", EventSeverity.INFO, EventType.STARTUP) # Connect security audit logger to event logger security_audit.set_event_logger(monitor.event_logger) monitor.monitoring_task = asyncio.create_task(monitor.monitor_containers()) monitor.cleanup_task = asyncio.create_task(monitor.cleanup_old_data()) # Start blackout window monitoring with WebSocket support await monitor.notification_service.blackout_manager.start_monitoring( monitor.notification_service, monitor, # Pass DockerMonitor instance to avoid re-initialization monitor.manager # Pass ConnectionManager for WebSocket broadcasts ) yield # Shutdown logger.info("Shutting down DockMon backend...") monitor.event_logger.log_system_event("DockMon Backend Shutting Down", "DockMon backend is shutting down", EventSeverity.INFO, EventType.SHUTDOWN) if monitor.monitoring_task: monitor.monitoring_task.cancel() if monitor.cleanup_task: monitor.cleanup_task.cancel() # Stop blackout monitoring monitor.notification_service.blackout_manager.stop_monitoring() # Close stats client (HTTP session and WebSocket) from stats_client import get_stats_client await get_stats_client().close() # Close notification service await monitor.notification_service.close() # Stop event logger await monitor.event_logger.stop() # Dispose SQLAlchemy engine monitor.db.engine.dispose() logger.info("SQLAlchemy engine disposed") app = FastAPI( title="DockMon API", version="1.0.0", lifespan=lifespan ) # Configure CORS - Production ready with environment-based configuration app.add_middleware( CORSMiddleware, allow_origins=AppConfig.CORS_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Content-Type", "Authorization"], ) logger.info(f"CORS configured for origins: {AppConfig.CORS_ORIGINS}") # ==================== API Routes ==================== # Register authentication router app.include_router(auth_router) @app.get("/") async def root(authenticated: bool = Depends(verify_session_auth)): """Backend API root - frontend is served separately""" return {"message": "DockMon Backend API", "version": "1.0.0", "docs": "/docs"} @app.get("/health") async def health_check(): """Health check endpoint for Docker health checks - no authentication required""" return {"status": "healthy", "service": "dockmon-backend"} def _is_localhost_or_internal(ip: str) -> bool: """Check if IP is localhost or internal network (Docker networks, private networks)""" import ipaddress try: addr = ipaddress.ip_address(ip) # Allow localhost if addr.is_loopback: return True # Allow private networks (RFC 1918) - for Docker networks and internal deployments if addr.is_private: return True return False except ValueError: # Invalid IP format return False # ==================== Frontend Authentication ==================== async def verify_session_auth(request: Request): """Verify authentication via session cookie only""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager # Since backend only listens on 127.0.0.1, all requests must come through nginx # No need to check client IP - the backend binding ensures security # Check session authentication session_id = _get_session_from_cookie(request) if session_id and session_manager.validate_session(session_id, request): return True # No valid session found raise HTTPException( status_code=401, detail="Authentication required - please login" ) @app.get("/api/hosts") async def get_hosts(authenticated: bool = Depends(verify_session_auth)): """Get all configured Docker hosts""" return list(monitor.hosts.values()) @app.post("/api/hosts") async def add_host(config: DockerHostConfig, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_hosts, request: Request = None): """Add a new Docker host""" try: host = monitor.add_host(config) # Security audit log - successful privileged action if request: security_audit.log_privileged_action( client_ip=request.client.host if hasattr(request, 'client') else "unknown", action="ADD_DOCKER_HOST", target=f"{config.name} ({config.url})", success=True, user_agent=request.headers.get('user-agent', 'unknown') ) # Broadcast host addition to WebSocket clients so they refresh await monitor.manager.broadcast({ "type": "host_added", "data": {"host_id": host.id, "host_name": host.name} }) return host except Exception as e: # Security audit log - failed privileged action if request: security_audit.log_privileged_action( client_ip=request.client.host if hasattr(request, 'client') else "unknown", action="ADD_DOCKER_HOST", target=f"{config.name} ({config.url})", success=False, user_agent=request.headers.get('user-agent', 'unknown') ) raise @app.put("/api/hosts/{host_id}") async def update_host(host_id: str, config: DockerHostConfig, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_hosts): """Update an existing Docker host""" host = monitor.update_host(host_id, config) return host @app.delete("/api/hosts/{host_id}") async def remove_host(host_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_hosts): """Remove a Docker host""" await monitor.remove_host(host_id) # Broadcast host removal to WebSocket clients so they refresh await monitor.manager.broadcast({ "type": "host_removed", "data": {"host_id": host_id} }) return {"status": "success", "message": f"Host {host_id} removed"} @app.get("/api/hosts/{host_id}/metrics") async def get_host_metrics(host_id: str, authenticated: bool = Depends(verify_session_auth)): """Get aggregated metrics for a Docker host (CPU, RAM, Network)""" try: host = monitor.hosts.get(host_id) if not host: raise HTTPException(status_code=404, detail="Host not found") client = monitor.clients.get(host_id) if not client: raise HTTPException(status_code=503, detail="Host client not available") # Get all running containers on this host containers = client.containers.list(filters={'status': 'running'}) total_cpu = 0.0 total_memory_used = 0 total_memory_limit = 0 total_net_rx = 0 total_net_tx = 0 container_count = 0 for container in containers: try: stats = container.stats(stream=False) # Calculate CPU percentage cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \ stats['precpu_stats']['cpu_usage']['total_usage'] system_delta = stats['cpu_stats']['system_cpu_usage'] - \ stats['precpu_stats']['system_cpu_usage'] if system_delta > 0: num_cpus = len(stats['cpu_stats']['cpu_usage'].get('percpu_usage', [1])) cpu_percent = (cpu_delta / system_delta) * num_cpus * 100.0 total_cpu += cpu_percent # Memory mem_usage = stats['memory_stats'].get('usage', 0) mem_limit = stats['memory_stats'].get('limit', 1) total_memory_used += mem_usage total_memory_limit += mem_limit # Network I/O networks = stats.get('networks', {}) for net_stats in networks.values(): total_net_rx += net_stats.get('rx_bytes', 0) total_net_tx += net_stats.get('tx_bytes', 0) container_count += 1 except Exception as e: logger.warning(f"Failed to get stats for container {container.id}: {e}") continue # Calculate percentages avg_cpu = round(total_cpu / container_count, 1) if container_count > 0 else 0.0 memory_percent = round((total_memory_used / total_memory_limit) * 100, 1) if total_memory_limit > 0 else 0.0 return { "cpu_percent": avg_cpu, "memory_percent": memory_percent, "memory_used_bytes": total_memory_used, "memory_limit_bytes": total_memory_limit, "network_rx_bytes": total_net_rx, "network_tx_bytes": total_net_tx, "container_count": container_count, "timestamp": int(time.time()) } except HTTPException: raise except Exception as e: logger.error(f"Error fetching metrics for host {host_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/containers") async def get_containers(host_id: Optional[str] = None, authenticated: bool = Depends(verify_session_auth)): """Get all containers""" return await monitor.get_containers(host_id) @app.post("/api/hosts/{host_id}/containers/{container_id}/restart") async def restart_container(host_id: str, container_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_containers): """Restart a container""" success = monitor.restart_container(host_id, container_id) return {"status": "success" if success else "failed"} @app.post("/api/hosts/{host_id}/containers/{container_id}/stop") async def stop_container(host_id: str, container_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_containers): """Stop a container""" success = monitor.stop_container(host_id, container_id) return {"status": "success" if success else "failed"} @app.post("/api/hosts/{host_id}/containers/{container_id}/start") async def start_container(host_id: str, container_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_containers): """Start a container""" success = monitor.start_container(host_id, container_id) return {"status": "success" if success else "failed"} @app.get("/api/hosts/{host_id}/containers/{container_id}/logs") async def get_container_logs( host_id: str, container_id: str, tail: int = 100, since: Optional[str] = None, # ISO timestamp for getting logs since a specific time authenticated: bool = Depends(verify_session_auth) # No rate limiting - authenticated users can poll logs freely ): """Get container logs - Portainer-style polling approach""" if host_id not in monitor.clients: raise HTTPException(status_code=404, detail="Host not found") try: client = monitor.clients[host_id] # Run blocking Docker calls in executor with timeout loop = asyncio.get_event_loop() # Get container with timeout try: container = await asyncio.wait_for( loop.run_in_executor(None, client.containers.get, container_id), timeout=5.0 ) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="Timeout getting container") # Prepare log options log_kwargs = { 'timestamps': True, 'tail': tail } # Add since parameter if provided (for getting only new logs) if since: try: # Parse ISO timestamp and convert to Unix timestamp for Docker import dateutil.parser dt = dateutil.parser.parse(since) # Docker's 'since' expects Unix timestamp as float import time unix_ts = time.mktime(dt.timetuple()) log_kwargs['since'] = unix_ts log_kwargs['tail'] = 'all' # Get all logs since timestamp except Exception as e: logger.debug(f"Could not parse 'since' parameter: {e}") pass # Invalid timestamp, ignore # Fetch logs with timeout try: logs = await asyncio.wait_for( loop.run_in_executor( None, lambda: container.logs(**log_kwargs).decode('utf-8', errors='ignore') ), timeout=5.0 ) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="Timeout fetching logs") # Parse logs and extract timestamps # Docker log format with timestamps: "2025-09-30T19:30:45.123456789Z actual log message" parsed_logs = [] for line in logs.split('\n'): if not line.strip(): continue # Try to extract timestamp (Docker format: ISO8601 with nanoseconds) try: # Find the space after timestamp space_idx = line.find(' ') if space_idx > 0: timestamp_str = line[:space_idx] log_text = line[space_idx + 1:] # Parse timestamp (remove nanoseconds for Python datetime) # Format: 2025-09-30T19:30:45.123456789Z -> 2025-09-30T19:30:45.123456Z if 'T' in timestamp_str and timestamp_str.endswith('Z'): # Truncate to microseconds (6 digits) if nanoseconds present parts = timestamp_str[:-1].split('.') if len(parts) == 2 and len(parts[1]) > 6: timestamp_str = f"{parts[0]}.{parts[1][:6]}Z" parsed_logs.append({ "timestamp": timestamp_str, "log": log_text }) else: # No valid timestamp, use current time parsed_logs.append({ "timestamp": datetime.utcnow().isoformat() + 'Z', "log": line }) else: # No space found, treat whole line as log parsed_logs.append({ "timestamp": datetime.utcnow().isoformat() + 'Z', "log": line }) except Exception: # If parsing fails, use current time parsed_logs.append({ "timestamp": datetime.utcnow().isoformat() + 'Z', "log": line }) return { "container_id": container_id, "logs": parsed_logs, "last_timestamp": datetime.utcnow().isoformat() + 'Z' # For next 'since' parameter } except HTTPException: raise except Exception as e: logger.error(f"Failed to get logs for {container_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) # Container exec endpoint removed for security reasons # Users should use direct SSH, Docker CLI, or other appropriate tools for container access # WebSocket log streaming removed in favor of HTTP polling (Portainer-style) # This is more reliable for remote Docker hosts @app.post("/api/containers/{container_id}/auto-restart") async def toggle_auto_restart(container_id: str, request: AutoRestartRequest, authenticated: bool = Depends(verify_session_auth)): """Toggle auto-restart for a container""" monitor.toggle_auto_restart(request.host_id, container_id, request.container_name, request.enabled) return {"container_id": container_id, "auto_restart": request.enabled} @app.get("/api/rate-limit/stats") async def get_rate_limit_stats(authenticated: bool = Depends(verify_session_auth)): """Get rate limiter statistics - admin only""" return rate_limiter.get_stats() @app.get("/api/security/audit") async def get_security_audit_stats(authenticated: bool = Depends(verify_session_auth), request: Request = None): """Get security audit statistics - admin only""" if request: security_audit.log_privileged_action( client_ip=request.client.host if hasattr(request, 'client') else "unknown", action="VIEW_SECURITY_AUDIT", target="security_audit_logs", success=True, user_agent=request.headers.get('user-agent', 'unknown') ) return security_audit.get_security_stats() @app.get("/api/settings") async def get_settings(authenticated: bool = Depends(verify_session_auth)): """Get global settings""" settings = monitor.db.get_settings() return { "max_retries": settings.max_retries, "retry_delay": settings.retry_delay, "default_auto_restart": settings.default_auto_restart, "polling_interval": settings.polling_interval, "connection_timeout": settings.connection_timeout, "log_retention_days": settings.log_retention_days, "enable_notifications": settings.enable_notifications, "alert_template": getattr(settings, 'alert_template', None), "blackout_windows": getattr(settings, 'blackout_windows', None), "timezone_offset": getattr(settings, 'timezone_offset', 0), "show_host_stats": getattr(settings, 'show_host_stats', True), "show_container_stats": getattr(settings, 'show_container_stats', True) } @app.post("/api/settings") async def update_settings(settings: GlobalSettings, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default): """Update global settings""" # Check if stats settings changed old_show_host_stats = monitor.settings.show_host_stats old_show_container_stats = monitor.settings.show_container_stats updated = monitor.db.update_settings(settings.dict()) monitor.settings = updated # Update in-memory settings # Log stats collection changes if old_show_host_stats != updated.show_host_stats: logger.info(f"Host stats collection {'enabled' if updated.show_host_stats else 'disabled'}") if old_show_container_stats != updated.show_container_stats: logger.info(f"Container stats collection {'enabled' if updated.show_container_stats else 'disabled'}") # Broadcast blackout status change to all clients is_blackout, window_name = monitor.notification_service.blackout_manager.is_in_blackout_window() await monitor.manager.broadcast({ 'type': 'blackout_status_changed', 'data': { 'is_blackout': is_blackout, 'window_name': window_name } }) return settings @app.get("/api/alerts") async def get_alert_rules(authenticated: bool = Depends(verify_session_auth)): """Get all alert rules""" rules = monitor.db.get_alert_rules(enabled_only=False) logger.info(f"Retrieved {len(rules)} alert rules from database") # Check for orphaned alerts orphaned = await monitor.check_orphaned_alerts() return [{ "id": rule.id, "name": rule.name, "containers": [{"host_id": c.host_id, "container_name": c.container_name} for c in rule.containers] if rule.containers else [], "trigger_events": rule.trigger_events, "trigger_states": rule.trigger_states, "notification_channels": rule.notification_channels, "cooldown_minutes": rule.cooldown_minutes, "enabled": rule.enabled, "last_triggered": rule.last_triggered.isoformat() if rule.last_triggered else None, "created_at": rule.created_at.isoformat(), "updated_at": rule.updated_at.isoformat(), "is_orphaned": rule.id in orphaned, "orphaned_containers": orphaned.get(rule.id, {}).get('orphaned_containers', []) if rule.id in orphaned else [] } for rule in rules] @app.post("/api/alerts") async def create_alert_rule(rule: AlertRuleCreate, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default): """Create a new alert rule""" try: # Validate cooldown_minutes if rule.cooldown_minutes < 0 or rule.cooldown_minutes > 10080: # Max 1 week raise HTTPException(status_code=400, detail="Cooldown must be between 0 and 10080 minutes (1 week)") rule_id = str(uuid.uuid4()) # Convert ContainerHostPair objects to dicts for database containers_data = None if rule.containers: containers_data = [{"host_id": c.host_id, "container_name": c.container_name} for c in rule.containers] logger.info(f"Creating alert rule: {rule.name} with {len(containers_data) if containers_data else 0} container+host pairs") db_rule = monitor.db.add_alert_rule({ "id": rule_id, "name": rule.name, "containers": containers_data, "trigger_events": rule.trigger_events, "trigger_states": rule.trigger_states, "notification_channels": rule.notification_channels, "cooldown_minutes": rule.cooldown_minutes, "enabled": rule.enabled }) logger.info(f"Successfully created alert rule with ID: {db_rule.id}") # Log alert rule creation monitor.event_logger.log_alert_rule_created( rule_name=db_rule.name, rule_id=db_rule.id, container_count=len(db_rule.containers) if db_rule.containers else 0, channels=db_rule.notification_channels or [], triggered_by="user" ) return { "id": db_rule.id, "name": db_rule.name, "containers": [{"host_id": c.host_id, "container_name": c.container_name} for c in db_rule.containers] if db_rule.containers else [], "trigger_events": db_rule.trigger_events, "trigger_states": db_rule.trigger_states, "notification_channels": db_rule.notification_channels, "cooldown_minutes": db_rule.cooldown_minutes, "enabled": db_rule.enabled, "last_triggered": None, "created_at": db_rule.created_at.isoformat(), "updated_at": db_rule.updated_at.isoformat() } except Exception as e: logger.error(f"Failed to create alert rule: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.put("/api/alerts/{rule_id}") async def update_alert_rule(rule_id: str, updates: AlertRuleUpdate, authenticated: bool = Depends(verify_session_auth)): """Update an alert rule""" try: # Validate cooldown_minutes if provided if updates.cooldown_minutes is not None: if updates.cooldown_minutes < 0 or updates.cooldown_minutes > 10080: # Max 1 week raise HTTPException(status_code=400, detail="Cooldown must be between 0 and 10080 minutes (1 week)") # Include all fields that are explicitly set, even if empty # This allows clearing trigger_events or trigger_states update_data = {} for k, v in updates.dict().items(): if v is not None: # Convert empty lists to None for trigger fields if k in ['trigger_events', 'trigger_states'] and isinstance(v, list) and not v: update_data[k] = None # Handle containers field separately elif k == 'containers': # v is already a list of dicts after .dict() call # Include it even if None to clear the containers (set to "all containers") update_data[k] = v else: update_data[k] = v elif k == 'containers': # Explicitly handle containers=None to clear specific container selection update_data[k] = None # Validate that at least one trigger type remains after update if 'trigger_events' in update_data or 'trigger_states' in update_data: # Get current rule to check what will remain current_rule = monitor.db.get_alert_rule(rule_id) if current_rule: final_events = update_data.get('trigger_events', current_rule.trigger_events) final_states = update_data.get('trigger_states', current_rule.trigger_states) if not final_events and not final_states: raise HTTPException(status_code=400, detail="Alert rule must have at least one trigger event or state") db_rule = monitor.db.update_alert_rule(rule_id, update_data) if not db_rule: raise HTTPException(status_code=404, detail="Alert rule not found") # Refresh in-memory alert rules return { "id": db_rule.id, "name": db_rule.name, "containers": [{"host_id": c.host_id, "container_name": c.container_name} for c in db_rule.containers] if db_rule.containers else [], "trigger_events": db_rule.trigger_events, "trigger_states": db_rule.trigger_states, "notification_channels": db_rule.notification_channels, "cooldown_minutes": db_rule.cooldown_minutes, "enabled": db_rule.enabled, "last_triggered": db_rule.last_triggered.isoformat() if db_rule.last_triggered else None, "created_at": db_rule.created_at.isoformat(), "updated_at": db_rule.updated_at.isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Failed to update alert rule: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.delete("/api/alerts/{rule_id}") async def delete_alert_rule(rule_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default): """Delete an alert rule""" try: # Get rule info before deleting for logging rule = monitor.db.get_alert_rule(rule_id) if not rule: raise HTTPException(status_code=404, detail="Alert rule not found") rule_name = rule.name success = monitor.db.delete_alert_rule(rule_id) if not success: raise HTTPException(status_code=404, detail="Alert rule not found") # Refresh in-memory alert rules # Log alert rule deletion monitor.event_logger.log_alert_rule_deleted( rule_name=rule_name, rule_id=rule_id, triggered_by="user" ) return {"status": "success", "message": f"Alert rule {rule_id} deleted"} except HTTPException: raise except Exception as e: logger.error(f"Failed to delete alert rule: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/alerts/orphaned") async def get_orphaned_alerts(authenticated: bool = Depends(verify_session_auth)): """Get alert rules that reference non-existent containers""" try: orphaned = await monitor.check_orphaned_alerts() return { "count": len(orphaned), "orphaned_rules": orphaned } except Exception as e: logger.error(f"Failed to check orphaned alerts: {e}") raise HTTPException(status_code=500, detail=str(e)) # ==================== Blackout Window Routes ==================== @app.get("/api/blackout/status") async def get_blackout_status(authenticated: bool = Depends(verify_session_auth)): """Get current blackout window status""" try: is_blackout, window_name = monitor.notification_service.blackout_manager.is_in_blackout_window() return { "is_blackout": is_blackout, "current_window": window_name } except Exception as e: logger.error(f"Error getting blackout status: {e}") return {"is_blackout": False, "current_window": None} # ==================== Notification Channel Routes ==================== @app.get("/api/notifications/template-variables") async def get_template_variables(authenticated: bool = Depends(verify_session_auth)): """Get available template variables for notification messages""" return { "variables": [ {"name": "{CONTAINER_NAME}", "description": "Name of the container"}, {"name": "{CONTAINER_ID}", "description": "Short container ID (12 characters)"}, {"name": "{HOST_NAME}", "description": "Name of the Docker host"}, {"name": "{HOST_ID}", "description": "ID of the Docker host"}, {"name": "{OLD_STATE}", "description": "Previous state of the container"}, {"name": "{NEW_STATE}", "description": "New state of the container"}, {"name": "{IMAGE}", "description": "Docker image name"}, {"name": "{TIMESTAMP}", "description": "Full timestamp (YYYY-MM-DD HH:MM:SS)"}, {"name": "{TIME}", "description": "Time only (HH:MM:SS)"}, {"name": "{DATE}", "description": "Date only (YYYY-MM-DD)"}, {"name": "{RULE_NAME}", "description": "Name of the alert rule"}, {"name": "{RULE_ID}", "description": "ID of the alert rule"}, {"name": "{TRIGGERED_BY}", "description": "What triggered the alert"}, {"name": "{EVENT_TYPE}", "description": "Docker event type (if applicable)"}, {"name": "{EXIT_CODE}", "description": "Container exit code (if applicable)"} ], "default_template": """🚨 **DockMon Alert** **Container:** `{CONTAINER_NAME}` **Host:** {HOST_NAME} **State Change:** `{OLD_STATE}` → `{NEW_STATE}` **Image:** {IMAGE} **Time:** {TIMESTAMP} **Rule:** {RULE_NAME} ───────────────────────""", "examples": { "simple": "Alert: {CONTAINER_NAME} on {HOST_NAME} changed from {OLD_STATE} to {NEW_STATE}", "detailed": """🔴 Container Alert Container: {CONTAINER_NAME} ({CONTAINER_ID}) Host: {HOST_NAME} Status: {OLD_STATE} → {NEW_STATE} Image: {IMAGE} Time: {TIMESTAMP} Triggered by: {RULE_NAME}""", "minimal": "{CONTAINER_NAME}: {NEW_STATE} at {TIME}" } } @app.get("/api/notifications/channels") async def get_notification_channels(authenticated: bool = Depends(verify_session_auth)): """Get all notification channels""" channels = monitor.db.get_notification_channels(enabled_only=False) return [{ "id": ch.id, "name": ch.name, "type": ch.type, "config": ch.config, "enabled": ch.enabled, "created_at": ch.created_at.isoformat(), "updated_at": ch.updated_at.isoformat() } for ch in channels] @app.post("/api/notifications/channels") async def create_notification_channel(channel: NotificationChannelCreate, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications): """Create a new notification channel""" try: db_channel = monitor.db.add_notification_channel({ "name": channel.name, "type": channel.type, "config": channel.config, "enabled": channel.enabled }) # Log notification channel creation monitor.event_logger.log_notification_channel_created( channel_name=db_channel.name, channel_type=db_channel.type, triggered_by="user" ) return { "id": db_channel.id, "name": db_channel.name, "type": db_channel.type, "config": db_channel.config, "enabled": db_channel.enabled, "created_at": db_channel.created_at.isoformat(), "updated_at": db_channel.updated_at.isoformat() } except Exception as e: logger.error(f"Failed to create notification channel: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.put("/api/notifications/channels/{channel_id}") async def update_notification_channel(channel_id: int, updates: NotificationChannelUpdate, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications): """Update a notification channel""" try: update_data = {k: v for k, v in updates.dict().items() if v is not None} db_channel = monitor.db.update_notification_channel(channel_id, update_data) if not db_channel: raise HTTPException(status_code=404, detail="Channel not found") return { "id": db_channel.id, "name": db_channel.name, "type": db_channel.type, "config": db_channel.config, "enabled": db_channel.enabled, "created_at": db_channel.created_at.isoformat(), "updated_at": db_channel.updated_at.isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Failed to update notification channel: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/notifications/channels/{channel_id}/dependent-alerts") async def get_dependent_alerts(channel_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications): """Get alerts that would be orphaned if this channel is deleted""" try: dependent_alerts = monitor.db.get_alerts_dependent_on_channel(channel_id) return {"alerts": dependent_alerts} except Exception as e: logger.error(f"Failed to get dependent alerts: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.delete("/api/notifications/channels/{channel_id}") async def delete_notification_channel(channel_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications): """Delete a notification channel and cascade delete alerts that would become orphaned""" try: # Find alerts that would be orphaned (only have this channel) affected_alerts = monitor.db.get_alerts_dependent_on_channel(channel_id) # Find all alerts that use this channel (for removal from multi-channel alerts) all_alerts = monitor.db.get_alert_rules() # Delete the channel success = monitor.db.delete_notification_channel(channel_id) if not success: raise HTTPException(status_code=404, detail="Channel not found") # Delete orphaned alerts deleted_alerts = [] for alert in affected_alerts: if monitor.db.delete_alert_rule(alert['id']): deleted_alerts.append(alert['name']) # Remove channel from multi-channel alerts updated_alerts = [] for alert in all_alerts: # Skip if already deleted if alert.id in [a['id'] for a in affected_alerts]: continue # Check if this alert uses the deleted channel channels = alert.notification_channels if isinstance(alert.notification_channels, list) else [] if channel_id in channels: # Remove the channel new_channels = [ch for ch in channels if ch != channel_id] monitor.db.update_alert_rule(alert.id, {'notification_channels': new_channels}) updated_alerts.append(alert.name) result = { "status": "success", "message": f"Channel {channel_id} deleted" } if deleted_alerts: result["deleted_alerts"] = deleted_alerts result["message"] += f" and {len(deleted_alerts)} orphaned alert(s) removed" if updated_alerts: result["updated_alerts"] = updated_alerts if "deleted_alerts" in result: result["message"] += f", {len(updated_alerts)} alert(s) updated" else: result["message"] += f" and {len(updated_alerts)} alert(s) updated" return result except HTTPException: raise except Exception as e: logger.error(f"Failed to delete notification channel: {e}") raise HTTPException(status_code=400, detail=str(e)) @app.post("/api/notifications/channels/{channel_id}/test") async def test_notification_channel(channel_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications): """Test a notification channel""" try: if not hasattr(monitor, 'notification_service'): raise HTTPException(status_code=503, detail="Notification service not available") result = await monitor.notification_service.test_channel(channel_id) return result except HTTPException: raise except Exception as e: logger.error(f"Failed to test notification channel: {e}") raise HTTPException(status_code=500, detail=str(e)) # ==================== Event Log Routes ==================== @app.get("/api/events") async def get_events( category: Optional[List[str]] = Query(None), event_type: Optional[str] = None, severity: Optional[List[str]] = Query(None), host_id: Optional[List[str]] = Query(None), container_id: Optional[str] = None, container_name: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, correlation_id: Optional[str] = None, search: Optional[str] = None, limit: int = 100, offset: int = 0, hours: Optional[int] = None, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default ): """ Get events with filtering and pagination Query parameters: - category: Filter by category (container, host, system, alert, notification) - event_type: Filter by event type (state_change, action_taken, etc.) - severity: Filter by severity (debug, info, warning, error, critical) - host_id: Filter by specific host - container_id: Filter by specific container - container_name: Filter by container name (partial match) - start_date: Filter events after this date (ISO 8601 format) - end_date: Filter events before this date (ISO 8601 format) - hours: Shortcut to get events from last X hours (overrides start_date) - correlation_id: Get related events - search: Search in title, message, and container name - limit: Number of results per page (default 100, max 500) - offset: Pagination offset """ try: # Validate and parse dates start_datetime = None end_datetime = None # If hours parameter is provided, calculate start_date if hours is not None: start_datetime = datetime.now() - timedelta(hours=hours) elif start_date: try: start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00')) except ValueError: raise HTTPException(status_code=400, detail="Invalid start_date format. Use ISO 8601 format.") if end_date: try: end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00')) except ValueError: raise HTTPException(status_code=400, detail="Invalid end_date format. Use ISO 8601 format.") # Limit maximum results per page if limit > 500: limit = 500 # Query events from database events, total_count = monitor.db.get_events( category=category, event_type=event_type, severity=severity, host_id=host_id, container_id=container_id, container_name=container_name, start_date=start_datetime, end_date=end_datetime, correlation_id=correlation_id, search=search, limit=limit, offset=offset ) # Convert to JSON-serializable format events_json = [] for event in events: events_json.append({ "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() }) return { "events": events_json, "total_count": total_count, "limit": limit, "offset": offset, "has_more": (offset + limit) < total_count } except HTTPException: raise except Exception as e: logger.error(f"Failed to get events: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/events/{event_id}") async def get_event_by_id( event_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default ): """Get a specific event by ID""" try: event = monitor.db.get_event_by_id(event_id) if not event: raise HTTPException(status_code=404, detail="Event not found") return { "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Failed to get event: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/events/correlation/{correlation_id}") async def get_events_by_correlation( correlation_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default ): """Get all events with the same correlation ID (related events)""" try: events = monitor.db.get_events_by_correlation(correlation_id) events_json = [] for event in events: events_json.append({ "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() }) return {"events": events_json, "count": len(events_json)} except Exception as e: logger.error(f"Failed to get events by correlation: {e}") raise HTTPException(status_code=500, detail=str(e)) # ==================== User Dashboard Routes ==================== @app.get("/api/user/dashboard-layout") async def get_dashboard_layout(request: Request, authenticated: bool = Depends(verify_session_auth)): """Get dashboard layout for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) layout = monitor.db.get_dashboard_layout(username) return {"layout": layout} @app.post("/api/user/dashboard-layout") async def save_dashboard_layout(request: Request, authenticated: bool = Depends(verify_session_auth)): """Save dashboard layout for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) try: body = await request.json() layout_json = body.get('layout') if layout_json is None: raise HTTPException(status_code=400, detail="Layout is required") # Validate JSON structure if layout_json: try: parsed_layout = json.loads(layout_json) if isinstance(layout_json, str) else layout_json # Validate it's a list if not isinstance(parsed_layout, list): raise HTTPException(status_code=400, detail="Layout must be an array of widget positions") # Validate each widget has required fields required_fields = ['x', 'y', 'w', 'h'] for widget in parsed_layout: if not isinstance(widget, dict): raise HTTPException(status_code=400, detail="Each widget must be an object") for field in required_fields: if field not in widget: raise HTTPException(status_code=400, detail=f"Widget missing required field: {field}") if not isinstance(widget[field], (int, float)): raise HTTPException(status_code=400, detail=f"Widget field '{field}' must be a number") # Convert back to string for storage layout_json = json.dumps(parsed_layout) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON format for layout") success = monitor.db.save_dashboard_layout(username, layout_json) if not success: raise HTTPException(status_code=500, detail="Failed to save layout") return {"success": True} except HTTPException: raise except Exception as e: logger.error(f"Failed to save dashboard layout: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/user/event-sort-order") async def get_event_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)): """Get event sort order preference for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) sort_order = monitor.db.get_event_sort_order(username) return {"sort_order": sort_order} @app.post("/api/user/event-sort-order") async def save_event_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)): """Save event sort order preference for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) try: body = await request.json() sort_order = body.get('sort_order') if sort_order not in ['asc', 'desc']: raise HTTPException(status_code=400, detail="sort_order must be 'asc' or 'desc'") success = monitor.db.save_event_sort_order(username, sort_order) if not success: raise HTTPException(status_code=500, detail="Failed to save sort order") return {"success": True} except HTTPException: raise except Exception as e: logger.error(f"Failed to save event sort order: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/user/container-sort-order") async def get_container_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)): """Get container sort order preference for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) sort_order = monitor.db.get_container_sort_order(username) return {"sort_order": sort_order} @app.post("/api/user/container-sort-order") async def save_container_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)): """Save container sort order preference for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) try: body = await request.json() sort_order = body.get('sort_order') valid_sorts = ['name-asc', 'name-desc', 'status', 'memory-desc', 'memory-asc', 'cpu-desc', 'cpu-asc'] if sort_order not in valid_sorts: raise HTTPException(status_code=400, detail=f"sort_order must be one of: {', '.join(valid_sorts)}") success = monitor.db.save_container_sort_order(username, sort_order) if not success: raise HTTPException(status_code=500, detail="Failed to save sort order") return {"success": True} except HTTPException: raise except Exception as e: logger.error(f"Failed to save container sort order: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/user/modal-preferences") async def get_modal_preferences(request: Request, authenticated: bool = Depends(verify_session_auth)): """Get modal preferences for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) preferences = monitor.db.get_modal_preferences(username) return {"preferences": preferences} @app.post("/api/user/modal-preferences") async def save_modal_preferences(request: Request, authenticated: bool = Depends(verify_session_auth)): """Save modal preferences for current user""" from auth.routes import _get_session_from_cookie from auth.session_manager import session_manager session_id = _get_session_from_cookie(request) username = session_manager.get_session_username(session_id) try: body = await request.json() preferences = body.get('preferences') if preferences is None: raise HTTPException(status_code=400, detail="Preferences are required") success = monitor.db.save_modal_preferences(username, preferences) if not success: raise HTTPException(status_code=500, detail="Failed to save preferences") return {"success": True} except HTTPException: raise except Exception as e: logger.error(f"Failed to save modal preferences: {e}") raise HTTPException(status_code=500, detail=str(e)) # ==================== Event Log Routes ==================== # Note: Main /api/events endpoint is defined earlier with full feature set @app.get("/api/events/{event_id}") async def get_event(event_id: int, authenticated: bool = Depends(verify_session_auth)): """Get a specific event by ID""" try: event = monitor.db.get_event_by_id(event_id) if not event: raise HTTPException(status_code=404, detail="Event not found") return { "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Failed to get event {event_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/events/correlation/{correlation_id}") async def get_events_by_correlation(correlation_id: str, authenticated: bool = Depends(verify_session_auth)): """Get all events with the same correlation ID""" try: events = monitor.db.get_events_by_correlation(correlation_id) return { "correlation_id": correlation_id, "events": [{ "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() } for event in events] } except Exception as e: logger.error(f"Failed to get events by correlation {correlation_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/events/statistics") async def get_event_statistics(start_date: Optional[str] = None, end_date: Optional[str] = None, authenticated: bool = Depends(verify_session_auth)): """Get event statistics for dashboard""" try: # Parse dates parsed_start_date = None parsed_end_date = None if start_date: try: parsed_start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00')) except ValueError: raise HTTPException(status_code=400, detail="Invalid start_date format") if end_date: try: parsed_end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00')) except ValueError: raise HTTPException(status_code=400, detail="Invalid end_date format") stats = monitor.db.get_event_statistics( start_date=parsed_start_date, end_date=parsed_end_date ) return stats except HTTPException: raise except Exception as e: logger.error(f"Failed to get event statistics: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/events/container/{container_id}") async def get_container_events(container_id: str, limit: int = 50, authenticated: bool = Depends(verify_session_auth)): """Get events for a specific container""" try: events, total_count = monitor.db.get_events( container_id=container_id, limit=limit, offset=0 ) return { "container_id": container_id, "events": [{ "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() } for event in events], "total_count": total_count } except Exception as e: logger.error(f"Failed to get events for container {container_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/events/host/{host_id}") async def get_host_events(host_id: str, limit: int = 50, authenticated: bool = Depends(verify_session_auth)): """Get events for a specific host""" try: events, total_count = monitor.db.get_events( host_id=host_id, limit=limit, offset=0 ) return { "host_id": host_id, "events": [{ "id": event.id, "correlation_id": event.correlation_id, "category": event.category, "event_type": event.event_type, "severity": event.severity, "host_id": event.host_id, "host_name": event.host_name, "container_id": event.container_id, "container_name": event.container_name, "title": event.title, "message": event.message, "old_state": event.old_state, "new_state": event.new_state, "triggered_by": event.triggered_by, "details": event.details, "duration_ms": event.duration_ms, "timestamp": event.timestamp.isoformat() } for event in events], "total_count": total_count } except Exception as e: logger.error(f"Failed to get events for host {host_id}: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/events/cleanup") async def cleanup_old_events(days: int = 30, authenticated: bool = Depends(verify_session_auth)): """Clean up old events - DANGEROUS: Can delete audit logs""" try: if days < 1: raise HTTPException(status_code=400, detail="Days must be at least 1") deleted_count = monitor.db.cleanup_old_events(days) monitor.event_logger.log_system_event( "Event Cleanup Completed", f"Cleaned up {deleted_count} events older than {days} days", EventSeverity.INFO, EventType.STARTUP ) return { "status": "success", "message": f"Cleaned up {deleted_count} events older than {days} days", "deleted_count": deleted_count } except HTTPException: raise except Exception as e: logger.error(f"Failed to cleanup events: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time updates""" # Generate a unique connection ID for rate limiting connection_id = f"ws_{id(websocket)}_{time.time()}" await monitor.manager.connect(websocket) await monitor.realtime.subscribe_to_events(websocket) # Send initial state settings_dict = { "max_retries": monitor.settings.max_retries, "retry_delay": monitor.settings.retry_delay, "default_auto_restart": monitor.settings.default_auto_restart, "polling_interval": monitor.settings.polling_interval, "connection_timeout": monitor.settings.connection_timeout, "log_retention_days": monitor.settings.log_retention_days, "enable_notifications": monitor.settings.enable_notifications, "alert_template": getattr(monitor.settings, 'alert_template', None), "blackout_windows": getattr(monitor.settings, 'blackout_windows', None), "timezone_offset": getattr(monitor.settings, 'timezone_offset', 0), "show_host_stats": getattr(monitor.settings, 'show_host_stats', True), "show_container_stats": getattr(monitor.settings, 'show_container_stats', True) } initial_state = { "type": "initial_state", "data": { "hosts": [h.dict() for h in monitor.hosts.values()], "containers": [c.dict() for c in await monitor.get_containers()], "settings": settings_dict } } await websocket.send_text(json.dumps(initial_state, cls=DateTimeEncoder)) try: while True: # Keep connection alive and handle incoming messages message = await websocket.receive_json() # Check rate limit for incoming messages allowed, reason = ws_rate_limiter.check_rate_limit(connection_id) if not allowed: # Send rate limit error to client await websocket.send_text(json.dumps({ "type": "error", "error": "rate_limit", "message": reason })) # Don't process the message continue # Handle different message types if message.get("type") == "subscribe_stats": container_id = message.get("container_id") if container_id: await monitor.realtime.subscribe_to_stats(websocket, container_id) # Find the host and start monitoring for host_id, client in monitor.clients.items(): try: client.containers.get(container_id) await monitor.realtime.start_container_stats_stream( client, container_id, interval=2 ) break except Exception as e: logger.debug(f"Container {container_id} not found on host {host_id[:8]}: {e}") continue elif message.get("type") == "unsubscribe_stats": container_id = message.get("container_id") if container_id: await monitor.realtime.unsubscribe_from_stats(websocket, container_id) elif message.get("type") == "modal_opened": # Track that a container modal is open - keep stats running for this container container_id = message.get("container_id") host_id = message.get("host_id") if container_id and host_id: # Verify container exists and user has access to it try: containers = await monitor.get_containers() # Must await async function container_exists = any( c.id == container_id and c.host_id == host_id for c in containers ) if container_exists: monitor.stats_manager.add_modal_container(container_id, host_id) else: logger.warning(f"User attempted to access stats for non-existent container: {container_id[:12]} on host {host_id[:8]}") except Exception as e: logger.error(f"Error validating container access: {e}") elif message.get("type") == "modal_closed": # Remove container from modal tracking container_id = message.get("container_id") host_id = message.get("host_id") if container_id and host_id: monitor.stats_manager.remove_modal_container(container_id, host_id) elif message.get("type") == "ping": await websocket.send_text(json.dumps({"type": "pong"}, cls=DateTimeEncoder)) except WebSocketDisconnect: await monitor.manager.disconnect(websocket) await monitor.realtime.unsubscribe_from_events(websocket) # Unsubscribe from all stats for container_id in list(monitor.realtime.stats_subscribers): await monitor.realtime.unsubscribe_from_stats(websocket, container_id) # Clear modal containers (user disconnected, modals are closed) monitor.stats_manager.clear_modal_containers() # Clean up rate limiter tracking ws_rate_limiter.cleanup_connection(connection_id)