ngxstat/scripts/generate_reports.py

380 lines
12 KiB
Python

import json
import sqlite3
from pathlib import Path
import shutil
from typing import List, Dict, Optional
from datetime import datetime
import time
import yaml
import typer
from jinja2 import Environment, FileSystemLoader
DB_PATH = Path("database/ngxstat.db")
OUTPUT_DIR = Path("output")
TEMPLATE_DIR = Path("templates")
REPORT_CONFIG = Path("reports.yml")
GENERATED_MARKER = OUTPUT_DIR / "generated.txt"
# Mapping of interval names to SQLite strftime formats. These strings are
# substituted into report queries whenever the special ``{bucket}`` token is
# present so that a single report definition can be reused for multiple
# intervals.
INTERVAL_FORMATS = {
"hourly": "%Y-%m-%d %H:00:00",
"daily": "%Y-%m-%d",
"weekly": "%Y-%W",
"monthly": "%Y-%m",
}
app = typer.Typer(help="Generate aggregated log reports")
@app.callback()
def _cli_callback(ctx: typer.Context) -> None:
"""Register post-command hook to note generation time."""
def _write_marker() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
GENERATED_MARKER.write_text(f"{timestamp}\n")
ctx.call_on_close(_write_marker)
def _get_domains() -> List[str]:
"""Return a sorted list of unique domains from the logs table."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
domains = [row[0] for row in cur.fetchall()]
conn.close()
return domains
def _load_config() -> List[Dict]:
if not REPORT_CONFIG.exists():
typer.echo(f"Config file not found: {REPORT_CONFIG}")
raise typer.Exit(1)
with REPORT_CONFIG.open("r") as fh:
data = yaml.safe_load(fh) or []
if not isinstance(data, list):
typer.echo("reports.yml must contain a list of report definitions")
raise typer.Exit(1)
return data
def _save_json(path: Path, data: List[Dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
def _copy_icons() -> None:
"""Copy vendored icons and scripts to the output directory."""
src_dir = Path("static/icons")
dst_dir = OUTPUT_DIR / "icons"
if src_dir.is_dir():
dst_dir.mkdir(parents=True, exist_ok=True)
for icon in src_dir.glob("*.svg"):
shutil.copy(icon, dst_dir / icon.name)
js_src = Path("static/chartManager.js")
if js_src.is_file():
shutil.copy(js_src, OUTPUT_DIR / js_src.name)
def _render_snippet(report: Dict, out_dir: Path) -> None:
"""Render a single report snippet to ``<name>.html`` inside ``out_dir``."""
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("report_snippet.html")
snippet_path = out_dir / f"{report['name']}.html"
snippet_path.write_text(template.render(report=report))
def _write_stats(
generated_at: Optional[str] = None, generation_seconds: Optional[float] = None
) -> None:
"""Query basic dataset stats and write them to ``output/global/stats.json``."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM logs")
total_logs = cur.fetchone()[0] or 0
cur.execute("SELECT MIN(time), MAX(time) FROM logs")
row = cur.fetchone() or (None, None)
start_date = row[0] or ""
end_date = row[1] or ""
cur.execute("SELECT COUNT(DISTINCT host) FROM logs")
unique_domains = cur.fetchone()[0] or 0
conn.close()
stats = {
"total_logs": total_logs,
"start_date": start_date,
"end_date": end_date,
"unique_domains": unique_domains,
}
if generated_at:
stats["generated_at"] = generated_at
if generation_seconds is not None:
stats["generation_seconds"] = generation_seconds
out_path = OUTPUT_DIR / "global" / "stats.json"
_save_json(out_path, stats)
def _bucket_expr(interval: str) -> str:
"""Return the SQLite strftime expression for the given interval."""
fmt = INTERVAL_FORMATS.get(interval)
if not fmt:
typer.echo(f"Unsupported interval: {interval}")
raise typer.Exit(1)
return f"strftime('{fmt}', datetime(time))"
def _generate_interval(interval: str, domain: Optional[str] = None) -> None:
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
_copy_icons()
bucket = _bucket_expr(interval)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Create a temporary view so queries can easily be filtered by domain
cur.execute("DROP VIEW IF EXISTS logs_view")
if domain:
# Parameters are not allowed in CREATE VIEW statements, so we must
# safely interpolate the domain value ourselves. Escape any single
# quotes to prevent malformed queries.
safe_domain = domain.replace("'", "''")
cur.execute(
f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'"
)
out_dir = OUTPUT_DIR / "domains" / domain / interval
else:
cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs")
out_dir = OUTPUT_DIR / interval
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" not in definition["query"] or definition.get("global"):
# Global reports are generated separately
continue
if domain and not definition.get("per_domain", True):
# Skip reports marked as not applicable to per-domain runs
continue
name = definition["name"]
query = definition["query"].replace("{bucket}", bucket)
query = query.replace("FROM logs", "FROM logs_view")
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "icon" in definition:
entry["icon"] = definition["icon"]
if "bucket" in definition:
entry["bucket"] = definition["bucket"]
if "buckets" in definition:
entry["buckets"] = definition["buckets"]
if "bucket_label" in definition:
entry["bucket_label"] = definition["bucket_label"]
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
if domain:
typer.echo(f"Generated {interval} reports for {domain}")
else:
typer.echo(f"Generated {interval} reports")
def _generate_all_domains(interval: str) -> None:
"""Generate reports for each unique domain."""
for domain in _get_domains():
_generate_interval(interval, domain)
def _generate_root_index() -> None:
"""Render the top-level index listing all intervals and domains."""
_copy_icons()
intervals = sorted(
[name for name in INTERVAL_FORMATS if (OUTPUT_DIR / name).is_dir()]
)
domains_dir = OUTPUT_DIR / "domains"
domains: List[str] = []
if domains_dir.is_dir():
domains = [p.name for p in domains_dir.iterdir() if p.is_dir()]
domains.sort()
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("index.html")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
out_path = OUTPUT_DIR / "index.html"
out_path.write_text(template.render(intervals=intervals, domains=domains))
typer.echo(f"Generated root index at {out_path}")
def _generate_global() -> None:
"""Generate reports that do not depend on an interval."""
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
start_time = time.time()
generated_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
_copy_icons()
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
out_dir = OUTPUT_DIR / "global"
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" in definition["query"] and not definition.get("global"):
continue
name = definition["name"]
query = definition["query"]
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "icon" in definition:
entry["icon"] = definition["icon"]
if "bucket" in definition:
entry["bucket"] = definition["bucket"]
if "buckets" in definition:
entry["buckets"] = definition["buckets"]
if "bucket_label" in definition:
entry["bucket_label"] = definition["bucket_label"]
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
elapsed = round(time.time() - start_time, 2)
_write_stats(generated_at, elapsed)
typer.echo("Generated global reports")
@app.command()
def hourly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate hourly reports."""
if all_domains:
_generate_all_domains("hourly")
else:
_generate_interval("hourly", domain)
@app.command()
def daily(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate daily reports."""
if all_domains:
_generate_all_domains("daily")
else:
_generate_interval("daily", domain)
@app.command()
def weekly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate weekly reports."""
if all_domains:
_generate_all_domains("weekly")
else:
_generate_interval("weekly", domain)
@app.command()
def monthly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate monthly reports."""
if all_domains:
_generate_all_domains("monthly")
else:
_generate_interval("monthly", domain)
@app.command("global")
def global_reports() -> None:
"""Generate global reports."""
_generate_global()
@app.command()
def index() -> None:
"""Generate the root index page linking all reports."""
_generate_root_index()
if __name__ == "__main__":
app()