Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

rockfish alert

Publish detection events to MQTT and/or Apache Kafka for automated response.

Overview

The alert command reads Suricata alert and hunt finding Parquet data, normalizes events into a common JSON payload, and publishes them to MQTT and/or Kafka topics. It supports deduplication, rate limiting, confidence filtering, and continuous polling.

This is the “R” (Response) in NDR — enabling closed-loop automated response via n8n, Node-RED, or any MQTT/Kafka consumer.

Usage

rockfish alert [OPTIONS]

Alert Payload

All alerts are normalized to a common JSON schema:

{
  "alert_id": "RF-2026-00042",
  "timestamp": "2026-02-16T14:32:07Z",
  "detection_type": "signature",
  "confidence": 0.95,
  "source": {
    "ip": "10.0.12.45"
  },
  "destinations": [
    { "ip": "185.220.101.34", "port": 443 }
  ],
  "metadata": {
    "protocol": "TCP",
    "suricata_sid": 2025001,
    "suricata_signature": "ET MALWARE Cobalt Strike Beacon",
    "suricata_category": "A Network Trojan was detected",
    "community_id": "1:abc123"
  },
  "recommended_action": "block_ip"
}

Topic Mapping

MQTT Topics (forward slashes)

SourceTopic
Suricata alertrockfish/alerts/signature
Hunt: beaconingrockfish/alerts/c2_beacon
Hunt: lateral movementrockfish/alerts/lateral_movement
Hunt: exfiltrationrockfish/alerts/exfiltration
Hunt: DNS tunnelingrockfish/alerts/anomaly
Heartbeatrockfish/status/heartbeat

Kafka Topics (dots)

SourceTopic
Suricata alertrockfish.alerts.signature
Hunt: beaconingrockfish.alerts.c2_beacon
Heartbeatrockfish.status.heartbeat

MQTT Options

OptionEnv VarDefault
--mqtt-brokerMQTT_BROKERlocalhost
--mqtt-portMQTT_PORT1883
--mqtt-client-idMQTT_CLIENT_IDrockfish-alert
--mqtt-qosMQTT_QOS1
--mqtt-topic-prefixMQTT_TOPIC_PREFIXrockfish
--mqtt-usernameMQTT_USERNAME
--mqtt-passwordMQTT_PASSWORD
--mqtt-tlsMQTT_TLS_ENABLEDfalse

Kafka Options

Requires building with the kafka feature.

OptionEnv VarDefault
--kafkaKAFKA_ENABLEDfalse
--kafka-brokersKAFKA_BROKERSlocalhost:9092
--kafka-topic-prefixKAFKA_TOPIC_PREFIXrockfish
--kafka-client-idKAFKA_CLIENT_IDrockfish-alert
--kafka-usernameKAFKA_USERNAME
--kafka-passwordKAFKA_PASSWORD
--kafka-security-protocolKAFKA_SECURITY_PROTOCOLplaintext
--kafka-compressionKAFKA_COMPRESSIONnone

Supported security protocols: plaintext, ssl, sasl_plaintext, sasl_ssl

Supported compression: none, gzip, snappy, lz4, zstd

Confidence Mapping

Suricata severity:

SeverityConfidence
10.95
20.85
30.70
4+0.50

Hunt severity:

SeverityConfidence
critical0.95
high0.85
medium0.70
low0.55

Deduplication

Identical alerts (same source IP, destination IP, detection type, and SID) are suppressed within a configurable time window (default: 5 minutes).

YAML Configuration

alert:
  mqtt:
    broker: mosquitto
    port: 1883
    client_id: rockfish-alert
    qos: 1
    topic_prefix: rockfish
  kafka:
    brokers: "kafka1:9092,kafka2:9092"
    topic_prefix: rockfish
    security_protocol: sasl_ssl
    compression: snappy
  confidence_threshold: 0.75
  poll_interval_secs: 30
  dedup_window_secs: 300
  enabled_types:
    - signature
    - lateral_movement
    - c2_beacon

Heartbeat

Periodic heartbeat published to {prefix}/status/heartbeat:

{
  "timestamp": "2026-02-16T14:32:07Z",
  "uptime_secs": 3600,
  "alerts_published": 142,
  "status": "running"
}

Examples

# Single-shot MQTT publish
rockfish alert -d /data --sensor my-sensor --hive \
  --mqtt-broker mosquitto -t "1 hour"

# Continuous MQTT + Kafka publishing
rockfish alert -d /data --sensor my-sensor --hive \
  --mqtt-broker mosquitto \
  --kafka --kafka-brokers kafka1:9092,kafka2:9092 \
  --continuous

# High-confidence only with TLS
rockfish alert -d /data --sensor prod-01 --hive \
  --mqtt-broker mqtt.internal --mqtt-tls \
  --confidence-threshold 0.85 --continuous

n8n Integration

Subscribe to MQTT topics in n8n for automated response:

  1. Add an MQTT Trigger node subscribing to rockfish/alerts/#
  2. Parse the alert JSON payload
  3. Route by detection_type
  4. Execute response actions (block IP, quarantine host, create ticket)