Add threat detection command #35

Merged
wagesj45 merged 1 commit from codex/add-detect_threats-command-in-analyze.py into main 2025-07-19 02:12:51 -05:00
2 changed files with 253 additions and 0 deletions

View file

@ -19,6 +19,7 @@ from __future__ import annotations
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Set from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta
import json import json
@ -27,6 +28,7 @@ import typer
from scripts import nginx_config # noqa: F401 # imported for side effects/usage from scripts import nginx_config # noqa: F401 # imported for side effects/usage
DB_PATH = Path("database/ngxstat.db") DB_PATH = Path("database/ngxstat.db")
ANALYSIS_DIR = Path("output/analysis")
app = typer.Typer(help="Ad-hoc statistics queries") app = typer.Typer(help="Ad-hoc statistics queries")
@ -197,5 +199,137 @@ def suggest_cache(
typer.echo(f"{host} {path} {count}") typer.echo(f"{host} {path} {count}")
@app.command("detect-threats")
def detect_threats(
hours: int = typer.Option(1, help="Number of recent hours to analyze"),
ip_threshold: int = typer.Option(
100, help="Requests from a single IP to flag"
),
) -> None:
"""Detect potential security threats from recent logs."""
conn = _connect()
cur = conn.cursor()
cur.execute("SELECT MAX(time) FROM logs")
row = cur.fetchone()
if not row or not row[0]:
typer.echo("No logs found")
conn.close()
return
max_dt = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
recent_end = max_dt
recent_start = recent_end - timedelta(hours=hours)
prev_start = recent_start - timedelta(hours=hours)
prev_end = recent_start
fmt = "%Y-%m-%d %H:%M:%S"
recent_start_s = recent_start.strftime(fmt)
recent_end_s = recent_end.strftime(fmt)
prev_start_s = prev_start.strftime(fmt)
prev_end_s = prev_end.strftime(fmt)
cur.execute(
"""
SELECT host,
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors,
COUNT(*) AS total
FROM logs
WHERE time >= ? AND time < ?
GROUP BY host
""",
(recent_start_s, recent_end_s),
)
recent_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()}
cur.execute(
"""
SELECT host,
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors,
COUNT(*) AS total
FROM logs
WHERE time >= ? AND time < ?
GROUP BY host
""",
(prev_start_s, prev_end_s),
)
prev_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()}
error_spikes = []
for host in set(recent_rows) | set(prev_rows):
r_err, r_total = recent_rows.get(host, (0, 0))
p_err, p_total = prev_rows.get(host, (0, 0))
r_rate = r_err * 100.0 / r_total if r_total else 0.0
p_rate = p_err * 100.0 / p_total if p_total else 0.0
if r_rate >= 10 and r_rate >= p_rate * 2:
error_spikes.append(
{
"host": host,
"recent_error_rate": round(r_rate, 2),
"previous_error_rate": round(p_rate, 2),
}
)
cur.execute(
"""
SELECT DISTINCT user_agent FROM logs
WHERE time >= ? AND time < ?
""",
(prev_start_s, prev_end_s),
)
prev_agents = {r[0] for r in cur.fetchall()}
cur.execute(
"""
SELECT user_agent, COUNT(*) AS c
FROM logs
WHERE time >= ? AND time < ?
GROUP BY user_agent
HAVING c >= 10
""",
(recent_start_s, recent_end_s),
)
suspicious_agents = [
{"user_agent": ua, "requests": cnt}
for ua, cnt in cur.fetchall()
if ua not in prev_agents
]
cur.execute(
"""
SELECT ip, COUNT(*) AS c
FROM logs
WHERE time >= ? AND time < ?
GROUP BY ip
HAVING c >= ?
ORDER BY c DESC
""",
(recent_start_s, recent_end_s, ip_threshold),
)
high_ip_requests = [
{"ip": ip, "requests": cnt} for ip, cnt in cur.fetchall()
]
conn.close()
report = {
"time_range": {
"recent_start": recent_start_s,
"recent_end": recent_end_s,
"previous_start": prev_start_s,
"previous_end": prev_end_s,
},
"error_spikes": error_spikes,
"suspicious_agents": suspicious_agents,
"high_ip_requests": high_ip_requests,
}
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)
out_path = ANALYSIS_DIR / "threat_report.json"
out_path.write_text(json.dumps(report, indent=2))
typer.echo(json.dumps(report))
if __name__ == "__main__": if __name__ == "__main__":
app() app()

View file

@ -203,3 +203,122 @@ server {
analyze.suggest_cache(threshold=2, json_output=True) analyze.suggest_cache(threshold=2, json_output=True)
out_json = json.loads(capsys.readouterr().out.strip()) out_json = json.loads(capsys.readouterr().out.strip())
assert out_json == [{"host": "example.com", "path": "/foo", "misses": 3}] assert out_json == [{"host": "example.com", "path": "/foo", "misses": 3}]
def setup_threat_db(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE logs (
id INTEGER PRIMARY KEY,
ip TEXT,
host TEXT,
time TEXT,
request TEXT,
status INTEGER,
bytes_sent INTEGER,
referer TEXT,
user_agent TEXT,
cache_status TEXT
)
"""
)
# Previous hour traffic with no errors
for i in range(10):
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"2.2.2.2",
"example.com",
f"2024-01-01 11:{i:02d}:00",
"GET /ok HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
# Recent hour with errors
for i in range(10):
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"3.3.3.3",
"example.com",
f"2024-01-01 12:{i:02d}:00",
"GET /fail HTTP/1.1",
500,
100,
"-",
"curl",
"MISS",
),
)
# High traffic from single IP
for i in range(101):
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"1.1.1.1",
"example.net",
f"2024-01-01 12:{i % 10:02d}:30",
"GET /spam HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
# New suspicious user agent
for i in range(15):
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"4.4.4.4",
"example.org",
f"2024-01-01 12:{30 + i:02d}:45",
"GET /bot HTTP/1.1",
200,
100,
"-",
"newbot",
"MISS",
),
)
conn.commit()
conn.close()
def test_detect_threats(tmp_path, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_threat_db(db_path)
out_dir = tmp_path / "analysis"
monkeypatch.setattr(analyze, "DB_PATH", db_path)
monkeypatch.setattr(analyze, "ANALYSIS_DIR", out_dir)
analyze.detect_threats(hours=1, ip_threshold=100)
report = json.loads((out_dir / "threat_report.json").read_text())
hosts = {e["host"] for e in report.get("error_spikes", [])}
assert "example.com" in hosts
ips = {e["ip"] for e in report.get("high_ip_requests", [])}
assert "1.1.1.1" in ips
agents = {e["user_agent"] for e in report.get("suspicious_agents", [])}
assert "newbot" in agents