Occam's razor Archive Pages Categories Tags

The Digital Immune System: Building an AI-Powered Security Monitor in Go

19 June 2025

Your immune system doesn’t wait for you to notice you’re sick. It continuously monitors for threats, identifies anomalies, and responds automatically. What if your servers worked the same way?

This post builds a “digital immune system” in Go - a monitor that watches system activity, uses local AI to detect anomalies, and automatically remediates threats. No cloud APIs, no security vendor lock-in, just a binary that runs on your servers.


Architecture

The system has three layers:

┌─────────────────────────────────────────────────────┐
│                    Collectors                        │
│  (processes, network, filesystem, auth logs)        │
└──────────────────────┬──────────────────────────────┘
                       │ events
                       ▼
┌─────────────────────────────────────────────────────┐
│              Anomaly Detection (ML)                  │
│  (statistical models, fast scoring, filtering)      │
└──────────────────────┬──────────────────────────────┘
                       │ anomalies
                       ▼
┌─────────────────────────────────────────────────────┐
│              AI Reasoning (Ollama)                   │
│  (threat analysis, remediation planning)            │
└──────────────────────┬──────────────────────────────┘
                       │ actions
                       ▼
┌─────────────────────────────────────────────────────┐
│                  Remediation                         │
│  (kill process, block IP, restore file, alert)      │
└─────────────────────────────────────────────────────┘

The ML layer does fast, cheap filtering. Only anomalies that score above a threshold get sent to the LLM for deeper analysis. This keeps latency low and prevents the LLM from being overwhelmed.


Data Structures

First, define the core types:

package main

import (
    "time"
)

type EventType string

const (
    EventProcess    EventType = "process"
    EventNetwork    EventType = "network"
    EventFilesystem EventType = "filesystem"
    EventAuth       EventType = "auth"
)

type Event struct {
    Type      EventType         `json:"type"`
    Timestamp time.Time         `json:"timestamp"`
    Data      map[string]any    `json:"data"`
    Source    string            `json:"source"`
}

type Anomaly struct {
    Event       Event     `json:"event"`
    Score       float64   `json:"score"`       // 0-1, higher = more anomalous
    Reason      string    `json:"reason"`      // ML explanation
    Threat      string    `json:"threat"`      // LLM threat assessment
    Remediation string    `json:"remediation"` // LLM suggested action
}

type Action struct {
    Type    string         `json:"type"`    // kill, block, restore, alert
    Target  string         `json:"target"`  // PID, IP, filepath
    Params  map[string]any `json:"params"`
    DryRun  bool           `json:"dry_run"`
}


Collectors

Each collector watches a different aspect of the system and emits events.


Process Collector

Monitors for suspicious processes - unusual binaries, crypto miners, reverse shells:

package collectors

import (
    "bufio"
    "fmt"
    "os"
    "path/filepath"
    "strconv"
    "strings"
    "time"
)

type ProcessInfo struct {
    PID        int
    Name       string
    Cmdline    string
    User       string
    CPU        float64
    Memory     float64
    StartTime  time.Time
    ParentPID  int
    Executable string
}

type ProcessCollector struct {
    events   chan Event
    known    map[int]ProcessInfo // Track known processes
    interval time.Duration
}

func NewProcessCollector(events chan Event) *ProcessCollector {
    return &ProcessCollector{
        events:   events,
        known:    make(map[int]ProcessInfo),
        interval: 5 * time.Second,
    }
}

func (c *ProcessCollector) Run() {
    ticker := time.NewTicker(c.interval)
    for range ticker.C {
        c.collect()
    }
}

func (c *ProcessCollector) collect() {
    procs, _ := filepath.Glob("/proc/[0-9]*")

    currentPIDs := make(map[int]bool)

    for _, procPath := range procs {
        pid, _ := strconv.Atoi(filepath.Base(procPath))
        currentPIDs[pid] = true

        info, err := c.getProcessInfo(pid)
        if err != nil {
            continue
        }

        // Check if this is a new process
        if _, exists := c.known[pid]; !exists {
            c.events <- Event{
                Type:      EventProcess,
                Timestamp: time.Now(),
                Source:    "process_collector",
                Data: map[string]any{
                    "action":     "spawn",
                    "pid":        info.PID,
                    "name":       info.Name,
                    "cmdline":    info.Cmdline,
                    "user":       info.User,
                    "parent_pid": info.ParentPID,
                    "executable": info.Executable,
                },
            }
        }

        c.known[pid] = info
    }

    // Detect terminated processes
    for pid := range c.known {
        if !currentPIDs[pid] {
            delete(c.known, pid)
        }
    }
}

func (c *ProcessCollector) getProcessInfo(pid int) (ProcessInfo, error) {
    info := ProcessInfo{PID: pid}

    // Read comm (process name)
    comm, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid))
    if err != nil {
        return info, err
    }
    info.Name = strings.TrimSpace(string(comm))

    // Read cmdline
    cmdline, _ := os.ReadFile(fmt.Sprintf("/proc/%d/cmdline", pid))
    info.Cmdline = strings.ReplaceAll(string(cmdline), "\x00", " ")

    // Read executable path
    exe, _ := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
    info.Executable = exe

    // Read status for user and parent PID
    status, err := os.Open(fmt.Sprintf("/proc/%d/status", pid))
    if err == nil {
        defer status.Close()
        scanner := bufio.NewScanner(status)
        for scanner.Scan() {
            line := scanner.Text()
            if strings.HasPrefix(line, "Uid:") {
                fields := strings.Fields(line)
                if len(fields) >= 2 {
                    uid, _ := strconv.Atoi(fields[1])
                    info.User = fmt.Sprintf("%d", uid)
                }
            }
            if strings.HasPrefix(line, "PPid:") {
                fields := strings.Fields(line)
                if len(fields) >= 2 {
                    info.ParentPID, _ = strconv.Atoi(fields[1])
                }
            }
        }
    }

    return info, nil
}


Network Collector

Watches for suspicious connections - unusual ports, known bad IPs, data exfiltration patterns:

package collectors

import (
    "bufio"
    "encoding/hex"
    "fmt"
    "net"
    "os"
    "strconv"
    "strings"
    "time"
)

type Connection struct {
    LocalAddr  string
    LocalPort  int
    RemoteAddr string
    RemotePort int
    State      string
    PID        int
}

type NetworkCollector struct {
    events     chan Event
    known      map[string]Connection
    interval   time.Duration
    suspPorts  map[int]bool // Suspicious ports to watch
}

func NewNetworkCollector(events chan Event) *NetworkCollector {
    return &NetworkCollector{
        events:   events,
        known:    make(map[string]Connection),
        interval: 3 * time.Second,
        suspPorts: map[int]bool{
            4444: true,  // Metasploit default
            5555: true,  // Common backdoor
            6666: true,  // IRC (often C2)
            1337: true,  // Leet port
            31337: true, // Back Orifice
        },
    }
}

func (c *NetworkCollector) Run() {
    ticker := time.NewTicker(c.interval)
    for range ticker.C {
        c.collect()
    }
}

func (c *NetworkCollector) collect() {
    // Parse /proc/net/tcp and /proc/net/tcp6
    for _, path := range []string{"/proc/net/tcp", "/proc/net/tcp6"} {
        c.parseNetFile(path)
    }
}

func (c *NetworkCollector) parseNetFile(path string) {
    file, err := os.Open(path)
    if err != nil {
        return
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    scanner.Scan() // Skip header

    for scanner.Scan() {
        conn := c.parseLine(scanner.Text())
        if conn == nil {
            continue
        }

        key := fmt.Sprintf("%s:%d-%s:%d",
            conn.LocalAddr, conn.LocalPort,
            conn.RemoteAddr, conn.RemotePort)

        // New connection
        if _, exists := c.known[key]; !exists && conn.State == "ESTABLISHED" {
            c.events <- Event{
                Type:      EventNetwork,
                Timestamp: time.Now(),
                Source:    "network_collector",
                Data: map[string]any{
                    "action":      "connect",
                    "local_addr":  conn.LocalAddr,
                    "local_port":  conn.LocalPort,
                    "remote_addr": conn.RemoteAddr,
                    "remote_port": conn.RemotePort,
                    "pid":         conn.PID,
                },
            }
            c.known[key] = *conn
        }
    }
}

func (c *NetworkCollector) parseLine(line string) *Connection {
    fields := strings.Fields(line)
    if len(fields) < 10 {
        return nil
    }

    localAddr, localPort := parseAddr(fields[1])
    remoteAddr, remotePort := parseAddr(fields[2])
    state := parseState(fields[3])

    return &Connection{
        LocalAddr:  localAddr,
        LocalPort:  localPort,
        RemoteAddr: remoteAddr,
        RemotePort: remotePort,
        State:      state,
    }
}

func parseAddr(s string) (string, int) {
    parts := strings.Split(s, ":")
    if len(parts) != 2 {
        return "", 0
    }

    // IP is in hex, reversed
    ipHex := parts[0]
    ip := make(net.IP, 4)
    for i := 0; i < 4; i++ {
        b, _ := hex.DecodeString(ipHex[i*2 : i*2+2])
        ip[3-i] = b[0]
    }

    port, _ := strconv.ParseInt(parts[1], 16, 32)
    return ip.String(), int(port)
}

func parseState(s string) string {
    states := map[string]string{
        "01": "ESTABLISHED",
        "02": "SYN_SENT",
        "0A": "LISTEN",
    }
    return states[s]
}


Filesystem Collector

Uses inotify to watch for suspicious file changes:

package collectors

import (
    "crypto/sha256"
    "encoding/hex"
    "io"
    "os"
    "path/filepath"
    "time"

    "github.com/fsnotify/fsnotify"
)

type FilesystemCollector struct {
    events    chan Event
    watcher   *fsnotify.Watcher
    watchDirs []string
    hashes    map[string]string // Track file hashes
}

func NewFilesystemCollector(events chan Event) *FilesystemCollector {
    watcher, _ := fsnotify.NewWatcher()
    return &FilesystemCollector{
        events:  events,
        watcher: watcher,
        watchDirs: []string{
            "/etc",
            "/usr/bin",
            "/usr/sbin",
            "/root",
            "/home",
        },
        hashes: make(map[string]string),
    }
}

func (c *FilesystemCollector) Run() {
    // Add watches
    for _, dir := range c.watchDirs {
        filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
            if err != nil || info.IsDir() {
                return nil
            }
            c.watcher.Add(path)
            c.hashes[path] = c.hashFile(path)
            return nil
        })
    }

    // Process events
    for {
        select {
        case event := <-c.watcher.Events:
            c.handleEvent(event)
        case err := <-c.watcher.Errors:
            if err != nil {
                // Log error
            }
        }
    }
}

func (c *FilesystemCollector) handleEvent(e fsnotify.Event) {
    eventType := ""
    switch {
    case e.Op&fsnotify.Write == fsnotify.Write:
        eventType = "modify"
    case e.Op&fsnotify.Create == fsnotify.Create:
        eventType = "create"
    case e.Op&fsnotify.Remove == fsnotify.Remove:
        eventType = "delete"
    case e.Op&fsnotify.Chmod == fsnotify.Chmod:
        eventType = "chmod"
    default:
        return
    }

    newHash := ""
    if eventType != "delete" {
        newHash = c.hashFile(e.Name)
    }

    c.events <- Event{
        Type:      EventFilesystem,
        Timestamp: time.Now(),
        Source:    "filesystem_collector",
        Data: map[string]any{
            "action":   eventType,
            "path":     e.Name,
            "old_hash": c.hashes[e.Name],
            "new_hash": newHash,
        },
    }

    c.hashes[e.Name] = newHash
}

func (c *FilesystemCollector) hashFile(path string) string {
    f, err := os.Open(path)
    if err != nil {
        return ""
    }
    defer f.Close()

    h := sha256.New()
    io.Copy(h, f)
    return hex.EncodeToString(h.Sum(nil))
}


Auth Log Collector

Watches authentication logs for failed logins, sudo abuse, SSH anomalies:

package collectors

import (
    "bufio"
    "os"
    "regexp"
    "strings"
    "time"

    "github.com/hpcloud/tail"
)

type AuthCollector struct {
    events   chan Event
    logPath  string
    patterns map[string]*regexp.Regexp
}

func NewAuthCollector(events chan Event) *AuthCollector {
    return &AuthCollector{
        events:  events,
        logPath: "/var/log/auth.log",
        patterns: map[string]*regexp.Regexp{
            "failed_login":   regexp.MustCompile(`Failed password for (\S+) from (\S+)`),
            "invalid_user":   regexp.MustCompile(`Invalid user (\S+) from (\S+)`),
            "sudo":           regexp.MustCompile(`sudo:\s+(\S+).*COMMAND=(.*)`),
            "ssh_accepted":   regexp.MustCompile(`Accepted (\S+) for (\S+) from (\S+)`),
            "ssh_disconnect": regexp.MustCompile(`Disconnected from (\S+)`),
        },
    }
}

func (c *AuthCollector) Run() {
    t, err := tail.TailFile(c.logPath, tail.Config{
        Follow:    true,
        ReOpen:    true,
        MustExist: false,
    })
    if err != nil {
        return
    }

    for line := range t.Lines {
        c.parseLine(line.Text)
    }
}

func (c *AuthCollector) parseLine(line string) {
    for eventType, pattern := range c.patterns {
        matches := pattern.FindStringSubmatch(line)
        if matches == nil {
            continue
        }

        data := map[string]any{
            "action":  eventType,
            "raw_log": line,
        }

        switch eventType {
        case "failed_login":
            data["user"] = matches[1]
            data["source_ip"] = matches[2]
        case "invalid_user":
            data["user"] = matches[1]
            data["source_ip"] = matches[2]
        case "sudo":
            data["user"] = matches[1]
            data["command"] = matches[2]
        case "ssh_accepted":
            data["method"] = matches[1]
            data["user"] = matches[2]
            data["source_ip"] = matches[3]
        }

        c.events <- Event{
            Type:      EventAuth,
            Timestamp: time.Now(),
            Source:    "auth_collector",
            Data:      data,
        }
    }
}


Anomaly Detection with ML

The ML layer does fast anomaly scoring. We use a simple statistical approach - no external dependencies, runs in microseconds.

package detector

import (
    "math"
    "sync"
    "time"
)

// RollingStats tracks statistics for anomaly detection
type RollingStats struct {
    mu       sync.RWMutex
    values   []float64
    maxSize  int
    mean     float64
    variance float64
}

func NewRollingStats(size int) *RollingStats {
    return &RollingStats{
        values:  make([]float64, 0, size),
        maxSize: size,
    }
}

func (s *RollingStats) Add(v float64) {
    s.mu.Lock()
    defer s.mu.Unlock()

    s.values = append(s.values, v)
    if len(s.values) > s.maxSize {
        s.values = s.values[1:]
    }

    // Update running stats
    s.mean = 0
    for _, x := range s.values {
        s.mean += x
    }
    s.mean /= float64(len(s.values))

    s.variance = 0
    for _, x := range s.values {
        s.variance += (x - s.mean) * (x - s.mean)
    }
    s.variance /= float64(len(s.values))
}

func (s *RollingStats) ZScore(v float64) float64 {
    s.mu.RLock()
    defer s.mu.RUnlock()

    if s.variance == 0 {
        return 0
    }
    return (v - s.mean) / math.Sqrt(s.variance)
}

// AnomalyDetector scores events for anomalousness
type AnomalyDetector struct {
    // Track stats per event type
    processRate   *RollingStats
    networkRate   *RollingStats
    authFailRate  *RollingStats

    // Counters for rate calculation
    processCount  int
    networkCount  int
    authFailCount int
    lastReset     time.Time

    // Known bad patterns
    suspiciousProcesses map[string]bool
    suspiciousPorts     map[int]bool
}

func NewAnomalyDetector() *AnomalyDetector {
    return &AnomalyDetector{
        processRate:  NewRollingStats(100),
        networkRate:  NewRollingStats(100),
        authFailRate: NewRollingStats(100),
        lastReset:    time.Now(),

        suspiciousProcesses: map[string]bool{
            "nc":       true,
            "ncat":     true,
            "netcat":   true,
            "nmap":     true,
            "masscan":  true,
            "hydra":    true,
            "john":     true,
            "hashcat":  true,
            "mimikatz": true,
            "xmrig":    true, // Crypto miner
            "minerd":   true,
        },

        suspiciousPorts: map[int]bool{
            4444:  true,
            5555:  true,
            6666:  true,
            1337:  true,
            31337: true,
        },
    }
}

func (d *AnomalyDetector) Score(event Event) (float64, string) {
    switch event.Type {
    case EventProcess:
        return d.scoreProcess(event)
    case EventNetwork:
        return d.scoreNetwork(event)
    case EventAuth:
        return d.scoreAuth(event)
    case EventFilesystem:
        return d.scoreFilesystem(event)
    }
    return 0, ""
}

func (d *AnomalyDetector) scoreProcess(event Event) (float64, string) {
    score := 0.0
    reasons := []string{}

    name, _ := event.Data["name"].(string)
    cmdline, _ := event.Data["cmdline"].(string)
    executable, _ := event.Data["executable"].(string)

    // Check known suspicious processes
    if d.suspiciousProcesses[name] {
        score += 0.7
        reasons = append(reasons, "known_suspicious_process")
    }

    // Check for deleted executables (common malware technique)
    if strings.Contains(executable, "(deleted)") {
        score += 0.8
        reasons = append(reasons, "deleted_executable")
    }

    // Check for processes running from /tmp or /dev/shm
    if strings.HasPrefix(executable, "/tmp") || strings.HasPrefix(executable, "/dev/shm") {
        score += 0.6
        reasons = append(reasons, "execution_from_tmp")
    }

    // Check for base64 in command line (obfuscation)
    if strings.Contains(cmdline, "base64") {
        score += 0.4
        reasons = append(reasons, "base64_in_cmdline")
    }

    // Check for reverse shell patterns
    if strings.Contains(cmdline, "/dev/tcp") || strings.Contains(cmdline, "bash -i") {
        score += 0.9
        reasons = append(reasons, "reverse_shell_pattern")
    }

    // Normalize score
    if score > 1.0 {
        score = 1.0
    }

    return score, strings.Join(reasons, ", ")
}

func (d *AnomalyDetector) scoreNetwork(event Event) (float64, string) {
    score := 0.0
    reasons := []string{}

    remotePort, _ := event.Data["remote_port"].(int)
    remoteAddr, _ := event.Data["remote_addr"].(string)

    // Check suspicious ports
    if d.suspiciousPorts[remotePort] {
        score += 0.7
        reasons = append(reasons, "suspicious_port")
    }

    // Check for connections to private IPs from public-facing services
    // (potential lateral movement)
    if isPrivateIP(remoteAddr) {
        // This might be normal, lower score
        score += 0.1
    }

    // Rate-based anomaly
    d.networkCount++
    elapsed := time.Since(d.lastReset).Seconds()
    if elapsed > 60 {
        rate := float64(d.networkCount) / elapsed
        d.networkRate.Add(rate)
        d.networkCount = 0
        d.lastReset = time.Now()

        zScore := d.networkRate.ZScore(rate)
        if zScore > 3 {
            score += 0.5
            reasons = append(reasons, "unusual_connection_rate")
        }
    }

    if score > 1.0 {
        score = 1.0
    }

    return score, strings.Join(reasons, ", ")
}

func (d *AnomalyDetector) scoreAuth(event Event) (float64, string) {
    score := 0.0
    reasons := []string{}

    action, _ := event.Data["action"].(string)

    if action == "failed_login" || action == "invalid_user" {
        score += 0.3
        reasons = append(reasons, "failed_auth")

        // Track rate of failures
        d.authFailCount++
        if d.authFailCount > 5 {
            score += 0.4
            reasons = append(reasons, "brute_force_pattern")
        }
    }

    if action == "sudo" {
        command, _ := event.Data["command"].(string)
        // Check for suspicious sudo commands
        if strings.Contains(command, "chmod 777") ||
            strings.Contains(command, "rm -rf") ||
            strings.Contains(command, "/etc/shadow") {
            score += 0.6
            reasons = append(reasons, "suspicious_sudo_command")
        }
    }

    if score > 1.0 {
        score = 1.0
    }

    return score, strings.Join(reasons, ", ")
}

func (d *AnomalyDetector) scoreFilesystem(event Event) (float64, string) {
    score := 0.0
    reasons := []string{}

    path, _ := event.Data["path"].(string)
    action, _ := event.Data["action"].(string)

    // Critical files
    criticalFiles := []string{
        "/etc/passwd",
        "/etc/shadow",
        "/etc/sudoers",
        "/root/.ssh/authorized_keys",
    }

    for _, critical := range criticalFiles {
        if path == critical {
            score += 0.8
            reasons = append(reasons, "critical_file_modified")
            break
        }
    }

    // New executables in sensitive directories
    if action == "create" && (strings.HasPrefix(path, "/usr/bin") || strings.HasPrefix(path, "/usr/sbin")) {
        score += 0.7
        reasons = append(reasons, "new_system_binary")
    }

    if score > 1.0 {
        score = 1.0
    }

    return score, strings.Join(reasons, ", ")
}

func isPrivateIP(ip string) bool {
    return strings.HasPrefix(ip, "10.") ||
        strings.HasPrefix(ip, "192.168.") ||
        strings.HasPrefix(ip, "172.16.") ||
        strings.HasPrefix(ip, "172.17.") ||
        strings.HasPrefix(ip, "172.18.") ||
        strings.HasPrefix(ip, "172.19.")
}


AI Reasoning with Ollama

High-scoring anomalies get sent to Ollama for deeper analysis. The LLM determines if it’s a real threat and suggests remediation.

package ai

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type OllamaClient struct {
    baseURL string
    model   string
}

func NewOllamaClient(baseURL, model string) *OllamaClient {
    return &OllamaClient{
        baseURL: baseURL,
        model:   model,
    }
}

type OllamaRequest struct {
    Model  string `json:"model"`
    Prompt string `json:"prompt"`
    Stream bool   `json:"stream"`
}

type OllamaResponse struct {
    Response string `json:"response"`
}

func (c *OllamaClient) Analyze(anomaly Anomaly) (threat string, remediation string, err error) {
    prompt := fmt.Sprintf(`You are a security analyst AI. Analyze this system anomaly and respond with JSON.

Event Type: %s
Event Data: %v
Anomaly Score: %.2f
Detection Reason: %s

Analyze this event and respond with ONLY a JSON object (no other text):
{
    "is_threat": true/false,
    "threat_level": "critical/high/medium/low/none",
    "explanation": "brief explanation of what this event means",
    "remediation": {
        "action": "kill/block/restore/alert/none",
        "target": "the target (PID, IP, filepath, etc)",
        "reason": "why this action"
    }
}`,
        anomaly.Event.Type,
        anomaly.Event.Data,
        anomaly.Score,
        anomaly.Reason,
    )

    reqBody, _ := json.Marshal(OllamaRequest{
        Model:  c.model,
        Prompt: prompt,
        Stream: false,
    })

    resp, err := http.Post(
        c.baseURL+"/api/generate",
        "application/json",
        bytes.NewBuffer(reqBody),
    )
    if err != nil {
        return "", "", err
    }
    defer resp.Body.Close()

    var ollamaResp OllamaResponse
    if err := json.NewDecoder(resp.Body).Decode(&ollamaResp); err != nil {
        return "", "", err
    }

    // Parse the JSON response
    var analysis struct {
        IsThreat    bool   `json:"is_threat"`
        ThreatLevel string `json:"threat_level"`
        Explanation string `json:"explanation"`
        Remediation struct {
            Action string `json:"action"`
            Target string `json:"target"`
            Reason string `json:"reason"`
        } `json:"remediation"`
    }

    // Extract JSON from response (LLM might include extra text)
    responseText := ollamaResp.Response
    startIdx := strings.Index(responseText, "{")
    endIdx := strings.LastIndex(responseText, "}")
    if startIdx >= 0 && endIdx > startIdx {
        jsonStr := responseText[startIdx : endIdx+1]
        if err := json.Unmarshal([]byte(jsonStr), &analysis); err != nil {
            return "", "", fmt.Errorf("failed to parse LLM response: %w", err)
        }
    }

    threat = fmt.Sprintf("[%s] %s", analysis.ThreatLevel, analysis.Explanation)

    if analysis.IsThreat && analysis.Remediation.Action != "none" {
        remediation = fmt.Sprintf("%s:%s (%s)",
            analysis.Remediation.Action,
            analysis.Remediation.Target,
            analysis.Remediation.Reason,
        )
    }

    return threat, remediation, nil
}


Remediation Engine

The remediation engine executes actions suggested by the AI. It has safety checks and dry-run support.

package remediation

import (
    "fmt"
    "os/exec"
    "strconv"
    "strings"
)

type Engine struct {
    dryRun     bool
    allowlist  map[string]bool // Processes that should never be killed
    blockRules []string        // Track blocked IPs for cleanup
}

func NewEngine(dryRun bool) *Engine {
    return &Engine{
        dryRun: dryRun,
        allowlist: map[string]bool{
            "systemd":   true,
            "init":      true,
            "sshd":      true,
            "kernel":    true,
            "dockerd":   true,
            "containerd": true,
        },
    }
}

func (e *Engine) Execute(action Action) error {
    switch action.Type {
    case "kill":
        return e.killProcess(action)
    case "block":
        return e.blockIP(action)
    case "restore":
        return e.restoreFile(action)
    case "alert":
        return e.sendAlert(action)
    default:
        return fmt.Errorf("unknown action type: %s", action.Type)
    }
}

func (e *Engine) killProcess(action Action) error {
    pidStr := action.Target
    pid, err := strconv.Atoi(pidStr)
    if err != nil {
        return fmt.Errorf("invalid PID: %s", pidStr)
    }

    // Safety check: don't kill system processes
    processName := action.Params["name"].(string)
    if e.allowlist[processName] {
        return fmt.Errorf("refusing to kill allowlisted process: %s", processName)
    }

    // Don't kill PID 1 or low PIDs
    if pid <= 100 {
        return fmt.Errorf("refusing to kill low PID: %d", pid)
    }

    if e.dryRun {
        fmt.Printf("[DRY RUN] Would kill process %d (%s)\n", pid, processName)
        return nil
    }

    cmd := exec.Command("kill", "-9", pidStr)
    return cmd.Run()
}

func (e *Engine) blockIP(action Action) error {
    ip := action.Target

    // Validate IP format
    if !isValidIP(ip) {
        return fmt.Errorf("invalid IP address: %s", ip)
    }

    // Don't block private IPs or localhost
    if strings.HasPrefix(ip, "127.") || strings.HasPrefix(ip, "10.") ||
        strings.HasPrefix(ip, "192.168.") {
        return fmt.Errorf("refusing to block private/localhost IP: %s", ip)
    }

    if e.dryRun {
        fmt.Printf("[DRY RUN] Would block IP %s\n", ip)
        return nil
    }

    // Use iptables to block
    cmd := exec.Command("iptables", "-A", "INPUT", "-s", ip, "-j", "DROP")
    if err := cmd.Run(); err != nil {
        return err
    }

    e.blockRules = append(e.blockRules, ip)
    return nil
}

func (e *Engine) restoreFile(action Action) error {
    path := action.Target
    backupPath := action.Params["backup"].(string)

    if e.dryRun {
        fmt.Printf("[DRY RUN] Would restore %s from %s\n", path, backupPath)
        return nil
    }

    cmd := exec.Command("cp", backupPath, path)
    return cmd.Run()
}

func (e *Engine) sendAlert(action Action) error {
    message := action.Params["message"].(string)

    // Log locally
    fmt.Printf("[ALERT] %s\n", message)

    // Could integrate with PagerDuty, Slack, etc.
    // For now, just write to a file
    if !e.dryRun {
        cmd := exec.Command("logger", "-t", "immune-system", message)
        return cmd.Run()
    }

    return nil
}

func isValidIP(ip string) bool {
    parts := strings.Split(ip, ".")
    if len(parts) != 4 {
        return false
    }
    for _, part := range parts {
        n, err := strconv.Atoi(part)
        if err != nil || n < 0 || n > 255 {
            return false
        }
    }
    return true
}


Main Loop

Tie it all together:

package main

import (
    "flag"
    "fmt"
    "log"
    "time"
)

func main() {
    dryRun := flag.Bool("dry-run", true, "Don't execute remediation actions")
    threshold := flag.Float64("threshold", 0.5, "Anomaly score threshold for AI analysis")
    ollamaURL := flag.String("ollama-url", "http://localhost:11434", "Ollama API URL")
    ollamaModel := flag.String("ollama-model", "llama3.2", "Ollama model to use")
    flag.Parse()

    // Initialize components
    events := make(chan Event, 1000)
    detector := NewAnomalyDetector()
    aiClient := NewOllamaClient(*ollamaURL, *ollamaModel)
    remediator := NewEngine(*dryRun)

    // Start collectors
    go NewProcessCollector(events).Run()
    go NewNetworkCollector(events).Run()
    go NewFilesystemCollector(events).Run()
    go NewAuthCollector(events).Run()

    log.Printf("Digital Immune System started (dry-run=%v, threshold=%.2f)", *dryRun, *threshold)

    // Main processing loop
    for event := range events {
        // Fast ML scoring
        score, reason := detector.Score(event)

        if score < *threshold {
            continue // Not anomalous enough
        }

        anomaly := Anomaly{
            Event:  event,
            Score:  score,
            Reason: reason,
        }

        log.Printf("Anomaly detected: type=%s score=%.2f reason=%s",
            event.Type, score, reason)

        // Send to AI for deeper analysis
        threat, remediation, err := aiClient.Analyze(anomaly)
        if err != nil {
            log.Printf("AI analysis failed: %v", err)
            continue
        }

        anomaly.Threat = threat
        anomaly.Remediation = remediation

        log.Printf("AI analysis: threat=%s remediation=%s", threat, remediation)

        // Execute remediation if suggested
        if remediation != "" {
            action := parseRemediation(remediation, event)
            if action != nil {
                if err := remediator.Execute(*action); err != nil {
                    log.Printf("Remediation failed: %v", err)
                } else {
                    log.Printf("Remediation executed: %s", action.Type)
                }
            }
        }
    }
}

func parseRemediation(remediation string, event Event) *Action {
    parts := strings.SplitN(remediation, ":", 2)
    if len(parts) != 2 {
        return nil
    }

    actionType := parts[0]
    target := strings.Split(parts[1], " ")[0]

    return &Action{
        Type:   actionType,
        Target: target,
        Params: event.Data,
    }
}


Running It

# Start Ollama with a model
ollama pull llama3.2
ollama serve

# Build and run (dry-run mode first!)
go build -o immune-system .
sudo ./immune-system --dry-run=true --threshold=0.5

# Once confident, enable remediation
sudo ./immune-system --dry-run=false --threshold=0.6

Example output:

2024/02/19 10:23:45 Digital Immune System started (dry-run=true, threshold=0.50)
2024/02/19 10:24:01 Anomaly detected: type=process score=0.70 reason=known_suspicious_process
2024/02/19 10:24:02 AI analysis: threat=[high] nmap is a network scanning tool commonly used for reconnaissance remediation=alert:nmap (reconnaissance activity detected)
2024/02/19 10:24:02 Remediation executed: alert
2024/02/19 10:25:33 Anomaly detected: type=auth score=0.70 reason=failed_auth, brute_force_pattern
2024/02/19 10:25:34 AI analysis: threat=[critical] Multiple failed SSH attempts from same IP indicates brute force attack remediation=block:192.0.2.50 (block attacking IP)
2024/02/19 10:25:34 [DRY RUN] Would block IP 192.0.2.50


Tradeoffs

What works well:

What’s tricky:

Improvements to consider:

blog comments powered by Disqus