Your immune system doesn’t wait for you to notice you’re sick. It continuously monitors for threats, identifies anomalies, and responds automatically. What if your servers worked the same way?
This post builds a “digital immune system” in Go - a monitor that watches system activity, uses local AI to detect anomalies, and automatically remediates threats. No cloud APIs, no security vendor lock-in, just a binary that runs on your servers.
The system has three layers:
┌─────────────────────────────────────────────────────┐
│ Collectors │
│ (processes, network, filesystem, auth logs) │
└──────────────────────┬──────────────────────────────┘
│ events
▼
┌─────────────────────────────────────────────────────┐
│ Anomaly Detection (ML) │
│ (statistical models, fast scoring, filtering) │
└──────────────────────┬──────────────────────────────┘
│ anomalies
▼
┌─────────────────────────────────────────────────────┐
│ AI Reasoning (Ollama) │
│ (threat analysis, remediation planning) │
└──────────────────────┬──────────────────────────────┘
│ actions
▼
┌─────────────────────────────────────────────────────┐
│ Remediation │
│ (kill process, block IP, restore file, alert) │
└─────────────────────────────────────────────────────┘
The ML layer does fast, cheap filtering. Only anomalies that score above a threshold get sent to the LLM for deeper analysis. This keeps latency low and prevents the LLM from being overwhelmed.
First, define the core types:
package main
import (
"time"
)
type EventType string
const (
EventProcess EventType = "process"
EventNetwork EventType = "network"
EventFilesystem EventType = "filesystem"
EventAuth EventType = "auth"
)
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data map[string]any `json:"data"`
Source string `json:"source"`
}
type Anomaly struct {
Event Event `json:"event"`
Score float64 `json:"score"` // 0-1, higher = more anomalous
Reason string `json:"reason"` // ML explanation
Threat string `json:"threat"` // LLM threat assessment
Remediation string `json:"remediation"` // LLM suggested action
}
type Action struct {
Type string `json:"type"` // kill, block, restore, alert
Target string `json:"target"` // PID, IP, filepath
Params map[string]any `json:"params"`
DryRun bool `json:"dry_run"`
}
Each collector watches a different aspect of the system and emits events.
Monitors for suspicious processes - unusual binaries, crypto miners, reverse shells:
package collectors
import (
"bufio"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
type ProcessInfo struct {
PID int
Name string
Cmdline string
User string
CPU float64
Memory float64
StartTime time.Time
ParentPID int
Executable string
}
type ProcessCollector struct {
events chan Event
known map[int]ProcessInfo // Track known processes
interval time.Duration
}
func NewProcessCollector(events chan Event) *ProcessCollector {
return &ProcessCollector{
events: events,
known: make(map[int]ProcessInfo),
interval: 5 * time.Second,
}
}
func (c *ProcessCollector) Run() {
ticker := time.NewTicker(c.interval)
for range ticker.C {
c.collect()
}
}
func (c *ProcessCollector) collect() {
procs, _ := filepath.Glob("/proc/[0-9]*")
currentPIDs := make(map[int]bool)
for _, procPath := range procs {
pid, _ := strconv.Atoi(filepath.Base(procPath))
currentPIDs[pid] = true
info, err := c.getProcessInfo(pid)
if err != nil {
continue
}
// Check if this is a new process
if _, exists := c.known[pid]; !exists {
c.events <- Event{
Type: EventProcess,
Timestamp: time.Now(),
Source: "process_collector",
Data: map[string]any{
"action": "spawn",
"pid": info.PID,
"name": info.Name,
"cmdline": info.Cmdline,
"user": info.User,
"parent_pid": info.ParentPID,
"executable": info.Executable,
},
}
}
c.known[pid] = info
}
// Detect terminated processes
for pid := range c.known {
if !currentPIDs[pid] {
delete(c.known, pid)
}
}
}
func (c *ProcessCollector) getProcessInfo(pid int) (ProcessInfo, error) {
info := ProcessInfo{PID: pid}
// Read comm (process name)
comm, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid))
if err != nil {
return info, err
}
info.Name = strings.TrimSpace(string(comm))
// Read cmdline
cmdline, _ := os.ReadFile(fmt.Sprintf("/proc/%d/cmdline", pid))
info.Cmdline = strings.ReplaceAll(string(cmdline), "\x00", " ")
// Read executable path
exe, _ := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
info.Executable = exe
// Read status for user and parent PID
status, err := os.Open(fmt.Sprintf("/proc/%d/status", pid))
if err == nil {
defer status.Close()
scanner := bufio.NewScanner(status)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "Uid:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
uid, _ := strconv.Atoi(fields[1])
info.User = fmt.Sprintf("%d", uid)
}
}
if strings.HasPrefix(line, "PPid:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
info.ParentPID, _ = strconv.Atoi(fields[1])
}
}
}
}
return info, nil
}
Watches for suspicious connections - unusual ports, known bad IPs, data exfiltration patterns:
package collectors
import (
"bufio"
"encoding/hex"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
)
type Connection struct {
LocalAddr string
LocalPort int
RemoteAddr string
RemotePort int
State string
PID int
}
type NetworkCollector struct {
events chan Event
known map[string]Connection
interval time.Duration
suspPorts map[int]bool // Suspicious ports to watch
}
func NewNetworkCollector(events chan Event) *NetworkCollector {
return &NetworkCollector{
events: events,
known: make(map[string]Connection),
interval: 3 * time.Second,
suspPorts: map[int]bool{
4444: true, // Metasploit default
5555: true, // Common backdoor
6666: true, // IRC (often C2)
1337: true, // Leet port
31337: true, // Back Orifice
},
}
}
func (c *NetworkCollector) Run() {
ticker := time.NewTicker(c.interval)
for range ticker.C {
c.collect()
}
}
func (c *NetworkCollector) collect() {
// Parse /proc/net/tcp and /proc/net/tcp6
for _, path := range []string{"/proc/net/tcp", "/proc/net/tcp6"} {
c.parseNetFile(path)
}
}
func (c *NetworkCollector) parseNetFile(path string) {
file, err := os.Open(path)
if err != nil {
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Scan() // Skip header
for scanner.Scan() {
conn := c.parseLine(scanner.Text())
if conn == nil {
continue
}
key := fmt.Sprintf("%s:%d-%s:%d",
conn.LocalAddr, conn.LocalPort,
conn.RemoteAddr, conn.RemotePort)
// New connection
if _, exists := c.known[key]; !exists && conn.State == "ESTABLISHED" {
c.events <- Event{
Type: EventNetwork,
Timestamp: time.Now(),
Source: "network_collector",
Data: map[string]any{
"action": "connect",
"local_addr": conn.LocalAddr,
"local_port": conn.LocalPort,
"remote_addr": conn.RemoteAddr,
"remote_port": conn.RemotePort,
"pid": conn.PID,
},
}
c.known[key] = *conn
}
}
}
func (c *NetworkCollector) parseLine(line string) *Connection {
fields := strings.Fields(line)
if len(fields) < 10 {
return nil
}
localAddr, localPort := parseAddr(fields[1])
remoteAddr, remotePort := parseAddr(fields[2])
state := parseState(fields[3])
return &Connection{
LocalAddr: localAddr,
LocalPort: localPort,
RemoteAddr: remoteAddr,
RemotePort: remotePort,
State: state,
}
}
func parseAddr(s string) (string, int) {
parts := strings.Split(s, ":")
if len(parts) != 2 {
return "", 0
}
// IP is in hex, reversed
ipHex := parts[0]
ip := make(net.IP, 4)
for i := 0; i < 4; i++ {
b, _ := hex.DecodeString(ipHex[i*2 : i*2+2])
ip[3-i] = b[0]
}
port, _ := strconv.ParseInt(parts[1], 16, 32)
return ip.String(), int(port)
}
func parseState(s string) string {
states := map[string]string{
"01": "ESTABLISHED",
"02": "SYN_SENT",
"0A": "LISTEN",
}
return states[s]
}
Uses inotify to watch for suspicious file changes:
package collectors
import (
"crypto/sha256"
"encoding/hex"
"io"
"os"
"path/filepath"
"time"
"github.com/fsnotify/fsnotify"
)
type FilesystemCollector struct {
events chan Event
watcher *fsnotify.Watcher
watchDirs []string
hashes map[string]string // Track file hashes
}
func NewFilesystemCollector(events chan Event) *FilesystemCollector {
watcher, _ := fsnotify.NewWatcher()
return &FilesystemCollector{
events: events,
watcher: watcher,
watchDirs: []string{
"/etc",
"/usr/bin",
"/usr/sbin",
"/root",
"/home",
},
hashes: make(map[string]string),
}
}
func (c *FilesystemCollector) Run() {
// Add watches
for _, dir := range c.watchDirs {
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return nil
}
c.watcher.Add(path)
c.hashes[path] = c.hashFile(path)
return nil
})
}
// Process events
for {
select {
case event := <-c.watcher.Events:
c.handleEvent(event)
case err := <-c.watcher.Errors:
if err != nil {
// Log error
}
}
}
}
func (c *FilesystemCollector) handleEvent(e fsnotify.Event) {
eventType := ""
switch {
case e.Op&fsnotify.Write == fsnotify.Write:
eventType = "modify"
case e.Op&fsnotify.Create == fsnotify.Create:
eventType = "create"
case e.Op&fsnotify.Remove == fsnotify.Remove:
eventType = "delete"
case e.Op&fsnotify.Chmod == fsnotify.Chmod:
eventType = "chmod"
default:
return
}
newHash := ""
if eventType != "delete" {
newHash = c.hashFile(e.Name)
}
c.events <- Event{
Type: EventFilesystem,
Timestamp: time.Now(),
Source: "filesystem_collector",
Data: map[string]any{
"action": eventType,
"path": e.Name,
"old_hash": c.hashes[e.Name],
"new_hash": newHash,
},
}
c.hashes[e.Name] = newHash
}
func (c *FilesystemCollector) hashFile(path string) string {
f, err := os.Open(path)
if err != nil {
return ""
}
defer f.Close()
h := sha256.New()
io.Copy(h, f)
return hex.EncodeToString(h.Sum(nil))
}
Watches authentication logs for failed logins, sudo abuse, SSH anomalies:
package collectors
import (
"bufio"
"os"
"regexp"
"strings"
"time"
"github.com/hpcloud/tail"
)
type AuthCollector struct {
events chan Event
logPath string
patterns map[string]*regexp.Regexp
}
func NewAuthCollector(events chan Event) *AuthCollector {
return &AuthCollector{
events: events,
logPath: "/var/log/auth.log",
patterns: map[string]*regexp.Regexp{
"failed_login": regexp.MustCompile(`Failed password for (\S+) from (\S+)`),
"invalid_user": regexp.MustCompile(`Invalid user (\S+) from (\S+)`),
"sudo": regexp.MustCompile(`sudo:\s+(\S+).*COMMAND=(.*)`),
"ssh_accepted": regexp.MustCompile(`Accepted (\S+) for (\S+) from (\S+)`),
"ssh_disconnect": regexp.MustCompile(`Disconnected from (\S+)`),
},
}
}
func (c *AuthCollector) Run() {
t, err := tail.TailFile(c.logPath, tail.Config{
Follow: true,
ReOpen: true,
MustExist: false,
})
if err != nil {
return
}
for line := range t.Lines {
c.parseLine(line.Text)
}
}
func (c *AuthCollector) parseLine(line string) {
for eventType, pattern := range c.patterns {
matches := pattern.FindStringSubmatch(line)
if matches == nil {
continue
}
data := map[string]any{
"action": eventType,
"raw_log": line,
}
switch eventType {
case "failed_login":
data["user"] = matches[1]
data["source_ip"] = matches[2]
case "invalid_user":
data["user"] = matches[1]
data["source_ip"] = matches[2]
case "sudo":
data["user"] = matches[1]
data["command"] = matches[2]
case "ssh_accepted":
data["method"] = matches[1]
data["user"] = matches[2]
data["source_ip"] = matches[3]
}
c.events <- Event{
Type: EventAuth,
Timestamp: time.Now(),
Source: "auth_collector",
Data: data,
}
}
}
The ML layer does fast anomaly scoring. We use a simple statistical approach - no external dependencies, runs in microseconds.
package detector
import (
"math"
"sync"
"time"
)
// RollingStats tracks statistics for anomaly detection
type RollingStats struct {
mu sync.RWMutex
values []float64
maxSize int
mean float64
variance float64
}
func NewRollingStats(size int) *RollingStats {
return &RollingStats{
values: make([]float64, 0, size),
maxSize: size,
}
}
func (s *RollingStats) Add(v float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.values = append(s.values, v)
if len(s.values) > s.maxSize {
s.values = s.values[1:]
}
// Update running stats
s.mean = 0
for _, x := range s.values {
s.mean += x
}
s.mean /= float64(len(s.values))
s.variance = 0
for _, x := range s.values {
s.variance += (x - s.mean) * (x - s.mean)
}
s.variance /= float64(len(s.values))
}
func (s *RollingStats) ZScore(v float64) float64 {
s.mu.RLock()
defer s.mu.RUnlock()
if s.variance == 0 {
return 0
}
return (v - s.mean) / math.Sqrt(s.variance)
}
// AnomalyDetector scores events for anomalousness
type AnomalyDetector struct {
// Track stats per event type
processRate *RollingStats
networkRate *RollingStats
authFailRate *RollingStats
// Counters for rate calculation
processCount int
networkCount int
authFailCount int
lastReset time.Time
// Known bad patterns
suspiciousProcesses map[string]bool
suspiciousPorts map[int]bool
}
func NewAnomalyDetector() *AnomalyDetector {
return &AnomalyDetector{
processRate: NewRollingStats(100),
networkRate: NewRollingStats(100),
authFailRate: NewRollingStats(100),
lastReset: time.Now(),
suspiciousProcesses: map[string]bool{
"nc": true,
"ncat": true,
"netcat": true,
"nmap": true,
"masscan": true,
"hydra": true,
"john": true,
"hashcat": true,
"mimikatz": true,
"xmrig": true, // Crypto miner
"minerd": true,
},
suspiciousPorts: map[int]bool{
4444: true,
5555: true,
6666: true,
1337: true,
31337: true,
},
}
}
func (d *AnomalyDetector) Score(event Event) (float64, string) {
switch event.Type {
case EventProcess:
return d.scoreProcess(event)
case EventNetwork:
return d.scoreNetwork(event)
case EventAuth:
return d.scoreAuth(event)
case EventFilesystem:
return d.scoreFilesystem(event)
}
return 0, ""
}
func (d *AnomalyDetector) scoreProcess(event Event) (float64, string) {
score := 0.0
reasons := []string{}
name, _ := event.Data["name"].(string)
cmdline, _ := event.Data["cmdline"].(string)
executable, _ := event.Data["executable"].(string)
// Check known suspicious processes
if d.suspiciousProcesses[name] {
score += 0.7
reasons = append(reasons, "known_suspicious_process")
}
// Check for deleted executables (common malware technique)
if strings.Contains(executable, "(deleted)") {
score += 0.8
reasons = append(reasons, "deleted_executable")
}
// Check for processes running from /tmp or /dev/shm
if strings.HasPrefix(executable, "/tmp") || strings.HasPrefix(executable, "/dev/shm") {
score += 0.6
reasons = append(reasons, "execution_from_tmp")
}
// Check for base64 in command line (obfuscation)
if strings.Contains(cmdline, "base64") {
score += 0.4
reasons = append(reasons, "base64_in_cmdline")
}
// Check for reverse shell patterns
if strings.Contains(cmdline, "/dev/tcp") || strings.Contains(cmdline, "bash -i") {
score += 0.9
reasons = append(reasons, "reverse_shell_pattern")
}
// Normalize score
if score > 1.0 {
score = 1.0
}
return score, strings.Join(reasons, ", ")
}
func (d *AnomalyDetector) scoreNetwork(event Event) (float64, string) {
score := 0.0
reasons := []string{}
remotePort, _ := event.Data["remote_port"].(int)
remoteAddr, _ := event.Data["remote_addr"].(string)
// Check suspicious ports
if d.suspiciousPorts[remotePort] {
score += 0.7
reasons = append(reasons, "suspicious_port")
}
// Check for connections to private IPs from public-facing services
// (potential lateral movement)
if isPrivateIP(remoteAddr) {
// This might be normal, lower score
score += 0.1
}
// Rate-based anomaly
d.networkCount++
elapsed := time.Since(d.lastReset).Seconds()
if elapsed > 60 {
rate := float64(d.networkCount) / elapsed
d.networkRate.Add(rate)
d.networkCount = 0
d.lastReset = time.Now()
zScore := d.networkRate.ZScore(rate)
if zScore > 3 {
score += 0.5
reasons = append(reasons, "unusual_connection_rate")
}
}
if score > 1.0 {
score = 1.0
}
return score, strings.Join(reasons, ", ")
}
func (d *AnomalyDetector) scoreAuth(event Event) (float64, string) {
score := 0.0
reasons := []string{}
action, _ := event.Data["action"].(string)
if action == "failed_login" || action == "invalid_user" {
score += 0.3
reasons = append(reasons, "failed_auth")
// Track rate of failures
d.authFailCount++
if d.authFailCount > 5 {
score += 0.4
reasons = append(reasons, "brute_force_pattern")
}
}
if action == "sudo" {
command, _ := event.Data["command"].(string)
// Check for suspicious sudo commands
if strings.Contains(command, "chmod 777") ||
strings.Contains(command, "rm -rf") ||
strings.Contains(command, "/etc/shadow") {
score += 0.6
reasons = append(reasons, "suspicious_sudo_command")
}
}
if score > 1.0 {
score = 1.0
}
return score, strings.Join(reasons, ", ")
}
func (d *AnomalyDetector) scoreFilesystem(event Event) (float64, string) {
score := 0.0
reasons := []string{}
path, _ := event.Data["path"].(string)
action, _ := event.Data["action"].(string)
// Critical files
criticalFiles := []string{
"/etc/passwd",
"/etc/shadow",
"/etc/sudoers",
"/root/.ssh/authorized_keys",
}
for _, critical := range criticalFiles {
if path == critical {
score += 0.8
reasons = append(reasons, "critical_file_modified")
break
}
}
// New executables in sensitive directories
if action == "create" && (strings.HasPrefix(path, "/usr/bin") || strings.HasPrefix(path, "/usr/sbin")) {
score += 0.7
reasons = append(reasons, "new_system_binary")
}
if score > 1.0 {
score = 1.0
}
return score, strings.Join(reasons, ", ")
}
func isPrivateIP(ip string) bool {
return strings.HasPrefix(ip, "10.") ||
strings.HasPrefix(ip, "192.168.") ||
strings.HasPrefix(ip, "172.16.") ||
strings.HasPrefix(ip, "172.17.") ||
strings.HasPrefix(ip, "172.18.") ||
strings.HasPrefix(ip, "172.19.")
}
High-scoring anomalies get sent to Ollama for deeper analysis. The LLM determines if it’s a real threat and suggests remediation.
package ai
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type OllamaClient struct {
baseURL string
model string
}
func NewOllamaClient(baseURL, model string) *OllamaClient {
return &OllamaClient{
baseURL: baseURL,
model: model,
}
}
type OllamaRequest struct {
Model string `json:"model"`
Prompt string `json:"prompt"`
Stream bool `json:"stream"`
}
type OllamaResponse struct {
Response string `json:"response"`
}
func (c *OllamaClient) Analyze(anomaly Anomaly) (threat string, remediation string, err error) {
prompt := fmt.Sprintf(`You are a security analyst AI. Analyze this system anomaly and respond with JSON.
Event Type: %s
Event Data: %v
Anomaly Score: %.2f
Detection Reason: %s
Analyze this event and respond with ONLY a JSON object (no other text):
{
"is_threat": true/false,
"threat_level": "critical/high/medium/low/none",
"explanation": "brief explanation of what this event means",
"remediation": {
"action": "kill/block/restore/alert/none",
"target": "the target (PID, IP, filepath, etc)",
"reason": "why this action"
}
}`,
anomaly.Event.Type,
anomaly.Event.Data,
anomaly.Score,
anomaly.Reason,
)
reqBody, _ := json.Marshal(OllamaRequest{
Model: c.model,
Prompt: prompt,
Stream: false,
})
resp, err := http.Post(
c.baseURL+"/api/generate",
"application/json",
bytes.NewBuffer(reqBody),
)
if err != nil {
return "", "", err
}
defer resp.Body.Close()
var ollamaResp OllamaResponse
if err := json.NewDecoder(resp.Body).Decode(&ollamaResp); err != nil {
return "", "", err
}
// Parse the JSON response
var analysis struct {
IsThreat bool `json:"is_threat"`
ThreatLevel string `json:"threat_level"`
Explanation string `json:"explanation"`
Remediation struct {
Action string `json:"action"`
Target string `json:"target"`
Reason string `json:"reason"`
} `json:"remediation"`
}
// Extract JSON from response (LLM might include extra text)
responseText := ollamaResp.Response
startIdx := strings.Index(responseText, "{")
endIdx := strings.LastIndex(responseText, "}")
if startIdx >= 0 && endIdx > startIdx {
jsonStr := responseText[startIdx : endIdx+1]
if err := json.Unmarshal([]byte(jsonStr), &analysis); err != nil {
return "", "", fmt.Errorf("failed to parse LLM response: %w", err)
}
}
threat = fmt.Sprintf("[%s] %s", analysis.ThreatLevel, analysis.Explanation)
if analysis.IsThreat && analysis.Remediation.Action != "none" {
remediation = fmt.Sprintf("%s:%s (%s)",
analysis.Remediation.Action,
analysis.Remediation.Target,
analysis.Remediation.Reason,
)
}
return threat, remediation, nil
}
The remediation engine executes actions suggested by the AI. It has safety checks and dry-run support.
package remediation
import (
"fmt"
"os/exec"
"strconv"
"strings"
)
type Engine struct {
dryRun bool
allowlist map[string]bool // Processes that should never be killed
blockRules []string // Track blocked IPs for cleanup
}
func NewEngine(dryRun bool) *Engine {
return &Engine{
dryRun: dryRun,
allowlist: map[string]bool{
"systemd": true,
"init": true,
"sshd": true,
"kernel": true,
"dockerd": true,
"containerd": true,
},
}
}
func (e *Engine) Execute(action Action) error {
switch action.Type {
case "kill":
return e.killProcess(action)
case "block":
return e.blockIP(action)
case "restore":
return e.restoreFile(action)
case "alert":
return e.sendAlert(action)
default:
return fmt.Errorf("unknown action type: %s", action.Type)
}
}
func (e *Engine) killProcess(action Action) error {
pidStr := action.Target
pid, err := strconv.Atoi(pidStr)
if err != nil {
return fmt.Errorf("invalid PID: %s", pidStr)
}
// Safety check: don't kill system processes
processName := action.Params["name"].(string)
if e.allowlist[processName] {
return fmt.Errorf("refusing to kill allowlisted process: %s", processName)
}
// Don't kill PID 1 or low PIDs
if pid <= 100 {
return fmt.Errorf("refusing to kill low PID: %d", pid)
}
if e.dryRun {
fmt.Printf("[DRY RUN] Would kill process %d (%s)\n", pid, processName)
return nil
}
cmd := exec.Command("kill", "-9", pidStr)
return cmd.Run()
}
func (e *Engine) blockIP(action Action) error {
ip := action.Target
// Validate IP format
if !isValidIP(ip) {
return fmt.Errorf("invalid IP address: %s", ip)
}
// Don't block private IPs or localhost
if strings.HasPrefix(ip, "127.") || strings.HasPrefix(ip, "10.") ||
strings.HasPrefix(ip, "192.168.") {
return fmt.Errorf("refusing to block private/localhost IP: %s", ip)
}
if e.dryRun {
fmt.Printf("[DRY RUN] Would block IP %s\n", ip)
return nil
}
// Use iptables to block
cmd := exec.Command("iptables", "-A", "INPUT", "-s", ip, "-j", "DROP")
if err := cmd.Run(); err != nil {
return err
}
e.blockRules = append(e.blockRules, ip)
return nil
}
func (e *Engine) restoreFile(action Action) error {
path := action.Target
backupPath := action.Params["backup"].(string)
if e.dryRun {
fmt.Printf("[DRY RUN] Would restore %s from %s\n", path, backupPath)
return nil
}
cmd := exec.Command("cp", backupPath, path)
return cmd.Run()
}
func (e *Engine) sendAlert(action Action) error {
message := action.Params["message"].(string)
// Log locally
fmt.Printf("[ALERT] %s\n", message)
// Could integrate with PagerDuty, Slack, etc.
// For now, just write to a file
if !e.dryRun {
cmd := exec.Command("logger", "-t", "immune-system", message)
return cmd.Run()
}
return nil
}
func isValidIP(ip string) bool {
parts := strings.Split(ip, ".")
if len(parts) != 4 {
return false
}
for _, part := range parts {
n, err := strconv.Atoi(part)
if err != nil || n < 0 || n > 255 {
return false
}
}
return true
}
Tie it all together:
package main
import (
"flag"
"fmt"
"log"
"time"
)
func main() {
dryRun := flag.Bool("dry-run", true, "Don't execute remediation actions")
threshold := flag.Float64("threshold", 0.5, "Anomaly score threshold for AI analysis")
ollamaURL := flag.String("ollama-url", "http://localhost:11434", "Ollama API URL")
ollamaModel := flag.String("ollama-model", "llama3.2", "Ollama model to use")
flag.Parse()
// Initialize components
events := make(chan Event, 1000)
detector := NewAnomalyDetector()
aiClient := NewOllamaClient(*ollamaURL, *ollamaModel)
remediator := NewEngine(*dryRun)
// Start collectors
go NewProcessCollector(events).Run()
go NewNetworkCollector(events).Run()
go NewFilesystemCollector(events).Run()
go NewAuthCollector(events).Run()
log.Printf("Digital Immune System started (dry-run=%v, threshold=%.2f)", *dryRun, *threshold)
// Main processing loop
for event := range events {
// Fast ML scoring
score, reason := detector.Score(event)
if score < *threshold {
continue // Not anomalous enough
}
anomaly := Anomaly{
Event: event,
Score: score,
Reason: reason,
}
log.Printf("Anomaly detected: type=%s score=%.2f reason=%s",
event.Type, score, reason)
// Send to AI for deeper analysis
threat, remediation, err := aiClient.Analyze(anomaly)
if err != nil {
log.Printf("AI analysis failed: %v", err)
continue
}
anomaly.Threat = threat
anomaly.Remediation = remediation
log.Printf("AI analysis: threat=%s remediation=%s", threat, remediation)
// Execute remediation if suggested
if remediation != "" {
action := parseRemediation(remediation, event)
if action != nil {
if err := remediator.Execute(*action); err != nil {
log.Printf("Remediation failed: %v", err)
} else {
log.Printf("Remediation executed: %s", action.Type)
}
}
}
}
}
func parseRemediation(remediation string, event Event) *Action {
parts := strings.SplitN(remediation, ":", 2)
if len(parts) != 2 {
return nil
}
actionType := parts[0]
target := strings.Split(parts[1], " ")[0]
return &Action{
Type: actionType,
Target: target,
Params: event.Data,
}
}
# Start Ollama with a model
ollama pull llama3.2
ollama serve
# Build and run (dry-run mode first!)
go build -o immune-system .
sudo ./immune-system --dry-run=true --threshold=0.5
# Once confident, enable remediation
sudo ./immune-system --dry-run=false --threshold=0.6
Example output:
2024/02/19 10:23:45 Digital Immune System started (dry-run=true, threshold=0.50)
2024/02/19 10:24:01 Anomaly detected: type=process score=0.70 reason=known_suspicious_process
2024/02/19 10:24:02 AI analysis: threat=[high] nmap is a network scanning tool commonly used for reconnaissance remediation=alert:nmap (reconnaissance activity detected)
2024/02/19 10:24:02 Remediation executed: alert
2024/02/19 10:25:33 Anomaly detected: type=auth score=0.70 reason=failed_auth, brute_force_pattern
2024/02/19 10:25:34 AI analysis: threat=[critical] Multiple failed SSH attempts from same IP indicates brute force attack remediation=block:192.0.2.50 (block attacking IP)
2024/02/19 10:25:34 [DRY RUN] Would block IP 192.0.2.50
What works well:
What’s tricky:
Improvements to consider: