Security: Log Aggregation & Analysis
Overview
Security operations require correlating events across multiple systems: firewalls, IDS/IPS, application logs, and authentication systems. Each system logs in its own format, making correlation difficult. Normalizing all security logs into a unified format enables attack detection, incident response, and compliance reporting.
Core Problem Statement
"Security events are scattered across systems with incompatible log formats." Detecting attacks requires correlating firewall blocks, IDS alerts, and authentication failures, but different formats prevent automated analysis.
Example Scenario
Your security infrastructure includes:
- Firewall: Linux iptables logging kernel messages
- IDS: Snort-style alerts with ISO timestamps
- Application: Custom authentication logs
An attacker (198.51.100.23) attempts:
- SQL injection against web server
- SSH brute force
- Multiple failed application logins
You need to correlate these events to detect the coordinated attack.
Input Data
Firewall Logs (iptables)
Nov 15 10:23:45 fw01 kernel: ACCEPT IN=eth0 OUT=eth1 SRC=203.0.113.45 DST=192.168.1.100 PROTO=TCP SPT=54321 DPT=443
Nov 15 10:23:46 fw01 kernel: DROP IN=eth0 OUT= SRC=198.51.100.23 DST=192.168.1.100 PROTO=TCP SPT=12345 DPT=22 REASON=blocked_port
Nov 15 10:23:47 fw01 kernel: ACCEPT IN=eth0 OUT=eth1 SRC=203.0.113.67 DST=192.168.1.200 PROTO=TCP SPT=43210 DPT=80
Nov 15 10:23:50 fw01 kernel: DROP IN=eth0 OUT= SRC=192.0.2.15 DST=192.168.1.100 PROTO=TCP SPT=56789 DPT=3389 REASON=suspicious_source
Nov 15 10:23:52 fw01 kernel: ACCEPT IN=eth0 OUT=eth1 SRC=203.0.113.45 DST=192.168.1.100 PROTO=TCP SPT=54322 DPT=443
Kernel-format firewall logs showing ACCEPT and DROP rules.
IDS Alerts (Snort-style)
[2024-11-15T10:23:48.123Z] ALERT: SQL Injection attempt from 198.51.100.23 to 192.168.1.100:80 - Signature: "UNION SELECT" detected
[2024-11-15T10:23:49.456Z] ALERT: Port scan detected from 192.0.2.15 targeting 192.168.1.0/24 - 100 ports scanned in 5 seconds
[2024-11-15T10:23:51.789Z] ALERT: Brute force login attempt from 198.51.100.23 to 192.168.1.100:22 - 15 failed attempts in 60 seconds
Intrusion detection alerts with timestamps and attack signatures.
Application Authentication Logs
2024-11-15 10:23:45 [AUTH] user=alice@example.com ip=203.0.113.45 action=login status=success
2024-11-15 10:23:46 [AUTH] user=admin ip=198.51.100.23 action=login status=failure reason=invalid_password
2024-11-15 10:23:47 [AUTH] user=bob@example.com ip=203.0.113.67 action=login status=success
2024-11-15 10:23:48 [AUTH] user=admin ip=198.51.100.23 action=login status=failure reason=invalid_password
2024-11-15 10:23:49 [AUTH] user=admin ip=198.51.100.23 action=login status=failure reason=invalid_password
2024-11-15 10:23:50 [AUTH] user=root ip=192.0.2.15 action=login status=failure reason=account_disabled
Application-level authentication events with user context.
Normalization Rules
Create rules that extract security-relevant fields while normalizing format differences:
Security Log Normalization Rules
rules:
# Firewall: ACCEPT traffic
- name: firewall_accept
pattern:
- field: date
- text: " "
- field: host
- text: " kernel: ACCEPT IN="
- field: in_iface
- text: " OUT="
- field: out_iface
- text: " SRC="
- field: src_ip
- text: " DST="
- field: dst_ip
- text: " PROTO="
- field: protocol
- text: " SPT="
- field: src_port
- text: " DPT="
- field: dst_port
output: "[firewall-accept:src={src_ip},dst={dst_ip}:{dst_port},proto={protocol}]"
# Firewall: DROP traffic
- name: firewall_drop
pattern:
- field: date
- text: " "
- field: host
- text: " kernel: DROP IN="
- field: in_iface
- text: " OUT="
- field: out_iface
- text: " SRC="
- field: src_ip
- text: " DST="
- field: dst_ip
- text: " PROTO="
- field: protocol
- text: " SPT="
- field: src_port
- text: " DPT="
- field: dst_port
- text: " REASON="
- field: reason
output: "[firewall-drop:src={src_ip},dst={dst_ip}:{dst_port},reason={reason}]"
# IDS: SQL Injection
- name: ids_sqli
pattern:
- text: "["
- field: timestamp
- text: "] ALERT: SQL Injection attempt from "
- field: src_ip
- text: " to "
- field: dst
- text: " - Signature: "
- field: signature
output: "[ids-sqli:src={src_ip},dst={dst}]"
# IDS: Port Scan
- name: ids_portscan
pattern:
- text: "["
- field: timestamp
- text: "] ALERT: Port scan detected from "
- field: src_ip
- text: " targeting "
- field: target
- text: " - "
- field: details
output: "[ids-portscan:src={src_ip},target={target}]"
# IDS: Brute Force
- name: ids_bruteforce
pattern:
- text: "["
- field: timestamp
- text: "] ALERT: Brute force login attempt from "
- field: src_ip
- text: " to "
- field: dst
- text: " - "
- field: details
output: "[ids-bruteforce:src={src_ip},dst={dst}]"
# App Auth: Successful Login
- name: auth_success
pattern:
- field: timestamp
- text: " [AUTH] user="
- field: username
- text: " ip="
- field: ip
- text: " action=login status=success"
output: "[auth-success:user={username},ip={ip}]"
# App Auth: Failed Login
- name: auth_failure
pattern:
- field: timestamp
- text: " [AUTH] user="
- field: username
- text: " ip="
- field: ip
- text: " action=login status=failure reason="
- field: reason
output: "[auth-failure:user={username},ip={ip},reason={reason}]"
Rules preserve: source IPs, destinations, attack types, user accounts, failure reasons. Rules ignore: timestamps (use log ingestion time), hostnames, interface names.
Implementation
# Combine all security logs
cat firewall.log ids.log app-auth.log | \
patterndb-yaml --rules security-rules.yaml \
--quiet > unified-security.log
# Correlate by source IP
echo "Events by source IP:"
grep -o 'src=[0-9.]*\|ip=[0-9.]*' unified-security.log | \
sed 's/src=\|ip=//' | sort | uniq -c | sort -rn
# Find coordinated attacks (same IP in multiple systems)
echo "\nSuspicious IPs (multiple event types):"
for ip in $(grep -o 'src=[0-9.]*\|ip=[0-9.]*' unified-security.log | \
sed 's/src=\|ip=//' | sort -u); do
event_types=$(grep "$ip" unified-security.log | \
grep -o '^\[[^:]*' | sort -u | wc -l)
if [ "$event_types" -gt 2 ]; then
echo " $ip: $event_types different event types"
grep "$ip" unified-security.log | head -5
fi
done
import sys
from patterndb_yaml import PatterndbYaml
from pathlib import Path
from collections import defaultdict
import re
# Redirect stdout to file for testing
_original_stdout = sys.stdout
output_file = open("output.txt", "w")
sys.stdout = output_file
# Normalize all security logs
processor = PatterndbYaml(rules_path=Path("security-rules.yaml"))
# Combine all log sources
all_logs = []
for log_file in ["firewall.log", "ids.log", "app-auth.log"]:
with open(log_file) as f:
all_logs.extend(f.readlines())
# Normalize combined logs
from io import StringIO
input_data = StringIO("".join(all_logs))
output_data = StringIO()
processor.process(input_data, output_data)
# Parse normalized events
output_data.seek(0)
events = []
for line in output_data:
line = line.strip()
if not line:
continue
# Extract event type and attributes
match = re.match(r'\[([^:]+):(.+)\]', line)
if match:
event_type, attrs = match.groups()
# Parse attributes
attr_dict = {}
for attr in attrs.split(','):
if '=' in attr:
key, value = attr.split('=', 1)
attr_dict[key] = value
events.append({
'type': event_type,
'attributes': attr_dict,
'raw': line
})
# Correlate by source IP
ip_events = defaultdict(list)
for event in events:
attrs = event['attributes']
source_ip = attrs.get('src') or attrs.get('ip')
if source_ip:
ip_events[source_ip].append(event)
# Detect coordinated attacks
print("Coordinated Attack Detection:\n")
for ip, ip_event_list in sorted(ip_events.items(),
key=lambda x: len(x[1]),
reverse=True):
event_types = set(e['type'] for e in ip_event_list)
if len(event_types) >= 3:
print(f"⚠ HIGH PRIORITY: {ip}")
print(f" Event types: {', '.join(sorted(event_types))}")
print(f" Total events: {len(ip_event_list)}")
print(" Timeline:")
for event in ip_event_list[:5]:
print(f" - {event['raw']}")
print()
elif len(event_types) >= 2:
print(f"⚠ MEDIUM: {ip} - {len(event_types)} event types")
# Restore stdout and close output file
sys.stdout = _original_stdout
output_file.close()
Expected Output
Unified Security Log
[firewall-accept:src=203.0.113.45,dst=192.168.1.100:443,proto=TCP]
[firewall-drop:src=198.51.100.23,dst=192.168.1.100:22,reason=blocked_port]
[firewall-accept:src=203.0.113.67,dst=192.168.1.200:80,proto=TCP]
[firewall-drop:src=192.0.2.15,dst=192.168.1.100:3389,reason=suspicious_source]
[firewall-accept:src=203.0.113.45,dst=192.168.1.100:443,proto=TCP]
[ids-sqli:src=198.51.100.23,dst=192.168.1.100:80]
[ids-portscan:src=192.0.2.15,target=192.168.1.0/24]
[ids-bruteforce:src=198.51.100.23,dst=192.168.1.100:22]
[auth-success:user=alice@example.com,ip=203.0.113.45]
[auth-failure:user=admin,ip=198.51.100.23,reason=invalid_password]
[auth-success:user=bob@example.com,ip=203.0.113.67]
[auth-failure:user=admin,ip=198.51.100.23,reason=invalid_password]
[auth-failure:user=admin,ip=198.51.100.23,reason=invalid_password]
[auth-failure:user=root,ip=192.0.2.15,reason=account_disabled]
All security events normalized to consistent format for correlation.
Attack Correlation
From the normalized logs, we can identify the attack pattern:
Attacker: 198.51.100.23
- [firewall-drop:...] - Blocked at firewall (port 22)
- [ids-sqli:...] - SQL injection attempt detected
- [ids-bruteforce:...] - Brute force attack on SSH
- [auth-failure:...] - Three failed application logins
Attacker: 192.0.2.15
- [firewall-drop:...] - Blocked suspicious source (port 3389)
- [ids-portscan:...] - Network port scan detected
- [auth-failure:...] - Failed login with disabled account
Practical Workflows
1. Real-Time Attack Detection
Stream logs from multiple sources and detect attacks in real-time:
#!/bin/bash
# Tail all security log sources
tail -f /var/log/firewall.log /var/log/ids/alerts.log /var/log/app/auth.log | \
patterndb-yaml --rules security-rules.yaml --quiet | \
while read event; do
# Extract source IP
if [[ "$event" =~ src=([0-9.]+)|ip=([0-9.]+) ]]; then
ip="${BASH_REMATCH[1]}${BASH_REMATCH[2]}"
# Count recent events from this IP
recent_count=$(grep "$ip" /tmp/recent-events.log \
2>/dev/null | wc -l)
# Alert on threshold
if [ "$recent_count" -gt 5 ]; then
echo "🚨 ALERT: Suspicious activity from $ip" \
"($recent_count events)"
grep "$ip" /tmp/recent-events.log
fi
# Track event
echo "$event" >> /tmp/recent-events.log
fi
# Rotate recent events (keep last 100)
tail -100 /tmp/recent-events.log > /tmp/recent-events.tmp
mv /tmp/recent-events.tmp /tmp/recent-events.log
done
2. Incident Timeline Reconstruction
Build attack timeline from multiple sources:
import sys
from patterndb_yaml import PatterndbYaml
from pathlib import Path
from datetime import datetime
from io import StringIO
import re
# Redirect stdout to file for testing
_original_stdout = sys.stdout
output_file = open("output.txt", "w")
sys.stdout = output_file
processor = PatterndbYaml(rules_path=Path("security-rules.yaml"))
# Normalize logs from incident timeframe
incident_logs = [
"firewall-incident.log",
"ids-incident.log",
"auth-incident.log"
]
all_events = []
for log_file in incident_logs:
with open(log_file) as f:
output = StringIO()
processor.process(f, output)
output.seek(0)
all_events.extend([line.strip() for line in output if line.strip()])
# Filter by attacker IP
attacker_ip = "198.51.100.23"
attacker_events = [e for e in all_events if attacker_ip in e]
# Generate incident report
print(f"Incident Timeline for {attacker_ip}")
print("=" * 60)
for i, event in enumerate(attacker_events, 1):
# Extract event type
match = re.match(r'\[([^:]+):', event)
if match:
event_type = match.group(1)
print(f"{i}. [{event_type.upper()}] {event}")
print(f"\nTotal events: {len(attacker_events)}")
# Restore stdout and close output file
sys.stdout = _original_stdout
output_file.close()
3. Compliance Reporting (Failed Access Attempts)
Generate compliance reports for failed access attempts:
# Normalize security logs
cat firewall.log ids.log app-auth.log | \
patterndb-yaml --rules security-rules.yaml --quiet > unified.log
# Extract failed access attempts
echo "Failed Access Attempts Report"
echo "=============================="
echo
echo "Firewall Blocks:"
grep '^\[firewall-drop:' unified.log | wc -l
grep '^\[firewall-drop:' unified.log | head -10
echo "\nAuthentication Failures:"
grep '^\[auth-failure:' unified.log | wc -l
grep '^\[auth-failure:' unified.log
echo "\nIntrusion Attempts:"
grep '^\[ids-' unified.log | wc -l
grep '^\[ids-' unified.log
4. Threat Intelligence Enrichment
Correlate with threat intelligence feeds:
import sys
from patterndb_yaml import PatterndbYaml
from pathlib import Path
import re
# Redirect stdout to file for testing
_original_stdout = sys.stdout
output_file = open("output.txt", "w")
sys.stdout = output_file
# Known malicious IPs (from threat feed)
threat_ips = {"198.51.100.23", "192.0.2.15"}
processor = PatterndbYaml(rules_path=Path("security-rules.yaml"))
# Normalize logs
with open("combined-security.log") as f:
from io import StringIO
output = StringIO()
processor.process(f, output)
output.seek(0)
events = [line.strip() for line in output if line.strip()]
# Check against threat feed
print("Threat Intelligence Matches:\n")
for event in events:
# Extract IPs from event
ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', event)
for ip in ips:
if ip in threat_ips:
print(f"⚠ KNOWN THREAT: {ip}")
print(f" Event: {event}")
print()
# Restore stdout and close output file
sys.stdout = _original_stdout
output_file.close()
5. SIEM Integration
Export normalized logs to SIEM in CEF format:
import sys
from patterndb_yaml import PatterndbYaml
from pathlib import Path
import re
# Redirect stdout to file for testing
_original_stdout = sys.stdout
output_file = open("output.txt", "w")
sys.stdout = output_file
processor = PatterndbYaml(rules_path=Path("security-rules.yaml"))
def to_cef(event):
"""Convert normalized event to CEF format"""
match = re.match(r'\[([^:]+):(.+)\]', event)
if not match:
return None
event_type, attrs = match.groups()
# Parse attributes
attr_dict = {}
for attr in attrs.split(','):
if '=' in attr:
key, value = attr.split('=', 1)
attr_dict[key] = value
# Map to CEF fields
severity = 5 if 'drop' in event_type or 'failure' in event_type else 3
src_ip = attr_dict.get('src', attr_dict.get('ip', ''))
dst_ip = attr_dict.get('dst', '').split(':')[0]
# CEF format:
# CEF:Version|Device Vendor|Device Product|Device Version|
# Signature ID|Name|Severity|Extension
return (
f"CEF:0|PatternDB|SecurityAggregator|1.0|"
f"{event_type}|{event_type}|{severity}|"
f"src={src_ip} dst={dst_ip} act={event_type}"
)
# Process and export
with open("combined-security.log") as f:
from io import StringIO
output = StringIO()
processor.process(f, output)
output.seek(0)
with open("security-cef.log", "w") as cef_out:
for line in output:
line = line.strip()
if line:
cef_event = to_cef(line)
if cef_event:
cef_out.write(cef_event + "\n")
# Print the generated CEF log for verification
with open("security-cef.log") as f:
print(f.read(), end='')
# Restore stdout and close output file
sys.stdout = _original_stdout
output_file.close()
Key Benefits
- Unified visibility: Single view across all security systems
- Attack correlation: Connect events from firewall, IDS, and application logs
- Rapid incident response: Quickly identify coordinated attacks
- Compliance reporting: Aggregate failed access attempts across systems
- SIEM integration: Export normalized logs in standard formats
Related Topics
- Rules - Pattern matching and normalization
- Statistics - Measure match coverage
- Explain Mode - Debug pattern matching