Add threat detection analysis

This commit is contained in:
Jordan Wages 2025-07-19 02:12:24 -05:00
commit 350445b167
2 changed files with 253 additions and 0 deletions

View file

@ -19,6 +19,7 @@ from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta
import json
@ -27,6 +28,7 @@ import typer
from scripts import nginx_config # noqa: F401 # imported for side effects/usage
DB_PATH = Path("database/ngxstat.db")
ANALYSIS_DIR = Path("output/analysis")
app = typer.Typer(help="Ad-hoc statistics queries")
@ -197,5 +199,137 @@ def suggest_cache(
typer.echo(f"{host} {path} {count}")
@app.command("detect-threats")
def detect_threats(
hours: int = typer.Option(1, help="Number of recent hours to analyze"),
ip_threshold: int = typer.Option(
100, help="Requests from a single IP to flag"
),
) -> None:
"""Detect potential security threats from recent logs."""
conn = _connect()
cur = conn.cursor()
cur.execute("SELECT MAX(time) FROM logs")
row = cur.fetchone()
if not row or not row[0]:
typer.echo("No logs found")
conn.close()
return
max_dt = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
recent_end = max_dt
recent_start = recent_end - timedelta(hours=hours)
prev_start = recent_start - timedelta(hours=hours)
prev_end = recent_start
fmt = "%Y-%m-%d %H:%M:%S"
recent_start_s = recent_start.strftime(fmt)
recent_end_s = recent_end.strftime(fmt)
prev_start_s = prev_start.strftime(fmt)
prev_end_s = prev_end.strftime(fmt)
cur.execute(
"""
SELECT host,
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors,
COUNT(*) AS total
FROM logs
WHERE time >= ? AND time < ?
GROUP BY host
""",
(recent_start_s, recent_end_s),
)
recent_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()}
cur.execute(
"""
SELECT host,
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors,
COUNT(*) AS total
FROM logs
WHERE time >= ? AND time < ?
GROUP BY host
""",
(prev_start_s, prev_end_s),
)
prev_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()}
error_spikes = []
for host in set(recent_rows) | set(prev_rows):
r_err, r_total = recent_rows.get(host, (0, 0))
p_err, p_total = prev_rows.get(host, (0, 0))
r_rate = r_err * 100.0 / r_total if r_total else 0.0
p_rate = p_err * 100.0 / p_total if p_total else 0.0
if r_rate >= 10 and r_rate >= p_rate * 2:
error_spikes.append(
{
"host": host,
"recent_error_rate": round(r_rate, 2),
"previous_error_rate": round(p_rate, 2),
}
)
cur.execute(
"""
SELECT DISTINCT user_agent FROM logs
WHERE time >= ? AND time < ?
""",
(prev_start_s, prev_end_s),
)
prev_agents = {r[0] for r in cur.fetchall()}
cur.execute(
"""
SELECT user_agent, COUNT(*) AS c
FROM logs
WHERE time >= ? AND time < ?
GROUP BY user_agent
HAVING c >= 10
""",
(recent_start_s, recent_end_s),
)
suspicious_agents = [
{"user_agent": ua, "requests": cnt}
for ua, cnt in cur.fetchall()
if ua not in prev_agents
]
cur.execute(
"""
SELECT ip, COUNT(*) AS c
FROM logs
WHERE time >= ? AND time < ?
GROUP BY ip
HAVING c >= ?
ORDER BY c DESC
""",
(recent_start_s, recent_end_s, ip_threshold),
)
high_ip_requests = [
{"ip": ip, "requests": cnt} for ip, cnt in cur.fetchall()
]
conn.close()
report = {
"time_range": {
"recent_start": recent_start_s,
"recent_end": recent_end_s,
"previous_start": prev_start_s,
"previous_end": prev_end_s,
},
"error_spikes": error_spikes,
"suspicious_agents": suspicious_agents,
"high_ip_requests": high_ip_requests,
}
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)
out_path = ANALYSIS_DIR / "threat_report.json"
out_path.write_text(json.dumps(report, indent=2))
typer.echo(json.dumps(report))
if __name__ == "__main__":
app()