diff --git a/scripts/analyze.py b/scripts/analyze.py index ded224d..219ceeb 100644 --- a/scripts/analyze.py +++ b/scripts/analyze.py @@ -19,6 +19,7 @@ from __future__ import annotations import sqlite3 from pathlib import Path from typing import Dict, List, Optional, Set +from datetime import datetime, timedelta import json @@ -27,6 +28,7 @@ import typer from scripts import nginx_config # noqa: F401 # imported for side effects/usage DB_PATH = Path("database/ngxstat.db") +ANALYSIS_DIR = Path("output/analysis") app = typer.Typer(help="Ad-hoc statistics queries") @@ -197,5 +199,137 @@ def suggest_cache( typer.echo(f"{host} {path} {count}") +@app.command("detect-threats") +def detect_threats( + hours: int = typer.Option(1, help="Number of recent hours to analyze"), + ip_threshold: int = typer.Option( + 100, help="Requests from a single IP to flag" + ), +) -> None: + """Detect potential security threats from recent logs.""" + + conn = _connect() + cur = conn.cursor() + + cur.execute("SELECT MAX(time) FROM logs") + row = cur.fetchone() + if not row or not row[0]: + typer.echo("No logs found") + conn.close() + return + + max_dt = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") + recent_end = max_dt + recent_start = recent_end - timedelta(hours=hours) + prev_start = recent_start - timedelta(hours=hours) + prev_end = recent_start + + fmt = "%Y-%m-%d %H:%M:%S" + recent_start_s = recent_start.strftime(fmt) + recent_end_s = recent_end.strftime(fmt) + prev_start_s = prev_start.strftime(fmt) + prev_end_s = prev_end.strftime(fmt) + + cur.execute( + """ + SELECT host, + SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors, + COUNT(*) AS total + FROM logs + WHERE time >= ? AND time < ? + GROUP BY host + """, + (recent_start_s, recent_end_s), + ) + recent_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()} + + cur.execute( + """ + SELECT host, + SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors, + COUNT(*) AS total + FROM logs + WHERE time >= ? AND time < ? + GROUP BY host + """, + (prev_start_s, prev_end_s), + ) + prev_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()} + + error_spikes = [] + for host in set(recent_rows) | set(prev_rows): + r_err, r_total = recent_rows.get(host, (0, 0)) + p_err, p_total = prev_rows.get(host, (0, 0)) + r_rate = r_err * 100.0 / r_total if r_total else 0.0 + p_rate = p_err * 100.0 / p_total if p_total else 0.0 + if r_rate >= 10 and r_rate >= p_rate * 2: + error_spikes.append( + { + "host": host, + "recent_error_rate": round(r_rate, 2), + "previous_error_rate": round(p_rate, 2), + } + ) + + cur.execute( + """ + SELECT DISTINCT user_agent FROM logs + WHERE time >= ? AND time < ? + """, + (prev_start_s, prev_end_s), + ) + prev_agents = {r[0] for r in cur.fetchall()} + + cur.execute( + """ + SELECT user_agent, COUNT(*) AS c + FROM logs + WHERE time >= ? AND time < ? + GROUP BY user_agent + HAVING c >= 10 + """, + (recent_start_s, recent_end_s), + ) + suspicious_agents = [ + {"user_agent": ua, "requests": cnt} + for ua, cnt in cur.fetchall() + if ua not in prev_agents + ] + + cur.execute( + """ + SELECT ip, COUNT(*) AS c + FROM logs + WHERE time >= ? AND time < ? + GROUP BY ip + HAVING c >= ? + ORDER BY c DESC + """, + (recent_start_s, recent_end_s, ip_threshold), + ) + high_ip_requests = [ + {"ip": ip, "requests": cnt} for ip, cnt in cur.fetchall() + ] + + conn.close() + + report = { + "time_range": { + "recent_start": recent_start_s, + "recent_end": recent_end_s, + "previous_start": prev_start_s, + "previous_end": prev_end_s, + }, + "error_spikes": error_spikes, + "suspicious_agents": suspicious_agents, + "high_ip_requests": high_ip_requests, + } + + ANALYSIS_DIR.mkdir(parents=True, exist_ok=True) + out_path = ANALYSIS_DIR / "threat_report.json" + out_path.write_text(json.dumps(report, indent=2)) + typer.echo(json.dumps(report)) + + if __name__ == "__main__": app() diff --git a/tests/test_analyze.py b/tests/test_analyze.py index 40bdc40..a4358d7 100644 --- a/tests/test_analyze.py +++ b/tests/test_analyze.py @@ -203,3 +203,122 @@ server { analyze.suggest_cache(threshold=2, json_output=True) out_json = json.loads(capsys.readouterr().out.strip()) assert out_json == [{"host": "example.com", "path": "/foo", "misses": 3}] + + +def setup_threat_db(path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(path) + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE logs ( + id INTEGER PRIMARY KEY, + ip TEXT, + host TEXT, + time TEXT, + request TEXT, + status INTEGER, + bytes_sent INTEGER, + referer TEXT, + user_agent TEXT, + cache_status TEXT + ) + """ + ) + + # Previous hour traffic with no errors + for i in range(10): + cur.execute( + "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "2.2.2.2", + "example.com", + f"2024-01-01 11:{i:02d}:00", + "GET /ok HTTP/1.1", + 200, + 100, + "-", + "curl", + "MISS", + ), + ) + + # Recent hour with errors + for i in range(10): + cur.execute( + "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "3.3.3.3", + "example.com", + f"2024-01-01 12:{i:02d}:00", + "GET /fail HTTP/1.1", + 500, + 100, + "-", + "curl", + "MISS", + ), + ) + + # High traffic from single IP + for i in range(101): + cur.execute( + "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "1.1.1.1", + "example.net", + f"2024-01-01 12:{i % 10:02d}:30", + "GET /spam HTTP/1.1", + 200, + 100, + "-", + "curl", + "MISS", + ), + ) + + # New suspicious user agent + for i in range(15): + cur.execute( + "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "4.4.4.4", + "example.org", + f"2024-01-01 12:{30 + i:02d}:45", + "GET /bot HTTP/1.1", + 200, + 100, + "-", + "newbot", + "MISS", + ), + ) + + conn.commit() + conn.close() + + +def test_detect_threats(tmp_path, monkeypatch): + db_path = tmp_path / "database" / "ngxstat.db" + setup_threat_db(db_path) + + out_dir = tmp_path / "analysis" + monkeypatch.setattr(analyze, "DB_PATH", db_path) + monkeypatch.setattr(analyze, "ANALYSIS_DIR", out_dir) + + analyze.detect_threats(hours=1, ip_threshold=100) + + report = json.loads((out_dir / "threat_report.json").read_text()) + + hosts = {e["host"] for e in report.get("error_spikes", [])} + assert "example.com" in hosts + + ips = {e["ip"] for e in report.get("high_ip_requests", [])} + assert "1.1.1.1" in ips + + agents = {e["user_agent"] for e in report.get("suspicious_agents", [])} + assert "newbot" in agents