Files

441 lines
12 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
)
// ContainerInfo holds basic container information
type ContainerInfo struct {
ID string
Name string
HostID string
}
// createTLSOption creates a Docker client TLS option from PEM-encoded certificates
func createTLSOption(caCertPEM, certPEM, keyPEM string) (client.Opt, error) {
// Parse CA certificate
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM([]byte(caCertPEM)) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
// Parse client certificate and key
clientCert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
if err != nil {
return nil, fmt.Errorf("failed to parse client certificate/key: %v", err)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
// Create HTTP client with TLS transport and timeouts
// Note: No overall Timeout set because Docker API streaming operations (stats, events)
// are long-running connections that should not be killed by a timeout
httpClient := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, // Connection establishment timeout
KeepAlive: 30 * time.Second, // TCP keepalive interval
}).DialContext,
TLSClientConfig: tlsConfig,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
},
}
return client.WithHTTPClient(httpClient), nil
}
// StreamManager manages persistent stats streams for all containers
type StreamManager struct {
cache *StatsCache
clients map[string]*client.Client // hostID -> Docker client
clientsMu sync.RWMutex
streams map[string]context.CancelFunc // composite key (hostID:containerID) -> cancel function
streamsMu sync.RWMutex
containers map[string]*ContainerInfo // composite key (hostID:containerID) -> info
containersMu sync.RWMutex
}
// NewStreamManager creates a new stream manager
func NewStreamManager(cache *StatsCache) *StreamManager {
return &StreamManager{
cache: cache,
clients: make(map[string]*client.Client),
streams: make(map[string]context.CancelFunc),
containers: make(map[string]*ContainerInfo),
}
}
// AddDockerHost adds a Docker host client
func (sm *StreamManager) AddDockerHost(hostID, hostAddress, tlsCACert, tlsCert, tlsKey string) error {
// Create Docker client for this host FIRST (before acquiring lock)
var cli *client.Client
var err error
if hostAddress == "" || hostAddress == "unix:///var/run/docker.sock" {
// Local Docker socket
cli, err = client.NewClientWithOpts(
client.FromEnv,
client.WithAPIVersionNegotiation(),
)
} else {
// Remote Docker host - check if TLS is needed
clientOpts := []client.Opt{
client.WithHost(hostAddress),
client.WithAPIVersionNegotiation(),
}
// If TLS certificates provided, configure TLS
if tlsCACert != "" && tlsCert != "" && tlsKey != "" {
tlsOpt, err := createTLSOption(tlsCACert, tlsCert, tlsKey)
if err != nil {
return fmt.Errorf("failed to create TLS config: %v", err)
}
clientOpts = append(clientOpts, tlsOpt)
}
cli, err = client.NewClientWithOpts(clientOpts...)
}
if err != nil {
return err
}
// Track whether client was successfully stored to prevent leak
clientStored := false
defer func() {
if !clientStored && cli != nil {
cli.Close()
log.Printf("Cleaned up unstored Docker client for host %s", truncateID(hostID, 8))
}
}()
// Now that new client is successfully created, acquire lock and swap
sm.clientsMu.Lock()
defer sm.clientsMu.Unlock()
// Close existing client if it exists (only after new one succeeds)
if existingClient, exists := sm.clients[hostID]; exists {
existingClient.Close()
log.Printf("Closed existing Docker client for host %s", truncateID(hostID, 8))
}
sm.clients[hostID] = cli
clientStored = true // Mark as successfully stored
log.Printf("Added Docker host: %s (%s)", truncateID(hostID, 8), hostAddress)
// Initialize host stats with zero values so the host appears immediately in the UI
sm.cache.UpdateHostStats(&HostStats{
HostID: hostID,
ContainerCount: 0,
})
return nil
}
// RemoveDockerHost removes a Docker host client and stops all its streams
func (sm *StreamManager) RemoveDockerHost(hostID string) {
// First, find all containers for this host
sm.containersMu.RLock()
containersToStop := make([]string, 0)
for compositeKey, info := range sm.containers {
if info.HostID == hostID {
containersToStop = append(containersToStop, compositeKey)
}
}
sm.containersMu.RUnlock()
// Stop all streams for containers on this host
// Do this BEFORE closing the client to avoid streams trying to use a closed client
for _, compositeKey := range containersToStop {
// Extract container ID from composite key (format: hostID:containerID)
parts := strings.SplitN(compositeKey, ":", 2)
if len(parts) == 2 {
sm.StopStream(parts[1], parts[0]) // containerID, hostID
}
}
// Now close and remove the Docker client
sm.clientsMu.Lock()
defer sm.clientsMu.Unlock()
if cli, exists := sm.clients[hostID]; exists {
cli.Close()
delete(sm.clients, hostID)
log.Printf("Removed Docker host: %s", truncateID(hostID, 8))
}
// Remove all stats for this host from cache
sm.cache.RemoveHostStats(hostID)
}
// StartStream starts a persistent stats stream for a container
func (sm *StreamManager) StartStream(ctx context.Context, containerID, containerName, hostID string) error {
// Create composite key to support containers with duplicate IDs on different hosts
compositeKey := fmt.Sprintf("%s:%s", hostID, containerID)
// Acquire locks in consistent order: clientsMu → streamsMu → containersMu (when needed)
sm.clientsMu.RLock()
sm.streamsMu.Lock()
// Check if stream already exists
if _, exists := sm.streams[compositeKey]; exists {
sm.streamsMu.Unlock()
sm.clientsMu.RUnlock()
return nil // Already streaming
}
// Check if client exists
_, clientExists := sm.clients[hostID]
if !clientExists {
sm.streamsMu.Unlock()
sm.clientsMu.RUnlock()
log.Printf("Warning: No Docker client for host %s", truncateID(hostID, 8))
return nil
}
// Create cancellable context for this stream
streamCtx, cancel := context.WithCancel(ctx)
sm.streams[compositeKey] = cancel
// Release locks before acquiring containersMu to prevent nested locking
sm.streamsMu.Unlock()
sm.clientsMu.RUnlock()
// Store container info with separate lock
sm.containersMu.Lock()
sm.containers[compositeKey] = &ContainerInfo{
ID: containerID,
Name: containerName,
HostID: hostID,
}
sm.containersMu.Unlock()
// Start streaming goroutine (no locks held)
go sm.streamStats(streamCtx, containerID, containerName, hostID)
log.Printf("Started stats stream for container %s (%s) on host %s", containerName, truncateID(containerID, 12), truncateID(hostID, 12))
return nil
}
// StopStream stops the stats stream for a container
func (sm *StreamManager) StopStream(containerID, hostID string) {
// Create composite key to support containers with duplicate IDs on different hosts
compositeKey := fmt.Sprintf("%s:%s", hostID, containerID)
sm.streamsMu.Lock()
defer sm.streamsMu.Unlock()
cancel, exists := sm.streams[compositeKey]
if exists {
cancel()
delete(sm.streams, compositeKey)
}
sm.containersMu.Lock()
defer sm.containersMu.Unlock()
delete(sm.containers, compositeKey)
// Remove from cache
sm.cache.RemoveContainerStats(containerID, hostID)
log.Printf("Stopped stats stream for container %s", truncateID(containerID, 12))
}
// streamStats maintains a persistent stats stream for a single container
func (sm *StreamManager) streamStats(ctx context.Context, containerID, containerName, hostID string) {
defer func() {
if r := recover(); r != nil {
log.Printf("Recovered from panic in stats stream for %s: %v", truncateID(containerID, 12), r)
}
}()
// Retry loop - restart stream if it fails
backoff := time.Second
maxBackoff := 30 * time.Second
for {
select {
case <-ctx.Done():
return
default:
}
// Get current Docker client (may have changed if host was updated)
sm.clientsMu.RLock()
cli, ok := sm.clients[hostID]
sm.clientsMu.RUnlock() // Manual unlock needed - we're in a loop
if !ok {
log.Printf("No Docker client for host %s (container %s), retrying in %v", truncateID(hostID, 8), truncateID(containerID, 12), backoff)
time.Sleep(backoff)
backoff = min(backoff*2, maxBackoff)
continue
}
// Open stats stream
stats, err := cli.ContainerStats(ctx, containerID, true) // stream=true
if err != nil {
log.Printf("Error opening stats stream for %s: %v (retrying in %v)", truncateID(containerID, 12), err, backoff)
time.Sleep(backoff)
backoff = min(backoff*2, maxBackoff)
continue
}
// Reset backoff on successful connection
backoff = time.Second
// Read stats from stream
decoder := json.NewDecoder(stats.Body)
for {
select {
case <-ctx.Done():
stats.Body.Close()
return
default:
}
var stat types.StatsJSON
if err := decoder.Decode(&stat); err != nil {
stats.Body.Close()
if err == io.EOF || err == context.Canceled {
log.Printf("Stats stream ended for %s", truncateID(containerID, 12))
} else {
log.Printf("Error decoding stats for %s: %v", truncateID(containerID, 12), err)
}
break // Break inner loop, will retry in outer loop
}
// Calculate and cache stats
sm.processStats(&stat, containerID, containerName, hostID)
}
// Brief pause before reconnecting
time.Sleep(time.Second)
}
}
// processStats calculates metrics from raw Docker stats
func (sm *StreamManager) processStats(stat *types.StatsJSON, containerID, containerName, hostID string) {
// Calculate CPU percentage
cpuPercent := calculateCPUPercent(stat)
// Memory stats
memUsage := stat.MemoryStats.Usage
memLimit := stat.MemoryStats.Limit
memPercent := 0.0
if memLimit > 0 {
memPercent = (float64(memUsage) / float64(memLimit)) * 100.0
}
// Network stats
var netRx, netTx uint64
for _, net := range stat.Networks {
netRx += net.RxBytes
netTx += net.TxBytes
}
// Disk I/O stats
var diskRead, diskWrite uint64
for _, bio := range stat.BlkioStats.IoServiceBytesRecursive {
if bio.Op == "Read" {
diskRead += bio.Value
} else if bio.Op == "Write" {
diskWrite += bio.Value
}
}
// Update cache
sm.cache.UpdateContainerStats(&ContainerStats{
ContainerID: containerID,
ContainerName: containerName,
HostID: hostID,
CPUPercent: roundToDecimal(cpuPercent, 1),
MemoryUsage: memUsage,
MemoryLimit: memLimit,
MemoryPercent: roundToDecimal(memPercent, 1),
NetworkRx: netRx,
NetworkTx: netTx,
DiskRead: diskRead,
DiskWrite: diskWrite,
})
}
// calculateCPUPercent calculates CPU percentage from Docker stats
func calculateCPUPercent(stat *types.StatsJSON) float64 {
// CPU calculation similar to `docker stats` command
cpuDelta := float64(stat.CPUStats.CPUUsage.TotalUsage) - float64(stat.PreCPUStats.CPUUsage.TotalUsage)
systemDelta := float64(stat.CPUStats.SystemUsage) - float64(stat.PreCPUStats.SystemUsage)
if systemDelta > 0.0 && cpuDelta > 0.0 {
numCPUs := float64(len(stat.CPUStats.CPUUsage.PercpuUsage))
if numCPUs == 0 {
numCPUs = 1.0
}
return (cpuDelta / systemDelta) * numCPUs * 100.0
}
return 0.0
}
// GetStreamCount returns the number of active streams
func (sm *StreamManager) GetStreamCount() int {
sm.streamsMu.RLock()
defer sm.streamsMu.RUnlock()
return len(sm.streams)
}
// HasHost checks if a Docker host is registered
func (sm *StreamManager) HasHost(hostID string) bool {
sm.clientsMu.RLock()
defer sm.clientsMu.RUnlock()
_, exists := sm.clients[hostID]
return exists
}
// StopAllStreams stops all active streams and closes all Docker clients
func (sm *StreamManager) StopAllStreams() {
// Stop all streams
sm.streamsMu.Lock()
for containerID, cancel := range sm.streams {
cancel()
log.Printf("Stopped stream for %s", truncateID(containerID, 12))
}
sm.streams = make(map[string]context.CancelFunc)
sm.streamsMu.Unlock()
// Close all Docker clients
sm.clientsMu.Lock()
for hostID, cli := range sm.clients {
cli.Close()
log.Printf("Closed Docker client for host %s", truncateID(hostID, 8))
}
sm.clients = make(map[string]*client.Client)
sm.clientsMu.Unlock()
}
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}