ngxstat/tests/test_reports.py

327 lines
9.8 KiB
Python

import sqlite3
from pathlib import Path
import json
import sys
from datetime import datetime
import pytest
from typer.testing import CliRunner
REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(REPO_ROOT))
from scripts import generate_reports as gr
def setup_db(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE logs (
id INTEGER PRIMARY KEY,
ip TEXT,
host TEXT,
time TEXT,
request TEXT,
status INTEGER,
bytes_sent INTEGER,
referer TEXT,
user_agent TEXT,
cache_status TEXT
)
"""
)
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"example.com",
"2024-01-01 10:00:00",
"GET / HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"example.com",
"2024-01-01 10:05:00",
"GET /err HTTP/1.1",
500,
100,
"-",
"curl",
"MISS",
),
)
conn.commit()
conn.close()
@pytest.fixture()
def sample_reports(tmp_path):
cfg = tmp_path / "reports.yml"
cfg.write_text(
"""
- name: hits
query: |
SELECT {bucket} AS bucket, COUNT(*) AS value
FROM logs
GROUP BY bucket
ORDER BY bucket
- name: error_rate
query: |
SELECT {bucket} AS bucket,
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS value
FROM logs
GROUP BY bucket
ORDER BY bucket
- name: domain_traffic
per_domain: false
query: |
SELECT host AS bucket,
COUNT(*) AS value
FROM logs
GROUP BY host
ORDER BY value DESC
- name: skip_report
per_domain: false
query: |
SELECT {bucket} AS bucket, COUNT(*) AS value
FROM logs
GROUP BY bucket
ORDER BY bucket
- name: domain_totals
global: true
query: |
SELECT host AS bucket,
COUNT(*) AS value
FROM logs
GROUP BY host
ORDER BY value DESC
"""
)
return cfg
def test_generate_interval(tmp_path, sample_reports, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(gr, "OUTPUT_DIR", tmp_path / "output")
monkeypatch.setattr(gr, "REPORT_CONFIG", sample_reports)
monkeypatch.setattr(
gr, "TEMPLATE_DIR", Path(__file__).resolve().parents[1] / "templates"
)
gr._generate_interval("hourly")
hits = json.loads((tmp_path / "output" / "hourly" / "hits.json").read_text())
assert hits[0]["value"] == 2
error_rate = json.loads(
(tmp_path / "output" / "hourly" / "error_rate.json").read_text()
)
assert error_rate[0]["value"] == pytest.approx(50.0)
reports = json.loads((tmp_path / "output" / "hourly" / "reports.json").read_text())
assert {r["name"] for r in reports} == {"hits", "error_rate", "skip_report"}
for r in reports:
snippet = tmp_path / "output" / "hourly" / r["html"]
assert snippet.exists()
def test_generate_interval_domain_filter(tmp_path, sample_reports, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(gr, "OUTPUT_DIR", tmp_path / "output")
monkeypatch.setattr(gr, "REPORT_CONFIG", sample_reports)
monkeypatch.setattr(
gr, "TEMPLATE_DIR", Path(__file__).resolve().parents[1] / "templates"
)
gr._generate_interval("hourly", "example.com")
hits = json.loads(
(
tmp_path / "output" / "domains" / "example.com" / "hourly" / "hits.json"
).read_text()
)
assert hits[0]["value"] == 2
reports = json.loads(
(
tmp_path / "output" / "domains" / "example.com" / "hourly" / "reports.json"
).read_text()
)
assert {r["name"] for r in reports} == {"hits", "error_rate"}
assert not (
tmp_path / "output" / "domains" / "example.com" / "hourly" / "skip_report.json"
).exists()
def test_generate_root_index(tmp_path, sample_reports, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(gr, "OUTPUT_DIR", tmp_path / "output")
monkeypatch.setattr(gr, "REPORT_CONFIG", sample_reports)
monkeypatch.setattr(
gr, "TEMPLATE_DIR", Path(__file__).resolve().parents[1] / "templates"
)
gr._generate_interval("hourly")
gr._generate_interval("daily")
# create dummy domain directories
(tmp_path / "output" / "domains" / "foo.com").mkdir(parents=True)
(tmp_path / "output" / "domains" / "bar.com").mkdir(parents=True)
# add an extra directory with capitalized name to ensure it's ignored
(tmp_path / "output" / "Global").mkdir(parents=True)
# add an analysis directory to ensure it's excluded
(tmp_path / "output" / "analysis").mkdir(parents=True)
gr._generate_root_index()
index_file = tmp_path / "output" / "index.html"
assert index_file.exists()
content = index_file.read_text()
# check for interval options
assert '<option value="hourly">' in content
assert '<option value="daily">' in content
assert '<option value="global">' not in content
assert '<option value="Global">' not in content
assert '<option value="analysis">' not in content
def test_generated_marker_written(tmp_path, monkeypatch):
out_dir = tmp_path / "output"
monkeypatch.setattr(gr, "OUTPUT_DIR", out_dir)
monkeypatch.setattr(gr, "TEMPLATE_DIR", REPO_ROOT / "templates")
monkeypatch.setattr(gr, "GENERATED_MARKER", out_dir / "generated.txt")
monkeypatch.setattr(gr, "_copy_icons", lambda: None)
(out_dir / "hourly").mkdir(parents=True)
runner = CliRunner()
result = runner.invoke(gr.app, ["index"])
assert result.exit_code == 0, result.output
marker = out_dir / "generated.txt"
assert marker.exists()
content = marker.read_text().strip()
datetime.strptime(content, "%Y-%m-%d %H:%M:%S")
def test_global_reports_once(tmp_path, sample_reports, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(gr, "OUTPUT_DIR", tmp_path / "output")
monkeypatch.setattr(gr, "REPORT_CONFIG", sample_reports)
monkeypatch.setattr(
gr, "TEMPLATE_DIR", Path(__file__).resolve().parents[1] / "templates"
)
gr._generate_global()
gr._generate_interval("hourly")
global_snippet = tmp_path / "output" / "global" / "domain_totals.html"
assert global_snippet.exists()
assert not (tmp_path / "output" / "hourly" / "domain_totals.html").exists()
def test_global_stats_file(tmp_path, sample_reports, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(gr, "OUTPUT_DIR", tmp_path / "output")
monkeypatch.setattr(gr, "REPORT_CONFIG", sample_reports)
monkeypatch.setattr(
gr, "TEMPLATE_DIR", Path(__file__).resolve().parents[1] / "templates"
)
gr._generate_global()
stats_path = tmp_path / "output" / "global" / "stats.json"
assert stats_path.exists()
stats = json.loads(stats_path.read_text())
assert set(stats.keys()) == {
"total_logs",
"start_date",
"end_date",
"unique_domains",
"generated_at",
"generation_seconds",
}
assert stats["total_logs"] == 2
assert stats["start_date"] == "2024-01-01 10:00:00"
assert stats["end_date"] == "2024-01-01 10:05:00"
assert stats["unique_domains"] == 1
assert isinstance(stats["generated_at"], str)
assert stats["generation_seconds"] >= 0
def test_multi_bucket_table(tmp_path, monkeypatch):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
# add a second domain entry
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"foo.com",
"2024-01-01 10:10:00",
"GET /foo HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
conn.commit()
conn.close()
cfg = tmp_path / "reports.yml"
cfg.write_text(
"""
- name: multi
chart: table
global: true
buckets: [domain, agent]
bucket_label: [Domain, Agent]
query: |
SELECT host AS domain, user_agent AS agent, COUNT(*) AS value
FROM logs
GROUP BY host, agent
"""
)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(gr, "OUTPUT_DIR", tmp_path / "output")
monkeypatch.setattr(gr, "REPORT_CONFIG", cfg)
monkeypatch.setattr(
gr, "TEMPLATE_DIR", Path(__file__).resolve().parents[1] / "templates"
)
gr._generate_global()
gr._generate_interval("hourly")
data = json.loads((tmp_path / "output" / "global" / "multi.json").read_text())
assert {"domain", "agent", "value"} <= data[0].keys()
reports = json.loads((tmp_path / "output" / "global" / "reports.json").read_text())
entry = next(r for r in reports if r["name"] == "multi")
assert entry["buckets"] == ["domain", "agent"]
assert entry["bucket_label"] == ["Domain", "Agent"]