1141 lines
49 KiB
Python
1141 lines
49 KiB
Python
"""
|
|
Notification service for DockMon
|
|
Handles sending alerts via Discord, Telegram, and Pushover
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
import requests
|
|
import httpx
|
|
from database import DatabaseManager, NotificationChannel, AlertRuleDB
|
|
from event_logger import EventSeverity, EventCategory, EventType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class AlertEvent:
|
|
"""Container alert event"""
|
|
container_id: str
|
|
container_name: str
|
|
host_id: str
|
|
host_name: str
|
|
old_state: str
|
|
new_state: str
|
|
timestamp: datetime
|
|
image: str
|
|
triggered_by: str = "monitor"
|
|
|
|
@dataclass
|
|
class DockerEventAlert:
|
|
"""Docker event that might trigger an alert"""
|
|
container_id: str
|
|
container_name: str
|
|
host_id: str
|
|
event_type: str # e.g., "die", "oom", "kill", "health_status:unhealthy"
|
|
timestamp: datetime
|
|
attributes: Dict[str, Any] = None # Additional event attributes
|
|
exit_code: Optional[int] = None
|
|
|
|
class NotificationService:
|
|
"""Handles all notification channels and alert processing"""
|
|
|
|
def __init__(self, db: DatabaseManager, event_logger=None):
|
|
self.db = db
|
|
self.event_logger = event_logger
|
|
self.http_client = httpx.AsyncClient(timeout=30.0)
|
|
self._last_alerts: Dict[str, datetime] = {} # For cooldown tracking
|
|
self._last_container_state: Dict[str, str] = {} # Track last known state per container
|
|
self._suppressed_alerts: List[AlertEvent] = [] # Track alerts suppressed during blackout windows
|
|
self.MAX_SUPPRESSED_ALERTS = 1000 # Prevent unbounded memory growth
|
|
self.MAX_COOLDOWN_ENTRIES = 10000 # Prevent unbounded cooldown dictionary
|
|
self.COOLDOWN_MAX_AGE_DAYS = 7 # Remove cooldown entries older than 7 days
|
|
|
|
# Initialize blackout manager
|
|
from blackout_manager import BlackoutManager
|
|
self.blackout_manager = BlackoutManager(db)
|
|
|
|
def _cleanup_old_cooldowns(self) -> None:
|
|
"""Clean up old cooldown entries to prevent memory leak"""
|
|
now = datetime.now()
|
|
max_age = timedelta(days=self.COOLDOWN_MAX_AGE_DAYS)
|
|
|
|
# Remove entries older than max age
|
|
keys_to_remove = [
|
|
key for key, timestamp in self._last_alerts.items()
|
|
if now - timestamp > max_age
|
|
]
|
|
|
|
for key in keys_to_remove:
|
|
del self._last_alerts[key]
|
|
|
|
if keys_to_remove:
|
|
logger.info(f"Cleaned up {len(keys_to_remove)} old cooldown entries")
|
|
|
|
# If still over limit, remove oldest entries
|
|
if len(self._last_alerts) > self.MAX_COOLDOWN_ENTRIES:
|
|
# Sort by timestamp and keep only the newest MAX_COOLDOWN_ENTRIES
|
|
sorted_alerts = sorted(self._last_alerts.items(), key=lambda x: x[1], reverse=True)
|
|
self._last_alerts = dict(sorted_alerts[:self.MAX_COOLDOWN_ENTRIES])
|
|
logger.warning(f"Cooldown dictionary exceeded limit, truncated to {self.MAX_COOLDOWN_ENTRIES} entries")
|
|
|
|
def _get_host_name(self, event) -> str:
|
|
"""Get host name from event, handling both AlertEvent and DockerEventAlert types"""
|
|
if hasattr(event, 'host_name'):
|
|
return event.host_name
|
|
elif hasattr(event, 'host_id'):
|
|
# Look up host name from host_id in database
|
|
try:
|
|
host = self.db.get_host(event.host_id)
|
|
return host.name if host else 'Unknown Host'
|
|
except Exception:
|
|
return 'Unknown Host'
|
|
else:
|
|
return 'Unknown Host'
|
|
|
|
async def process_docker_event(self, event: DockerEventAlert) -> bool:
|
|
"""Process a Docker event and send alerts if rules match"""
|
|
try:
|
|
# Get matching alert rules for this event
|
|
rules = self.db.get_alert_rules(enabled_only=True)
|
|
matching_rules = []
|
|
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Processing Docker event: {event.event_type} for container '{event.container_name}' on host '{host_name}'")
|
|
|
|
for rule in rules:
|
|
# Check if rule has event triggers
|
|
if not rule.trigger_events:
|
|
continue
|
|
|
|
# Check if this event type matches any triggers
|
|
event_matches = False
|
|
for trigger in rule.trigger_events:
|
|
# Handle special event mappings
|
|
if trigger == "die-nonzero" and event.event_type == "die" and event.exit_code and event.exit_code != 0:
|
|
event_matches = True
|
|
break
|
|
elif trigger == "die-zero" and event.event_type == "die" and event.exit_code == 0:
|
|
event_matches = True
|
|
break
|
|
elif trigger == event.event_type:
|
|
event_matches = True
|
|
break
|
|
# Handle health status events
|
|
elif trigger.startswith("health_status:") and event.event_type == "health_status":
|
|
health_status = event.attributes.get("health_status") if event.attributes else None
|
|
if health_status and trigger == f"health_status:{health_status}":
|
|
event_matches = True
|
|
break
|
|
|
|
if not event_matches:
|
|
continue
|
|
|
|
# Check if this container+host matches the rule
|
|
# First check if rule has specific container+host pairs
|
|
if hasattr(rule, 'containers') and rule.containers:
|
|
# Use specific container+host pairs
|
|
matches = False
|
|
for container in rule.containers:
|
|
if (container.host_id == event.host_id and
|
|
container.container_name == event.container_name):
|
|
matches = True
|
|
break
|
|
|
|
if not matches:
|
|
continue
|
|
else:
|
|
continue
|
|
|
|
logger.info(f"Docker event matches rule {rule.id}: {rule.name}")
|
|
matching_rules.append(rule)
|
|
|
|
if not matching_rules:
|
|
logger.debug(f"No rules match Docker event {event.event_type} for container '{event.container_name}' on host '{host_name}'")
|
|
return False
|
|
|
|
# Send notifications for matching rules
|
|
success_count = 0
|
|
for rule in matching_rules:
|
|
logger.info(f"Processing Docker event rule '{rule.name}' for container '{event.container_name}' on host '{host_name}'")
|
|
|
|
# Check cooldown
|
|
container_key = f"{event.host_id}:{event.container_id}"
|
|
cooldown_key = f"event:{rule.id}:{container_key}:{event.event_type}"
|
|
|
|
# Periodic cleanup of old cooldown entries (every ~100 alerts)
|
|
if len(self._last_alerts) % 100 == 0:
|
|
self._cleanup_old_cooldowns()
|
|
|
|
# Check cooldown
|
|
if cooldown_key in self._last_alerts:
|
|
time_since = datetime.now() - self._last_alerts[cooldown_key]
|
|
if time_since.total_seconds() < rule.cooldown_minutes * 60:
|
|
logger.info(f"Skipping Docker event alert for container '{event.container_name}' on host '{host_name}' (rule: {rule.name}) due to cooldown")
|
|
continue
|
|
|
|
# Check if we're in a blackout window
|
|
is_blackout, window_name = self.blackout_manager.is_in_blackout_window()
|
|
logger.info(f"Blackout check for Docker event (rule: {rule.name}): is_blackout={is_blackout}, window_name={window_name}")
|
|
if is_blackout:
|
|
logger.info(f"Docker event alert suppressed during blackout window '{window_name}' for container '{event.container_name}' on host '{host_name}' (rule: {rule.name})")
|
|
continue
|
|
|
|
# Send notification
|
|
logger.info(f"Sending Docker event notification for rule '{rule.name}'")
|
|
if await self._send_event_notification(rule, event):
|
|
success_count += 1
|
|
self._last_alerts[cooldown_key] = datetime.now()
|
|
|
|
# Update rule's last triggered time
|
|
self.db.update_alert_rule(rule.id, {
|
|
'last_triggered': datetime.now()
|
|
})
|
|
|
|
return success_count > 0
|
|
|
|
except Exception as e:
|
|
host_name = self._get_host_name(event)
|
|
logger.error(f"Error processing Docker event for container '{event.container_name}' on host '{host_name}': {e}")
|
|
return False
|
|
|
|
async def _send_event_notification(self, rule: AlertRuleDB, event: DockerEventAlert) -> bool:
|
|
"""Send notifications for a Docker event"""
|
|
try:
|
|
# Get host name
|
|
host_name = event.host_id
|
|
try:
|
|
with self.db.get_session() as session:
|
|
from database import DockerHostDB
|
|
host = session.query(DockerHostDB).filter_by(id=event.host_id).first()
|
|
if host:
|
|
host_name = host.name
|
|
except Exception as e:
|
|
logger.warning(f"Could not get host name: {e}")
|
|
|
|
# Get container image
|
|
image = event.attributes.get('image', 'Unknown') if event.attributes else 'Unknown'
|
|
|
|
# Format event type description
|
|
if event.event_type == "die":
|
|
if event.exit_code == 0:
|
|
event_desc = "Container stopped normally (exit code 0)"
|
|
emoji = "🟢"
|
|
else:
|
|
event_desc = f"Container died with exit code {event.exit_code}"
|
|
emoji = "🔴"
|
|
elif event.event_type == "oom":
|
|
event_desc = "Container killed due to Out Of Memory (OOM)"
|
|
emoji = "💀"
|
|
elif event.event_type == "kill":
|
|
event_desc = "Container was killed"
|
|
emoji = "⚠️"
|
|
elif event.event_type.startswith("health_status"):
|
|
status = event.attributes.get("health_status", "unknown") if event.attributes else "unknown"
|
|
if status == "unhealthy":
|
|
event_desc = "Container is UNHEALTHY"
|
|
emoji = "🏥"
|
|
elif status == "healthy":
|
|
event_desc = "Container is healthy again"
|
|
emoji = "✅"
|
|
else:
|
|
event_desc = f"Health status: {status}"
|
|
emoji = "🏥"
|
|
elif event.event_type == "restart-loop":
|
|
event_desc = "Container is in a restart loop"
|
|
emoji = "🔄"
|
|
else:
|
|
event_desc = f"Docker event: {event.event_type}"
|
|
emoji = "📢"
|
|
|
|
# Format structured message like state alerts
|
|
message = f"""{emoji} **DockMon Alert**
|
|
|
|
**Container:** `{event.container_name}`
|
|
**Host:** {host_name}
|
|
**Event:** {event_desc}
|
|
**Image:** {image}
|
|
**Time:** {event.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Rule:** {rule.name}"""
|
|
|
|
# Get notification channels
|
|
channels = self.db.get_notification_channels_by_ids(rule.notification_channels)
|
|
|
|
success_count = 0
|
|
total_channels = len(channels)
|
|
|
|
for channel in channels:
|
|
if channel.enabled:
|
|
try:
|
|
if channel.type == 'telegram':
|
|
await self._send_telegram(channel.config, message)
|
|
success_count += 1
|
|
elif channel.type == 'discord':
|
|
await self._send_discord(channel.config, message)
|
|
success_count += 1
|
|
elif channel.type == 'pushover':
|
|
await self._send_pushover(channel.config, message, event)
|
|
success_count += 1
|
|
elif channel.type == 'slack':
|
|
await self._send_slack(channel.config, message, event)
|
|
success_count += 1
|
|
elif channel.type == 'gotify':
|
|
await self._send_gotify(channel.config, message, event)
|
|
success_count += 1
|
|
elif channel.type == 'smtp':
|
|
await self._send_smtp(channel.config, message, event)
|
|
success_count += 1
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Event notification sent via {channel.type} for container '{event.container_name}' on host '{host_name}'")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send {channel.type} notification: {e}")
|
|
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Event alert sent to {success_count}/{total_channels} channels for container '{event.container_name}' on host '{host_name}'")
|
|
return success_count > 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending event notification: {e}")
|
|
return False
|
|
|
|
async def send_alert(self, event: AlertEvent) -> bool:
|
|
"""Process an alert event and send notifications"""
|
|
try:
|
|
# Get matching alert rules
|
|
alert_rules = await self._get_matching_rules(event)
|
|
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Found {len(alert_rules) if alert_rules else 0} matching alert rules for container '{event.container_name}' on host '{host_name}'")
|
|
|
|
if not alert_rules:
|
|
logger.warning(f"No alert rules match container '{event.container_name}' on host '{host_name}' (state: {event.old_state} → {event.new_state})")
|
|
return False
|
|
|
|
success_count = 0
|
|
total_rules = len(alert_rules)
|
|
|
|
# Check if we're in a blackout window
|
|
is_blackout, window_name = self.blackout_manager.is_in_blackout_window()
|
|
if is_blackout:
|
|
logger.info(f"Suppressed {len(alert_rules)} alerts during blackout window '{window_name}' for container '{event.container_name}' on host '{host_name}'")
|
|
# Track this alert for later (with cap to prevent memory leak)
|
|
if len(self._suppressed_alerts) < self.MAX_SUPPRESSED_ALERTS:
|
|
self._suppressed_alerts.append(event)
|
|
else:
|
|
logger.warning(f"Suppressed alerts queue full ({self.MAX_SUPPRESSED_ALERTS}), dropping oldest")
|
|
self._suppressed_alerts.pop(0)
|
|
self._suppressed_alerts.append(event)
|
|
|
|
# Log suppression event
|
|
if self.event_logger:
|
|
rule_names = ", ".join([rule.name for rule in alert_rules[:3]]) # First 3 rules
|
|
if len(alert_rules) > 3:
|
|
rule_names += f" (+{len(alert_rules) - 3} more)"
|
|
|
|
from event_logger import EventContext
|
|
context = EventContext(
|
|
host_id=event.host_id,
|
|
host_name=host_name,
|
|
container_id=event.container_id,
|
|
container_name=event.container_name
|
|
)
|
|
|
|
self.event_logger.log_event(
|
|
title=f"Alert Suppressed: {event.container_name}",
|
|
message=f"Alert suppressed during blackout window '{window_name}'. State change: {event.old_state} → {event.new_state}. Matching rules: {rule_names}",
|
|
category=EventCategory.ALERT,
|
|
event_type=EventType.RULE_TRIGGERED,
|
|
severity=EventSeverity.WARNING,
|
|
context=context,
|
|
old_state=event.old_state,
|
|
new_state=event.new_state,
|
|
details={"blackout_window": window_name, "rules_count": len(alert_rules), "suppressed": True}
|
|
)
|
|
|
|
return False
|
|
|
|
# Process each matching rule
|
|
triggered_rules = []
|
|
for rule in alert_rules:
|
|
if await self._should_send_alert(rule, event):
|
|
if await self._send_rule_notifications(rule, event):
|
|
success_count += 1
|
|
triggered_rules.append(rule.name)
|
|
# Update last triggered time for this container + rule combination
|
|
container_key = f"{event.host_id}:{event.container_id}"
|
|
cooldown_key = f"{rule.id}:{container_key}"
|
|
self._last_alerts[cooldown_key] = datetime.now()
|
|
|
|
# Also update the rule's global last_triggered for backward compatibility
|
|
self.db.update_alert_rule(rule.id, {
|
|
'last_triggered': datetime.now()
|
|
})
|
|
|
|
# Log the event to new event system
|
|
if success_count > 0 and self.event_logger:
|
|
rule_names = ", ".join(triggered_rules[:3]) # First 3 rules
|
|
if len(triggered_rules) > 3:
|
|
rule_names += f" (+{len(triggered_rules) - 3} more)"
|
|
|
|
# Determine severity based on state
|
|
severity = EventSeverity.CRITICAL if event.new_state in ['exited', 'dead'] else EventSeverity.ERROR
|
|
|
|
from event_logger import EventContext
|
|
context = EventContext(
|
|
host_id=event.host_id,
|
|
host_name=host_name,
|
|
container_id=event.container_id,
|
|
container_name=event.container_name
|
|
)
|
|
|
|
self.event_logger.log_event(
|
|
category=EventCategory.ALERT,
|
|
event_type=EventType.RULE_TRIGGERED,
|
|
title=f"Alert Triggered: {event.container_name}",
|
|
message=f"{success_count} alert rule(s) triggered for container state change: {event.old_state} → {event.new_state}. Rules: {rule_names}",
|
|
severity=severity,
|
|
context=context,
|
|
old_state=event.old_state,
|
|
new_state=event.new_state,
|
|
details={"rules_triggered": triggered_rules, "rules_count": success_count, "total_rules": total_rules}
|
|
)
|
|
|
|
return success_count > 0
|
|
|
|
except Exception as e:
|
|
host_name = self._get_host_name(event)
|
|
logger.error(f"Error processing alert for container '{event.container_name}' on host '{host_name}': {e}")
|
|
return False
|
|
|
|
async def _get_matching_rules(self, event: AlertEvent) -> List[AlertRuleDB]:
|
|
"""Get alert rules that match the container and state change"""
|
|
rules = self.db.get_alert_rules(enabled_only=True)
|
|
matching_rules = []
|
|
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Checking {len(rules)} alert rules for container '{event.container_name}' on host '{host_name}' (state: {event.old_state} → {event.new_state})")
|
|
|
|
for rule in rules:
|
|
container_info = f"{len(rule.containers)} container+host pairs" if hasattr(rule, 'containers') and rule.containers else "no containers"
|
|
logger.debug(f"Rule {rule.id}: name='{rule.name}', containers={container_info}, states={rule.trigger_states}, events={rule.trigger_events}")
|
|
|
|
# Check if this container+host matches the rule
|
|
# First check if rule has specific container+host pairs
|
|
if hasattr(rule, 'containers') and rule.containers:
|
|
# Use specific container+host pairs
|
|
matches = False
|
|
for container in rule.containers:
|
|
if (container.host_id == event.host_id and
|
|
container.container_name == event.container_name):
|
|
matches = True
|
|
break
|
|
|
|
if not matches:
|
|
logger.debug(f"Rule {rule.id} skipped: no matching container+host pair")
|
|
continue
|
|
else:
|
|
# No containers specified = monitor all containers
|
|
logger.debug(f"Rule {rule.id} matches: monitoring all containers")
|
|
|
|
# Check if new state triggers alert (only if trigger_states is defined)
|
|
if rule.trigger_states and event.new_state not in rule.trigger_states:
|
|
logger.debug(f"Rule {rule.id} skipped: state '{event.new_state}' not in triggers {rule.trigger_states}")
|
|
continue
|
|
elif not rule.trigger_states:
|
|
# This rule only has event triggers, not state triggers, skip for state changes
|
|
logger.debug(f"Rule {rule.id} skipped: no state triggers defined (events only)")
|
|
continue
|
|
|
|
logger.debug(f"Rule {rule.id} MATCHES!")
|
|
matching_rules.append(rule)
|
|
|
|
return matching_rules
|
|
|
|
async def _should_send_alert(self, rule: AlertRuleDB, event: AlertEvent) -> bool:
|
|
"""Check if alert should be sent based on cooldown per container"""
|
|
container_key = f"{event.host_id}:{event.container_id}"
|
|
cooldown_key = f"{rule.id}:{container_key}"
|
|
|
|
# Check if container recovered (went to a non-alert state) since last alert
|
|
# Use old_state from the event, not our tracked state!
|
|
host_name = self._get_host_name(event)
|
|
logger.debug(f"Alert check for container '{event.container_name}' on host '{host_name}': {event.old_state} → {event.new_state}")
|
|
|
|
# If container was in a "good" state (running) and now in "bad" state (exited),
|
|
# this is a new incident - reset cooldown
|
|
good_states = ['running', 'created']
|
|
if rule.trigger_states and event.old_state in good_states and event.new_state in rule.trigger_states:
|
|
logger.info(f"Alert allowed for container '{event.container_name}' on host '{host_name}': Container recovered ({event.old_state}) then failed ({event.new_state}) - new incident detected")
|
|
# Remove the cooldown for this container
|
|
if cooldown_key in self._last_alerts:
|
|
del self._last_alerts[cooldown_key]
|
|
return True
|
|
|
|
if cooldown_key not in self._last_alerts:
|
|
logger.debug(f"Alert allowed: No previous alert for this container")
|
|
return True
|
|
|
|
# Check cooldown period
|
|
time_since_last = datetime.now() - self._last_alerts[cooldown_key]
|
|
cooldown_minutes = rule.cooldown_minutes or 15
|
|
cooldown_seconds = cooldown_minutes * 60
|
|
|
|
if time_since_last.total_seconds() >= cooldown_seconds:
|
|
logger.debug(f"Alert allowed: Cooldown period exceeded ({time_since_last.total_seconds():.1f}s > {cooldown_seconds}s)")
|
|
return True
|
|
else:
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Alert blocked for container '{event.container_name}' on host '{host_name}': Still in cooldown ({cooldown_seconds - time_since_last.total_seconds():.1f}s remaining)")
|
|
return False
|
|
|
|
async def _send_rule_notifications(self, rule: AlertRuleDB, event: AlertEvent) -> bool:
|
|
"""Send notifications for a specific rule"""
|
|
if not rule.notification_channels:
|
|
return False
|
|
|
|
channels = self.db.get_notification_channels(enabled_only=True)
|
|
channel_map = {str(ch.id): ch for ch in channels}
|
|
|
|
success_count = 0
|
|
total_channels = len(rule.notification_channels)
|
|
|
|
# Send to each configured channel
|
|
for channel_id in rule.notification_channels:
|
|
if str(channel_id) in channel_map:
|
|
channel = channel_map[str(channel_id)]
|
|
try:
|
|
if await self._send_to_channel(channel, event, rule):
|
|
success_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to send to channel {channel.name}: {e}")
|
|
|
|
host_name = self._get_host_name(event)
|
|
logger.info(f"Alert sent to {success_count}/{total_channels} channels for container '{event.container_name}' on host '{host_name}'")
|
|
return success_count > 0
|
|
|
|
async def _send_to_channel(self, channel: NotificationChannel,
|
|
event: AlertEvent, rule: AlertRuleDB) -> bool:
|
|
"""Send notification to a specific channel"""
|
|
# Get global template from settings
|
|
settings = self.db.get_settings()
|
|
template = getattr(settings, 'alert_template', None) if settings else None
|
|
|
|
# Use global template or default
|
|
if not template:
|
|
template = self._get_default_template(channel.type)
|
|
|
|
message = self._format_message(event, rule, template)
|
|
|
|
if channel.type == "telegram":
|
|
return await self._send_telegram(channel.config, message)
|
|
elif channel.type == "discord":
|
|
return await self._send_discord(channel.config, message)
|
|
elif channel.type == "pushover":
|
|
return await self._send_pushover(channel.config, message, event)
|
|
elif channel.type == "slack":
|
|
return await self._send_slack(channel.config, message, event)
|
|
elif channel.type == "gotify":
|
|
return await self._send_gotify(channel.config, message, event)
|
|
elif channel.type == "smtp":
|
|
return await self._send_smtp(channel.config, message, event)
|
|
else:
|
|
logger.warning(f"Unknown notification channel type: {channel.type}")
|
|
return False
|
|
|
|
def _get_default_template(self, channel_type: str = None) -> str:
|
|
"""Get default template for channel type"""
|
|
# Default template with variables - ends with separator for visual distinction
|
|
default = """🚨 **DockMon Alert**
|
|
|
|
**Container:** `{CONTAINER_NAME}`
|
|
**Host:** {HOST_NAME}
|
|
**State Change:** `{OLD_STATE}` → `{NEW_STATE}`
|
|
**Image:** {IMAGE}
|
|
**Time:** {TIMESTAMP}
|
|
**Rule:** {RULE_NAME}
|
|
───────────────────────"""
|
|
|
|
# Channel-specific defaults (can be customized per platform)
|
|
templates = {
|
|
'slack': default,
|
|
'discord': default,
|
|
'telegram': default,
|
|
'pushover': """DockMon Alert
|
|
Container: {CONTAINER_NAME}
|
|
Host: {HOST_NAME}
|
|
State: {OLD_STATE} → {NEW_STATE}
|
|
Image: {IMAGE}
|
|
Time: {TIMESTAMP}
|
|
Rule: {RULE_NAME}
|
|
---"""
|
|
}
|
|
|
|
return templates.get(channel_type, default)
|
|
|
|
def _format_message(self, event: AlertEvent, rule: AlertRuleDB, template: str = None) -> str:
|
|
"""Format alert message using template with variable substitution"""
|
|
# Use provided template or default
|
|
if not template:
|
|
template = self._get_default_template()
|
|
|
|
# Get timezone offset from settings
|
|
settings = self.db.get_settings()
|
|
timezone_offset = getattr(settings, 'timezone_offset', 0) if settings else 0
|
|
|
|
# Convert timestamp to local timezone
|
|
local_timestamp = event.timestamp + timedelta(minutes=timezone_offset)
|
|
|
|
# Prepare variables for substitution
|
|
host_name = self._get_host_name(event)
|
|
variables = {
|
|
'{CONTAINER_NAME}': event.container_name,
|
|
'{CONTAINER_ID}': event.container_id[:12], # Short ID
|
|
'{HOST_NAME}': host_name,
|
|
'{HOST_ID}': event.host_id,
|
|
'{OLD_STATE}': event.old_state,
|
|
'{NEW_STATE}': event.new_state,
|
|
'{IMAGE}': event.image,
|
|
'{TIMESTAMP}': local_timestamp.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'{TIME}': local_timestamp.strftime('%H:%M:%S'),
|
|
'{DATE}': local_timestamp.strftime('%Y-%m-%d'),
|
|
'{RULE_NAME}': rule.name,
|
|
'{RULE_ID}': str(rule.id),
|
|
'{TRIGGERED_BY}': event.triggered_by,
|
|
}
|
|
|
|
# Handle optional Docker event attributes
|
|
if hasattr(event, 'event_type'):
|
|
variables['{EVENT_TYPE}'] = event.event_type
|
|
if hasattr(event, 'exit_code') and event.exit_code is not None:
|
|
variables['{EXIT_CODE}'] = str(event.exit_code)
|
|
|
|
# Replace all variables in template
|
|
message = template
|
|
for var, value in variables.items():
|
|
message = message.replace(var, value)
|
|
|
|
# Clean up any unused variables (remove them)
|
|
import re
|
|
message = re.sub(r'\{[A-Z_]+\}', '', message)
|
|
|
|
return message
|
|
|
|
async def _send_telegram(self, config: Dict[str, Any], message: str) -> bool:
|
|
"""Send notification via Telegram"""
|
|
try:
|
|
# Support both 'token' and 'bot_token' for backward compatibility
|
|
token = config.get('token') or config.get('bot_token')
|
|
chat_id = config.get('chat_id')
|
|
|
|
if not token or not chat_id:
|
|
logger.error(f"Telegram config missing token or chat_id")
|
|
return False
|
|
|
|
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
|
payload = {
|
|
'chat_id': chat_id,
|
|
'text': message,
|
|
'parse_mode': 'Markdown'
|
|
}
|
|
|
|
response = await self.http_client.post(url, json=payload)
|
|
response.raise_for_status()
|
|
|
|
logger.info("Telegram notification sent successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Telegram notification: {e}")
|
|
return False
|
|
|
|
async def _send_discord(self, config: Dict[str, Any], message: str) -> bool:
|
|
"""Send notification via Discord webhook"""
|
|
try:
|
|
webhook_url = config.get('webhook_url')
|
|
|
|
if not webhook_url:
|
|
logger.error("Discord config missing webhook_url")
|
|
return False
|
|
|
|
# Convert markdown to Discord format
|
|
discord_message = message.replace('`', '`').replace('**', '**')
|
|
|
|
payload = {
|
|
'content': discord_message,
|
|
'username': 'DockMon',
|
|
'avatar_url': 'https://cdn-icons-png.flaticon.com/512/919/919853.png'
|
|
}
|
|
|
|
response = await self.http_client.post(webhook_url, json=payload)
|
|
response.raise_for_status()
|
|
|
|
logger.info("Discord notification sent successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Discord notification: {e}")
|
|
return False
|
|
|
|
async def _send_pushover(self, config: Dict[str, Any], message: str,
|
|
event) -> bool:
|
|
"""Send notification via Pushover"""
|
|
try:
|
|
app_token = config.get('app_token')
|
|
user_key = config.get('user_key')
|
|
|
|
if not app_token or not user_key:
|
|
logger.error("Pushover config missing app_token or user_key")
|
|
return False
|
|
|
|
# Strip markdown for Pushover
|
|
plain_message = re.sub(r'\*\*(.*?)\*\*', r'\1', message) # Bold
|
|
plain_message = re.sub(r'`(.*?)`', r'\1', plain_message) # Code
|
|
plain_message = re.sub(r'🚨', '', plain_message) # Remove alert emoji
|
|
|
|
# Determine priority based on event type
|
|
priority = 0 # Normal
|
|
# Handle both AlertEvent and DockerEventAlert
|
|
if hasattr(event, 'new_state') and event.new_state in ['exited', 'dead']:
|
|
priority = 1 # High priority for state failures
|
|
elif hasattr(event, 'event_type') and event.event_type in ['die', 'oom', 'kill']:
|
|
priority = 1 # High priority for critical Docker events
|
|
|
|
payload = {
|
|
'token': app_token,
|
|
'user': user_key,
|
|
'message': plain_message,
|
|
'title': f"DockMon: {event.container_name}",
|
|
'priority': priority,
|
|
'url': config.get('url', ''),
|
|
'url_title': 'Open DockMon'
|
|
}
|
|
|
|
response = await self.http_client.post(
|
|
'https://api.pushover.net/1/messages.json',
|
|
data=payload
|
|
)
|
|
response.raise_for_status()
|
|
|
|
logger.info("Pushover notification sent successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Pushover notification: {e}")
|
|
return False
|
|
|
|
async def _send_slack(self, config: Dict[str, Any], message: str, event: AlertEvent) -> bool:
|
|
"""Send notification via Slack webhook"""
|
|
try:
|
|
webhook_url = config.get('webhook_url')
|
|
|
|
if not webhook_url:
|
|
logger.error("Slack config missing webhook_url")
|
|
return False
|
|
|
|
# Convert markdown to Slack format
|
|
# Slack uses mrkdwn format which is similar to markdown but with some differences
|
|
slack_message = message.replace('**', '*') # Bold in Slack is single asterisk
|
|
slack_message = slack_message.replace('`', '`') # Code blocks remain the same
|
|
|
|
# Determine color based on event type
|
|
color = "#ff0000" # Default red for critical
|
|
if hasattr(event, 'new_state'):
|
|
if event.new_state == 'running':
|
|
color = "#00ff00" # Green for running
|
|
elif event.new_state in ['stopped', 'paused']:
|
|
color = "#ffaa00" # Orange for stopped/paused
|
|
elif hasattr(event, 'event_type'):
|
|
if event.event_type in ['start', 'unpause']:
|
|
color = "#00ff00" # Green for recovery events
|
|
elif event.event_type in ['stop', 'pause']:
|
|
color = "#ffaa00" # Orange for controlled stops
|
|
|
|
# Create rich Slack message with attachments
|
|
payload = {
|
|
'attachments': [{
|
|
'color': color,
|
|
'fallback': slack_message,
|
|
'title': '🚨 DockMon Alert',
|
|
'text': slack_message,
|
|
'mrkdwn_in': ['text'],
|
|
'footer': 'DockMon',
|
|
'footer_icon': 'https://raw.githubusercontent.com/docker/compose/v2/logo.png',
|
|
'ts': int(event.timestamp.timestamp())
|
|
}]
|
|
}
|
|
|
|
response = await self.http_client.post(webhook_url, json=payload)
|
|
response.raise_for_status()
|
|
|
|
logger.info("Slack notification sent successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Slack notification: {e}")
|
|
return False
|
|
|
|
async def _send_gotify(self, config: Dict[str, Any], message: str, event) -> bool:
|
|
"""Send notification via Gotify"""
|
|
try:
|
|
# Validate required config fields
|
|
server_url = config.get('server_url', '').strip()
|
|
app_token = config.get('app_token', '').strip()
|
|
|
|
if not server_url:
|
|
logger.error("Gotify config missing server_url")
|
|
return False
|
|
|
|
if not app_token:
|
|
logger.error("Gotify config missing app_token")
|
|
return False
|
|
|
|
# Validate server URL format
|
|
if not server_url.startswith(('http://', 'https://')):
|
|
logger.error(f"Gotify server_url must start with http:// or https://: {server_url}")
|
|
return False
|
|
|
|
# Strip markdown formatting for plain text
|
|
plain_message = re.sub(r'\*\*(.*?)\*\*', r'\1', message)
|
|
plain_message = re.sub(r'`(.*?)`', r'\1', plain_message)
|
|
plain_message = re.sub(r'[🚨🔴🟢💀⚠️🏥✅🔄📢]', '', plain_message) # Remove emojis
|
|
|
|
# Determine priority (0-10, default 5)
|
|
priority = 5
|
|
if hasattr(event, 'new_state') and event.new_state in ['exited', 'dead']:
|
|
priority = 8 # High priority for critical states
|
|
elif hasattr(event, 'event_type') and event.event_type in ['die', 'oom', 'kill']:
|
|
priority = 8 # High priority for critical events
|
|
|
|
# Build URL with proper path handling
|
|
base_url = server_url.rstrip('/')
|
|
url = f"{base_url}/message?token={app_token}"
|
|
|
|
# Create payload
|
|
payload = {
|
|
'title': f"DockMon: {event.container_name}",
|
|
'message': plain_message,
|
|
'priority': priority
|
|
}
|
|
|
|
# Send request with timeout
|
|
response = await self.http_client.post(url, json=payload)
|
|
response.raise_for_status()
|
|
|
|
logger.info("Gotify notification sent successfully")
|
|
return True
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Gotify HTTP error {e.response.status_code}: {e}")
|
|
return False
|
|
except httpx.RequestError as e:
|
|
logger.error(f"Gotify connection error: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to send Gotify notification: {e}")
|
|
return False
|
|
|
|
async def _send_smtp(self, config: Dict[str, Any], message: str, event) -> bool:
|
|
"""Send notification via SMTP (Email)"""
|
|
try:
|
|
# Import SMTP libraries (only when needed to avoid dependency issues)
|
|
try:
|
|
import aiosmtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
except ImportError:
|
|
logger.error("SMTP support requires 'aiosmtplib' package. Install with: pip install aiosmtplib")
|
|
return False
|
|
|
|
# Validate required config fields
|
|
smtp_host = config.get('smtp_host', '').strip()
|
|
smtp_port = config.get('smtp_port', 587)
|
|
smtp_user = config.get('smtp_user', '').strip()
|
|
smtp_password = config.get('smtp_password', '').strip()
|
|
from_email = config.get('from_email', '').strip()
|
|
to_email = config.get('to_email', '').strip()
|
|
use_tls = config.get('use_tls', True)
|
|
|
|
# Validate all required fields
|
|
if not smtp_host:
|
|
logger.error("SMTP config missing smtp_host")
|
|
return False
|
|
if not smtp_user:
|
|
logger.error("SMTP config missing smtp_user")
|
|
return False
|
|
if not smtp_password:
|
|
logger.error("SMTP config missing smtp_password")
|
|
return False
|
|
if not from_email:
|
|
logger.error("SMTP config missing from_email")
|
|
return False
|
|
if not to_email:
|
|
logger.error("SMTP config missing to_email")
|
|
return False
|
|
|
|
# Validate port range
|
|
try:
|
|
smtp_port = int(smtp_port)
|
|
if smtp_port < 1 or smtp_port > 65535:
|
|
logger.error(f"SMTP port must be between 1-65535: {smtp_port}")
|
|
return False
|
|
except (ValueError, TypeError):
|
|
logger.error(f"Invalid SMTP port: {smtp_port}")
|
|
return False
|
|
|
|
# Validate email format (basic check)
|
|
email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
|
|
if not email_pattern.match(from_email):
|
|
logger.error(f"Invalid from_email format: {from_email}")
|
|
return False
|
|
if not email_pattern.match(to_email):
|
|
logger.error(f"Invalid to_email format: {to_email}")
|
|
return False
|
|
|
|
# Create multipart email
|
|
msg = MIMEMultipart('alternative')
|
|
msg['Subject'] = f"DockMon Alert: {event.container_name}"
|
|
msg['From'] = from_email
|
|
msg['To'] = to_email
|
|
|
|
# Plain text version (strip markdown and emojis)
|
|
plain_text = re.sub(r'\*\*(.*?)\*\*', r'\1', message)
|
|
plain_text = re.sub(r'`(.*?)`', r'\1', plain_text)
|
|
plain_text = re.sub(r'[🚨🔴🟢💀⚠️🏥✅🔄📢]', '', plain_text)
|
|
|
|
# HTML version with basic styling (light theme for better email compatibility)
|
|
html_text = message.replace('\n', '<br>')
|
|
html_text = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', html_text)
|
|
html_text = re.sub(r'`(.*?)`', r'<code style="background:#f5f5f5;color:#333;padding:2px 6px;border-radius:3px;font-family:monospace;">\1</code>', html_text)
|
|
|
|
html_body = f"""<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
</head>
|
|
<body style="margin:0;padding:0;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,'Helvetica Neue',Arial,sans-serif;background:#f9f9f9;color:#333;">
|
|
<div style="max-width:600px;margin:20px auto;background:#ffffff;padding:24px;border-radius:8px;border:1px solid #e0e0e0;box-shadow:0 1px 3px rgba(0,0,0,0.1);">
|
|
<div style="line-height:1.6;font-size:14px;">
|
|
{html_text}
|
|
</div>
|
|
<div style="margin-top:20px;padding-top:20px;border-top:1px solid #e0e0e0;font-size:12px;color:#666;">
|
|
Sent by DockMon Container Monitoring
|
|
</div>
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
|
|
# Attach both versions
|
|
part1 = MIMEText(plain_text, 'plain', 'utf-8')
|
|
part2 = MIMEText(html_body, 'html', 'utf-8')
|
|
msg.attach(part1)
|
|
msg.attach(part2)
|
|
|
|
# Send email with proper connection handling
|
|
# Port 587 uses STARTTLS, port 465 uses direct TLS/SSL
|
|
if smtp_port == 587:
|
|
smtp_kwargs = {
|
|
'hostname': smtp_host,
|
|
'port': smtp_port,
|
|
'start_tls': use_tls, # Use STARTTLS for port 587
|
|
'timeout': 30
|
|
}
|
|
elif smtp_port == 465:
|
|
smtp_kwargs = {
|
|
'hostname': smtp_host,
|
|
'port': smtp_port,
|
|
'use_tls': use_tls, # Use direct TLS for port 465
|
|
'timeout': 30
|
|
}
|
|
else:
|
|
# Other ports (like 25) - no encryption by default unless use_tls is True
|
|
smtp_kwargs = {
|
|
'hostname': smtp_host,
|
|
'port': smtp_port,
|
|
'start_tls': use_tls if use_tls else False,
|
|
'timeout': 30
|
|
}
|
|
|
|
async with aiosmtplib.SMTP(**smtp_kwargs) as smtp:
|
|
await smtp.login(smtp_user, smtp_password)
|
|
await smtp.send_message(msg)
|
|
|
|
logger.info(f"SMTP notification sent successfully to {to_email}")
|
|
return True
|
|
|
|
except aiosmtplib.SMTPAuthenticationError as e:
|
|
logger.error(f"SMTP authentication failed: {e}")
|
|
return False
|
|
except aiosmtplib.SMTPException as e:
|
|
logger.error(f"SMTP error: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to send SMTP notification: {e}")
|
|
return False
|
|
|
|
async def test_channel(self, channel_id: int) -> Dict[str, Any]:
|
|
"""Test a notification channel"""
|
|
try:
|
|
channel = self.db.get_notification_channels(enabled_only=False)
|
|
channel = next((ch for ch in channel if ch.id == channel_id), None)
|
|
|
|
if not channel:
|
|
return {'success': False, 'error': 'Channel not found'}
|
|
|
|
# Create test event
|
|
test_event = AlertEvent(
|
|
container_id='test_container',
|
|
container_name='test-container',
|
|
host_id='test_host',
|
|
host_name='Test Host',
|
|
old_state='running',
|
|
new_state='stopped',
|
|
timestamp=datetime.now(),
|
|
image='nginx:latest',
|
|
triggered_by='test'
|
|
)
|
|
|
|
# Create test rule
|
|
test_rule = type('TestRule', (), {
|
|
'id': 'test_rule',
|
|
'name': 'Test Notification',
|
|
'notification_channels': [channel_id]
|
|
})()
|
|
|
|
success = await self._send_to_channel(channel, test_event, test_rule)
|
|
|
|
return {
|
|
'success': success,
|
|
'channel_name': channel.name,
|
|
'channel_type': channel.type
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error testing channel {channel_id}: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
async def process_suppressed_alerts(self, monitor):
|
|
"""Process alerts that were suppressed during blackout windows
|
|
|
|
Args:
|
|
monitor: The DockerMonitor instance (reused, not created)
|
|
"""
|
|
if not self._suppressed_alerts:
|
|
logger.info("No suppressed alerts to process")
|
|
return
|
|
|
|
logger.info(f"Processing {len(self._suppressed_alerts)} suppressed alerts from blackout windows")
|
|
|
|
alerts_to_send = []
|
|
|
|
# For each suppressed alert, check if the container is still in that problematic state
|
|
for alert in self._suppressed_alerts:
|
|
container_key = f"{alert.host_id}:{alert.container_id}"
|
|
|
|
# Get current state of this container
|
|
try:
|
|
client = monitor.clients.get(alert.host_id)
|
|
if not client:
|
|
logger.debug(f"No client found for host {alert.host_id}")
|
|
continue
|
|
|
|
try:
|
|
container = client.containers.get(alert.container_id)
|
|
current_state = container.status
|
|
|
|
# If container is still in the problematic state from the alert, send it
|
|
if current_state == alert.new_state:
|
|
logger.info(f"Container {alert.container_name} still in '{current_state}' state - sending suppressed alert")
|
|
alerts_to_send.append(alert)
|
|
else:
|
|
logger.info(f"Container {alert.container_name} recovered from '{alert.new_state}' to '{current_state}' during blackout window - skipping alert")
|
|
|
|
except Exception as e:
|
|
# Container might have been removed
|
|
logger.debug(f"Could not check container {alert.container_id}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking suppressed alert for {alert.container_name}: {e}")
|
|
|
|
# Clear the suppressed alerts list
|
|
self._suppressed_alerts.clear()
|
|
|
|
# Send alerts for containers still in problematic states
|
|
for alert in alerts_to_send:
|
|
try:
|
|
await self.send_alert(alert)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send suppressed alert for {alert.container_name}: {e}")
|
|
|
|
logger.info(f"Sent {len(alerts_to_send)} of {len(self._suppressed_alerts) + len(alerts_to_send)} suppressed alerts")
|
|
|
|
async def close(self):
|
|
"""Clean up resources"""
|
|
await self.http_client.aclose()
|
|
|
|
async def __aenter__(self):
|
|
"""Context manager entry"""
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit - ensure cleanup"""
|
|
await self.close()
|
|
return False
|
|
|
|
class AlertProcessor:
|
|
"""Processes container state changes and triggers alerts"""
|
|
|
|
def __init__(self, notification_service: NotificationService):
|
|
self.notification_service = notification_service
|
|
self._container_states: Dict[str, str] = {} # Track previous states
|
|
|
|
async def process_container_update(self, containers: List[Any], hosts: Dict[str, Any]):
|
|
"""Process container updates and trigger alerts for state changes"""
|
|
for container in containers:
|
|
container_key = f"{container.host_id}:{container.id}"
|
|
current_state = container.status
|
|
previous_state = self._container_states.get(container_key)
|
|
|
|
# Skip if no state change
|
|
if previous_state == current_state:
|
|
continue
|
|
|
|
# Update tracked state
|
|
self._container_states[container_key] = current_state
|
|
|
|
# IMPORTANT: Update the notification service's state tracking too
|
|
# This ensures recovery states are tracked even when they don't trigger alerts
|
|
self.notification_service._last_container_state[container_key] = current_state
|
|
logger.debug(f"State transition for {container.name}: {previous_state} → {current_state}")
|
|
|
|
# Skip if this is the first time we see this container
|
|
if previous_state is None:
|
|
continue
|
|
|
|
# Create alert event
|
|
host = hosts.get(container.host_id)
|
|
host_name = host.name if host else 'Unknown Host'
|
|
|
|
alert_event = AlertEvent(
|
|
container_id=container.id,
|
|
container_name=container.name,
|
|
host_id=container.host_id,
|
|
host_name=host_name,
|
|
old_state=previous_state,
|
|
new_state=current_state,
|
|
timestamp=datetime.now(),
|
|
image=container.image,
|
|
triggered_by='monitor'
|
|
)
|
|
|
|
# Send alert
|
|
logger.debug(f"Processing state change for {container.name}: {previous_state} → {current_state}")
|
|
await self.notification_service.send_alert(alert_event)
|
|
|
|
def get_container_states(self) -> Dict[str, str]:
|
|
"""Get current container states for debugging"""
|
|
return self._container_states.copy() |