324 lines
8.2 KiB
Python
324 lines
8.2 KiB
Python
import sys
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.append(str(REPO_ROOT))
|
|
from scripts import analyze
|
|
from scripts import generate_reports as gr
|
|
|
|
|
|
def setup_db(path: Path) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(path)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE logs (
|
|
id INTEGER PRIMARY KEY,
|
|
ip TEXT,
|
|
host TEXT,
|
|
time TEXT,
|
|
request TEXT,
|
|
status INTEGER,
|
|
bytes_sent INTEGER,
|
|
referer TEXT,
|
|
user_agent TEXT,
|
|
cache_status TEXT
|
|
)
|
|
"""
|
|
)
|
|
cur.execute(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"127.0.0.1",
|
|
"example.com",
|
|
"2024-01-01 10:00:00",
|
|
"GET / HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
)
|
|
cur.execute(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"127.0.0.1",
|
|
"missing.com",
|
|
"2024-01-01 11:00:00",
|
|
"GET / HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def test_check_missing_domains(tmp_path, monkeypatch, capsys):
|
|
db_path = tmp_path / "database" / "ngxstat.db"
|
|
setup_db(db_path)
|
|
|
|
conf = tmp_path / "nginx.conf"
|
|
conf.write_text(
|
|
"""
|
|
server {
|
|
listen 80;
|
|
server_name example.com;
|
|
}
|
|
"""
|
|
)
|
|
|
|
monkeypatch.setattr(analyze, "DB_PATH", db_path)
|
|
monkeypatch.setattr(gr, "DB_PATH", db_path)
|
|
monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)])
|
|
|
|
analyze.check_missing_domains(json_output=False)
|
|
out = capsys.readouterr().out.strip().splitlines()
|
|
assert out == ["missing.com"]
|
|
|
|
analyze.check_missing_domains(json_output=True)
|
|
out_json = json.loads(capsys.readouterr().out.strip())
|
|
assert out_json == ["missing.com"]
|
|
|
|
|
|
def test_suggest_cache(tmp_path, monkeypatch, capsys):
|
|
db_path = tmp_path / "database" / "ngxstat.db"
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(db_path)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE logs (
|
|
id INTEGER PRIMARY KEY,
|
|
ip TEXT,
|
|
host TEXT,
|
|
time TEXT,
|
|
request TEXT,
|
|
status INTEGER,
|
|
bytes_sent INTEGER,
|
|
referer TEXT,
|
|
user_agent TEXT,
|
|
cache_status TEXT
|
|
)
|
|
"""
|
|
)
|
|
entries = [
|
|
(
|
|
"127.0.0.1",
|
|
"example.com",
|
|
"2024-01-01 10:00:00",
|
|
"GET /foo HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
(
|
|
"127.0.0.1",
|
|
"example.com",
|
|
"2024-01-01 10:01:00",
|
|
"GET /foo HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
(
|
|
"127.0.0.1",
|
|
"example.com",
|
|
"2024-01-01 10:02:00",
|
|
"GET /foo HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
(
|
|
"127.0.0.1",
|
|
"cached.com",
|
|
"2024-01-01 10:00:00",
|
|
"GET /bar HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
(
|
|
"127.0.0.1",
|
|
"cached.com",
|
|
"2024-01-01 10:01:00",
|
|
"GET /bar HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
]
|
|
cur.executemany(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
|
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
entries,
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
conf = tmp_path / "nginx.conf"
|
|
conf.write_text(
|
|
"""
|
|
server {
|
|
listen 80;
|
|
server_name example.com;
|
|
}
|
|
|
|
server {
|
|
listen 80;
|
|
server_name cached.com;
|
|
proxy_cache cache1;
|
|
}
|
|
"""
|
|
)
|
|
|
|
monkeypatch.setattr(analyze, "DB_PATH", db_path)
|
|
monkeypatch.setattr(gr, "DB_PATH", db_path)
|
|
monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)])
|
|
|
|
analyze.suggest_cache(threshold=2, json_output=False)
|
|
out = capsys.readouterr().out.strip().splitlines()
|
|
assert out == ["example.com /foo 3"]
|
|
|
|
analyze.suggest_cache(threshold=2, json_output=True)
|
|
out_json = json.loads(capsys.readouterr().out.strip())
|
|
assert out_json == [{"host": "example.com", "path": "/foo", "misses": 3}]
|
|
|
|
|
|
def setup_threat_db(path: Path) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(path)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE logs (
|
|
id INTEGER PRIMARY KEY,
|
|
ip TEXT,
|
|
host TEXT,
|
|
time TEXT,
|
|
request TEXT,
|
|
status INTEGER,
|
|
bytes_sent INTEGER,
|
|
referer TEXT,
|
|
user_agent TEXT,
|
|
cache_status TEXT
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Previous hour traffic with no errors
|
|
for i in range(10):
|
|
cur.execute(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
|
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"2.2.2.2",
|
|
"example.com",
|
|
f"2024-01-01 11:{i:02d}:00",
|
|
"GET /ok HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
)
|
|
|
|
# Recent hour with errors
|
|
for i in range(10):
|
|
cur.execute(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
|
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"3.3.3.3",
|
|
"example.com",
|
|
f"2024-01-01 12:{i:02d}:00",
|
|
"GET /fail HTTP/1.1",
|
|
500,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
)
|
|
|
|
# High traffic from single IP
|
|
for i in range(101):
|
|
cur.execute(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
|
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"1.1.1.1",
|
|
"example.net",
|
|
f"2024-01-01 12:{i % 10:02d}:30",
|
|
"GET /spam HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"curl",
|
|
"MISS",
|
|
),
|
|
)
|
|
|
|
# New suspicious user agent
|
|
for i in range(15):
|
|
cur.execute(
|
|
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status)"
|
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
"4.4.4.4",
|
|
"example.org",
|
|
f"2024-01-01 12:{30 + i:02d}:45",
|
|
"GET /bot HTTP/1.1",
|
|
200,
|
|
100,
|
|
"-",
|
|
"newbot",
|
|
"MISS",
|
|
),
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def test_detect_threats(tmp_path, monkeypatch):
|
|
db_path = tmp_path / "database" / "ngxstat.db"
|
|
setup_threat_db(db_path)
|
|
|
|
out_dir = tmp_path / "analysis"
|
|
monkeypatch.setattr(analyze, "DB_PATH", db_path)
|
|
monkeypatch.setattr(analyze, "ANALYSIS_DIR", out_dir)
|
|
|
|
analyze.detect_threats(hours=1, ip_threshold=100)
|
|
|
|
report = json.loads((out_dir / "threat_report.json").read_text())
|
|
|
|
hosts = {e["host"] for e in report.get("error_spikes", [])}
|
|
assert "example.com" in hosts
|
|
|
|
ips = {e["ip"] for e in report.get("high_ip_requests", [])}
|
|
assert "1.1.1.1" in ips
|
|
|
|
agents = {e["user_agent"] for e in report.get("suspicious_agents", [])}
|
|
assert "newbot" in agents
|