import json import sqlite3 from pathlib import Path import shutil from typing import List, Dict, Optional from datetime import datetime import time import yaml import typer from jinja2 import Environment, FileSystemLoader DB_PATH = Path("database/ngxstat.db") OUTPUT_DIR = Path("output") TEMPLATE_DIR = Path("templates") REPORT_CONFIG = Path("reports.yml") # Mapping of interval names to SQLite strftime formats. These strings are # substituted into report queries whenever the special ``{bucket}`` token is # present so that a single report definition can be reused for multiple # intervals. INTERVAL_FORMATS = { "hourly": "%Y-%m-%d %H:00:00", "daily": "%Y-%m-%d", "weekly": "%Y-%W", "monthly": "%Y-%m", } app = typer.Typer(help="Generate aggregated log reports") def _get_domains() -> List[str]: """Return a sorted list of unique domains from the logs table.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("SELECT DISTINCT host FROM logs ORDER BY host") domains = [row[0] for row in cur.fetchall()] conn.close() return domains def _load_config() -> List[Dict]: if not REPORT_CONFIG.exists(): typer.echo(f"Config file not found: {REPORT_CONFIG}") raise typer.Exit(1) with REPORT_CONFIG.open("r") as fh: data = yaml.safe_load(fh) or [] if not isinstance(data, list): typer.echo("reports.yml must contain a list of report definitions") raise typer.Exit(1) return data def _save_json(path: Path, data: List[Dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2)) def _copy_icons() -> None: """Copy vendored icons and scripts to the output directory.""" src_dir = Path("static/icons") dst_dir = OUTPUT_DIR / "icons" if src_dir.is_dir(): dst_dir.mkdir(parents=True, exist_ok=True) for icon in src_dir.glob("*.svg"): shutil.copy(icon, dst_dir / icon.name) js_src = Path("static/chartManager.js") if js_src.is_file(): shutil.copy(js_src, OUTPUT_DIR / js_src.name) def _render_snippet(report: Dict, out_dir: Path) -> None: """Render a single report snippet to ``.html`` inside ``out_dir``.""" env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) template = env.get_template("report_snippet.html") snippet_path = out_dir / f"{report['name']}.html" snippet_path.write_text(template.render(report=report)) def _write_stats( generated_at: Optional[str] = None, generation_seconds: Optional[float] = None ) -> None: """Query basic dataset stats and write them to ``output/global/stats.json``.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM logs") total_logs = cur.fetchone()[0] or 0 cur.execute("SELECT MIN(time), MAX(time) FROM logs") row = cur.fetchone() or (None, None) start_date = row[0] or "" end_date = row[1] or "" cur.execute("SELECT COUNT(DISTINCT host) FROM logs") unique_domains = cur.fetchone()[0] or 0 conn.close() stats = { "total_logs": total_logs, "start_date": start_date, "end_date": end_date, "unique_domains": unique_domains, } if generated_at: stats["generated_at"] = generated_at if generation_seconds is not None: stats["generation_seconds"] = generation_seconds out_path = OUTPUT_DIR / "global" / "stats.json" _save_json(out_path, stats) def _bucket_expr(interval: str) -> str: """Return the SQLite strftime expression for the given interval.""" fmt = INTERVAL_FORMATS.get(interval) if not fmt: typer.echo(f"Unsupported interval: {interval}") raise typer.Exit(1) return f"strftime('{fmt}', datetime(time))" def _generate_interval(interval: str, domain: Optional[str] = None) -> None: cfg = _load_config() if not cfg: typer.echo("No report definitions found") return _copy_icons() bucket = _bucket_expr(interval) conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Create a temporary view so queries can easily be filtered by domain cur.execute("DROP VIEW IF EXISTS logs_view") if domain: # Parameters are not allowed in CREATE VIEW statements, so we must # safely interpolate the domain value ourselves. Escape any single # quotes to prevent malformed queries. safe_domain = domain.replace("'", "''") cur.execute( f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'" ) out_dir = OUTPUT_DIR / "domains" / domain / interval else: cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs") out_dir = OUTPUT_DIR / interval out_dir.mkdir(parents=True, exist_ok=True) report_list = [] for definition in cfg: if "{bucket}" not in definition["query"] or definition.get("global"): # Global reports are generated separately continue if domain and not definition.get("per_domain", True): # Skip reports marked as not applicable to per-domain runs continue name = definition["name"] query = definition["query"].replace("{bucket}", bucket) query = query.replace("FROM logs", "FROM logs_view") cur.execute(query) rows = cur.fetchall() headers = [c[0] for c in cur.description] data = [dict(zip(headers, row)) for row in rows] json_path = out_dir / f"{name}.json" _save_json(json_path, data) entry = { "name": name, "label": definition.get("label", name.title()), "chart": definition.get("chart", "line"), "json": f"{name}.json", "html": f"{name}.html", } if "icon" in definition: entry["icon"] = definition["icon"] if "bucket" in definition: entry["bucket"] = definition["bucket"] if "buckets" in definition: entry["buckets"] = definition["buckets"] if "bucket_label" in definition: entry["bucket_label"] = definition["bucket_label"] if "color" in definition: entry["color"] = definition["color"] if "colors" in definition: entry["colors"] = definition["colors"] _render_snippet(entry, out_dir) report_list.append(entry) _save_json(out_dir / "reports.json", report_list) if domain: typer.echo(f"Generated {interval} reports for {domain}") else: typer.echo(f"Generated {interval} reports") def _generate_all_domains(interval: str) -> None: """Generate reports for each unique domain.""" for domain in _get_domains(): _generate_interval(interval, domain) def _generate_root_index() -> None: """Render the top-level index listing all intervals and domains.""" _copy_icons() intervals = sorted( [name for name in INTERVAL_FORMATS if (OUTPUT_DIR / name).is_dir()] ) domains_dir = OUTPUT_DIR / "domains" domains: List[str] = [] if domains_dir.is_dir(): domains = [p.name for p in domains_dir.iterdir() if p.is_dir()] domains.sort() env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) template = env.get_template("index.html") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) out_path = OUTPUT_DIR / "index.html" out_path.write_text(template.render(intervals=intervals, domains=domains)) typer.echo(f"Generated root index at {out_path}") def _generate_global() -> None: """Generate reports that do not depend on an interval.""" cfg = _load_config() if not cfg: typer.echo("No report definitions found") return start_time = time.time() generated_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") _copy_icons() conn = sqlite3.connect(DB_PATH) cur = conn.cursor() out_dir = OUTPUT_DIR / "global" out_dir.mkdir(parents=True, exist_ok=True) report_list = [] for definition in cfg: if "{bucket}" in definition["query"] and not definition.get("global"): continue name = definition["name"] query = definition["query"] cur.execute(query) rows = cur.fetchall() headers = [c[0] for c in cur.description] data = [dict(zip(headers, row)) for row in rows] json_path = out_dir / f"{name}.json" _save_json(json_path, data) entry = { "name": name, "label": definition.get("label", name.title()), "chart": definition.get("chart", "line"), "json": f"{name}.json", "html": f"{name}.html", } if "icon" in definition: entry["icon"] = definition["icon"] if "bucket" in definition: entry["bucket"] = definition["bucket"] if "buckets" in definition: entry["buckets"] = definition["buckets"] if "bucket_label" in definition: entry["bucket_label"] = definition["bucket_label"] if "color" in definition: entry["color"] = definition["color"] if "colors" in definition: entry["colors"] = definition["colors"] _render_snippet(entry, out_dir) report_list.append(entry) _save_json(out_dir / "reports.json", report_list) elapsed = round(time.time() - start_time, 2) _write_stats(generated_at, elapsed) typer.echo("Generated global reports") @app.command() def hourly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate hourly reports.""" if all_domains: _generate_all_domains("hourly") else: _generate_interval("hourly", domain) @app.command() def daily( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate daily reports.""" if all_domains: _generate_all_domains("daily") else: _generate_interval("daily", domain) @app.command() def weekly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate weekly reports.""" if all_domains: _generate_all_domains("weekly") else: _generate_interval("weekly", domain) @app.command() def monthly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate monthly reports.""" if all_domains: _generate_all_domains("monthly") else: _generate_interval("monthly", domain) @app.command("global") def global_reports() -> None: """Generate global reports.""" _generate_global() @app.command() def index() -> None: """Generate the root index page linking all reports.""" _generate_root_index() if __name__ == "__main__": app()