Files

1554 lines
71 KiB
Python

"""
Docker Monitoring Core for DockMon
Main monitoring class for Docker containers and hosts
"""
import asyncio
import logging
import os
import shutil
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional
import docker
from docker import DockerClient
from fastapi import HTTPException
from config.paths import DATABASE_PATH, CERTS_DIR
from database import DatabaseManager, AutoRestartConfig, GlobalSettings, DockerHostDB
from models.docker_models import DockerHost, DockerHostConfig, Container
from models.settings_models import AlertRule, NotificationSettings
from websocket.connection import ConnectionManager
from realtime import RealtimeMonitor
from notifications import NotificationService, AlertProcessor
from event_logger import EventLogger, EventSeverity, EventType
from stats_client import get_stats_client
from docker_monitor.stats_manager import StatsManager
from auth.session_manager import session_manager
logger = logging.getLogger(__name__)
def _handle_task_exception(task: asyncio.Task) -> None:
"""Handle exceptions from fire-and-forget async tasks"""
try:
task.result()
except asyncio.CancelledError:
pass # Task was cancelled, this is normal
except Exception as e:
logger.error(f"Unhandled exception in background task: {e}", exc_info=True)
def sanitize_host_id(host_id: str) -> str:
"""
Sanitize host ID to prevent path traversal attacks.
Only allows valid UUID format or alphanumeric + dash characters.
"""
if not host_id:
raise ValueError("Host ID cannot be empty")
# Check for path traversal attempts
if ".." in host_id or "/" in host_id or "\\" in host_id:
raise ValueError(f"Invalid host ID format: {host_id}")
# Try to validate as UUID first
try:
uuid.UUID(host_id)
return host_id
except ValueError:
# If not a valid UUID, only allow alphanumeric and dashes
import re
if re.match(r'^[a-zA-Z0-9\-]+$', host_id):
return host_id
else:
raise ValueError(f"Invalid host ID format: {host_id}")
class DockerMonitor:
"""Main monitoring class for Docker containers"""
def __init__(self):
self.hosts: Dict[str, DockerHost] = {}
self.clients: Dict[str, DockerClient] = {}
self.db = DatabaseManager(DATABASE_PATH) # Initialize database with centralized path
self.settings = self.db.get_settings() # Load settings from DB
self.notification_settings = NotificationSettings()
self.auto_restart_status: Dict[str, bool] = {}
self.restart_attempts: Dict[str, int] = {}
self.restarting_containers: Dict[str, bool] = {} # Track containers currently being restarted
self.monitoring_task: Optional[asyncio.Task] = None
# Reconnection tracking with exponential backoff
self.reconnect_attempts: Dict[str, int] = {} # Track reconnect attempts per host
self.last_reconnect_attempt: Dict[str, float] = {} # Track last attempt time per host
self.manager = ConnectionManager()
self.realtime = RealtimeMonitor() # Real-time monitoring
self.event_logger = EventLogger(self.db, self.manager) # Event logging service with WebSocket support
self.notification_service = NotificationService(self.db, self.event_logger) # Notification service
self.alert_processor = AlertProcessor(self.notification_service) # Alert processor
self._container_states: Dict[str, str] = {} # Track container states for change detection
self._recent_user_actions: Dict[str, float] = {} # Track recent user actions: {container_key: timestamp}
self.cleanup_task: Optional[asyncio.Task] = None # Background cleanup task
# Locks for shared data structures to prevent race conditions
self._state_lock = asyncio.Lock()
self._actions_lock = asyncio.Lock()
self._restart_lock = asyncio.Lock()
# Stats collection manager
self.stats_manager = StatsManager()
self._load_persistent_config() # Load saved hosts and configs
def add_host(self, config: DockerHostConfig, existing_id: str = None, skip_db_save: bool = False, suppress_event_loop_errors: bool = False) -> DockerHost:
"""Add a new Docker host to monitor"""
client = None # Track client for cleanup on error
try:
# Validate certificates if provided (before trying to use them)
if config.tls_cert or config.tls_key or config.tls_ca:
self._validate_certificates(config)
# Create Docker client
if config.url.startswith("unix://"):
client = docker.DockerClient(base_url=config.url)
else:
# For TCP connections
tls_config = None
if config.tls_cert and config.tls_key:
# Create persistent certificate storage directory
safe_id = sanitize_host_id(existing_id or str(uuid.uuid4()))
cert_dir = os.path.join(CERTS_DIR, safe_id)
# Create with secure permissions - handle TOCTOU race condition
try:
os.makedirs(cert_dir, mode=0o700, exist_ok=False)
except FileExistsError:
# Verify it's actually a directory and not a symlink/file
import stat
st = os.lstat(cert_dir) # Use lstat to not follow symlinks
if not stat.S_ISDIR(st.st_mode):
raise ValueError("Certificate path exists but is not a directory")
# Write certificate files
cert_file = os.path.join(cert_dir, 'client-cert.pem')
key_file = os.path.join(cert_dir, 'client-key.pem')
ca_file = os.path.join(cert_dir, 'ca.pem') if config.tls_ca else None
with open(cert_file, 'w') as f:
f.write(config.tls_cert)
with open(key_file, 'w') as f:
f.write(config.tls_key)
if ca_file and config.tls_ca:
with open(ca_file, 'w') as f:
f.write(config.tls_ca)
# Set secure permissions
os.chmod(cert_file, 0o600)
os.chmod(key_file, 0o600)
if ca_file:
os.chmod(ca_file, 0o600)
tls_config = docker.tls.TLSConfig(
client_cert=(cert_file, key_file),
ca_cert=ca_file,
verify=bool(config.tls_ca)
)
client = docker.DockerClient(
base_url=config.url,
tls=tls_config,
timeout=self.settings.connection_timeout
)
# Test connection
client.ping()
# Validate TLS configuration for TCP connections
security_status = self._validate_host_security(config)
# Create host object with existing ID if provided (for persistence after restarts)
# Sanitize the ID to prevent path traversal
host_id = existing_id or str(uuid.uuid4())
try:
host_id = sanitize_host_id(host_id)
except ValueError as e:
logger.error(f"Invalid host ID: {e}")
raise HTTPException(status_code=400, detail=str(e))
host = DockerHost(
id=host_id,
name=config.name,
url=config.url,
status="online",
security_status=security_status
)
# Store client and host
self.clients[host.id] = client
self.hosts[host.id] = host
# Save to database only if not reconnecting to an existing host
if not skip_db_save:
db_host = self.db.add_host({
'id': host.id,
'name': config.name,
'url': config.url,
'tls_cert': config.tls_cert,
'tls_key': config.tls_key,
'tls_ca': config.tls_ca,
'security_status': security_status
})
# Register host with stats and event services
# Only register if we're adding a NEW host (not during startup/reconnect)
# During startup, monitor_containers() handles all registrations
if not skip_db_save: # New host being added by user
try:
import asyncio
stats_client = get_stats_client()
async def register_host():
try:
await stats_client.add_docker_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key)
logger.info(f"Registered {host.name} ({host.id[:8]}) with stats service")
await stats_client.add_event_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key)
logger.info(f"Registered {host.name} ({host.id[:8]}) with event service")
except Exception as e:
logger.error(f"Failed to register {host.name} with Go services: {e}")
# Try to create task if event loop is running
try:
task = asyncio.create_task(register_host())
task.add_done_callback(_handle_task_exception)
except RuntimeError:
# No event loop running - will be registered by monitor_containers()
logger.debug(f"No event loop yet - {host.name} will be registered when monitoring starts")
except Exception as e:
logger.warning(f"Could not register {host.name} with Go services: {e}")
# Log host connection
self.event_logger.log_host_connection(
host_name=host.name,
host_id=host.id,
host_url=config.url,
connected=True
)
# Log host added (only for new hosts, not reconnects)
if not skip_db_save:
self.event_logger.log_host_added(
host_name=host.name,
host_id=host.id,
host_url=config.url,
triggered_by="user"
)
logger.info(f"Added Docker host: {host.name} ({host.url})")
return host
except Exception as e:
# Clean up client if it was created but not stored
if client is not None:
try:
client.close()
logger.debug(f"Closed orphaned Docker client for {config.name}")
except Exception as close_error:
logger.debug(f"Error closing Docker client: {close_error}")
# Suppress event loop errors during first run startup
if suppress_event_loop_errors and "no running event loop" in str(e):
logger.debug(f"Event loop warning for {config.name} (expected during startup): {e}")
# Re-raise so the caller knows host was added but with event loop issue
raise
else:
logger.error(f"Failed to add host {config.name}: {e}")
error_msg = self._get_user_friendly_error(str(e))
raise HTTPException(status_code=400, detail=error_msg)
def _get_user_friendly_error(self, error: str) -> str:
"""Convert technical Docker errors to user-friendly messages"""
error_lower = error.lower()
# SSL/TLS certificate errors
if 'ssl' in error_lower or 'tls' in error_lower:
if 'pem lib' in error_lower or 'pem' in error_lower:
return (
"SSL certificate error: The certificates provided appear to be invalid or don't match. "
"Please verify:\n"
"• The certificates are for the correct server (check hostname/IP)\n"
"• The client certificate and private key are a matching pair\n"
"• The CA certificate matches the server's certificate\n"
"• The certificates haven't expired"
)
elif 'certificate verify failed' in error_lower:
return (
"SSL certificate verification failed: The server's certificate is not trusted by the CA certificate you provided. "
"Make sure you're using the correct CA certificate that signed the server's certificate."
)
elif 'ssleof' in error_lower or 'connection reset' in error_lower:
return (
"SSL connection failed: The server closed the connection during SSL handshake. "
"This usually means the server doesn't recognize the certificates. "
"Verify you're using the correct certificates for this server."
)
else:
return f"SSL/TLS error: Unable to establish secure connection. {error}"
# Connection errors
elif 'connection refused' in error_lower:
return (
"Connection refused: The Docker daemon is not accepting connections on this address. "
"Make sure:\n"
"• Docker is running on the remote host\n"
"• The Docker daemon is configured to listen on the specified port\n"
"• Firewall allows connections to the port"
)
elif 'timeout' in error_lower or 'timed out' in error_lower:
return (
"Connection timeout: Unable to reach the Docker daemon. "
"Check that the host address is correct and the host is reachable on your network."
)
elif 'no route to host' in error_lower or 'network unreachable' in error_lower:
return (
"Network unreachable: Cannot reach the specified host. "
"Verify the IP address/hostname is correct and the host is on your network."
)
elif 'http request to an https server' in error_lower:
return (
"Protocol mismatch: You're trying to connect without TLS to a server that requires TLS. "
"The server expects HTTPS connections. Please provide TLS certificates or change the server configuration."
)
# Return original error if we don't have a friendly version
return error
def _validate_certificates(self, config: DockerHostConfig):
"""Validate certificate format before attempting to use them"""
def check_cert_format(cert_data: str, cert_type: str):
"""Check if certificate has proper PEM format markers"""
if not cert_data or not cert_data.strip():
raise HTTPException(
status_code=400,
detail=f"{cert_type} is empty. Please paste the certificate content."
)
cert_data = cert_data.strip()
# Check for BEGIN marker
if "-----BEGIN" not in cert_data:
raise HTTPException(
status_code=400,
detail=f"{cert_type} is missing the '-----BEGIN' header. Make sure you copied the complete certificate including the BEGIN line."
)
# Check for END marker
if "-----END" not in cert_data:
raise HTTPException(
status_code=400,
detail=f"{cert_type} is missing the '-----END' footer. Make sure you copied the complete certificate including the END line."
)
# Check BEGIN comes before END
begin_pos = cert_data.find("-----BEGIN")
end_pos = cert_data.find("-----END")
if begin_pos >= end_pos:
raise HTTPException(
status_code=400,
detail=f"{cert_type} format is invalid. The '-----BEGIN' line should come before the '-----END' line."
)
# Check for certificate data between markers
cert_content = cert_data[begin_pos:end_pos + 50] # Include END marker
lines = cert_content.split('\n')
if len(lines) < 3: # Should have BEGIN, at least one data line, and END
raise HTTPException(
status_code=400,
detail=f"{cert_type} appears to be incomplete. Make sure you copied all lines between BEGIN and END."
)
# Validate each certificate type
if config.tls_ca:
check_cert_format(config.tls_ca, "CA Certificate")
if config.tls_cert:
check_cert_format(config.tls_cert, "Client Certificate")
if config.tls_key:
# Private keys can be PRIVATE KEY or RSA PRIVATE KEY
key_data = config.tls_key.strip()
if "-----BEGIN" not in key_data or "-----END" not in key_data:
raise HTTPException(
status_code=400,
detail="Client Private Key is incomplete. Make sure you copied the complete key including both '-----BEGIN' and '-----END' lines."
)
def _validate_host_security(self, config: DockerHostConfig) -> str:
"""Validate the security configuration of a Docker host"""
if config.url.startswith("unix://"):
return "secure" # Unix sockets are secure (local only)
elif config.url.startswith("tcp://"):
if config.tls_cert and config.tls_key and config.tls_ca:
return "secure" # Has TLS certificates
else:
logger.warning(f"Host {config.name} configured without TLS - connection is insecure!")
return "insecure" # TCP without TLS
else:
return "unknown" # Unknown protocol
def _cleanup_host_certificates(self, host_id: str):
"""Clean up certificate files for a host"""
safe_id = sanitize_host_id(host_id)
cert_dir = os.path.join(CERTS_DIR, safe_id)
# Defense in depth: verify path is within CERTS_DIR
abs_cert_dir = os.path.abspath(cert_dir)
abs_certs_dir = os.path.abspath(CERTS_DIR)
if not abs_cert_dir.startswith(abs_certs_dir):
logger.error(f"Path traversal attempt detected: {host_id}")
raise ValueError("Invalid certificate path")
if os.path.exists(cert_dir):
try:
shutil.rmtree(cert_dir)
logger.info(f"Cleaned up certificate files for host {host_id}")
except Exception as e:
logger.warning(f"Failed to clean up certificates for host {host_id}: {e}")
async def remove_host(self, host_id: str):
"""Remove a Docker host"""
# Validate host_id to prevent path traversal
try:
host_id = sanitize_host_id(host_id)
except ValueError as e:
logger.error(f"Invalid host ID: {e}")
raise HTTPException(status_code=400, detail=str(e))
if host_id in self.hosts:
# Get host info before removing
host = self.hosts[host_id]
host_name = host.name
del self.hosts[host_id]
if host_id in self.clients:
self.clients[host_id].close()
del self.clients[host_id]
# Remove from Go stats and event services (await to ensure cleanup completes before returning)
try:
stats_client = get_stats_client()
try:
# Remove from stats service (closes Docker client and stops all container streams)
await stats_client.remove_docker_host(host_id)
logger.info(f"Removed {host_name} ({host_id[:8]}) from stats service")
# Remove from event service
await stats_client.remove_event_host(host_id)
logger.info(f"Removed {host_name} ({host_id[:8]}) from event service")
except asyncio.TimeoutError:
# Timeout during cleanup is expected - Go service closes connections immediately
logger.debug(f"Timeout removing {host_name} from Go services (expected during cleanup)")
except Exception as e:
logger.error(f"Failed to remove {host_name} from Go services: {e}")
except Exception as e:
logger.warning(f"Failed to remove host {host_id} from Go services: {e}")
# Clean up certificate files
self._cleanup_host_certificates(host_id)
# Remove from database
self.db.delete_host(host_id)
# Clean up container state tracking for this host
async with self._state_lock:
containers_to_remove = [key for key in self._container_states.keys() if key.startswith(f"{host_id}:")]
for container_key in containers_to_remove:
del self._container_states[container_key]
# Clean up recent user actions for this host
async with self._actions_lock:
actions_to_remove = [key for key in self._recent_user_actions.keys() if key.startswith(f"{host_id}:")]
for container_key in actions_to_remove:
del self._recent_user_actions[container_key]
# Clean up notification service's container state tracking for this host
notification_states_to_remove = [key for key in self.notification_service._last_container_state.keys() if key.startswith(f"{host_id}:")]
for container_key in notification_states_to_remove:
del self.notification_service._last_container_state[container_key]
# Clean up alert processor's container state tracking for this host
alert_processor_states_to_remove = [key for key in self.alert_processor._container_states.keys() if key.startswith(f"{host_id}:")]
for container_key in alert_processor_states_to_remove:
del self.alert_processor._container_states[container_key]
# Clean up notification service's alert cooldown tracking for this host
alert_cooldowns_to_remove = [key for key in self.notification_service._last_alerts.keys() if key.startswith(f"{host_id}:")]
for container_key in alert_cooldowns_to_remove:
del self.notification_service._last_alerts[container_key]
# Clean up reconnection tracking for this host
if host_id in self.reconnect_attempts:
del self.reconnect_attempts[host_id]
if host_id in self.last_reconnect_attempt:
del self.last_reconnect_attempt[host_id]
# Clean up auto-restart tracking for this host
async with self._restart_lock:
auto_restart_to_remove = [key for key in self.auto_restart_status.keys() if key.startswith(f"{host_id}:")]
for container_key in auto_restart_to_remove:
del self.auto_restart_status[container_key]
if container_key in self.restart_attempts:
del self.restart_attempts[container_key]
if container_key in self.restarting_containers:
del self.restarting_containers[container_key]
# Clean up stats manager's streaming containers for this host
# Remove using the full composite key (format: "host_id:container_id")
for container_key in containers_to_remove:
self.stats_manager.streaming_containers.discard(container_key)
if containers_to_remove:
logger.debug(f"Cleaned up {len(containers_to_remove)} container state entries for removed host {host_id[:8]}")
if notification_states_to_remove:
logger.debug(f"Cleaned up {len(notification_states_to_remove)} notification state entries for removed host {host_id[:8]}")
if alert_processor_states_to_remove:
logger.debug(f"Cleaned up {len(alert_processor_states_to_remove)} alert processor state entries for removed host {host_id[:8]}")
if alert_cooldowns_to_remove:
logger.debug(f"Cleaned up {len(alert_cooldowns_to_remove)} alert cooldown entries for removed host {host_id[:8]}")
if auto_restart_to_remove:
logger.debug(f"Cleaned up {len(auto_restart_to_remove)} auto-restart entries for removed host {host_id[:8]}")
# Log host removed
self.event_logger.log_host_removed(
host_name=host_name,
host_id=host_id,
triggered_by="user"
)
logger.info(f"Removed host {host_id}")
def update_host(self, host_id: str, config: DockerHostConfig):
"""Update an existing Docker host"""
# Validate host_id to prevent path traversal
try:
host_id = sanitize_host_id(host_id)
except ValueError as e:
logger.error(f"Invalid host ID: {e}")
raise HTTPException(status_code=400, detail=str(e))
client = None # Track client for cleanup on error
try:
# Get existing host from database to check if we need to preserve certificates
existing_host = self.db.get_host(host_id)
if not existing_host:
raise HTTPException(status_code=404, detail=f"Host {host_id} not found")
# If certificates are not provided in the update, use existing ones
# This allows updating just the name without providing certificates again
if not config.tls_cert and existing_host.tls_cert:
config.tls_cert = existing_host.tls_cert
if not config.tls_key and existing_host.tls_key:
config.tls_key = existing_host.tls_key
if not config.tls_ca and existing_host.tls_ca:
config.tls_ca = existing_host.tls_ca
# Only validate certificates if NEW ones are provided (not using existing)
# Check if any NEW certificate data was actually sent in the request
if (config.tls_cert and config.tls_cert != existing_host.tls_cert) or \
(config.tls_key and config.tls_key != existing_host.tls_key) or \
(config.tls_ca and config.tls_ca != existing_host.tls_ca):
self._validate_certificates(config)
# Remove the existing host from memory first
if host_id in self.hosts:
# Close existing client first (this should stop the monitoring task)
if host_id in self.clients:
logger.info(f"Closing Docker client for host {host_id}")
self.clients[host_id].close()
del self.clients[host_id]
# Remove from memory
del self.hosts[host_id]
# Validate TLS configuration
security_status = self._validate_host_security(config)
# Update database
updated_db_host = self.db.update_host(host_id, {
'name': config.name,
'url': config.url,
'tls_cert': config.tls_cert,
'tls_key': config.tls_key,
'tls_ca': config.tls_ca,
'security_status': security_status
})
if not updated_db_host:
raise Exception(f"Host {host_id} not found in database")
# Create new Docker client with updated config
if config.url.startswith("unix://"):
client = docker.DockerClient(base_url=config.url)
else:
# For TCP connections
tls_config = None
if config.tls_cert and config.tls_key:
# Create persistent certificate storage directory
safe_id = sanitize_host_id(host_id)
cert_dir = os.path.join(CERTS_DIR, safe_id)
# Create with secure permissions to avoid TOCTOU race condition
os.makedirs(cert_dir, mode=0o700, exist_ok=True)
# Write certificate files
cert_file = os.path.join(cert_dir, 'client-cert.pem')
key_file = os.path.join(cert_dir, 'client-key.pem')
ca_file = os.path.join(cert_dir, 'ca.pem') if config.tls_ca else None
with open(cert_file, 'w') as f:
f.write(config.tls_cert)
with open(key_file, 'w') as f:
f.write(config.tls_key)
if ca_file and config.tls_ca:
with open(ca_file, 'w') as f:
f.write(config.tls_ca)
# Set secure permissions
os.chmod(cert_file, 0o600)
os.chmod(key_file, 0o600)
if ca_file:
os.chmod(ca_file, 0o600)
tls_config = docker.tls.TLSConfig(
client_cert=(cert_file, key_file),
ca_cert=ca_file,
verify=bool(config.tls_ca)
)
client = docker.DockerClient(
base_url=config.url,
tls=tls_config,
timeout=self.settings.connection_timeout
)
# Test connection
client.ping()
# Create host object with existing ID
host = DockerHost(
id=host_id,
name=config.name,
url=config.url,
status="online",
security_status=security_status
)
# Store client and host
self.clients[host.id] = client
self.hosts[host.id] = host
# Re-register host with stats and event services (in case URL changed)
# Note: add_docker_host() automatically closes old client if it exists
try:
import asyncio
stats_client = get_stats_client()
async def reregister_host():
try:
# Re-register with stats service (automatically closes old client)
await stats_client.add_docker_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key)
logger.info(f"Re-registered {host.name} ({host.id[:8]}) with stats service")
# Remove and re-add event monitoring
await stats_client.remove_event_host(host.id)
await stats_client.add_event_host(host.id, host.url, config.tls_ca, config.tls_cert, config.tls_key)
logger.info(f"Re-registered {host.name} ({host.id[:8]}) with event service")
except Exception as e:
logger.error(f"Failed to re-register {host.name} with Go services: {e}")
# Create task to re-register (fire and forget)
task = asyncio.create_task(reregister_host())
task.add_done_callback(_handle_task_exception)
except Exception as e:
logger.warning(f"Could not re-register {host.name} with Go services: {e}")
# Log host update
self.event_logger.log_host_connection(
host_name=host.name,
host_id=host.id,
host_url=config.url,
connected=True
)
logger.info(f"Successfully updated host {host_id}: {host.name} ({host.url})")
return host
except Exception as e:
# Clean up client if it was created but not stored
if client and host_id not in self.clients:
try:
client.close()
logger.debug(f"Closed orphaned Docker client for host {host_id[:8]}")
except Exception as close_error:
logger.debug(f"Error closing Docker client: {close_error}")
logger.error(f"Failed to update host {host_id}: {e}")
error_msg = self._get_user_friendly_error(str(e))
raise HTTPException(status_code=400, detail=error_msg)
async def get_containers(self, host_id: Optional[str] = None) -> List[Container]:
"""Get containers from one or all hosts"""
containers = []
hosts_to_check = [host_id] if host_id else list(self.hosts.keys())
for hid in hosts_to_check:
host = self.hosts.get(hid)
if not host:
continue
# Try to reconnect if host exists but has no client (offline)
if hid not in self.clients:
# Exponential backoff: 5s, 10s, 20s, 40s, 80s, max 5 minutes
now = time.time()
attempts = self.reconnect_attempts.get(hid, 0)
last_attempt = self.last_reconnect_attempt.get(hid, 0)
backoff_seconds = min(5 * (2 ** attempts), 300)
# Skip reconnection if we're in backoff period
if now - last_attempt < backoff_seconds:
time_remaining = backoff_seconds - (now - last_attempt)
logger.debug(f"Skipping reconnection for {host.name} - backoff active (attempt {attempts}, {time_remaining:.1f}s remaining)")
host.status = "offline"
continue
# Record this reconnection attempt
self.last_reconnect_attempt[hid] = now
logger.info(f"Attempting to reconnect to offline host {host.name} (attempt {attempts + 1})")
# Attempt to reconnect offline hosts
try:
# Fetch TLS certs from database for reconnection
with self.db.get_session() as session:
db_host = session.query(DockerHostDB).filter_by(id=hid).first()
if host.url.startswith("unix://"):
client = docker.DockerClient(base_url=host.url)
elif db_host and db_host.tls_cert and db_host.tls_key and db_host.tls_ca:
# Reconnect with TLS using certs from database
logger.debug(f"Reconnecting to {host.name} with TLS")
# Write certs to temporary files for TLS config
cert_dir = os.path.join(CERTS_DIR, hid)
os.makedirs(cert_dir, exist_ok=True)
cert_file = os.path.join(cert_dir, 'cert.pem')
key_file = os.path.join(cert_dir, 'key.pem')
ca_file = os.path.join(cert_dir, 'ca.pem') if db_host.tls_ca else None
with open(cert_file, 'w') as f:
f.write(db_host.tls_cert)
with open(key_file, 'w') as f:
f.write(db_host.tls_key)
if ca_file:
with open(ca_file, 'w') as f:
f.write(db_host.tls_ca)
# Set secure permissions
os.chmod(cert_file, 0o600)
os.chmod(key_file, 0o600)
if ca_file:
os.chmod(ca_file, 0o600)
tls_config = docker.tls.TLSConfig(
client_cert=(cert_file, key_file),
ca_cert=ca_file,
verify=bool(db_host.tls_ca)
)
client = docker.DockerClient(
base_url=host.url,
tls=tls_config,
timeout=self.settings.connection_timeout
)
else:
# Reconnect without TLS
client = docker.DockerClient(
base_url=host.url,
timeout=self.settings.connection_timeout
)
# Test the connection
client.ping()
# Connection successful - add to clients
self.clients[hid] = client
# Reset reconnection attempts on success
self.reconnect_attempts[hid] = 0
logger.info(f"Reconnected to offline host: {host.name}")
# Re-register with stats and events service
try:
stats_client = get_stats_client()
tls_ca = db_host.tls_ca if db_host else None
tls_cert = db_host.tls_cert if db_host else None
tls_key = db_host.tls_key if db_host else None
await stats_client.add_docker_host(hid, host.url, tls_ca, tls_cert, tls_key)
await stats_client.add_event_host(hid, host.url, tls_ca, tls_cert, tls_key)
logger.info(f"Re-registered {host.name} ({hid[:8]}) with stats/events service after reconnection")
except Exception as e:
logger.warning(f"Failed to re-register {host.name} with Go services after reconnection: {e}")
except Exception as e:
# Increment reconnection attempts on failure
self.reconnect_attempts[hid] = attempts + 1
# Still offline - update status and continue
host.status = "offline"
host.error = f"Connection failed: {str(e)}"
host.last_checked = datetime.now()
# Log with backoff info to help debugging
next_attempt_in = min(5 * (2 ** (attempts + 1)), 300)
logger.debug(f"Host {host.name} still offline (attempt {attempts + 1}). Next retry in {next_attempt_in}s")
continue
client = self.clients[hid]
try:
docker_containers = client.containers.list(all=True)
host.status = "online"
host.container_count = len(docker_containers)
host.error = None
for dc in docker_containers:
try:
container_id = dc.id[:12]
# Try to get image info, but handle missing images gracefully
# Access dc.image first to trigger any errors before accessing its properties
try:
container_image = dc.image
image_name = container_image.tags[0] if container_image.tags else container_image.short_id
except Exception as img_error:
# Image may have been deleted - use image ID from container attrs
# This is common when containers reference deleted images
image_name = dc.attrs.get('Config', {}).get('Image', 'unknown')
if image_name == 'unknown':
# Try to get from ImageID in attrs
image_id = dc.attrs.get('Image', '')
if image_id.startswith('sha256:'):
image_name = image_id[:19] # sha256: + first 12 chars
else:
image_name = image_id[:12] if image_id else 'unknown'
container = Container(
id=dc.id,
short_id=container_id,
name=dc.name,
state=dc.status,
status=dc.attrs['State']['Status'],
host_id=hid,
host_name=host.name,
image=image_name,
created=dc.attrs['Created'],
auto_restart=self._get_auto_restart_status(hid, container_id),
restart_attempts=self.restart_attempts.get(container_id, 0)
)
containers.append(container)
except Exception as container_error:
# Log but don't fail the whole host for one bad container
logger.warning(f"Skipping container {dc.name if hasattr(dc, 'name') else 'unknown'} on {host.name} due to error: {container_error}")
continue
except Exception as e:
logger.error(f"Error getting containers from {host.name}: {e}")
host.status = "offline"
host.error = str(e)
host.last_checked = datetime.now()
# Fetch stats from Go stats service and populate container stats
try:
from stats_client import get_stats_client
stats_client = get_stats_client()
container_stats = await stats_client.get_container_stats()
# Populate stats for each container using composite key (host_id:container_id)
for container in containers:
# Use composite key to support containers with duplicate IDs on different hosts
composite_key = f"{container.host_id}:{container.id}"
stats = container_stats.get(composite_key, {})
if stats:
container.cpu_percent = stats.get('cpu_percent')
container.memory_usage = stats.get('memory_usage')
container.memory_limit = stats.get('memory_limit')
container.memory_percent = stats.get('memory_percent')
container.network_rx = stats.get('network_rx')
container.network_tx = stats.get('network_tx')
container.disk_read = stats.get('disk_read')
container.disk_write = stats.get('disk_write')
logger.debug(f"Populated stats for {container.name} ({container.short_id}) on {container.host_name}: CPU {container.cpu_percent}%")
except Exception as e:
logger.warning(f"Failed to fetch container stats from stats service: {e}")
return containers
def restart_container(self, host_id: str, container_id: str) -> bool:
"""Restart a specific container"""
if host_id not in self.clients:
raise HTTPException(status_code=404, detail="Host not found")
host = self.hosts.get(host_id)
host_name = host.name if host else 'Unknown Host'
start_time = time.time()
try:
client = self.clients[host_id]
container = client.containers.get(container_id)
container_name = container.name
container.restart(timeout=10)
duration_ms = int((time.time() - start_time) * 1000)
logger.info(f"Restarted container '{container_name}' on host '{host_name}'")
# Log the successful restart
self.event_logger.log_container_action(
action="restart",
container_name=container_name,
container_id=container_id,
host_name=host_name,
host_id=host_id,
success=True,
triggered_by="user",
duration_ms=duration_ms
)
return True
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"Failed to restart container '{container_name}' on host '{host_name}': {e}")
# Log the failed restart
self.event_logger.log_container_action(
action="restart",
container_name=container_id, # Use ID if name unavailable
container_id=container_id,
host_name=host_name,
host_id=host_id,
success=False,
triggered_by="user",
error_message=str(e),
duration_ms=duration_ms
)
raise HTTPException(status_code=500, detail=str(e))
def stop_container(self, host_id: str, container_id: str) -> bool:
"""Stop a specific container"""
if host_id not in self.clients:
raise HTTPException(status_code=404, detail="Host not found")
host = self.hosts.get(host_id)
host_name = host.name if host else 'Unknown Host'
start_time = time.time()
try:
client = self.clients[host_id]
container = client.containers.get(container_id)
container_name = container.name
container.stop(timeout=10)
duration_ms = int((time.time() - start_time) * 1000)
logger.info(f"Stopped container '{container_name}' on host '{host_name}'")
# Track this user action to suppress critical severity on expected state change
container_key = f"{host_id}:{container_id}"
self._recent_user_actions[container_key] = time.time()
logger.info(f"Tracked user stop action for {container_key}")
# Log the successful stop
self.event_logger.log_container_action(
action="stop",
container_name=container_name,
container_id=container_id,
host_name=host_name,
host_id=host_id,
success=True,
triggered_by="user",
duration_ms=duration_ms
)
return True
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"Failed to stop container '{container_name}' on host '{host_name}': {e}")
# Log the failed stop
self.event_logger.log_container_action(
action="stop",
container_name=container_id,
container_id=container_id,
host_name=host_name,
host_id=host_id,
success=False,
triggered_by="user",
error_message=str(e),
duration_ms=duration_ms
)
raise HTTPException(status_code=500, detail=str(e))
def start_container(self, host_id: str, container_id: str) -> bool:
"""Start a specific container"""
if host_id not in self.clients:
raise HTTPException(status_code=404, detail="Host not found")
host = self.hosts.get(host_id)
host_name = host.name if host else 'Unknown Host'
start_time = time.time()
try:
client = self.clients[host_id]
container = client.containers.get(container_id)
container_name = container.name
container.start()
duration_ms = int((time.time() - start_time) * 1000)
logger.info(f"Started container '{container_name}' on host '{host_name}'")
# Track this user action to suppress critical severity on expected state change
container_key = f"{host_id}:{container_id}"
self._recent_user_actions[container_key] = time.time()
logger.info(f"Tracked user start action for {container_key}")
# Log the successful start
self.event_logger.log_container_action(
action="start",
container_name=container_name,
container_id=container_id,
host_name=host_name,
host_id=host_id,
success=True,
triggered_by="user",
duration_ms=duration_ms
)
return True
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
logger.error(f"Failed to start container '{container_name}' on host '{host_name}': {e}")
# Log the failed start
self.event_logger.log_container_action(
action="start",
container_name=container_id,
container_id=container_id,
host_name=host_name,
host_id=host_id,
success=False,
triggered_by="user",
error_message=str(e),
duration_ms=duration_ms
)
raise HTTPException(status_code=500, detail=str(e))
def toggle_auto_restart(self, host_id: str, container_id: str, container_name: str, enabled: bool):
"""Toggle auto-restart for a container"""
# Get host name for logging
host = self.hosts.get(host_id)
host_name = host.name if host else 'Unknown Host'
# Use host_id:container_id as key to prevent collisions between hosts
container_key = f"{host_id}:{container_id}"
self.auto_restart_status[container_key] = enabled
if not enabled:
self.restart_attempts[container_key] = 0
self.restarting_containers[container_key] = False
# Save to database
self.db.set_auto_restart(host_id, container_id, container_name, enabled)
logger.info(f"Auto-restart {'enabled' if enabled else 'disabled'} for container '{container_name}' on host '{host_name}'")
async def check_orphaned_alerts(self):
"""Check for alert rules that reference non-existent containers
Returns dict mapping alert_rule_id to list of orphaned container entries"""
orphaned = {}
try:
# Get all alert rules
with self.db.get_session() as session:
from database import AlertRuleDB, AlertRuleContainer
alert_rules = session.query(AlertRuleDB).all()
# Get all current containers (name + host_id pairs)
current_containers = {}
for container in await self.get_containers():
key = f"{container.host_id}:{container.name}"
current_containers[key] = True
# Check each alert rule's containers
for rule in alert_rules:
orphaned_containers = []
for alert_container in rule.containers:
key = f"{alert_container.host_id}:{alert_container.container_name}"
if key not in current_containers:
# Container doesn't exist anymore
orphaned_containers.append({
'host_id': alert_container.host_id,
'host_name': alert_container.host.name if alert_container.host else 'Unknown',
'container_name': alert_container.container_name
})
if orphaned_containers:
orphaned[rule.id] = {
'rule_name': rule.name,
'orphaned_containers': orphaned_containers
}
if orphaned:
logger.info(f"Found {len(orphaned)} alert rules with orphaned containers")
return orphaned
except Exception as e:
logger.error(f"Error checking orphaned alerts: {e}")
return {}
async def _handle_docker_event(self, event: dict):
"""Handle Docker events from Go service"""
try:
action = event.get('action', '')
container_id = event.get('container_id', '')
container_name = event.get('container_name', '')
host_id = event.get('host_id', '')
attributes = event.get('attributes', {})
timestamp_str = event.get('timestamp', '')
# Filter out noisy exec_* events (health checks, etc.)
if action.startswith('exec_'):
return
# Only log important events
important_events = ['create', 'start', 'stop', 'die', 'kill', 'destroy', 'pause', 'unpause', 'restart', 'oom', 'health_status']
if action in important_events:
logger.info(f"Docker event: {action} - {container_name} ({container_id[:12]}) on host {host_id[:8]}")
# Process event for notifications/alerts
if self.notification_service and action in ['die', 'oom', 'kill', 'health_status', 'restart']:
# Parse timestamp
from datetime import datetime
try:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
except (ValueError, AttributeError, TypeError) as e:
logger.warning(f"Failed to parse timestamp '{timestamp_str}': {e}, using current time")
timestamp = datetime.now()
# Get exit code for die events
exit_code = None
if action == 'die':
exit_code_str = attributes.get('exitCode', '0')
try:
exit_code = int(exit_code_str)
except (ValueError, TypeError):
exit_code = None
# Create alert event
from notifications import DockerEventAlert
alert_event = DockerEventAlert(
container_id=container_id,
container_name=container_name,
host_id=host_id,
event_type=action,
timestamp=timestamp,
attributes=attributes,
exit_code=exit_code
)
# Process in background to not block event monitoring
task = asyncio.create_task(self.notification_service.process_docker_event(alert_event))
task.add_done_callback(_handle_task_exception)
except Exception as e:
logger.error(f"Error handling Docker event from Go service: {e}")
async def monitor_containers(self):
"""Main monitoring loop"""
logger.info("Starting container monitoring...")
# Get stats client instance
# Note: streaming_containers is now managed by self.stats_manager
stats_client = get_stats_client()
# Register all hosts with the stats and event services on startup
for host_id, host in self.hosts.items():
try:
# Get TLS certificates from database
session = self.db.get_session()
try:
db_host = session.query(DockerHostDB).filter_by(id=host_id).first()
tls_ca = db_host.tls_ca if db_host else None
tls_cert = db_host.tls_cert if db_host else None
tls_key = db_host.tls_key if db_host else None
finally:
session.close()
# Register with stats service
await stats_client.add_docker_host(host_id, host.url, tls_ca, tls_cert, tls_key)
logger.info(f"Registered host {host.name} ({host_id[:8]}) with stats service")
# Register with event service
await stats_client.add_event_host(host_id, host.url, tls_ca, tls_cert, tls_key)
logger.info(f"Registered host {host.name} ({host_id[:8]}) with event service")
except Exception as e:
logger.error(f"Failed to register host {host_id} with services: {e}")
# Connect to event stream WebSocket
try:
await stats_client.connect_event_stream(self._handle_docker_event)
logger.info("Connected to Go event stream")
except Exception as e:
logger.error(f"Failed to connect to event stream: {e}")
while True:
try:
containers = await self.get_containers()
# Centralized stats collection decision using StatsManager
has_viewers = self.manager.has_active_connections()
if has_viewers:
# Determine which containers need stats (centralized logic)
containers_needing_stats = self.stats_manager.determine_containers_needing_stats(
containers,
self.settings
)
# Sync streams with what's needed (start new, stop old)
await self.stats_manager.sync_container_streams(
containers,
containers_needing_stats,
stats_client,
_handle_task_exception
)
else:
# No active viewers - stop all streams
await self.stats_manager.stop_all_streams(stats_client, _handle_task_exception)
# Track container state changes and log them
for container in containers:
container_key = f"{container.host_id}:{container.id}"
current_state = container.status
# Hold lock during entire read-process-write to prevent race conditions
async with self._state_lock:
previous_state = self._container_states.get(container_key)
# Log state changes
if previous_state is not None and previous_state != current_state:
# Check if this state change was expected (recent user action)
async with self._actions_lock:
last_user_action = self._recent_user_actions.get(container_key, 0)
time_since_action = time.time() - last_user_action
is_user_initiated = time_since_action < 30 # Within 30 seconds
logger.info(f"State change for {container_key}: {previous_state}{current_state}, "
f"time_since_action={time_since_action:.1f}s, user_initiated={is_user_initiated}")
# Clean up old tracking entries (5 minutes or older)
if time_since_action >= 300:
async with self._actions_lock:
self._recent_user_actions.pop(container_key, None)
self.event_logger.log_container_state_change(
container_name=container.name,
container_id=container.short_id,
host_name=container.host_name,
host_id=container.host_id,
old_state=previous_state,
new_state=current_state,
triggered_by="user" if is_user_initiated else "system"
)
# Update tracked state (still inside lock)
self._container_states[container_key] = current_state
# Check for containers that need auto-restart
for container in containers:
if (container.status == "exited" and
self._get_auto_restart_status(container.host_id, container.short_id)):
# Use host_id:container_id as key to prevent collisions between hosts
container_key = f"{container.host_id}:{container.short_id}"
attempts = self.restart_attempts.get(container_key, 0)
is_restarting = self.restarting_containers.get(container_key, False)
if attempts < self.settings.max_retries and not is_restarting:
self.restarting_containers[container_key] = True
task = asyncio.create_task(
self.auto_restart_container(container)
)
task.add_done_callback(_handle_task_exception)
# Process alerts for container state changes
await self.alert_processor.process_container_update(containers, self.hosts)
# Only fetch and broadcast stats if there are active viewers
if has_viewers:
# Prepare broadcast data
broadcast_data = {
"containers": [c.dict() for c in containers],
"hosts": [h.dict() for h in self.hosts.values()],
"timestamp": datetime.now().isoformat()
}
# Only include host metrics if host stats are enabled
if self.stats_manager.should_broadcast_host_metrics(self.settings):
# Get host metrics from stats service (fast HTTP call)
host_metrics = await stats_client.get_host_stats()
logger.debug(f"Retrieved metrics for {len(host_metrics)} hosts from stats service")
broadcast_data["host_metrics"] = host_metrics
# Broadcast update to all connected clients
await self.manager.broadcast({
"type": "containers_update",
"data": broadcast_data
})
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
await asyncio.sleep(self.settings.polling_interval)
async def auto_restart_container(self, container: Container):
"""Attempt to auto-restart a container"""
container_id = container.short_id
# Use host_id:container_id as key to prevent collisions between hosts
container_key = f"{container.host_id}:{container_id}"
self.restart_attempts[container_key] = self.restart_attempts.get(container_key, 0) + 1
attempt = self.restart_attempts[container_key]
correlation_id = self.event_logger.create_correlation_id()
logger.info(
f"Auto-restart attempt {attempt}/{self.settings.max_retries} "
f"for container '{container.name}' on host '{container.host_name}'"
)
# Wait before attempting restart
await asyncio.sleep(self.settings.retry_delay)
try:
success = self.restart_container(container.host_id, container.id)
if success:
self.restart_attempts[container_key] = 0
# Log successful auto-restart
self.event_logger.log_auto_restart_attempt(
container_name=container.name,
container_id=container_id,
host_name=container.host_name,
host_id=container.host_id,
attempt=attempt,
max_attempts=self.settings.max_retries,
success=True,
correlation_id=correlation_id
)
await self.manager.broadcast({
"type": "auto_restart_success",
"data": {
"container_id": container_id,
"container_name": container.name,
"host": container.host_name
}
})
except Exception as e:
logger.error(f"Auto-restart failed for {container.name}: {e}")
# Log failed auto-restart
self.event_logger.log_auto_restart_attempt(
container_name=container.name,
container_id=container_id,
host_name=container.host_name,
host_id=container.host_id,
attempt=attempt,
max_attempts=self.settings.max_retries,
success=False,
error_message=str(e),
correlation_id=correlation_id
)
if attempt >= self.settings.max_retries:
self.auto_restart_status[container_key] = False
await self.manager.broadcast({
"type": "auto_restart_failed",
"data": {
"container_id": container_id,
"container_name": container.name,
"attempts": attempt,
"max_retries": self.settings.max_retries
}
})
finally:
# Always clear the restarting flag when done (success or failure)
self.restarting_containers[container_key] = False
def _load_persistent_config(self):
"""Load saved configuration from database"""
try:
# Load saved hosts
db_hosts = self.db.get_hosts(active_only=True)
# Detect and warn about duplicate hosts (same URL)
seen_urls = {}
for host in db_hosts:
if host.url in seen_urls:
logger.warning(
f"Duplicate host detected: '{host.name}' ({host.id}) and "
f"'{seen_urls[host.url]['name']}' ({seen_urls[host.url]['id']}) "
f"both use URL '{host.url}'. Consider removing duplicates."
)
else:
seen_urls[host.url] = {'name': host.name, 'id': host.id}
# Check if this is first run
with self.db.get_session() as session:
settings = session.query(GlobalSettings).first()
if not settings:
# Create default settings
settings = GlobalSettings()
session.add(settings)
session.commit()
# Auto-add local Docker only on first run (outside session context)
with self.db.get_session() as session:
settings = session.query(GlobalSettings).first()
if settings and not settings.first_run_complete and not db_hosts and os.path.exists('/var/run/docker.sock'):
logger.info("First run detected - adding local Docker automatically")
host_added = False
try:
config = DockerHostConfig(
name="Local Docker",
url="unix:///var/run/docker.sock",
tls_cert=None,
tls_key=None,
tls_ca=None
)
self.add_host(config, suppress_event_loop_errors=True)
host_added = True
logger.info("Successfully added local Docker host")
except Exception as e:
# Check if this is the benign "no running event loop" error during startup
# The host is actually added successfully despite this error
error_str = str(e)
if "no running event loop" in error_str:
host_added = True
logger.debug(f"Event loop warning during first run (host added successfully): {e}")
else:
logger.error(f"Failed to add local Docker: {e}")
session.rollback()
# Mark first run as complete if host was added
if host_added:
settings.first_run_complete = True
session.commit()
logger.info("First run setup complete")
for db_host in db_hosts:
try:
config = DockerHostConfig(
name=db_host.name,
url=db_host.url,
tls_cert=db_host.tls_cert,
tls_key=db_host.tls_key,
tls_ca=db_host.tls_ca
)
# Try to connect to the host with existing ID and preserve security status
host = self.add_host(config, existing_id=db_host.id, skip_db_save=True, suppress_event_loop_errors=True)
# Override with stored security status
if hasattr(host, 'security_status') and db_host.security_status:
host.security_status = db_host.security_status
except Exception as e:
# Suppress event loop errors during startup
error_str = str(e)
if "no running event loop" not in error_str:
logger.error(f"Failed to reconnect to saved host {db_host.name}: {e}")
# Add host to UI even if connection failed, mark as offline
# This prevents "disappearing hosts" bug after restart
host = DockerHost(
id=db_host.id,
name=db_host.name,
url=db_host.url,
status="offline",
client=None
)
host.security_status = db_host.security_status or "unknown"
self.hosts[db_host.id] = host
logger.info(f"Added host {db_host.name} in offline mode - connection will retry")
# Load auto-restart configurations
for host_id in self.hosts:
configs = self.db.get_session().query(AutoRestartConfig).filter(
AutoRestartConfig.host_id == host_id,
AutoRestartConfig.enabled == True
).all()
for config in configs:
# Use host_id:container_id as key to prevent collisions between hosts
container_key = f"{config.host_id}:{config.container_id}"
self.auto_restart_status[container_key] = True
self.restart_attempts[container_key] = config.restart_count
logger.info(f"Loaded {len(self.hosts)} hosts from database")
except Exception as e:
logger.error(f"Error loading persistent config: {e}")
def _get_auto_restart_status(self, host_id: str, container_id: str) -> bool:
"""Get auto-restart status for a container"""
# Use host_id:container_id as key to prevent collisions between hosts
container_key = f"{host_id}:{container_id}"
# Check in-memory cache first
if container_key in self.auto_restart_status:
return self.auto_restart_status[container_key]
# Check database
config = self.db.get_auto_restart_config(host_id, container_id)
if config:
self.auto_restart_status[container_key] = config.enabled
return config.enabled
return False
async def cleanup_old_data(self):
"""Periodic cleanup of old data"""
logger.info("Starting periodic data cleanup...")
while True:
try:
settings = self.db.get_settings()
if settings.auto_cleanup_events:
# Clean up old events
event_deleted = self.db.cleanup_old_events(settings.event_retention_days)
if event_deleted > 0:
self.event_logger.log_system_event(
"Automatic Event Cleanup",
f"Cleaned up {event_deleted} events older than {settings.event_retention_days} days",
EventSeverity.INFO,
EventType.STARTUP
)
# Clean up expired sessions (runs daily regardless of event cleanup setting)
expired_count = session_manager.cleanup_expired_sessions()
if expired_count > 0:
logger.info(f"Cleaned up {expired_count} expired sessions")
# Sleep for 24 hours before next cleanup
await asyncio.sleep(24 * 60 * 60) # 24 hours
except Exception as e:
logger.error(f"Error in cleanup task: {e}")
# Wait 1 hour before retrying
await asyncio.sleep(60 * 60) # 1 hour