ngxstat/scripts/generate_reports.py
ngxstat-bot fab91d2e04 Phase 1 UX + JS transforms: tabs, windowing, percent/grouping, smoothing, stacked series, metadata pass-through, top_n
- Replace tabs with Recent/Trends/Distribution/Tables/Analysis and add sticky controls (interval, domain, window [default 7d], percent, group small, exclude '-' -> Uncached, smoothing toggle).

- Client-side transforms: time-window slicing, percent mode, group others (3%), per-report exclusions; stackedBar multi-series; moving average for error_rate.

- Generator: pass through optional UX metadata (windows_supported, window_default, group_others_threshold, exclude_values, top_n, stacked, palette) and enforce top_n LIMIT for table reports.

- Reports: add status_classes_timeseries and cache_status_timeseries; apply top_n=50 to heavy tables.

- Chart manager: add helpers (sliceWindow, excludeValues, toPercent, groupOthers, movingAverage).

- URL state + localStorage for context; per-tab filtering for Trends/Distribution/Tables.
2025-08-18 23:01:00 -05:00

424 lines
13 KiB
Python

import json
import sqlite3
from pathlib import Path
import shutil
from typing import List, Dict, Optional
from datetime import datetime
import time
import yaml
import typer
from jinja2 import Environment, FileSystemLoader
DB_PATH = Path("database/ngxstat.db")
OUTPUT_DIR = Path("output")
TEMPLATE_DIR = Path("templates")
REPORT_CONFIG = Path("reports.yml")
GENERATED_MARKER = OUTPUT_DIR / "generated.txt"
# Mapping of interval names to SQLite strftime formats. These strings are
# substituted into report queries whenever the special ``{bucket}`` token is
# present so that a single report definition can be reused for multiple
# intervals.
INTERVAL_FORMATS = {
"hourly": "%Y-%m-%d %H:00:00",
"daily": "%Y-%m-%d",
"weekly": "%Y-%W",
"monthly": "%Y-%m",
}
app = typer.Typer(help="Generate aggregated log reports")
@app.callback()
def _cli_callback(ctx: typer.Context) -> None:
"""Register post-command hook to note generation time."""
def _write_marker() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
GENERATED_MARKER.write_text(f"{timestamp}\n")
ctx.call_on_close(_write_marker)
def _get_domains() -> List[str]:
"""Return a sorted list of unique domains from the logs table."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
domains = [row[0] for row in cur.fetchall()]
conn.close()
return domains
def _load_config() -> List[Dict]:
if not REPORT_CONFIG.exists():
typer.echo(f"Config file not found: {REPORT_CONFIG}")
raise typer.Exit(1)
with REPORT_CONFIG.open("r") as fh:
data = yaml.safe_load(fh) or []
if not isinstance(data, list):
typer.echo("reports.yml must contain a list of report definitions")
raise typer.Exit(1)
return data
def _save_json(path: Path, data: List[Dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
def _copy_icons() -> None:
"""Copy vendored icons and scripts to the output directory."""
src_dir = Path("static/icons")
dst_dir = OUTPUT_DIR / "icons"
if src_dir.is_dir():
dst_dir.mkdir(parents=True, exist_ok=True)
for icon in src_dir.glob("*.svg"):
shutil.copy(icon, dst_dir / icon.name)
js_src = Path("static/chartManager.js")
if js_src.is_file():
shutil.copy(js_src, OUTPUT_DIR / js_src.name)
def _render_snippet(report: Dict, out_dir: Path) -> None:
"""Render a single report snippet to ``<name>.html`` inside ``out_dir``."""
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("report_snippet.html")
snippet_path = out_dir / f"{report['name']}.html"
snippet_path.write_text(template.render(report=report))
def _write_stats(
generated_at: Optional[str] = None, generation_seconds: Optional[float] = None
) -> None:
"""Query basic dataset stats and write them to ``output/global/stats.json``."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM logs")
total_logs = cur.fetchone()[0] or 0
cur.execute("SELECT MIN(time), MAX(time) FROM logs")
row = cur.fetchone() or (None, None)
start_date = row[0] or ""
end_date = row[1] or ""
cur.execute("SELECT COUNT(DISTINCT host) FROM logs")
unique_domains = cur.fetchone()[0] or 0
conn.close()
stats = {
"total_logs": total_logs,
"start_date": start_date,
"end_date": end_date,
"unique_domains": unique_domains,
}
if generated_at:
stats["generated_at"] = generated_at
if generation_seconds is not None:
stats["generation_seconds"] = generation_seconds
out_path = OUTPUT_DIR / "global" / "stats.json"
_save_json(out_path, stats)
def _bucket_expr(interval: str) -> str:
"""Return the SQLite strftime expression for the given interval."""
fmt = INTERVAL_FORMATS.get(interval)
if not fmt:
typer.echo(f"Unsupported interval: {interval}")
raise typer.Exit(1)
return f"strftime('{fmt}', datetime(time))"
def _generate_interval(interval: str, domain: Optional[str] = None) -> None:
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
_copy_icons()
bucket = _bucket_expr(interval)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Create a temporary view so queries can easily be filtered by domain
cur.execute("DROP VIEW IF EXISTS logs_view")
if domain:
# Parameters are not allowed in CREATE VIEW statements, so we must
# safely interpolate the domain value ourselves. Escape any single
# quotes to prevent malformed queries.
safe_domain = domain.replace("'", "''")
cur.execute(
f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'"
)
out_dir = OUTPUT_DIR / "domains" / domain / interval
else:
cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs")
out_dir = OUTPUT_DIR / interval
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" not in definition["query"] or definition.get("global"):
# Global reports are generated separately
continue
if domain and not definition.get("per_domain", True):
# Skip reports marked as not applicable to per-domain runs
continue
name = definition["name"]
query = definition["query"].replace("{bucket}", bucket)
query = query.replace("FROM logs", "FROM logs_view")
# Apply top_n limit for tables (performance-friendly), if configured
top_n = definition.get("top_n")
chart_type = definition.get("chart", "line")
if top_n and chart_type == "table":
try:
n = int(top_n)
if "LIMIT" not in query.upper():
query = f"{query}\nLIMIT {n}"
except Exception:
pass
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "icon" in definition:
entry["icon"] = definition["icon"]
if "bucket" in definition:
entry["bucket"] = definition["bucket"]
if "buckets" in definition:
entry["buckets"] = definition["buckets"]
if "bucket_label" in definition:
entry["bucket_label"] = definition["bucket_label"]
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
# Optional UX metadata passthrough for frontend-only transforms
for key in (
"windows_supported",
"window_default",
"group_others_threshold",
"exclude_values",
"top_n",
"stacked",
"palette",
):
if key in definition:
entry[key] = definition[key]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
if domain:
typer.echo(f"Generated {interval} reports for {domain}")
else:
typer.echo(f"Generated {interval} reports")
def _generate_all_domains(interval: str) -> None:
"""Generate reports for each unique domain."""
for domain in _get_domains():
_generate_interval(interval, domain)
def _generate_root_index() -> None:
"""Render the top-level index listing all intervals and domains."""
_copy_icons()
intervals = sorted(
[name for name in INTERVAL_FORMATS if (OUTPUT_DIR / name).is_dir()]
)
domains_dir = OUTPUT_DIR / "domains"
domains: List[str] = []
if domains_dir.is_dir():
domains = [p.name for p in domains_dir.iterdir() if p.is_dir()]
domains.sort()
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("index.html")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
out_path = OUTPUT_DIR / "index.html"
out_path.write_text(template.render(intervals=intervals, domains=domains))
typer.echo(f"Generated root index at {out_path}")
def _generate_global() -> None:
"""Generate reports that do not depend on an interval."""
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
start_time = time.time()
generated_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
_copy_icons()
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
out_dir = OUTPUT_DIR / "global"
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" in definition["query"] and not definition.get("global"):
continue
name = definition["name"]
query = definition["query"]
# Apply top_n limit for tables (performance-friendly), if configured
top_n = definition.get("top_n")
chart_type = definition.get("chart", "line")
if top_n and chart_type == "table":
try:
n = int(top_n)
if "LIMIT" not in query.upper():
query = f"{query}\nLIMIT {n}"
except Exception:
pass
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "icon" in definition:
entry["icon"] = definition["icon"]
if "bucket" in definition:
entry["bucket"] = definition["bucket"]
if "buckets" in definition:
entry["buckets"] = definition["buckets"]
if "bucket_label" in definition:
entry["bucket_label"] = definition["bucket_label"]
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
# Optional UX metadata passthrough for frontend-only transforms
for key in (
"windows_supported",
"window_default",
"group_others_threshold",
"exclude_values",
"top_n",
"stacked",
"palette",
):
if key in definition:
entry[key] = definition[key]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
elapsed = round(time.time() - start_time, 2)
_write_stats(generated_at, elapsed)
typer.echo("Generated global reports")
@app.command()
def hourly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate hourly reports."""
if all_domains:
_generate_all_domains("hourly")
else:
_generate_interval("hourly", domain)
@app.command()
def daily(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate daily reports."""
if all_domains:
_generate_all_domains("daily")
else:
_generate_interval("daily", domain)
@app.command()
def weekly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate weekly reports."""
if all_domains:
_generate_all_domains("weekly")
else:
_generate_interval("weekly", domain)
@app.command()
def monthly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate monthly reports."""
if all_domains:
_generate_all_domains("monthly")
else:
_generate_interval("monthly", domain)
@app.command("global")
def global_reports() -> None:
"""Generate global reports."""
_generate_global()
@app.command()
def index() -> None:
"""Generate the root index page linking all reports."""
_generate_root_index()
if __name__ == "__main__":
app()