Files

1632 lines
66 KiB
Python

#!/usr/bin/env python3
"""
DockMon Backend - Docker Container Monitoring System
Supports multiple Docker hosts with auto-restart and alerts
"""
import asyncio
import json
import logging
import time
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from contextlib import asynccontextmanager
import docker
from docker import DockerClient
from docker.errors import DockerException, APIError
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request, Depends, status, Cookie, Response, Query
from fastapi.middleware.cors import CORSMiddleware
# Session-based auth - no longer need HTTPBearer
from fastapi.responses import FileResponse
from database import DatabaseManager
from realtime import RealtimeMonitor
from notifications import NotificationService, AlertProcessor
from event_logger import EventLogger, EventContext, EventCategory, EventType, EventSeverity, PerformanceTimer
# Import extracted modules
from config.settings import AppConfig, get_cors_origins, setup_logging
from models.docker_models import DockerHostConfig, DockerHost
from models.settings_models import GlobalSettings, AlertRule
from models.request_models import (
AutoRestartRequest, AlertRuleCreate, AlertRuleUpdate,
NotificationChannelCreate, NotificationChannelUpdate, EventLogFilter
)
from security.audit import security_audit
from security.rate_limiting import rate_limiter, rate_limit_auth, rate_limit_hosts, rate_limit_containers, rate_limit_notifications, rate_limit_default
from auth.routes import router as auth_router, verify_frontend_session
verify_session_auth = verify_frontend_session
from websocket.connection import ConnectionManager, DateTimeEncoder
from websocket.rate_limiter import ws_rate_limiter
from docker_monitor.monitor import DockerMonitor
# Configure logging
setup_logging()
logger = logging.getLogger(__name__)
# ==================== FastAPI Application ====================
# Create monitor instance
monitor = DockerMonitor()
# ==================== Authentication ====================
# Session-based authentication only - no API keys needed
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle"""
# Startup
logger.info("Starting DockMon backend...")
# Ensure default user exists
monitor.db.get_or_create_default_user()
await monitor.event_logger.start()
monitor.event_logger.log_system_event("DockMon Backend Starting", "DockMon backend is initializing", EventSeverity.INFO, EventType.STARTUP)
# Connect security audit logger to event logger
security_audit.set_event_logger(monitor.event_logger)
monitor.monitoring_task = asyncio.create_task(monitor.monitor_containers())
monitor.cleanup_task = asyncio.create_task(monitor.cleanup_old_data())
# Start blackout window monitoring with WebSocket support
await monitor.notification_service.blackout_manager.start_monitoring(
monitor.notification_service,
monitor, # Pass DockerMonitor instance to avoid re-initialization
monitor.manager # Pass ConnectionManager for WebSocket broadcasts
)
yield
# Shutdown
logger.info("Shutting down DockMon backend...")
monitor.event_logger.log_system_event("DockMon Backend Shutting Down", "DockMon backend is shutting down", EventSeverity.INFO, EventType.SHUTDOWN)
if monitor.monitoring_task:
monitor.monitoring_task.cancel()
if monitor.cleanup_task:
monitor.cleanup_task.cancel()
# Stop blackout monitoring
monitor.notification_service.blackout_manager.stop_monitoring()
# Close stats client (HTTP session and WebSocket)
from stats_client import get_stats_client
await get_stats_client().close()
# Close notification service
await monitor.notification_service.close()
# Stop event logger
await monitor.event_logger.stop()
# Dispose SQLAlchemy engine
monitor.db.engine.dispose()
logger.info("SQLAlchemy engine disposed")
app = FastAPI(
title="DockMon API",
version="1.0.0",
lifespan=lifespan
)
# Configure CORS - Production ready with environment-based configuration
app.add_middleware(
CORSMiddleware,
allow_origins=AppConfig.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization"],
)
logger.info(f"CORS configured for origins: {AppConfig.CORS_ORIGINS}")
# ==================== API Routes ====================
# Register authentication router
app.include_router(auth_router)
@app.get("/")
async def root(authenticated: bool = Depends(verify_session_auth)):
"""Backend API root - frontend is served separately"""
return {"message": "DockMon Backend API", "version": "1.0.0", "docs": "/docs"}
@app.get("/health")
async def health_check():
"""Health check endpoint for Docker health checks - no authentication required"""
return {"status": "healthy", "service": "dockmon-backend"}
def _is_localhost_or_internal(ip: str) -> bool:
"""Check if IP is localhost or internal network (Docker networks, private networks)"""
import ipaddress
try:
addr = ipaddress.ip_address(ip)
# Allow localhost
if addr.is_loopback:
return True
# Allow private networks (RFC 1918) - for Docker networks and internal deployments
if addr.is_private:
return True
return False
except ValueError:
# Invalid IP format
return False
# ==================== Frontend Authentication ====================
async def verify_session_auth(request: Request):
"""Verify authentication via session cookie only"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
# Since backend only listens on 127.0.0.1, all requests must come through nginx
# No need to check client IP - the backend binding ensures security
# Check session authentication
session_id = _get_session_from_cookie(request)
if session_id and session_manager.validate_session(session_id, request):
return True
# No valid session found
raise HTTPException(
status_code=401,
detail="Authentication required - please login"
)
@app.get("/api/hosts")
async def get_hosts(authenticated: bool = Depends(verify_session_auth)):
"""Get all configured Docker hosts"""
return list(monitor.hosts.values())
@app.post("/api/hosts")
async def add_host(config: DockerHostConfig, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_hosts, request: Request = None):
"""Add a new Docker host"""
try:
host = monitor.add_host(config)
# Security audit log - successful privileged action
if request:
security_audit.log_privileged_action(
client_ip=request.client.host if hasattr(request, 'client') else "unknown",
action="ADD_DOCKER_HOST",
target=f"{config.name} ({config.url})",
success=True,
user_agent=request.headers.get('user-agent', 'unknown')
)
# Broadcast host addition to WebSocket clients so they refresh
await monitor.manager.broadcast({
"type": "host_added",
"data": {"host_id": host.id, "host_name": host.name}
})
return host
except Exception as e:
# Security audit log - failed privileged action
if request:
security_audit.log_privileged_action(
client_ip=request.client.host if hasattr(request, 'client') else "unknown",
action="ADD_DOCKER_HOST",
target=f"{config.name} ({config.url})",
success=False,
user_agent=request.headers.get('user-agent', 'unknown')
)
raise
@app.put("/api/hosts/{host_id}")
async def update_host(host_id: str, config: DockerHostConfig, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_hosts):
"""Update an existing Docker host"""
host = monitor.update_host(host_id, config)
return host
@app.delete("/api/hosts/{host_id}")
async def remove_host(host_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_hosts):
"""Remove a Docker host"""
await monitor.remove_host(host_id)
# Broadcast host removal to WebSocket clients so they refresh
await monitor.manager.broadcast({
"type": "host_removed",
"data": {"host_id": host_id}
})
return {"status": "success", "message": f"Host {host_id} removed"}
@app.get("/api/hosts/{host_id}/metrics")
async def get_host_metrics(host_id: str, authenticated: bool = Depends(verify_session_auth)):
"""Get aggregated metrics for a Docker host (CPU, RAM, Network)"""
try:
host = monitor.hosts.get(host_id)
if not host:
raise HTTPException(status_code=404, detail="Host not found")
client = monitor.clients.get(host_id)
if not client:
raise HTTPException(status_code=503, detail="Host client not available")
# Get all running containers on this host
containers = client.containers.list(filters={'status': 'running'})
total_cpu = 0.0
total_memory_used = 0
total_memory_limit = 0
total_net_rx = 0
total_net_tx = 0
container_count = 0
for container in containers:
try:
stats = container.stats(stream=False)
# Calculate CPU percentage
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
if system_delta > 0:
num_cpus = len(stats['cpu_stats']['cpu_usage'].get('percpu_usage', [1]))
cpu_percent = (cpu_delta / system_delta) * num_cpus * 100.0
total_cpu += cpu_percent
# Memory
mem_usage = stats['memory_stats'].get('usage', 0)
mem_limit = stats['memory_stats'].get('limit', 1)
total_memory_used += mem_usage
total_memory_limit += mem_limit
# Network I/O
networks = stats.get('networks', {})
for net_stats in networks.values():
total_net_rx += net_stats.get('rx_bytes', 0)
total_net_tx += net_stats.get('tx_bytes', 0)
container_count += 1
except Exception as e:
logger.warning(f"Failed to get stats for container {container.id}: {e}")
continue
# Calculate percentages
avg_cpu = round(total_cpu / container_count, 1) if container_count > 0 else 0.0
memory_percent = round((total_memory_used / total_memory_limit) * 100, 1) if total_memory_limit > 0 else 0.0
return {
"cpu_percent": avg_cpu,
"memory_percent": memory_percent,
"memory_used_bytes": total_memory_used,
"memory_limit_bytes": total_memory_limit,
"network_rx_bytes": total_net_rx,
"network_tx_bytes": total_net_tx,
"container_count": container_count,
"timestamp": int(time.time())
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching metrics for host {host_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/containers")
async def get_containers(host_id: Optional[str] = None, authenticated: bool = Depends(verify_session_auth)):
"""Get all containers"""
return await monitor.get_containers(host_id)
@app.post("/api/hosts/{host_id}/containers/{container_id}/restart")
async def restart_container(host_id: str, container_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_containers):
"""Restart a container"""
success = monitor.restart_container(host_id, container_id)
return {"status": "success" if success else "failed"}
@app.post("/api/hosts/{host_id}/containers/{container_id}/stop")
async def stop_container(host_id: str, container_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_containers):
"""Stop a container"""
success = monitor.stop_container(host_id, container_id)
return {"status": "success" if success else "failed"}
@app.post("/api/hosts/{host_id}/containers/{container_id}/start")
async def start_container(host_id: str, container_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_containers):
"""Start a container"""
success = monitor.start_container(host_id, container_id)
return {"status": "success" if success else "failed"}
@app.get("/api/hosts/{host_id}/containers/{container_id}/logs")
async def get_container_logs(
host_id: str,
container_id: str,
tail: int = 100,
since: Optional[str] = None, # ISO timestamp for getting logs since a specific time
authenticated: bool = Depends(verify_session_auth)
# No rate limiting - authenticated users can poll logs freely
):
"""Get container logs - Portainer-style polling approach"""
if host_id not in monitor.clients:
raise HTTPException(status_code=404, detail="Host not found")
try:
client = monitor.clients[host_id]
# Run blocking Docker calls in executor with timeout
loop = asyncio.get_event_loop()
# Get container with timeout
try:
container = await asyncio.wait_for(
loop.run_in_executor(None, client.containers.get, container_id),
timeout=5.0
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Timeout getting container")
# Prepare log options
log_kwargs = {
'timestamps': True,
'tail': tail
}
# Add since parameter if provided (for getting only new logs)
if since:
try:
# Parse ISO timestamp and convert to Unix timestamp for Docker
import dateutil.parser
dt = dateutil.parser.parse(since)
# Docker's 'since' expects Unix timestamp as float
import time
unix_ts = time.mktime(dt.timetuple())
log_kwargs['since'] = unix_ts
log_kwargs['tail'] = 'all' # Get all logs since timestamp
except Exception as e:
logger.debug(f"Could not parse 'since' parameter: {e}")
pass # Invalid timestamp, ignore
# Fetch logs with timeout
try:
logs = await asyncio.wait_for(
loop.run_in_executor(
None,
lambda: container.logs(**log_kwargs).decode('utf-8', errors='ignore')
),
timeout=5.0
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Timeout fetching logs")
# Parse logs and extract timestamps
# Docker log format with timestamps: "2025-09-30T19:30:45.123456789Z actual log message"
parsed_logs = []
for line in logs.split('\n'):
if not line.strip():
continue
# Try to extract timestamp (Docker format: ISO8601 with nanoseconds)
try:
# Find the space after timestamp
space_idx = line.find(' ')
if space_idx > 0:
timestamp_str = line[:space_idx]
log_text = line[space_idx + 1:]
# Parse timestamp (remove nanoseconds for Python datetime)
# Format: 2025-09-30T19:30:45.123456789Z -> 2025-09-30T19:30:45.123456Z
if 'T' in timestamp_str and timestamp_str.endswith('Z'):
# Truncate to microseconds (6 digits) if nanoseconds present
parts = timestamp_str[:-1].split('.')
if len(parts) == 2 and len(parts[1]) > 6:
timestamp_str = f"{parts[0]}.{parts[1][:6]}Z"
parsed_logs.append({
"timestamp": timestamp_str,
"log": log_text
})
else:
# No valid timestamp, use current time
parsed_logs.append({
"timestamp": datetime.utcnow().isoformat() + 'Z',
"log": line
})
else:
# No space found, treat whole line as log
parsed_logs.append({
"timestamp": datetime.utcnow().isoformat() + 'Z',
"log": line
})
except Exception:
# If parsing fails, use current time
parsed_logs.append({
"timestamp": datetime.utcnow().isoformat() + 'Z',
"log": line
})
return {
"container_id": container_id,
"logs": parsed_logs,
"last_timestamp": datetime.utcnow().isoformat() + 'Z' # For next 'since' parameter
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get logs for {container_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Container exec endpoint removed for security reasons
# Users should use direct SSH, Docker CLI, or other appropriate tools for container access
# WebSocket log streaming removed in favor of HTTP polling (Portainer-style)
# This is more reliable for remote Docker hosts
@app.post("/api/containers/{container_id}/auto-restart")
async def toggle_auto_restart(container_id: str, request: AutoRestartRequest, authenticated: bool = Depends(verify_session_auth)):
"""Toggle auto-restart for a container"""
monitor.toggle_auto_restart(request.host_id, container_id, request.container_name, request.enabled)
return {"container_id": container_id, "auto_restart": request.enabled}
@app.get("/api/rate-limit/stats")
async def get_rate_limit_stats(authenticated: bool = Depends(verify_session_auth)):
"""Get rate limiter statistics - admin only"""
return rate_limiter.get_stats()
@app.get("/api/security/audit")
async def get_security_audit_stats(authenticated: bool = Depends(verify_session_auth), request: Request = None):
"""Get security audit statistics - admin only"""
if request:
security_audit.log_privileged_action(
client_ip=request.client.host if hasattr(request, 'client') else "unknown",
action="VIEW_SECURITY_AUDIT",
target="security_audit_logs",
success=True,
user_agent=request.headers.get('user-agent', 'unknown')
)
return security_audit.get_security_stats()
@app.get("/api/settings")
async def get_settings(authenticated: bool = Depends(verify_session_auth)):
"""Get global settings"""
settings = monitor.db.get_settings()
return {
"max_retries": settings.max_retries,
"retry_delay": settings.retry_delay,
"default_auto_restart": settings.default_auto_restart,
"polling_interval": settings.polling_interval,
"connection_timeout": settings.connection_timeout,
"log_retention_days": settings.log_retention_days,
"enable_notifications": settings.enable_notifications,
"alert_template": getattr(settings, 'alert_template', None),
"blackout_windows": getattr(settings, 'blackout_windows', None),
"timezone_offset": getattr(settings, 'timezone_offset', 0),
"show_host_stats": getattr(settings, 'show_host_stats', True),
"show_container_stats": getattr(settings, 'show_container_stats', True)
}
@app.post("/api/settings")
async def update_settings(settings: GlobalSettings, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default):
"""Update global settings"""
# Check if stats settings changed
old_show_host_stats = monitor.settings.show_host_stats
old_show_container_stats = monitor.settings.show_container_stats
updated = monitor.db.update_settings(settings.dict())
monitor.settings = updated # Update in-memory settings
# Log stats collection changes
if old_show_host_stats != updated.show_host_stats:
logger.info(f"Host stats collection {'enabled' if updated.show_host_stats else 'disabled'}")
if old_show_container_stats != updated.show_container_stats:
logger.info(f"Container stats collection {'enabled' if updated.show_container_stats else 'disabled'}")
# Broadcast blackout status change to all clients
is_blackout, window_name = monitor.notification_service.blackout_manager.is_in_blackout_window()
await monitor.manager.broadcast({
'type': 'blackout_status_changed',
'data': {
'is_blackout': is_blackout,
'window_name': window_name
}
})
return settings
@app.get("/api/alerts")
async def get_alert_rules(authenticated: bool = Depends(verify_session_auth)):
"""Get all alert rules"""
rules = monitor.db.get_alert_rules(enabled_only=False)
logger.info(f"Retrieved {len(rules)} alert rules from database")
# Check for orphaned alerts
orphaned = await monitor.check_orphaned_alerts()
return [{
"id": rule.id,
"name": rule.name,
"containers": [{"host_id": c.host_id, "container_name": c.container_name}
for c in rule.containers] if rule.containers else [],
"trigger_events": rule.trigger_events,
"trigger_states": rule.trigger_states,
"notification_channels": rule.notification_channels,
"cooldown_minutes": rule.cooldown_minutes,
"enabled": rule.enabled,
"last_triggered": rule.last_triggered.isoformat() if rule.last_triggered else None,
"created_at": rule.created_at.isoformat(),
"updated_at": rule.updated_at.isoformat(),
"is_orphaned": rule.id in orphaned,
"orphaned_containers": orphaned.get(rule.id, {}).get('orphaned_containers', []) if rule.id in orphaned else []
} for rule in rules]
@app.post("/api/alerts")
async def create_alert_rule(rule: AlertRuleCreate, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default):
"""Create a new alert rule"""
try:
# Validate cooldown_minutes
if rule.cooldown_minutes < 0 or rule.cooldown_minutes > 10080: # Max 1 week
raise HTTPException(status_code=400, detail="Cooldown must be between 0 and 10080 minutes (1 week)")
rule_id = str(uuid.uuid4())
# Convert ContainerHostPair objects to dicts for database
containers_data = None
if rule.containers:
containers_data = [{"host_id": c.host_id, "container_name": c.container_name}
for c in rule.containers]
logger.info(f"Creating alert rule: {rule.name} with {len(containers_data) if containers_data else 0} container+host pairs")
db_rule = monitor.db.add_alert_rule({
"id": rule_id,
"name": rule.name,
"containers": containers_data,
"trigger_events": rule.trigger_events,
"trigger_states": rule.trigger_states,
"notification_channels": rule.notification_channels,
"cooldown_minutes": rule.cooldown_minutes,
"enabled": rule.enabled
})
logger.info(f"Successfully created alert rule with ID: {db_rule.id}")
# Log alert rule creation
monitor.event_logger.log_alert_rule_created(
rule_name=db_rule.name,
rule_id=db_rule.id,
container_count=len(db_rule.containers) if db_rule.containers else 0,
channels=db_rule.notification_channels or [],
triggered_by="user"
)
return {
"id": db_rule.id,
"name": db_rule.name,
"containers": [{"host_id": c.host_id, "container_name": c.container_name}
for c in db_rule.containers] if db_rule.containers else [],
"trigger_events": db_rule.trigger_events,
"trigger_states": db_rule.trigger_states,
"notification_channels": db_rule.notification_channels,
"cooldown_minutes": db_rule.cooldown_minutes,
"enabled": db_rule.enabled,
"last_triggered": None,
"created_at": db_rule.created_at.isoformat(),
"updated_at": db_rule.updated_at.isoformat()
}
except Exception as e:
logger.error(f"Failed to create alert rule: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.put("/api/alerts/{rule_id}")
async def update_alert_rule(rule_id: str, updates: AlertRuleUpdate, authenticated: bool = Depends(verify_session_auth)):
"""Update an alert rule"""
try:
# Validate cooldown_minutes if provided
if updates.cooldown_minutes is not None:
if updates.cooldown_minutes < 0 or updates.cooldown_minutes > 10080: # Max 1 week
raise HTTPException(status_code=400, detail="Cooldown must be between 0 and 10080 minutes (1 week)")
# Include all fields that are explicitly set, even if empty
# This allows clearing trigger_events or trigger_states
update_data = {}
for k, v in updates.dict().items():
if v is not None:
# Convert empty lists to None for trigger fields
if k in ['trigger_events', 'trigger_states'] and isinstance(v, list) and not v:
update_data[k] = None
# Handle containers field separately
elif k == 'containers':
# v is already a list of dicts after .dict() call
# Include it even if None to clear the containers (set to "all containers")
update_data[k] = v
else:
update_data[k] = v
elif k == 'containers':
# Explicitly handle containers=None to clear specific container selection
update_data[k] = None
# Validate that at least one trigger type remains after update
if 'trigger_events' in update_data or 'trigger_states' in update_data:
# Get current rule to check what will remain
current_rule = monitor.db.get_alert_rule(rule_id)
if current_rule:
final_events = update_data.get('trigger_events', current_rule.trigger_events)
final_states = update_data.get('trigger_states', current_rule.trigger_states)
if not final_events and not final_states:
raise HTTPException(status_code=400,
detail="Alert rule must have at least one trigger event or state")
db_rule = monitor.db.update_alert_rule(rule_id, update_data)
if not db_rule:
raise HTTPException(status_code=404, detail="Alert rule not found")
# Refresh in-memory alert rules
return {
"id": db_rule.id,
"name": db_rule.name,
"containers": [{"host_id": c.host_id, "container_name": c.container_name}
for c in db_rule.containers] if db_rule.containers else [],
"trigger_events": db_rule.trigger_events,
"trigger_states": db_rule.trigger_states,
"notification_channels": db_rule.notification_channels,
"cooldown_minutes": db_rule.cooldown_minutes,
"enabled": db_rule.enabled,
"last_triggered": db_rule.last_triggered.isoformat() if db_rule.last_triggered else None,
"created_at": db_rule.created_at.isoformat(),
"updated_at": db_rule.updated_at.isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update alert rule: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.delete("/api/alerts/{rule_id}")
async def delete_alert_rule(rule_id: str, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_default):
"""Delete an alert rule"""
try:
# Get rule info before deleting for logging
rule = monitor.db.get_alert_rule(rule_id)
if not rule:
raise HTTPException(status_code=404, detail="Alert rule not found")
rule_name = rule.name
success = monitor.db.delete_alert_rule(rule_id)
if not success:
raise HTTPException(status_code=404, detail="Alert rule not found")
# Refresh in-memory alert rules
# Log alert rule deletion
monitor.event_logger.log_alert_rule_deleted(
rule_name=rule_name,
rule_id=rule_id,
triggered_by="user"
)
return {"status": "success", "message": f"Alert rule {rule_id} deleted"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete alert rule: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.get("/api/alerts/orphaned")
async def get_orphaned_alerts(authenticated: bool = Depends(verify_session_auth)):
"""Get alert rules that reference non-existent containers"""
try:
orphaned = await monitor.check_orphaned_alerts()
return {
"count": len(orphaned),
"orphaned_rules": orphaned
}
except Exception as e:
logger.error(f"Failed to check orphaned alerts: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== Blackout Window Routes ====================
@app.get("/api/blackout/status")
async def get_blackout_status(authenticated: bool = Depends(verify_session_auth)):
"""Get current blackout window status"""
try:
is_blackout, window_name = monitor.notification_service.blackout_manager.is_in_blackout_window()
return {
"is_blackout": is_blackout,
"current_window": window_name
}
except Exception as e:
logger.error(f"Error getting blackout status: {e}")
return {"is_blackout": False, "current_window": None}
# ==================== Notification Channel Routes ====================
@app.get("/api/notifications/template-variables")
async def get_template_variables(authenticated: bool = Depends(verify_session_auth)):
"""Get available template variables for notification messages"""
return {
"variables": [
{"name": "{CONTAINER_NAME}", "description": "Name of the container"},
{"name": "{CONTAINER_ID}", "description": "Short container ID (12 characters)"},
{"name": "{HOST_NAME}", "description": "Name of the Docker host"},
{"name": "{HOST_ID}", "description": "ID of the Docker host"},
{"name": "{OLD_STATE}", "description": "Previous state of the container"},
{"name": "{NEW_STATE}", "description": "New state of the container"},
{"name": "{IMAGE}", "description": "Docker image name"},
{"name": "{TIMESTAMP}", "description": "Full timestamp (YYYY-MM-DD HH:MM:SS)"},
{"name": "{TIME}", "description": "Time only (HH:MM:SS)"},
{"name": "{DATE}", "description": "Date only (YYYY-MM-DD)"},
{"name": "{RULE_NAME}", "description": "Name of the alert rule"},
{"name": "{RULE_ID}", "description": "ID of the alert rule"},
{"name": "{TRIGGERED_BY}", "description": "What triggered the alert"},
{"name": "{EVENT_TYPE}", "description": "Docker event type (if applicable)"},
{"name": "{EXIT_CODE}", "description": "Container exit code (if applicable)"}
],
"default_template": """🚨 **DockMon Alert**
**Container:** `{CONTAINER_NAME}`
**Host:** {HOST_NAME}
**State Change:** `{OLD_STATE}` → `{NEW_STATE}`
**Image:** {IMAGE}
**Time:** {TIMESTAMP}
**Rule:** {RULE_NAME}
───────────────────────""",
"examples": {
"simple": "Alert: {CONTAINER_NAME} on {HOST_NAME} changed from {OLD_STATE} to {NEW_STATE}",
"detailed": """🔴 Container Alert
Container: {CONTAINER_NAME} ({CONTAINER_ID})
Host: {HOST_NAME}
Status: {OLD_STATE}{NEW_STATE}
Image: {IMAGE}
Time: {TIMESTAMP}
Triggered by: {RULE_NAME}""",
"minimal": "{CONTAINER_NAME}: {NEW_STATE} at {TIME}"
}
}
@app.get("/api/notifications/channels")
async def get_notification_channels(authenticated: bool = Depends(verify_session_auth)):
"""Get all notification channels"""
channels = monitor.db.get_notification_channels(enabled_only=False)
return [{
"id": ch.id,
"name": ch.name,
"type": ch.type,
"config": ch.config,
"enabled": ch.enabled,
"created_at": ch.created_at.isoformat(),
"updated_at": ch.updated_at.isoformat()
} for ch in channels]
@app.post("/api/notifications/channels")
async def create_notification_channel(channel: NotificationChannelCreate, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications):
"""Create a new notification channel"""
try:
db_channel = monitor.db.add_notification_channel({
"name": channel.name,
"type": channel.type,
"config": channel.config,
"enabled": channel.enabled
})
# Log notification channel creation
monitor.event_logger.log_notification_channel_created(
channel_name=db_channel.name,
channel_type=db_channel.type,
triggered_by="user"
)
return {
"id": db_channel.id,
"name": db_channel.name,
"type": db_channel.type,
"config": db_channel.config,
"enabled": db_channel.enabled,
"created_at": db_channel.created_at.isoformat(),
"updated_at": db_channel.updated_at.isoformat()
}
except Exception as e:
logger.error(f"Failed to create notification channel: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.put("/api/notifications/channels/{channel_id}")
async def update_notification_channel(channel_id: int, updates: NotificationChannelUpdate, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications):
"""Update a notification channel"""
try:
update_data = {k: v for k, v in updates.dict().items() if v is not None}
db_channel = monitor.db.update_notification_channel(channel_id, update_data)
if not db_channel:
raise HTTPException(status_code=404, detail="Channel not found")
return {
"id": db_channel.id,
"name": db_channel.name,
"type": db_channel.type,
"config": db_channel.config,
"enabled": db_channel.enabled,
"created_at": db_channel.created_at.isoformat(),
"updated_at": db_channel.updated_at.isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update notification channel: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.get("/api/notifications/channels/{channel_id}/dependent-alerts")
async def get_dependent_alerts(channel_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications):
"""Get alerts that would be orphaned if this channel is deleted"""
try:
dependent_alerts = monitor.db.get_alerts_dependent_on_channel(channel_id)
return {"alerts": dependent_alerts}
except Exception as e:
logger.error(f"Failed to get dependent alerts: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.delete("/api/notifications/channels/{channel_id}")
async def delete_notification_channel(channel_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications):
"""Delete a notification channel and cascade delete alerts that would become orphaned"""
try:
# Find alerts that would be orphaned (only have this channel)
affected_alerts = monitor.db.get_alerts_dependent_on_channel(channel_id)
# Find all alerts that use this channel (for removal from multi-channel alerts)
all_alerts = monitor.db.get_alert_rules()
# Delete the channel
success = monitor.db.delete_notification_channel(channel_id)
if not success:
raise HTTPException(status_code=404, detail="Channel not found")
# Delete orphaned alerts
deleted_alerts = []
for alert in affected_alerts:
if monitor.db.delete_alert_rule(alert['id']):
deleted_alerts.append(alert['name'])
# Remove channel from multi-channel alerts
updated_alerts = []
for alert in all_alerts:
# Skip if already deleted
if alert.id in [a['id'] for a in affected_alerts]:
continue
# Check if this alert uses the deleted channel
channels = alert.notification_channels if isinstance(alert.notification_channels, list) else []
if channel_id in channels:
# Remove the channel
new_channels = [ch for ch in channels if ch != channel_id]
monitor.db.update_alert_rule(alert.id, {'notification_channels': new_channels})
updated_alerts.append(alert.name)
result = {
"status": "success",
"message": f"Channel {channel_id} deleted"
}
if deleted_alerts:
result["deleted_alerts"] = deleted_alerts
result["message"] += f" and {len(deleted_alerts)} orphaned alert(s) removed"
if updated_alerts:
result["updated_alerts"] = updated_alerts
if "deleted_alerts" in result:
result["message"] += f", {len(updated_alerts)} alert(s) updated"
else:
result["message"] += f" and {len(updated_alerts)} alert(s) updated"
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete notification channel: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.post("/api/notifications/channels/{channel_id}/test")
async def test_notification_channel(channel_id: int, authenticated: bool = Depends(verify_session_auth), rate_limit_check: bool = rate_limit_notifications):
"""Test a notification channel"""
try:
if not hasattr(monitor, 'notification_service'):
raise HTTPException(status_code=503, detail="Notification service not available")
result = await monitor.notification_service.test_channel(channel_id)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to test notification channel: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== Event Log Routes ====================
@app.get("/api/events")
async def get_events(
category: Optional[List[str]] = Query(None),
event_type: Optional[str] = None,
severity: Optional[List[str]] = Query(None),
host_id: Optional[List[str]] = Query(None),
container_id: Optional[str] = None,
container_name: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
correlation_id: Optional[str] = None,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0,
hours: Optional[int] = None,
authenticated: bool = Depends(verify_session_auth),
rate_limit_check: bool = rate_limit_default
):
"""
Get events with filtering and pagination
Query parameters:
- category: Filter by category (container, host, system, alert, notification)
- event_type: Filter by event type (state_change, action_taken, etc.)
- severity: Filter by severity (debug, info, warning, error, critical)
- host_id: Filter by specific host
- container_id: Filter by specific container
- container_name: Filter by container name (partial match)
- start_date: Filter events after this date (ISO 8601 format)
- end_date: Filter events before this date (ISO 8601 format)
- hours: Shortcut to get events from last X hours (overrides start_date)
- correlation_id: Get related events
- search: Search in title, message, and container name
- limit: Number of results per page (default 100, max 500)
- offset: Pagination offset
"""
try:
# Validate and parse dates
start_datetime = None
end_datetime = None
# If hours parameter is provided, calculate start_date
if hours is not None:
start_datetime = datetime.now() - timedelta(hours=hours)
elif start_date:
try:
start_datetime = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid start_date format. Use ISO 8601 format.")
if end_date:
try:
end_datetime = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid end_date format. Use ISO 8601 format.")
# Limit maximum results per page
if limit > 500:
limit = 500
# Query events from database
events, total_count = monitor.db.get_events(
category=category,
event_type=event_type,
severity=severity,
host_id=host_id,
container_id=container_id,
container_name=container_name,
start_date=start_datetime,
end_date=end_datetime,
correlation_id=correlation_id,
search=search,
limit=limit,
offset=offset
)
# Convert to JSON-serializable format
events_json = []
for event in events:
events_json.append({
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
})
return {
"events": events_json,
"total_count": total_count,
"limit": limit,
"offset": offset,
"has_more": (offset + limit) < total_count
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get events: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/events/{event_id}")
async def get_event_by_id(
event_id: int,
authenticated: bool = Depends(verify_session_auth),
rate_limit_check: bool = rate_limit_default
):
"""Get a specific event by ID"""
try:
event = monitor.db.get_event_by_id(event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
return {
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get event: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/events/correlation/{correlation_id}")
async def get_events_by_correlation(
correlation_id: str,
authenticated: bool = Depends(verify_session_auth),
rate_limit_check: bool = rate_limit_default
):
"""Get all events with the same correlation ID (related events)"""
try:
events = monitor.db.get_events_by_correlation(correlation_id)
events_json = []
for event in events:
events_json.append({
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
})
return {"events": events_json, "count": len(events_json)}
except Exception as e:
logger.error(f"Failed to get events by correlation: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== User Dashboard Routes ====================
@app.get("/api/user/dashboard-layout")
async def get_dashboard_layout(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Get dashboard layout for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
layout = monitor.db.get_dashboard_layout(username)
return {"layout": layout}
@app.post("/api/user/dashboard-layout")
async def save_dashboard_layout(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Save dashboard layout for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
try:
body = await request.json()
layout_json = body.get('layout')
if layout_json is None:
raise HTTPException(status_code=400, detail="Layout is required")
# Validate JSON structure
if layout_json:
try:
parsed_layout = json.loads(layout_json) if isinstance(layout_json, str) else layout_json
# Validate it's a list
if not isinstance(parsed_layout, list):
raise HTTPException(status_code=400, detail="Layout must be an array of widget positions")
# Validate each widget has required fields
required_fields = ['x', 'y', 'w', 'h']
for widget in parsed_layout:
if not isinstance(widget, dict):
raise HTTPException(status_code=400, detail="Each widget must be an object")
for field in required_fields:
if field not in widget:
raise HTTPException(status_code=400, detail=f"Widget missing required field: {field}")
if not isinstance(widget[field], (int, float)):
raise HTTPException(status_code=400, detail=f"Widget field '{field}' must be a number")
# Convert back to string for storage
layout_json = json.dumps(parsed_layout)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for layout")
success = monitor.db.save_dashboard_layout(username, layout_json)
if not success:
raise HTTPException(status_code=500, detail="Failed to save layout")
return {"success": True}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to save dashboard layout: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/user/event-sort-order")
async def get_event_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Get event sort order preference for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
sort_order = monitor.db.get_event_sort_order(username)
return {"sort_order": sort_order}
@app.post("/api/user/event-sort-order")
async def save_event_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Save event sort order preference for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
try:
body = await request.json()
sort_order = body.get('sort_order')
if sort_order not in ['asc', 'desc']:
raise HTTPException(status_code=400, detail="sort_order must be 'asc' or 'desc'")
success = monitor.db.save_event_sort_order(username, sort_order)
if not success:
raise HTTPException(status_code=500, detail="Failed to save sort order")
return {"success": True}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to save event sort order: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/user/container-sort-order")
async def get_container_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Get container sort order preference for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
sort_order = monitor.db.get_container_sort_order(username)
return {"sort_order": sort_order}
@app.post("/api/user/container-sort-order")
async def save_container_sort_order(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Save container sort order preference for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
try:
body = await request.json()
sort_order = body.get('sort_order')
valid_sorts = ['name-asc', 'name-desc', 'status', 'memory-desc', 'memory-asc', 'cpu-desc', 'cpu-asc']
if sort_order not in valid_sorts:
raise HTTPException(status_code=400, detail=f"sort_order must be one of: {', '.join(valid_sorts)}")
success = monitor.db.save_container_sort_order(username, sort_order)
if not success:
raise HTTPException(status_code=500, detail="Failed to save sort order")
return {"success": True}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to save container sort order: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/user/modal-preferences")
async def get_modal_preferences(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Get modal preferences for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
preferences = monitor.db.get_modal_preferences(username)
return {"preferences": preferences}
@app.post("/api/user/modal-preferences")
async def save_modal_preferences(request: Request, authenticated: bool = Depends(verify_session_auth)):
"""Save modal preferences for current user"""
from auth.routes import _get_session_from_cookie
from auth.session_manager import session_manager
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
try:
body = await request.json()
preferences = body.get('preferences')
if preferences is None:
raise HTTPException(status_code=400, detail="Preferences are required")
success = monitor.db.save_modal_preferences(username, preferences)
if not success:
raise HTTPException(status_code=500, detail="Failed to save preferences")
return {"success": True}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to save modal preferences: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== Event Log Routes ====================
# Note: Main /api/events endpoint is defined earlier with full feature set
@app.get("/api/events/{event_id}")
async def get_event(event_id: int, authenticated: bool = Depends(verify_session_auth)):
"""Get a specific event by ID"""
try:
event = monitor.db.get_event_by_id(event_id)
if not event:
raise HTTPException(status_code=404, detail="Event not found")
return {
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get event {event_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/events/correlation/{correlation_id}")
async def get_events_by_correlation(correlation_id: str, authenticated: bool = Depends(verify_session_auth)):
"""Get all events with the same correlation ID"""
try:
events = monitor.db.get_events_by_correlation(correlation_id)
return {
"correlation_id": correlation_id,
"events": [{
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
} for event in events]
}
except Exception as e:
logger.error(f"Failed to get events by correlation {correlation_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/events/statistics")
async def get_event_statistics(start_date: Optional[str] = None,
end_date: Optional[str] = None,
authenticated: bool = Depends(verify_session_auth)):
"""Get event statistics for dashboard"""
try:
# Parse dates
parsed_start_date = None
parsed_end_date = None
if start_date:
try:
parsed_start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid start_date format")
if end_date:
try:
parsed_end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid end_date format")
stats = monitor.db.get_event_statistics(
start_date=parsed_start_date,
end_date=parsed_end_date
)
return stats
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get event statistics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/events/container/{container_id}")
async def get_container_events(container_id: str, limit: int = 50, authenticated: bool = Depends(verify_session_auth)):
"""Get events for a specific container"""
try:
events, total_count = monitor.db.get_events(
container_id=container_id,
limit=limit,
offset=0
)
return {
"container_id": container_id,
"events": [{
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
} for event in events],
"total_count": total_count
}
except Exception as e:
logger.error(f"Failed to get events for container {container_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/events/host/{host_id}")
async def get_host_events(host_id: str, limit: int = 50, authenticated: bool = Depends(verify_session_auth)):
"""Get events for a specific host"""
try:
events, total_count = monitor.db.get_events(
host_id=host_id,
limit=limit,
offset=0
)
return {
"host_id": host_id,
"events": [{
"id": event.id,
"correlation_id": event.correlation_id,
"category": event.category,
"event_type": event.event_type,
"severity": event.severity,
"host_id": event.host_id,
"host_name": event.host_name,
"container_id": event.container_id,
"container_name": event.container_name,
"title": event.title,
"message": event.message,
"old_state": event.old_state,
"new_state": event.new_state,
"triggered_by": event.triggered_by,
"details": event.details,
"duration_ms": event.duration_ms,
"timestamp": event.timestamp.isoformat()
} for event in events],
"total_count": total_count
}
except Exception as e:
logger.error(f"Failed to get events for host {host_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/events/cleanup")
async def cleanup_old_events(days: int = 30, authenticated: bool = Depends(verify_session_auth)):
"""Clean up old events - DANGEROUS: Can delete audit logs"""
try:
if days < 1:
raise HTTPException(status_code=400, detail="Days must be at least 1")
deleted_count = monitor.db.cleanup_old_events(days)
monitor.event_logger.log_system_event(
"Event Cleanup Completed",
f"Cleaned up {deleted_count} events older than {days} days",
EventSeverity.INFO,
EventType.STARTUP
)
return {
"status": "success",
"message": f"Cleaned up {deleted_count} events older than {days} days",
"deleted_count": deleted_count
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to cleanup events: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time updates"""
# Generate a unique connection ID for rate limiting
connection_id = f"ws_{id(websocket)}_{time.time()}"
await monitor.manager.connect(websocket)
await monitor.realtime.subscribe_to_events(websocket)
# Send initial state
settings_dict = {
"max_retries": monitor.settings.max_retries,
"retry_delay": monitor.settings.retry_delay,
"default_auto_restart": monitor.settings.default_auto_restart,
"polling_interval": monitor.settings.polling_interval,
"connection_timeout": monitor.settings.connection_timeout,
"log_retention_days": monitor.settings.log_retention_days,
"enable_notifications": monitor.settings.enable_notifications,
"alert_template": getattr(monitor.settings, 'alert_template', None),
"blackout_windows": getattr(monitor.settings, 'blackout_windows', None),
"timezone_offset": getattr(monitor.settings, 'timezone_offset', 0),
"show_host_stats": getattr(monitor.settings, 'show_host_stats', True),
"show_container_stats": getattr(monitor.settings, 'show_container_stats', True)
}
initial_state = {
"type": "initial_state",
"data": {
"hosts": [h.dict() for h in monitor.hosts.values()],
"containers": [c.dict() for c in await monitor.get_containers()],
"settings": settings_dict
}
}
await websocket.send_text(json.dumps(initial_state, cls=DateTimeEncoder))
try:
while True:
# Keep connection alive and handle incoming messages
message = await websocket.receive_json()
# Check rate limit for incoming messages
allowed, reason = ws_rate_limiter.check_rate_limit(connection_id)
if not allowed:
# Send rate limit error to client
await websocket.send_text(json.dumps({
"type": "error",
"error": "rate_limit",
"message": reason
}))
# Don't process the message
continue
# Handle different message types
if message.get("type") == "subscribe_stats":
container_id = message.get("container_id")
if container_id:
await monitor.realtime.subscribe_to_stats(websocket, container_id)
# Find the host and start monitoring
for host_id, client in monitor.clients.items():
try:
client.containers.get(container_id)
await monitor.realtime.start_container_stats_stream(
client, container_id, interval=2
)
break
except Exception as e:
logger.debug(f"Container {container_id} not found on host {host_id[:8]}: {e}")
continue
elif message.get("type") == "unsubscribe_stats":
container_id = message.get("container_id")
if container_id:
await monitor.realtime.unsubscribe_from_stats(websocket, container_id)
elif message.get("type") == "modal_opened":
# Track that a container modal is open - keep stats running for this container
container_id = message.get("container_id")
host_id = message.get("host_id")
if container_id and host_id:
# Verify container exists and user has access to it
try:
containers = await monitor.get_containers() # Must await async function
container_exists = any(
c.id == container_id and c.host_id == host_id
for c in containers
)
if container_exists:
monitor.stats_manager.add_modal_container(container_id, host_id)
else:
logger.warning(f"User attempted to access stats for non-existent container: {container_id[:12]} on host {host_id[:8]}")
except Exception as e:
logger.error(f"Error validating container access: {e}")
elif message.get("type") == "modal_closed":
# Remove container from modal tracking
container_id = message.get("container_id")
host_id = message.get("host_id")
if container_id and host_id:
monitor.stats_manager.remove_modal_container(container_id, host_id)
elif message.get("type") == "ping":
await websocket.send_text(json.dumps({"type": "pong"}, cls=DateTimeEncoder))
except WebSocketDisconnect:
await monitor.manager.disconnect(websocket)
await monitor.realtime.unsubscribe_from_events(websocket)
# Unsubscribe from all stats
for container_id in list(monitor.realtime.stats_subscribers):
await monitor.realtime.unsubscribe_from_stats(websocket, container_id)
# Clear modal containers (user disconnected, modals are closed)
monitor.stats_manager.clear_modal_containers()
# Clean up rate limiter tracking
ws_rate_limiter.cleanup_connection(connection_id)