import sys import json import sqlite3 from pathlib import Path import pytest # noqa: F401 REPO_ROOT = Path(__file__).resolve().parents[1] sys.path.append(str(REPO_ROOT)) from scripts import analyze from scripts import generate_reports as gr def setup_db(path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(path) cur = conn.cursor() cur.execute( """ CREATE TABLE logs ( id INTEGER PRIMARY KEY, ip TEXT, host TEXT, time TEXT, request TEXT, status INTEGER, bytes_sent INTEGER, referer TEXT, user_agent TEXT, cache_status TEXT ) """ ) cur.execute( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( "127.0.0.1", "example.com", "2024-01-01 10:00:00", "GET / HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ) cur.execute( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( "127.0.0.1", "missing.com", "2024-01-01 11:00:00", "GET / HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ) conn.commit() conn.close() def test_check_missing_domains(tmp_path, monkeypatch, capsys): db_path = tmp_path / "database" / "ngxstat.db" setup_db(db_path) conf = tmp_path / "nginx.conf" conf.write_text( """ server { listen 80; server_name example.com; } """ ) monkeypatch.setattr(analyze, "DB_PATH", db_path) monkeypatch.setattr(gr, "DB_PATH", db_path) monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)]) analyze.check_missing_domains(json_output=False) out = capsys.readouterr().out.strip().splitlines() assert out == ["missing.com"] analyze.check_missing_domains(json_output=True) out_json = json.loads(capsys.readouterr().out.strip()) assert out_json == ["missing.com"] def test_suggest_cache(tmp_path, monkeypatch, capsys): db_path = tmp_path / "database" / "ngxstat.db" db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db_path) cur = conn.cursor() cur.execute( """ CREATE TABLE logs ( id INTEGER PRIMARY KEY, ip TEXT, host TEXT, time TEXT, request TEXT, status INTEGER, bytes_sent INTEGER, referer TEXT, user_agent TEXT, cache_status TEXT ) """ ) entries = [ ( "127.0.0.1", "example.com", "2024-01-01 10:00:00", "GET /foo HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ( "127.0.0.1", "example.com", "2024-01-01 10:01:00", "GET /foo HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ( "127.0.0.1", "example.com", "2024-01-01 10:02:00", "GET /foo HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ( "127.0.0.1", "cached.com", "2024-01-01 10:00:00", "GET /bar HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ( "127.0.0.1", "cached.com", "2024-01-01 10:01:00", "GET /bar HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ] cur.executemany( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", entries, ) conn.commit() conn.close() conf = tmp_path / "nginx.conf" conf.write_text( """ server { listen 80; server_name example.com; } server { listen 80; server_name cached.com; proxy_cache cache1; } """ ) monkeypatch.setattr(analyze, "DB_PATH", db_path) monkeypatch.setattr(gr, "DB_PATH", db_path) monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)]) analyze.suggest_cache(threshold=2, json_output=False) out = capsys.readouterr().out.strip().splitlines() assert out == ["example.com /foo 3"] analyze.suggest_cache(threshold=2, json_output=True) out_json = json.loads(capsys.readouterr().out.strip()) assert out_json == [{"host": "example.com", "path": "/foo", "misses": 3}] def setup_threat_db(path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(path) cur = conn.cursor() cur.execute( """ CREATE TABLE logs ( id INTEGER PRIMARY KEY, ip TEXT, host TEXT, time TEXT, request TEXT, status INTEGER, bytes_sent INTEGER, referer TEXT, user_agent TEXT, cache_status TEXT ) """ ) # Previous hour traffic with no errors for i in range(10): cur.execute( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( "2.2.2.2", "example.com", f"2024-01-01 11:{i:02d}:00", "GET /ok HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ) # Recent hour with errors for i in range(10): cur.execute( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( "3.3.3.3", "example.com", f"2024-01-01 12:{i:02d}:00", "GET /fail HTTP/1.1", 500, 100, "-", "curl", "MISS", ), ) # High traffic from single IP for i in range(101): cur.execute( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( "1.1.1.1", "example.net", f"2024-01-01 12:{i % 10:02d}:30", "GET /spam HTTP/1.1", 200, 100, "-", "curl", "MISS", ), ) # New suspicious user agent for i in range(15): cur.execute( "INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( "4.4.4.4", "example.org", f"2024-01-01 12:{30 + i:02d}:45", "GET /bot HTTP/1.1", 200, 100, "-", "newbot", "MISS", ), ) conn.commit() conn.close() def test_detect_threats(tmp_path, monkeypatch): db_path = tmp_path / "database" / "ngxstat.db" setup_threat_db(db_path) out_dir = tmp_path / "analysis" monkeypatch.setattr(analyze, "DB_PATH", db_path) monkeypatch.setattr(analyze, "ANALYSIS_DIR", out_dir) analyze.detect_threats(hours=1, ip_threshold=100) report = json.loads((out_dir / "threat_report.json").read_text()) hosts = {e["host"] for e in report.get("error_spikes", [])} assert "example.com" in hosts ips = {e["ip"] for e in report.get("high_ip_requests", [])} assert "1.1.1.1" in ips agents = {e["user_agent"] for e in report.get("suspicious_agents", [])} assert "newbot" in agents