""" Docker Monitoring Core for DockMon Main monitoring class for Docker containers and hosts """ import asyncio import logging import os import shutil import time import uuid from datetime import datetime from typing import Dict, List, Optional import docker from docker import DockerClient from fastapi import HTTPException from config.paths import DATABASE_PATH, CERTS_DIR from database import DatabaseManager, AutoRestartConfig, GlobalSettings, DockerHostDB from models.docker_models import DockerHost, DockerHostConfig, Container from models.settings_models import AlertRule, NotificationSettings from websocket.connection import ConnectionManager from realtime import RealtimeMonitor from notifications import NotificationService, AlertProcessor from event_logger import EventLogger, EventSeverity, EventType from stats_client import get_stats_client from docker_monitor.stats_manager import StatsManager from auth.session_manager import session_manager logger = logging.getLogger(__name__) def _handle_task_exception(task: asyncio.Task) -> None: """Handle exceptions from fire-and-forget async tasks""" try: task.result() except asyncio.CancelledError: pass # Task was cancelled, this is normal except Exception as e: logger.error(f"Unhandled exception in background task: {e}", exc_info=True) def sanitize_host_id(host_id: str) -> str: """ Sanitize host ID to prevent path traversal attacks. Only allows valid UUID format or alphanumeric + dash characters. """ if not host_id: raise ValueError("Host ID cannot be empty") # Check for path traversal attempts if ".." in host_id or "/" in host_id or "\\" in host_id: raise ValueError(f"Invalid host ID format: {host_id}") # Try to validate as UUID first try: uuid.UUID(host_id) return host_id except ValueError: # If not a valid UUID, only allow alphanumeric and dashes import re if re.match(r'^[a-zA-Z0-9\-]+$', host_id): return host_id else: raise ValueError(f"Invalid host ID format: {host_id}") class DockerMonitor: """Main monitoring class for Docker containers""" def __init__(self): self.hosts: Dict[str, DockerHost] = {} self.clients: Dict[str, DockerClient] = {} self.db = DatabaseManager(DATABASE_PATH) # Initialize database with centralized path self.settings = self.db.get_settings() # Load settings from DB self.notification_settings = NotificationSettings() self.auto_restart_status: Dict[str, bool] = {} self.restart_attempts: Dict[str, int] = {} self.restarting_containers: Dict[str, bool] = {} # Track containers currently being restarted self.monitoring_task: Optional[asyncio.Task] = None # Reconnection tracking with exponential backoff self.reconnect_attempts: Dict[str, int] = {} # Track reconnect attempts per host self.last_reconnect_attempt: Dict[str, float] = {} # Track last attempt time per host self.manager = ConnectionManager() self.realtime = RealtimeMonitor() # Real-time monitoring self.event_logger = EventLogger(self.db, self.manager) # Event logging service with WebSocket support self.notification_service = NotificationService(self.db, self.event_logger) # Notification service self.alert_processor = AlertProcessor(self.notification_service) # Alert processor self._container_states: Dict[str, str] = {} # Track container states for change detection self._recent_user_actions: Dict[str, float] = {} # Track recent user actions: {container_key: timestamp} self.cleanup_task: Optional[asyncio.Task] = None # Background cleanup task # Locks for shared data structures to prevent race conditions self._state_lock = asyncio.Lock() self._actions_lock = asyncio.Lock() self._restart_lock = asyncio.Lock() # Stats collection manager self.stats_manager = StatsManager() self._load_persistent_config() # Load saved hosts and configs def add_host(self, config: DockerHostConfig, existing_id: str = None, skip_db_save: bool = False, suppress_event_loop_errors: bool = False) -> DockerHost: """Add a new Docker host to monitor""" client = None # Track client for cleanup on error try: # Validate certificates if provided (before trying to use them) if config.tls_cert or config.tls_key or config.tls_ca: self._validate_certificates(config) # Create Docker client if config.url.startswith("unix://"): client = docker.DockerClient(base_url=config.url) else: # For TCP connections tls_config = None if config.tls_cert and config.tls_key: # Create persistent certificate storage directory safe_id = sanitize_host_id(existing_id or str(uuid.uuid4())) cert_dir = os.path.join(CERTS_DIR, safe_id) # Create with secure permissions - handle TOCTOU race condition try: os.makedirs(cert_dir, mode=0o700, exist_ok=False) except FileExistsError: # Verify it's actually a directory and not a symlink/file import stat st = os.lstat(cert_dir) # Use lstat to not follow symlinks if not stat.S_ISDIR(st.st_mode): raise ValueError("Certificate path exists but is not a directory") # Write certificate files cert_file = os.path.join(cert_dir, 'client-cert.pem') key_file = os.path.join(cert_dir, 'client-key.pem') ca_file = os.path.join(cert_dir, 'ca.pem') if config.tls_ca else None with open(cert_file, 'w') as f: f.write(config.tls_cert) with open(key_file, 'w') as f: f.write(config.tls_key) if ca_file and config.tls_ca: with open(ca_file, 'w') as f: f.write(config.tls_ca) # Set secure permissions os.chmod(cert_file, 0o600) os.chmod(key_file, 0o600) if ca_file: os.chmod(ca_file, 0o600) tls_config = docker.tls.TLSConfig( client_cert=(cert_file, key_file), ca_cert=ca_file, verify=bool(config.tls_ca) ) client = docker.DockerClient( base_url=config.url, tls=tls_config, timeout=self.settings.connection_timeout ) # Test connection client.ping() # Validate TLS configuration for TCP connections security_status = self._validate_host_security(config) # Create host object with existing ID if provided (for persistence after restarts) # Sanitize the ID to prevent path traversal host_id = existing_id or str(uuid.uuid4()) try: host_id = sanitize_host_id(host_id) except ValueError as e: logger.error(f"Invalid host ID: {e}") raise HTTPException(status_code=400, detail=str(e)) host = DockerHost( id=host_id, name=config.name, url=config.url, status="online", security_status=security_status ) # Store client and host self.clients[host.id] = client self.hosts[host.id] = host # Save to database only if not reconnecting to an existing host if not skip_db_save: db_host = self.db.add_host({ 'id': host.id, 'name': config.name, 'url': config.url, 'tls_cert': config.tls_cert, 'tls_key': config.tls_key, 'tls_ca': config.tls_ca, 'security_status': security_status }) # Register host with stats and event services # Only register if we're adding a NEW host (not during startup/reconnect) # During startup, monitor_containers() handles all registrations if not skip_db_save: # New host being added by user try: import asyncio stats_client = get_stats_client() async def register_host(): try: await stats_client.add_docker_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key) logger.info(f"Registered {host.name} ({host.id[:8]}) with stats service") await stats_client.add_event_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key) logger.info(f"Registered {host.name} ({host.id[:8]}) with event service") except Exception as e: logger.error(f"Failed to register {host.name} with Go services: {e}") # Try to create task if event loop is running try: task = asyncio.create_task(register_host()) task.add_done_callback(_handle_task_exception) except RuntimeError: # No event loop running - will be registered by monitor_containers() logger.debug(f"No event loop yet - {host.name} will be registered when monitoring starts") except Exception as e: logger.warning(f"Could not register {host.name} with Go services: {e}") # Log host connection self.event_logger.log_host_connection( host_name=host.name, host_id=host.id, host_url=config.url, connected=True ) # Log host added (only for new hosts, not reconnects) if not skip_db_save: self.event_logger.log_host_added( host_name=host.name, host_id=host.id, host_url=config.url, triggered_by="user" ) logger.info(f"Added Docker host: {host.name} ({host.url})") return host except Exception as e: # Clean up client if it was created but not stored if client is not None: try: client.close() logger.debug(f"Closed orphaned Docker client for {config.name}") except Exception as close_error: logger.debug(f"Error closing Docker client: {close_error}") # Suppress event loop errors during first run startup if suppress_event_loop_errors and "no running event loop" in str(e): logger.debug(f"Event loop warning for {config.name} (expected during startup): {e}") # Re-raise so the caller knows host was added but with event loop issue raise else: logger.error(f"Failed to add host {config.name}: {e}") error_msg = self._get_user_friendly_error(str(e)) raise HTTPException(status_code=400, detail=error_msg) def _get_user_friendly_error(self, error: str) -> str: """Convert technical Docker errors to user-friendly messages""" error_lower = error.lower() # SSL/TLS certificate errors if 'ssl' in error_lower or 'tls' in error_lower: if 'pem lib' in error_lower or 'pem' in error_lower: return ( "SSL certificate error: The certificates provided appear to be invalid or don't match. " "Please verify:\n" "• The certificates are for the correct server (check hostname/IP)\n" "• The client certificate and private key are a matching pair\n" "• The CA certificate matches the server's certificate\n" "• The certificates haven't expired" ) elif 'certificate verify failed' in error_lower: return ( "SSL certificate verification failed: The server's certificate is not trusted by the CA certificate you provided. " "Make sure you're using the correct CA certificate that signed the server's certificate." ) elif 'ssleof' in error_lower or 'connection reset' in error_lower: return ( "SSL connection failed: The server closed the connection during SSL handshake. " "This usually means the server doesn't recognize the certificates. " "Verify you're using the correct certificates for this server." ) else: return f"SSL/TLS error: Unable to establish secure connection. {error}" # Connection errors elif 'connection refused' in error_lower: return ( "Connection refused: The Docker daemon is not accepting connections on this address. " "Make sure:\n" "• Docker is running on the remote host\n" "• The Docker daemon is configured to listen on the specified port\n" "• Firewall allows connections to the port" ) elif 'timeout' in error_lower or 'timed out' in error_lower: return ( "Connection timeout: Unable to reach the Docker daemon. " "Check that the host address is correct and the host is reachable on your network." ) elif 'no route to host' in error_lower or 'network unreachable' in error_lower: return ( "Network unreachable: Cannot reach the specified host. " "Verify the IP address/hostname is correct and the host is on your network." ) elif 'http request to an https server' in error_lower: return ( "Protocol mismatch: You're trying to connect without TLS to a server that requires TLS. " "The server expects HTTPS connections. Please provide TLS certificates or change the server configuration." ) # Return original error if we don't have a friendly version return error def _validate_certificates(self, config: DockerHostConfig): """Validate certificate format before attempting to use them""" def check_cert_format(cert_data: str, cert_type: str): """Check if certificate has proper PEM format markers""" if not cert_data or not cert_data.strip(): raise HTTPException( status_code=400, detail=f"{cert_type} is empty. Please paste the certificate content." ) cert_data = cert_data.strip() # Check for BEGIN marker if "-----BEGIN" not in cert_data: raise HTTPException( status_code=400, detail=f"{cert_type} is missing the '-----BEGIN' header. Make sure you copied the complete certificate including the BEGIN line." ) # Check for END marker if "-----END" not in cert_data: raise HTTPException( status_code=400, detail=f"{cert_type} is missing the '-----END' footer. Make sure you copied the complete certificate including the END line." ) # Check BEGIN comes before END begin_pos = cert_data.find("-----BEGIN") end_pos = cert_data.find("-----END") if begin_pos >= end_pos: raise HTTPException( status_code=400, detail=f"{cert_type} format is invalid. The '-----BEGIN' line should come before the '-----END' line." ) # Check for certificate data between markers cert_content = cert_data[begin_pos:end_pos + 50] # Include END marker lines = cert_content.split('\n') if len(lines) < 3: # Should have BEGIN, at least one data line, and END raise HTTPException( status_code=400, detail=f"{cert_type} appears to be incomplete. Make sure you copied all lines between BEGIN and END." ) # Validate each certificate type if config.tls_ca: check_cert_format(config.tls_ca, "CA Certificate") if config.tls_cert: check_cert_format(config.tls_cert, "Client Certificate") if config.tls_key: # Private keys can be PRIVATE KEY or RSA PRIVATE KEY key_data = config.tls_key.strip() if "-----BEGIN" not in key_data or "-----END" not in key_data: raise HTTPException( status_code=400, detail="Client Private Key is incomplete. Make sure you copied the complete key including both '-----BEGIN' and '-----END' lines." ) def _validate_host_security(self, config: DockerHostConfig) -> str: """Validate the security configuration of a Docker host""" if config.url.startswith("unix://"): return "secure" # Unix sockets are secure (local only) elif config.url.startswith("tcp://"): if config.tls_cert and config.tls_key and config.tls_ca: return "secure" # Has TLS certificates else: logger.warning(f"Host {config.name} configured without TLS - connection is insecure!") return "insecure" # TCP without TLS else: return "unknown" # Unknown protocol def _cleanup_host_certificates(self, host_id: str): """Clean up certificate files for a host""" safe_id = sanitize_host_id(host_id) cert_dir = os.path.join(CERTS_DIR, safe_id) # Defense in depth: verify path is within CERTS_DIR abs_cert_dir = os.path.abspath(cert_dir) abs_certs_dir = os.path.abspath(CERTS_DIR) if not abs_cert_dir.startswith(abs_certs_dir): logger.error(f"Path traversal attempt detected: {host_id}") raise ValueError("Invalid certificate path") if os.path.exists(cert_dir): try: shutil.rmtree(cert_dir) logger.info(f"Cleaned up certificate files for host {host_id}") except Exception as e: logger.warning(f"Failed to clean up certificates for host {host_id}: {e}") async def remove_host(self, host_id: str): """Remove a Docker host""" # Validate host_id to prevent path traversal try: host_id = sanitize_host_id(host_id) except ValueError as e: logger.error(f"Invalid host ID: {e}") raise HTTPException(status_code=400, detail=str(e)) if host_id in self.hosts: # Get host info before removing host = self.hosts[host_id] host_name = host.name del self.hosts[host_id] if host_id in self.clients: self.clients[host_id].close() del self.clients[host_id] # Remove from Go stats and event services (await to ensure cleanup completes before returning) try: stats_client = get_stats_client() try: # Remove from stats service (closes Docker client and stops all container streams) await stats_client.remove_docker_host(host_id) logger.info(f"Removed {host_name} ({host_id[:8]}) from stats service") # Remove from event service await stats_client.remove_event_host(host_id) logger.info(f"Removed {host_name} ({host_id[:8]}) from event service") except asyncio.TimeoutError: # Timeout during cleanup is expected - Go service closes connections immediately logger.debug(f"Timeout removing {host_name} from Go services (expected during cleanup)") except Exception as e: logger.error(f"Failed to remove {host_name} from Go services: {e}") except Exception as e: logger.warning(f"Failed to remove host {host_id} from Go services: {e}") # Clean up certificate files self._cleanup_host_certificates(host_id) # Remove from database self.db.delete_host(host_id) # Clean up container state tracking for this host async with self._state_lock: containers_to_remove = [key for key in self._container_states.keys() if key.startswith(f"{host_id}:")] for container_key in containers_to_remove: del self._container_states[container_key] # Clean up recent user actions for this host async with self._actions_lock: actions_to_remove = [key for key in self._recent_user_actions.keys() if key.startswith(f"{host_id}:")] for container_key in actions_to_remove: del self._recent_user_actions[container_key] # Clean up notification service's container state tracking for this host notification_states_to_remove = [key for key in self.notification_service._last_container_state.keys() if key.startswith(f"{host_id}:")] for container_key in notification_states_to_remove: del self.notification_service._last_container_state[container_key] # Clean up alert processor's container state tracking for this host alert_processor_states_to_remove = [key for key in self.alert_processor._container_states.keys() if key.startswith(f"{host_id}:")] for container_key in alert_processor_states_to_remove: del self.alert_processor._container_states[container_key] # Clean up notification service's alert cooldown tracking for this host alert_cooldowns_to_remove = [key for key in self.notification_service._last_alerts.keys() if key.startswith(f"{host_id}:")] for container_key in alert_cooldowns_to_remove: del self.notification_service._last_alerts[container_key] # Clean up reconnection tracking for this host if host_id in self.reconnect_attempts: del self.reconnect_attempts[host_id] if host_id in self.last_reconnect_attempt: del self.last_reconnect_attempt[host_id] # Clean up auto-restart tracking for this host async with self._restart_lock: auto_restart_to_remove = [key for key in self.auto_restart_status.keys() if key.startswith(f"{host_id}:")] for container_key in auto_restart_to_remove: del self.auto_restart_status[container_key] if container_key in self.restart_attempts: del self.restart_attempts[container_key] if container_key in self.restarting_containers: del self.restarting_containers[container_key] # Clean up stats manager's streaming containers for this host # Remove using the full composite key (format: "host_id:container_id") for container_key in containers_to_remove: self.stats_manager.streaming_containers.discard(container_key) if containers_to_remove: logger.debug(f"Cleaned up {len(containers_to_remove)} container state entries for removed host {host_id[:8]}") if notification_states_to_remove: logger.debug(f"Cleaned up {len(notification_states_to_remove)} notification state entries for removed host {host_id[:8]}") if alert_processor_states_to_remove: logger.debug(f"Cleaned up {len(alert_processor_states_to_remove)} alert processor state entries for removed host {host_id[:8]}") if alert_cooldowns_to_remove: logger.debug(f"Cleaned up {len(alert_cooldowns_to_remove)} alert cooldown entries for removed host {host_id[:8]}") if auto_restart_to_remove: logger.debug(f"Cleaned up {len(auto_restart_to_remove)} auto-restart entries for removed host {host_id[:8]}") # Log host removed self.event_logger.log_host_removed( host_name=host_name, host_id=host_id, triggered_by="user" ) logger.info(f"Removed host {host_id}") def update_host(self, host_id: str, config: DockerHostConfig): """Update an existing Docker host""" # Validate host_id to prevent path traversal try: host_id = sanitize_host_id(host_id) except ValueError as e: logger.error(f"Invalid host ID: {e}") raise HTTPException(status_code=400, detail=str(e)) client = None # Track client for cleanup on error try: # Get existing host from database to check if we need to preserve certificates existing_host = self.db.get_host(host_id) if not existing_host: raise HTTPException(status_code=404, detail=f"Host {host_id} not found") # If certificates are not provided in the update, use existing ones # This allows updating just the name without providing certificates again if not config.tls_cert and existing_host.tls_cert: config.tls_cert = existing_host.tls_cert if not config.tls_key and existing_host.tls_key: config.tls_key = existing_host.tls_key if not config.tls_ca and existing_host.tls_ca: config.tls_ca = existing_host.tls_ca # Only validate certificates if NEW ones are provided (not using existing) # Check if any NEW certificate data was actually sent in the request if (config.tls_cert and config.tls_cert != existing_host.tls_cert) or \ (config.tls_key and config.tls_key != existing_host.tls_key) or \ (config.tls_ca and config.tls_ca != existing_host.tls_ca): self._validate_certificates(config) # Remove the existing host from memory first if host_id in self.hosts: # Close existing client first (this should stop the monitoring task) if host_id in self.clients: logger.info(f"Closing Docker client for host {host_id}") self.clients[host_id].close() del self.clients[host_id] # Remove from memory del self.hosts[host_id] # Validate TLS configuration security_status = self._validate_host_security(config) # Update database updated_db_host = self.db.update_host(host_id, { 'name': config.name, 'url': config.url, 'tls_cert': config.tls_cert, 'tls_key': config.tls_key, 'tls_ca': config.tls_ca, 'security_status': security_status }) if not updated_db_host: raise Exception(f"Host {host_id} not found in database") # Create new Docker client with updated config if config.url.startswith("unix://"): client = docker.DockerClient(base_url=config.url) else: # For TCP connections tls_config = None if config.tls_cert and config.tls_key: # Create persistent certificate storage directory safe_id = sanitize_host_id(host_id) cert_dir = os.path.join(CERTS_DIR, safe_id) # Create with secure permissions to avoid TOCTOU race condition os.makedirs(cert_dir, mode=0o700, exist_ok=True) # Write certificate files cert_file = os.path.join(cert_dir, 'client-cert.pem') key_file = os.path.join(cert_dir, 'client-key.pem') ca_file = os.path.join(cert_dir, 'ca.pem') if config.tls_ca else None with open(cert_file, 'w') as f: f.write(config.tls_cert) with open(key_file, 'w') as f: f.write(config.tls_key) if ca_file and config.tls_ca: with open(ca_file, 'w') as f: f.write(config.tls_ca) # Set secure permissions os.chmod(cert_file, 0o600) os.chmod(key_file, 0o600) if ca_file: os.chmod(ca_file, 0o600) tls_config = docker.tls.TLSConfig( client_cert=(cert_file, key_file), ca_cert=ca_file, verify=bool(config.tls_ca) ) client = docker.DockerClient( base_url=config.url, tls=tls_config, timeout=self.settings.connection_timeout ) # Test connection client.ping() # Create host object with existing ID host = DockerHost( id=host_id, name=config.name, url=config.url, status="online", security_status=security_status ) # Store client and host self.clients[host.id] = client self.hosts[host.id] = host # Re-register host with stats and event services (in case URL changed) # Note: add_docker_host() automatically closes old client if it exists try: import asyncio stats_client = get_stats_client() async def reregister_host(): try: # Re-register with stats service (automatically closes old client) await stats_client.add_docker_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key) logger.info(f"Re-registered {host.name} ({host.id[:8]}) with stats service") # Remove and re-add event monitoring await stats_client.remove_event_host(host.id) await stats_client.add_event_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key) logger.info(f"Re-registered {host.name} ({host.id[:8]}) with event service") except Exception as e: logger.error(f"Failed to re-register {host.name} with Go services: {e}") # Create task to re-register (fire and forget) task = asyncio.create_task(reregister_host()) task.add_done_callback(_handle_task_exception) except Exception as e: logger.warning(f"Could not re-register {host.name} with Go services: {e}") # Log host update self.event_logger.log_host_connection( host_name=host.name, host_id=host.id, host_url=config.url, connected=True ) logger.info(f"Successfully updated host {host_id}: {host.name} ({host.url})") return host except Exception as e: # Clean up client if it was created but not stored if client and host_id not in self.clients: try: client.close() logger.debug(f"Closed orphaned Docker client for host {host_id[:8]}") except Exception as close_error: logger.debug(f"Error closing Docker client: {close_error}") logger.error(f"Failed to update host {host_id}: {e}") error_msg = self._get_user_friendly_error(str(e)) raise HTTPException(status_code=400, detail=error_msg) async def get_containers(self, host_id: Optional[str] = None) -> List[Container]: """Get containers from one or all hosts""" containers = [] hosts_to_check = [host_id] if host_id else list(self.hosts.keys()) for hid in hosts_to_check: host = self.hosts.get(hid) if not host: continue # Try to reconnect if host exists but has no client (offline) if hid not in self.clients: # Exponential backoff: 5s, 10s, 20s, 40s, 80s, max 5 minutes now = time.time() attempts = self.reconnect_attempts.get(hid, 0) last_attempt = self.last_reconnect_attempt.get(hid, 0) backoff_seconds = min(5 * (2 ** attempts), 300) # Skip reconnection if we're in backoff period if now - last_attempt < backoff_seconds: time_remaining = backoff_seconds - (now - last_attempt) logger.debug(f"Skipping reconnection for {host.name} - backoff active (attempt {attempts}, {time_remaining:.1f}s remaining)") host.status = "offline" continue # Record this reconnection attempt self.last_reconnect_attempt[hid] = now logger.info(f"Attempting to reconnect to offline host {host.name} (attempt {attempts + 1})") # Attempt to reconnect offline hosts try: # Fetch TLS certs from database for reconnection with self.db.get_session() as session: db_host = session.query(DockerHostDB).filter_by(id=hid).first() if host.url.startswith("unix://"): client = docker.DockerClient(base_url=host.url) elif db_host and db_host.tls_cert and db_host.tls_key and db_host.tls_ca: # Reconnect with TLS using certs from database logger.debug(f"Reconnecting to {host.name} with TLS") # Write certs to temporary files for TLS config cert_dir = os.path.join(CERTS_DIR, hid) os.makedirs(cert_dir, exist_ok=True) cert_file = os.path.join(cert_dir, 'cert.pem') key_file = os.path.join(cert_dir, 'key.pem') ca_file = os.path.join(cert_dir, 'ca.pem') if db_host.tls_ca else None with open(cert_file, 'w') as f: f.write(db_host.tls_cert) with open(key_file, 'w') as f: f.write(db_host.tls_key) if ca_file: with open(ca_file, 'w') as f: f.write(db_host.tls_ca) # Set secure permissions os.chmod(cert_file, 0o600) os.chmod(key_file, 0o600) if ca_file: os.chmod(ca_file, 0o600) tls_config = docker.tls.TLSConfig( client_cert=(cert_file, key_file), ca_cert=ca_file, verify=bool(db_host.tls_ca) ) client = docker.DockerClient( base_url=host.url, tls=tls_config, timeout=self.settings.connection_timeout ) else: # Reconnect without TLS client = docker.DockerClient( base_url=host.url, timeout=self.settings.connection_timeout ) # Test the connection client.ping() # Connection successful - add to clients self.clients[hid] = client # Reset reconnection attempts on success self.reconnect_attempts[hid] = 0 logger.info(f"Reconnected to offline host: {host.name}") # Re-register with stats and events service try: stats_client = get_stats_client() tls_ca = db_host.tls_ca if db_host else None tls_cert = db_host.tls_cert if db_host else None tls_key = db_host.tls_key if db_host else None await stats_client.add_docker_host(hid, host.url, tls_ca, tls_cert, tls_key) await stats_client.add_event_host(hid, host.url, tls_ca, tls_cert, tls_key) logger.info(f"Re-registered {host.name} ({hid[:8]}) with stats/events service after reconnection") except Exception as e: logger.warning(f"Failed to re-register {host.name} with Go services after reconnection: {e}") except Exception as e: # Increment reconnection attempts on failure self.reconnect_attempts[hid] = attempts + 1 # Still offline - update status and continue host.status = "offline" host.error = f"Connection failed: {str(e)}" host.last_checked = datetime.now() # Log with backoff info to help debugging next_attempt_in = min(5 * (2 ** (attempts + 1)), 300) logger.debug(f"Host {host.name} still offline (attempt {attempts + 1}). Next retry in {next_attempt_in}s") continue client = self.clients[hid] try: docker_containers = client.containers.list(all=True) host.status = "online" host.container_count = len(docker_containers) host.error = None for dc in docker_containers: try: container_id = dc.id[:12] # Try to get image info, but handle missing images gracefully # Access dc.image first to trigger any errors before accessing its properties try: container_image = dc.image image_name = container_image.tags[0] if container_image.tags else container_image.short_id except Exception as img_error: # Image may have been deleted - use image ID from container attrs # This is common when containers reference deleted images image_name = dc.attrs.get('Config', {}).get('Image', 'unknown') if image_name == 'unknown': # Try to get from ImageID in attrs image_id = dc.attrs.get('Image', '') if image_id.startswith('sha256:'): image_name = image_id[:19] # sha256: + first 12 chars else: image_name = image_id[:12] if image_id else 'unknown' container = Container( id=dc.id, short_id=container_id, name=dc.name, state=dc.status, status=dc.attrs['State']['Status'], host_id=hid, host_name=host.name, image=image_name, created=dc.attrs['Created'], auto_restart=self._get_auto_restart_status(hid, container_id), restart_attempts=self.restart_attempts.get(container_id, 0) ) containers.append(container) except Exception as container_error: # Log but don't fail the whole host for one bad container logger.warning(f"Skipping container {dc.name if hasattr(dc, 'name') else 'unknown'} on {host.name} due to error: {container_error}") continue except Exception as e: logger.error(f"Error getting containers from {host.name}: {e}") host.status = "offline" host.error = str(e) host.last_checked = datetime.now() # Fetch stats from Go stats service and populate container stats try: from stats_client import get_stats_client stats_client = get_stats_client() container_stats = await stats_client.get_container_stats() # Populate stats for each container using composite key (host_id:container_id) for container in containers: # Use composite key to support containers with duplicate IDs on different hosts composite_key = f"{container.host_id}:{container.id}" stats = container_stats.get(composite_key, {}) if stats: container.cpu_percent = stats.get('cpu_percent') container.memory_usage = stats.get('memory_usage') container.memory_limit = stats.get('memory_limit') container.memory_percent = stats.get('memory_percent') container.network_rx = stats.get('network_rx') container.network_tx = stats.get('network_tx') container.disk_read = stats.get('disk_read') container.disk_write = stats.get('disk_write') logger.debug(f"Populated stats for {container.name} ({container.short_id}) on {container.host_name}: CPU {container.cpu_percent}%") except Exception as e: logger.warning(f"Failed to fetch container stats from stats service: {e}") return containers def restart_container(self, host_id: str, container_id: str) -> bool: """Restart a specific container""" if host_id not in self.clients: raise HTTPException(status_code=404, detail="Host not found") host = self.hosts.get(host_id) host_name = host.name if host else 'Unknown Host' start_time = time.time() try: client = self.clients[host_id] container = client.containers.get(container_id) container_name = container.name container.restart(timeout=10) duration_ms = int((time.time() - start_time) * 1000) logger.info(f"Restarted container '{container_name}' on host '{host_name}'") # Log the successful restart self.event_logger.log_container_action( action="restart", container_name=container_name, container_id=container_id, host_name=host_name, host_id=host_id, success=True, triggered_by="user", duration_ms=duration_ms ) return True except Exception as e: duration_ms = int((time.time() - start_time) * 1000) logger.error(f"Failed to restart container '{container_name}' on host '{host_name}': {e}") # Log the failed restart self.event_logger.log_container_action( action="restart", container_name=container_id, # Use ID if name unavailable container_id=container_id, host_name=host_name, host_id=host_id, success=False, triggered_by="user", error_message=str(e), duration_ms=duration_ms ) raise HTTPException(status_code=500, detail=str(e)) def stop_container(self, host_id: str, container_id: str) -> bool: """Stop a specific container""" if host_id not in self.clients: raise HTTPException(status_code=404, detail="Host not found") host = self.hosts.get(host_id) host_name = host.name if host else 'Unknown Host' start_time = time.time() try: client = self.clients[host_id] container = client.containers.get(container_id) container_name = container.name container.stop(timeout=10) duration_ms = int((time.time() - start_time) * 1000) logger.info(f"Stopped container '{container_name}' on host '{host_name}'") # Track this user action to suppress critical severity on expected state change container_key = f"{host_id}:{container_id}" self._recent_user_actions[container_key] = time.time() logger.info(f"Tracked user stop action for {container_key}") # Log the successful stop self.event_logger.log_container_action( action="stop", container_name=container_name, container_id=container_id, host_name=host_name, host_id=host_id, success=True, triggered_by="user", duration_ms=duration_ms ) return True except Exception as e: duration_ms = int((time.time() - start_time) * 1000) logger.error(f"Failed to stop container '{container_name}' on host '{host_name}': {e}") # Log the failed stop self.event_logger.log_container_action( action="stop", container_name=container_id, container_id=container_id, host_name=host_name, host_id=host_id, success=False, triggered_by="user", error_message=str(e), duration_ms=duration_ms ) raise HTTPException(status_code=500, detail=str(e)) def start_container(self, host_id: str, container_id: str) -> bool: """Start a specific container""" if host_id not in self.clients: raise HTTPException(status_code=404, detail="Host not found") host = self.hosts.get(host_id) host_name = host.name if host else 'Unknown Host' start_time = time.time() try: client = self.clients[host_id] container = client.containers.get(container_id) container_name = container.name container.start() duration_ms = int((time.time() - start_time) * 1000) logger.info(f"Started container '{container_name}' on host '{host_name}'") # Track this user action to suppress critical severity on expected state change container_key = f"{host_id}:{container_id}" self._recent_user_actions[container_key] = time.time() logger.info(f"Tracked user start action for {container_key}") # Log the successful start self.event_logger.log_container_action( action="start", container_name=container_name, container_id=container_id, host_name=host_name, host_id=host_id, success=True, triggered_by="user", duration_ms=duration_ms ) return True except Exception as e: duration_ms = int((time.time() - start_time) * 1000) logger.error(f"Failed to start container '{container_name}' on host '{host_name}': {e}") # Log the failed start self.event_logger.log_container_action( action="start", container_name=container_id, container_id=container_id, host_name=host_name, host_id=host_id, success=False, triggered_by="user", error_message=str(e), duration_ms=duration_ms ) raise HTTPException(status_code=500, detail=str(e)) def toggle_auto_restart(self, host_id: str, container_id: str, container_name: str, enabled: bool): """Toggle auto-restart for a container""" # Get host name for logging host = self.hosts.get(host_id) host_name = host.name if host else 'Unknown Host' # Use host_id:container_id as key to prevent collisions between hosts container_key = f"{host_id}:{container_id}" self.auto_restart_status[container_key] = enabled if not enabled: self.restart_attempts[container_key] = 0 self.restarting_containers[container_key] = False # Save to database self.db.set_auto_restart(host_id, container_id, container_name, enabled) logger.info(f"Auto-restart {'enabled' if enabled else 'disabled'} for container '{container_name}' on host '{host_name}'") async def check_orphaned_alerts(self): """Check for alert rules that reference non-existent containers Returns dict mapping alert_rule_id to list of orphaned container entries""" orphaned = {} try: # Get all alert rules with self.db.get_session() as session: from database import AlertRuleDB, AlertRuleContainer alert_rules = session.query(AlertRuleDB).all() # Get all current containers (name + host_id pairs) current_containers = {} for container in await self.get_containers(): key = f"{container.host_id}:{container.name}" current_containers[key] = True # Check each alert rule's containers for rule in alert_rules: orphaned_containers = [] for alert_container in rule.containers: key = f"{alert_container.host_id}:{alert_container.container_name}" if key not in current_containers: # Container doesn't exist anymore orphaned_containers.append({ 'host_id': alert_container.host_id, 'host_name': alert_container.host.name if alert_container.host else 'Unknown', 'container_name': alert_container.container_name }) if orphaned_containers: orphaned[rule.id] = { 'rule_name': rule.name, 'orphaned_containers': orphaned_containers } if orphaned: logger.info(f"Found {len(orphaned)} alert rules with orphaned containers") return orphaned except Exception as e: logger.error(f"Error checking orphaned alerts: {e}") return {} async def _handle_docker_event(self, event: dict): """Handle Docker events from Go service""" try: action = event.get('action', '') container_id = event.get('container_id', '') container_name = event.get('container_name', '') host_id = event.get('host_id', '') attributes = event.get('attributes', {}) timestamp_str = event.get('timestamp', '') # Filter out noisy exec_* events (health checks, etc.) if action.startswith('exec_'): return # Only log important events important_events = ['create', 'start', 'stop', 'die', 'kill', 'destroy', 'pause', 'unpause', 'restart', 'oom', 'health_status'] if action in important_events: logger.info(f"Docker event: {action} - {container_name} ({container_id[:12]}) on host {host_id[:8]}") # Process event for notifications/alerts if self.notification_service and action in ['die', 'oom', 'kill', 'health_status', 'restart']: # Parse timestamp from datetime import datetime try: timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) except (ValueError, AttributeError, TypeError) as e: logger.warning(f"Failed to parse timestamp '{timestamp_str}': {e}, using current time") timestamp = datetime.now() # Get exit code for die events exit_code = None if action == 'die': exit_code_str = attributes.get('exitCode', '0') try: exit_code = int(exit_code_str) except (ValueError, TypeError): exit_code = None # Create alert event from notifications import DockerEventAlert alert_event = DockerEventAlert( container_id=container_id, container_name=container_name, host_id=host_id, event_type=action, timestamp=timestamp, attributes=attributes, exit_code=exit_code ) # Process in background to not block event monitoring task = asyncio.create_task(self.notification_service.process_docker_event(alert_event)) task.add_done_callback(_handle_task_exception) except Exception as e: logger.error(f"Error handling Docker event from Go service: {e}") async def monitor_containers(self): """Main monitoring loop""" logger.info("Starting container monitoring...") # Get stats client instance # Note: streaming_containers is now managed by self.stats_manager stats_client = get_stats_client() # Register all hosts with the stats and event services on startup for host_id, host in self.hosts.items(): try: # Get TLS certificates from database session = self.db.get_session() try: db_host = session.query(DockerHostDB).filter_by(id=host_id).first() tls_ca = db_host.tls_ca if db_host else None tls_cert = db_host.tls_cert if db_host else None tls_key = db_host.tls_key if db_host else None finally: session.close() # Register with stats service await stats_client.add_docker_host(host_id, host.url, tls_ca, tls_cert, tls_key) logger.info(f"Registered host {host.name} ({host_id[:8]}) with stats service") # Register with event service await stats_client.add_event_host(host_id, host.url, tls_ca, tls_cert, tls_key) logger.info(f"Registered host {host.name} ({host_id[:8]}) with event service") except Exception as e: logger.error(f"Failed to register host {host_id} with services: {e}") # Connect to event stream WebSocket try: await stats_client.connect_event_stream(self._handle_docker_event) logger.info("Connected to Go event stream") except Exception as e: logger.error(f"Failed to connect to event stream: {e}") while True: try: containers = await self.get_containers() # Centralized stats collection decision using StatsManager has_viewers = self.manager.has_active_connections() if has_viewers: # Determine which containers need stats (centralized logic) containers_needing_stats = self.stats_manager.determine_containers_needing_stats( containers, self.settings ) # Sync streams with what's needed (start new, stop old) await self.stats_manager.sync_container_streams( containers, containers_needing_stats, stats_client, _handle_task_exception ) else: # No active viewers - stop all streams await self.stats_manager.stop_all_streams(stats_client, _handle_task_exception) # Track container state changes and log them for container in containers: container_key = f"{container.host_id}:{container.id}" current_state = container.status # Hold lock during entire read-process-write to prevent race conditions async with self._state_lock: previous_state = self._container_states.get(container_key) # Log state changes if previous_state is not None and previous_state != current_state: # Check if this state change was expected (recent user action) async with self._actions_lock: last_user_action = self._recent_user_actions.get(container_key, 0) time_since_action = time.time() - last_user_action is_user_initiated = time_since_action < 30 # Within 30 seconds logger.info(f"State change for {container_key}: {previous_state} → {current_state}, " f"time_since_action={time_since_action:.1f}s, user_initiated={is_user_initiated}") # Clean up old tracking entries (5 minutes or older) if time_since_action >= 300: async with self._actions_lock: self._recent_user_actions.pop(container_key, None) self.event_logger.log_container_state_change( container_name=container.name, container_id=container.short_id, host_name=container.host_name, host_id=container.host_id, old_state=previous_state, new_state=current_state, triggered_by="user" if is_user_initiated else "system" ) # Update tracked state (still inside lock) self._container_states[container_key] = current_state # Check for containers that need auto-restart for container in containers: if (container.status == "exited" and self._get_auto_restart_status(container.host_id, container.short_id)): # Use host_id:container_id as key to prevent collisions between hosts container_key = f"{container.host_id}:{container.short_id}" attempts = self.restart_attempts.get(container_key, 0) is_restarting = self.restarting_containers.get(container_key, False) if attempts < self.settings.max_retries and not is_restarting: self.restarting_containers[container_key] = True task = asyncio.create_task( self.auto_restart_container(container) ) task.add_done_callback(_handle_task_exception) # Process alerts for container state changes await self.alert_processor.process_container_update(containers, self.hosts) # Only fetch and broadcast stats if there are active viewers if has_viewers: # Prepare broadcast data broadcast_data = { "containers": [c.dict() for c in containers], "hosts": [h.dict() for h in self.hosts.values()], "timestamp": datetime.now().isoformat() } # Only include host metrics if host stats are enabled if self.stats_manager.should_broadcast_host_metrics(self.settings): # Get host metrics from stats service (fast HTTP call) host_metrics = await stats_client.get_host_stats() logger.debug(f"Retrieved metrics for {len(host_metrics)} hosts from stats service") broadcast_data["host_metrics"] = host_metrics # Broadcast update to all connected clients await self.manager.broadcast({ "type": "containers_update", "data": broadcast_data }) except Exception as e: logger.error(f"Error in monitoring loop: {e}") await asyncio.sleep(self.settings.polling_interval) async def auto_restart_container(self, container: Container): """Attempt to auto-restart a container""" container_id = container.short_id # Use host_id:container_id as key to prevent collisions between hosts container_key = f"{container.host_id}:{container_id}" self.restart_attempts[container_key] = self.restart_attempts.get(container_key, 0) + 1 attempt = self.restart_attempts[container_key] correlation_id = self.event_logger.create_correlation_id() logger.info( f"Auto-restart attempt {attempt}/{self.settings.max_retries} " f"for container '{container.name}' on host '{container.host_name}'" ) # Wait before attempting restart await asyncio.sleep(self.settings.retry_delay) try: success = self.restart_container(container.host_id, container.id) if success: self.restart_attempts[container_key] = 0 # Log successful auto-restart self.event_logger.log_auto_restart_attempt( container_name=container.name, container_id=container_id, host_name=container.host_name, host_id=container.host_id, attempt=attempt, max_attempts=self.settings.max_retries, success=True, correlation_id=correlation_id ) await self.manager.broadcast({ "type": "auto_restart_success", "data": { "container_id": container_id, "container_name": container.name, "host": container.host_name } }) except Exception as e: logger.error(f"Auto-restart failed for {container.name}: {e}") # Log failed auto-restart self.event_logger.log_auto_restart_attempt( container_name=container.name, container_id=container_id, host_name=container.host_name, host_id=container.host_id, attempt=attempt, max_attempts=self.settings.max_retries, success=False, error_message=str(e), correlation_id=correlation_id ) if attempt >= self.settings.max_retries: self.auto_restart_status[container_key] = False await self.manager.broadcast({ "type": "auto_restart_failed", "data": { "container_id": container_id, "container_name": container.name, "attempts": attempt, "max_retries": self.settings.max_retries } }) finally: # Always clear the restarting flag when done (success or failure) self.restarting_containers[container_key] = False def _load_persistent_config(self): """Load saved configuration from database""" try: # Load saved hosts db_hosts = self.db.get_hosts(active_only=True) # Detect and warn about duplicate hosts (same URL) seen_urls = {} for host in db_hosts: if host.url in seen_urls: logger.warning( f"Duplicate host detected: '{host.name}' ({host.id}) and " f"'{seen_urls[host.url]['name']}' ({seen_urls[host.url]['id']}) " f"both use URL '{host.url}'. Consider removing duplicates." ) else: seen_urls[host.url] = {'name': host.name, 'id': host.id} # Check if this is first run with self.db.get_session() as session: settings = session.query(GlobalSettings).first() if not settings: # Create default settings settings = GlobalSettings() session.add(settings) session.commit() # Auto-add local Docker only on first run (outside session context) with self.db.get_session() as session: settings = session.query(GlobalSettings).first() if settings and not settings.first_run_complete and not db_hosts and os.path.exists('/var/run/docker.sock'): logger.info("First run detected - adding local Docker automatically") host_added = False try: config = DockerHostConfig( name="Local Docker", url="unix:///var/run/docker.sock", tls_cert=None, tls_key=None, tls_ca=None ) self.add_host(config, suppress_event_loop_errors=True) host_added = True logger.info("Successfully added local Docker host") except Exception as e: # Check if this is the benign "no running event loop" error during startup # The host is actually added successfully despite this error error_str = str(e) if "no running event loop" in error_str: host_added = True logger.debug(f"Event loop warning during first run (host added successfully): {e}") else: logger.error(f"Failed to add local Docker: {e}") session.rollback() # Mark first run as complete if host was added if host_added: settings.first_run_complete = True session.commit() logger.info("First run setup complete") for db_host in db_hosts: try: config = DockerHostConfig( name=db_host.name, url=db_host.url, tls_cert=db_host.tls_cert, tls_key=db_host.tls_key, tls_ca=db_host.tls_ca ) # Try to connect to the host with existing ID and preserve security status host = self.add_host(config, existing_id=db_host.id, skip_db_save=True, suppress_event_loop_errors=True) # Override with stored security status if hasattr(host, 'security_status') and db_host.security_status: host.security_status = db_host.security_status except Exception as e: # Suppress event loop errors during startup error_str = str(e) if "no running event loop" not in error_str: logger.error(f"Failed to reconnect to saved host {db_host.name}: {e}") # Add host to UI even if connection failed, mark as offline # This prevents "disappearing hosts" bug after restart host = DockerHost( id=db_host.id, name=db_host.name, url=db_host.url, status="offline", client=None ) host.security_status = db_host.security_status or "unknown" self.hosts[db_host.id] = host logger.info(f"Added host {db_host.name} in offline mode - connection will retry") # Load auto-restart configurations for host_id in self.hosts: configs = self.db.get_session().query(AutoRestartConfig).filter( AutoRestartConfig.host_id == host_id, AutoRestartConfig.enabled == True ).all() for config in configs: # Use host_id:container_id as key to prevent collisions between hosts container_key = f"{config.host_id}:{config.container_id}" self.auto_restart_status[container_key] = True self.restart_attempts[container_key] = config.restart_count logger.info(f"Loaded {len(self.hosts)} hosts from database") except Exception as e: logger.error(f"Error loading persistent config: {e}") def _get_auto_restart_status(self, host_id: str, container_id: str) -> bool: """Get auto-restart status for a container""" # Use host_id:container_id as key to prevent collisions between hosts container_key = f"{host_id}:{container_id}" # Check in-memory cache first if container_key in self.auto_restart_status: return self.auto_restart_status[container_key] # Check database config = self.db.get_auto_restart_config(host_id, container_id) if config: self.auto_restart_status[container_key] = config.enabled return config.enabled return False async def cleanup_old_data(self): """Periodic cleanup of old data""" logger.info("Starting periodic data cleanup...") while True: try: settings = self.db.get_settings() if settings.auto_cleanup_events: # Clean up old events event_deleted = self.db.cleanup_old_events(settings.event_retention_days) if event_deleted > 0: self.event_logger.log_system_event( "Automatic Event Cleanup", f"Cleaned up {event_deleted} events older than {settings.event_retention_days} days", EventSeverity.INFO, EventType.STARTUP ) # Clean up expired sessions (runs daily regardless of event cleanup setting) expired_count = session_manager.cleanup_expired_sessions() if expired_count > 0: logger.info(f"Cleaned up {expired_count} expired sessions") # Sleep for 24 hours before next cleanup await asyncio.sleep(24 * 60 * 60) # 24 hours except Exception as e: logger.error(f"Error in cleanup task: {e}") # Wait 1 hour before retrying await asyncio.sleep(60 * 60) # 1 hour