ngxstat/scripts/generate_reports.py
2025-07-19 03:30:08 -05:00

319 lines
9.5 KiB
Python

import json
import sqlite3
from pathlib import Path
from typing import List, Dict, Optional
import yaml
import typer
from jinja2 import Environment, FileSystemLoader
DB_PATH = Path("database/ngxstat.db")
OUTPUT_DIR = Path("output")
TEMPLATE_DIR = Path("templates")
REPORT_CONFIG = Path("reports.yml")
# Mapping of interval names to SQLite strftime formats. These strings are
# substituted into report queries whenever the special ``{bucket}`` token is
# present so that a single report definition can be reused for multiple
# intervals.
INTERVAL_FORMATS = {
"hourly": "%Y-%m-%d %H:00:00",
"daily": "%Y-%m-%d",
"weekly": "%Y-%W",
"monthly": "%Y-%m",
}
app = typer.Typer(help="Generate aggregated log reports")
def _get_domains() -> List[str]:
"""Return a sorted list of unique domains from the logs table."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
domains = [row[0] for row in cur.fetchall()]
conn.close()
return domains
def _load_config() -> List[Dict]:
if not REPORT_CONFIG.exists():
typer.echo(f"Config file not found: {REPORT_CONFIG}")
raise typer.Exit(1)
with REPORT_CONFIG.open("r") as fh:
data = yaml.safe_load(fh) or []
if not isinstance(data, list):
typer.echo("reports.yml must contain a list of report definitions")
raise typer.Exit(1)
return data
def _save_json(path: Path, data: List[Dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
def _render_snippet(report: Dict, out_dir: Path) -> None:
"""Render a single report snippet to ``<name>.html`` inside ``out_dir``."""
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("report_snippet.html")
snippet_path = out_dir / f"{report['name']}.html"
snippet_path.write_text(template.render(report=report))
def _write_stats() -> None:
"""Query basic dataset stats and write them to ``output/global/stats.json``."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM logs")
total_logs = cur.fetchone()[0] or 0
cur.execute("SELECT MIN(time), MAX(time) FROM logs")
row = cur.fetchone() or (None, None)
start_date = row[0] or ""
end_date = row[1] or ""
cur.execute("SELECT COUNT(DISTINCT host) FROM logs")
unique_domains = cur.fetchone()[0] or 0
conn.close()
stats = {
"total_logs": total_logs,
"start_date": start_date,
"end_date": end_date,
"unique_domains": unique_domains,
}
out_path = OUTPUT_DIR / "global" / "stats.json"
_save_json(out_path, stats)
def _bucket_expr(interval: str) -> str:
"""Return the SQLite strftime expression for the given interval."""
fmt = INTERVAL_FORMATS.get(interval)
if not fmt:
typer.echo(f"Unsupported interval: {interval}")
raise typer.Exit(1)
return f"strftime('{fmt}', datetime(time))"
def _generate_interval(interval: str, domain: Optional[str] = None) -> None:
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
bucket = _bucket_expr(interval)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Create a temporary view so queries can easily be filtered by domain
cur.execute("DROP VIEW IF EXISTS logs_view")
if domain:
# Parameters are not allowed in CREATE VIEW statements, so we must
# safely interpolate the domain value ourselves. Escape any single
# quotes to prevent malformed queries.
safe_domain = domain.replace("'", "''")
cur.execute(
f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'"
)
out_dir = OUTPUT_DIR / "domains" / domain / interval
else:
cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs")
out_dir = OUTPUT_DIR / interval
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" not in definition["query"] or definition.get("global"):
# Global reports are generated separately
continue
if domain and not definition.get("per_domain", True):
# Skip reports marked as not applicable to per-domain runs
continue
name = definition["name"]
query = definition["query"].replace("{bucket}", bucket)
query = query.replace("FROM logs", "FROM logs_view")
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
typer.echo(f"Generated {interval} reports")
def _generate_all_domains(interval: str) -> None:
"""Generate reports for each unique domain."""
for domain in _get_domains():
_generate_interval(interval, domain)
def _generate_root_index() -> None:
"""Render the top-level index listing all intervals and domains."""
intervals = [
p.name
for p in OUTPUT_DIR.iterdir()
if p.is_dir() and p.name.lower() not in {"domains", "global", "analysis"}
]
intervals.sort()
domains_dir = OUTPUT_DIR / "domains"
domains: List[str] = []
if domains_dir.is_dir():
domains = [p.name for p in domains_dir.iterdir() if p.is_dir()]
domains.sort()
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("index.html")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
out_path = OUTPUT_DIR / "index.html"
out_path.write_text(template.render(intervals=intervals, domains=domains))
typer.echo(f"Generated root index at {out_path}")
def _generate_global() -> None:
"""Generate reports that do not depend on an interval."""
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
out_dir = OUTPUT_DIR / "global"
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" in definition["query"] and not definition.get("global"):
continue
name = definition["name"]
query = definition["query"]
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
_write_stats()
typer.echo("Generated global reports")
@app.command()
def hourly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate hourly reports."""
if all_domains:
_generate_all_domains("hourly")
else:
_generate_interval("hourly", domain)
@app.command()
def daily(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate daily reports."""
if all_domains:
_generate_all_domains("daily")
else:
_generate_interval("daily", domain)
@app.command()
def weekly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate weekly reports."""
if all_domains:
_generate_all_domains("weekly")
else:
_generate_interval("weekly", domain)
@app.command()
def monthly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate monthly reports."""
if all_domains:
_generate_all_domains("monthly")
else:
_generate_interval("monthly", domain)
@app.command("global")
def global_reports() -> None:
"""Generate global reports."""
_generate_global()
@app.command()
def index() -> None:
"""Generate the root index page linking all reports."""
_generate_root_index()
if __name__ == "__main__":
app()