ngxstat/scripts/generate_reports.py

460 lines
14 KiB
Python

import json
import sqlite3
from pathlib import Path
import shutil
from typing import List, Dict, Optional
from datetime import datetime, timezone
import time
import yaml
import typer
from jinja2 import Environment, FileSystemLoader
DB_PATH = Path("database/ngxstat.db")
OUTPUT_DIR = Path("output")
TEMPLATE_DIR = Path("templates")
REPORT_CONFIG = Path("reports.yml")
GENERATED_MARKER = OUTPUT_DIR / "generated.txt"
# Mapping of interval names to SQLite strftime formats. These strings are
# substituted into report queries whenever the special ``{bucket}`` token is
# present so that a single report definition can be reused for multiple
# intervals.
INTERVAL_FORMATS = {
"hourly": "%Y-%m-%d %H:00:00",
"daily": "%Y-%m-%d",
"weekly": "%Y-%W",
"monthly": "%Y-%m",
}
app = typer.Typer(help="Generate aggregated log reports")
@app.callback()
def _cli_callback(ctx: typer.Context) -> None:
"""Register post-command hook to note generation time."""
def _write_marker() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Use timezone-aware UTC to avoid deprecation warnings and ambiguity
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
GENERATED_MARKER.write_text(f"{timestamp}\n")
ctx.call_on_close(_write_marker)
def _get_domains() -> List[str]:
"""Return a sorted list of unique domains from the logs table."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
domains = [row[0] for row in cur.fetchall()]
conn.close()
return domains
def _load_config() -> List[Dict]:
if not REPORT_CONFIG.exists():
typer.echo(f"Config file not found: {REPORT_CONFIG}")
raise typer.Exit(1)
with REPORT_CONFIG.open("r") as fh:
data = yaml.safe_load(fh) or []
if not isinstance(data, list):
typer.echo("reports.yml must contain a list of report definitions")
raise typer.Exit(1)
return data
def _save_json(path: Path, data: List[Dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
def _copy_icons() -> None:
"""Copy vendored icons and scripts to the output directory."""
src_dir = Path("static/icons")
dst_dir = OUTPUT_DIR / "icons"
if src_dir.is_dir():
dst_dir.mkdir(parents=True, exist_ok=True)
for icon in src_dir.glob("*.svg"):
shutil.copy(icon, dst_dir / icon.name)
js_src = Path("static/chartManager.js")
if js_src.is_file():
shutil.copy(js_src, OUTPUT_DIR / js_src.name)
def _render_snippet(report: Dict, out_dir: Path) -> None:
"""Render a single report snippet to ``<name>.html`` inside ``out_dir``."""
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("report_snippet.html")
snippet_path = out_dir / f"{report['name']}.html"
snippet_path.write_text(template.render(report=report))
def _write_stats(
generated_at: Optional[str] = None, generation_seconds: Optional[float] = None
) -> None:
"""Query basic dataset stats and write them to ``output/global/stats.json``."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM logs")
total_logs = cur.fetchone()[0] or 0
cur.execute("SELECT MIN(time), MAX(time) FROM logs")
row = cur.fetchone() or (None, None)
start_date = row[0] or ""
end_date = row[1] or ""
cur.execute("SELECT COUNT(DISTINCT host) FROM logs")
unique_domains = cur.fetchone()[0] or 0
conn.close()
stats = {
"total_logs": total_logs,
"start_date": start_date,
"end_date": end_date,
"unique_domains": unique_domains,
}
if generated_at:
stats["generated_at"] = generated_at
if generation_seconds is not None:
stats["generation_seconds"] = generation_seconds
out_path = OUTPUT_DIR / "global" / "stats.json"
_save_json(out_path, stats)
def _bucket_expr(interval: str) -> str:
"""Return the SQLite strftime expression for the given interval."""
fmt = INTERVAL_FORMATS.get(interval)
if not fmt:
typer.echo(f"Unsupported interval: {interval}")
raise typer.Exit(1)
return f"strftime('{fmt}', datetime(time))"
def _generate_interval(interval: str, domain: Optional[str] = None) -> None:
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
_copy_icons()
bucket = _bucket_expr(interval)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Create a temporary view so queries can easily be filtered by domain
cur.execute("DROP VIEW IF EXISTS logs_view")
if domain:
# Parameters are not allowed in CREATE VIEW statements, so we must
# safely interpolate the domain value ourselves. Escape any single
# quotes to prevent malformed queries.
safe_domain = domain.replace("'", "''")
cur.execute(
f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'"
)
out_dir = OUTPUT_DIR / "domains" / domain / interval
else:
cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs")
out_dir = OUTPUT_DIR / interval
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" not in definition["query"] or definition.get("global"):
# Global reports are generated separately
continue
if domain and not definition.get("per_domain", True):
# Skip reports marked as not applicable to per-domain runs
continue
name = definition["name"]
query = definition["query"].replace("{bucket}", bucket)
query = query.replace("FROM logs", "FROM logs_view")
# Apply top_n limit for tables (performance-friendly), if configured
top_n = definition.get("top_n")
chart_type = definition.get("chart", "line")
if top_n and chart_type == "table":
try:
n = int(top_n)
if "LIMIT" not in query.upper():
query = f"{query}\nLIMIT {n}"
except Exception:
pass
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "icon" in definition:
entry["icon"] = definition["icon"]
if "bucket" in definition:
entry["bucket"] = definition["bucket"]
if "buckets" in definition:
entry["buckets"] = definition["buckets"]
if "bucket_label" in definition:
entry["bucket_label"] = definition["bucket_label"]
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
# Optional UX metadata passthrough for frontend-only transforms
for key in (
"windows_supported",
"window_default",
"group_others_threshold",
"exclude_values",
"top_n",
"stacked",
"palette",
):
if key in definition:
entry[key] = definition[key]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
if domain:
typer.echo(f"Generated {interval} reports for {domain}")
else:
typer.echo(f"Generated {interval} reports")
def _generate_all_domains(interval: str) -> None:
"""Generate reports for each unique domain."""
for domain in _get_domains():
_generate_interval(interval, domain)
def _generate_root_index() -> None:
"""Render the top-level index listing all intervals and domains."""
_copy_icons()
intervals = sorted(
[name for name in INTERVAL_FORMATS if (OUTPUT_DIR / name).is_dir()]
)
domains_dir = OUTPUT_DIR / "domains"
domains: List[str] = []
if domains_dir.is_dir():
domains = [p.name for p in domains_dir.iterdir() if p.is_dir()]
domains.sort()
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("index.html")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
out_path = OUTPUT_DIR / "index.html"
out_path.write_text(template.render(intervals=intervals, domains=domains))
typer.echo(f"Generated root index at {out_path}")
def _generate_global() -> None:
"""Generate reports that do not depend on an interval."""
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
start_time = time.time()
# Use timezone-aware UTC for generated_at (string remains unchanged format)
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
_copy_icons()
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
out_dir = OUTPUT_DIR / "global"
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
if "{bucket}" in definition["query"] and not definition.get("global"):
continue
name = definition["name"]
query = definition["query"]
# Apply top_n limit for tables (performance-friendly), if configured
top_n = definition.get("top_n")
chart_type = definition.get("chart", "line")
if top_n and chart_type == "table":
try:
n = int(top_n)
if "LIMIT" not in query.upper():
query = f"{query}\nLIMIT {n}"
except Exception:
pass
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
entry = {
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
"html": f"{name}.html",
}
if "icon" in definition:
entry["icon"] = definition["icon"]
if "bucket" in definition:
entry["bucket"] = definition["bucket"]
if "buckets" in definition:
entry["buckets"] = definition["buckets"]
if "bucket_label" in definition:
entry["bucket_label"] = definition["bucket_label"]
if "color" in definition:
entry["color"] = definition["color"]
if "colors" in definition:
entry["colors"] = definition["colors"]
# Optional UX metadata passthrough for frontend-only transforms
for key in (
"windows_supported",
"window_default",
"group_others_threshold",
"exclude_values",
"top_n",
"stacked",
"palette",
):
if key in definition:
entry[key] = definition[key]
_render_snippet(entry, out_dir)
report_list.append(entry)
_save_json(out_dir / "reports.json", report_list)
elapsed = round(time.time() - start_time, 2)
_write_stats(generated_at, elapsed)
typer.echo("Generated global reports")
def _generate_analysis() -> None:
"""Generate analysis JSON files consumed by the Analysis tab."""
try:
# Import lazily to avoid circulars and keep dependencies optional
from scripts import analyze
except Exception as exc: # pragma: no cover - defensive
typer.echo(f"Failed to import analysis module: {exc}")
return
# Ensure output root and icons present for parity
_copy_icons()
# These commands write JSON files under output/analysis/
try:
analyze.check_missing_domains(json_output=True)
except Exception as exc: # pragma: no cover - continue best-effort
typer.echo(f"check_missing_domains failed: {exc}")
try:
analyze.suggest_cache(json_output=True)
except Exception as exc: # pragma: no cover
typer.echo(f"suggest_cache failed: {exc}")
try:
analyze.detect_threats()
except Exception as exc: # pragma: no cover
typer.echo(f"detect_threats failed: {exc}")
typer.echo("Generated analysis JSON files")
@app.command()
def hourly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate hourly reports."""
if all_domains:
_generate_all_domains("hourly")
else:
_generate_interval("hourly", domain)
@app.command()
def daily(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate daily reports."""
if all_domains:
_generate_all_domains("daily")
else:
_generate_interval("daily", domain)
@app.command()
def weekly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate weekly reports."""
if all_domains:
_generate_all_domains("weekly")
else:
_generate_interval("weekly", domain)
@app.command()
def monthly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate monthly reports."""
if all_domains:
_generate_all_domains("monthly")
else:
_generate_interval("monthly", domain)
@app.command("global")
def global_reports() -> None:
"""Generate global reports."""
_generate_global()
@app.command()
def analysis() -> None:
"""Generate analysis JSON files for the Analysis tab."""
_generate_analysis()
@app.command()
def index() -> None:
"""Generate the root index page linking all reports."""
_generate_root_index()
if __name__ == "__main__":
app()