rockfish alert
Publish detection events to MQTT and/or Apache Kafka for automated response.
Overview
The alert command reads Suricata alert and hunt finding Parquet data, normalizes events into a common JSON payload, and publishes them to MQTT and/or Kafka topics. It supports deduplication, rate limiting, confidence filtering, and continuous polling.
This is the “R” (Response) in NDR — enabling closed-loop automated response via n8n, Node-RED, or any MQTT/Kafka consumer.
Usage
rockfish alert [OPTIONS]
Alert Payload
All alerts are normalized to a common JSON schema:
{
"alert_id": "RF-2026-00042",
"timestamp": "2026-02-16T14:32:07Z",
"detection_type": "signature",
"confidence": 0.95,
"source": {
"ip": "10.0.12.45"
},
"destinations": [
{ "ip": "185.220.101.34", "port": 443 }
],
"metadata": {
"protocol": "TCP",
"suricata_sid": 2025001,
"suricata_signature": "ET MALWARE Cobalt Strike Beacon",
"suricata_category": "A Network Trojan was detected",
"community_id": "1:abc123"
},
"recommended_action": "block_ip"
}
Topic Mapping
MQTT Topics (forward slashes)
| Source | Topic |
|---|---|
| Suricata alert | rockfish/alerts/signature |
| Hunt: beaconing | rockfish/alerts/c2_beacon |
| Hunt: lateral movement | rockfish/alerts/lateral_movement |
| Hunt: exfiltration | rockfish/alerts/exfiltration |
| Hunt: DNS tunneling | rockfish/alerts/anomaly |
| Heartbeat | rockfish/status/heartbeat |
Kafka Topics (dots)
| Source | Topic |
|---|---|
| Suricata alert | rockfish.alerts.signature |
| Hunt: beaconing | rockfish.alerts.c2_beacon |
| Heartbeat | rockfish.status.heartbeat |
MQTT Options
| Option | Env Var | Default |
|---|---|---|
--mqtt-broker | MQTT_BROKER | localhost |
--mqtt-port | MQTT_PORT | 1883 |
--mqtt-client-id | MQTT_CLIENT_ID | rockfish-alert |
--mqtt-qos | MQTT_QOS | 1 |
--mqtt-topic-prefix | MQTT_TOPIC_PREFIX | rockfish |
--mqtt-username | MQTT_USERNAME | — |
--mqtt-password | MQTT_PASSWORD | — |
--mqtt-tls | MQTT_TLS_ENABLED | false |
Kafka Options
Requires building with the
kafkafeature.
| Option | Env Var | Default |
|---|---|---|
--kafka | KAFKA_ENABLED | false |
--kafka-brokers | KAFKA_BROKERS | localhost:9092 |
--kafka-topic-prefix | KAFKA_TOPIC_PREFIX | rockfish |
--kafka-client-id | KAFKA_CLIENT_ID | rockfish-alert |
--kafka-username | KAFKA_USERNAME | — |
--kafka-password | KAFKA_PASSWORD | — |
--kafka-security-protocol | KAFKA_SECURITY_PROTOCOL | plaintext |
--kafka-compression | KAFKA_COMPRESSION | none |
Supported security protocols: plaintext, ssl, sasl_plaintext, sasl_ssl
Supported compression: none, gzip, snappy, lz4, zstd
Confidence Mapping
Suricata severity:
| Severity | Confidence |
|---|---|
| 1 | 0.95 |
| 2 | 0.85 |
| 3 | 0.70 |
| 4+ | 0.50 |
Hunt severity:
| Severity | Confidence |
|---|---|
| critical | 0.95 |
| high | 0.85 |
| medium | 0.70 |
| low | 0.55 |
Deduplication
Identical alerts (same source IP, destination IP, detection type, and SID) are suppressed within a configurable time window (default: 5 minutes).
YAML Configuration
alert:
mqtt:
broker: mosquitto
port: 1883
client_id: rockfish-alert
qos: 1
topic_prefix: rockfish
kafka:
brokers: "kafka1:9092,kafka2:9092"
topic_prefix: rockfish
security_protocol: sasl_ssl
compression: snappy
confidence_threshold: 0.75
poll_interval_secs: 30
dedup_window_secs: 300
enabled_types:
- signature
- lateral_movement
- c2_beacon
Heartbeat
Periodic heartbeat published to {prefix}/status/heartbeat:
{
"timestamp": "2026-02-16T14:32:07Z",
"uptime_secs": 3600,
"alerts_published": 142,
"status": "running"
}
Examples
# Single-shot MQTT publish
rockfish alert -d /data --sensor my-sensor --hive \
--mqtt-broker mosquitto -t "1 hour"
# Continuous MQTT + Kafka publishing
rockfish alert -d /data --sensor my-sensor --hive \
--mqtt-broker mosquitto \
--kafka --kafka-brokers kafka1:9092,kafka2:9092 \
--continuous
# High-confidence only with TLS
rockfish alert -d /data --sensor prod-01 --hive \
--mqtt-broker mqtt.internal --mqtt-tls \
--confidence-threshold 0.85 --continuous
n8n Integration
Subscribe to MQTT topics in n8n for automated response:
- Add an MQTT Trigger node subscribing to
rockfish/alerts/# - Parse the alert JSON payload
- Route by
detection_type - Execute response actions (block IP, quarantine host, create ticket)