import json import sqlite3 from pathlib import Path from typing import List, Dict, Optional import yaml import typer from jinja2 import Environment, FileSystemLoader DB_PATH = Path("database/ngxstat.db") OUTPUT_DIR = Path("output") TEMPLATE_DIR = Path("templates") REPORT_CONFIG = Path("reports.yml") # Mapping of interval names to SQLite strftime formats. These strings are # substituted into report queries whenever the special ``{bucket}`` token is # present so that a single report definition can be reused for multiple # intervals. INTERVAL_FORMATS = { "hourly": "%Y-%m-%d %H:00:00", "daily": "%Y-%m-%d", "weekly": "%Y-%W", "monthly": "%Y-%m", } app = typer.Typer(help="Generate aggregated log reports") def _get_domains() -> List[str]: """Return a sorted list of unique domains from the logs table.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("SELECT DISTINCT host FROM logs ORDER BY host") domains = [row[0] for row in cur.fetchall()] conn.close() return domains def _load_config() -> List[Dict]: if not REPORT_CONFIG.exists(): typer.echo(f"Config file not found: {REPORT_CONFIG}") raise typer.Exit(1) with REPORT_CONFIG.open("r") as fh: data = yaml.safe_load(fh) or [] if not isinstance(data, list): typer.echo("reports.yml must contain a list of report definitions") raise typer.Exit(1) return data def _save_json(path: Path, data: List[Dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2)) def _render_snippet(report: Dict, out_dir: Path) -> None: """Render a single report snippet to ``.html`` inside ``out_dir``.""" env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) template = env.get_template("report_snippet.html") snippet_path = out_dir / f"{report['name']}.html" snippet_path.write_text(template.render(report=report)) def _bucket_expr(interval: str) -> str: """Return the SQLite strftime expression for the given interval.""" fmt = INTERVAL_FORMATS.get(interval) if not fmt: typer.echo(f"Unsupported interval: {interval}") raise typer.Exit(1) return f"strftime('{fmt}', datetime(time))" def _generate_interval(interval: str, domain: Optional[str] = None) -> None: cfg = _load_config() if not cfg: typer.echo("No report definitions found") return bucket = _bucket_expr(interval) conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Create a temporary view so queries can easily be filtered by domain cur.execute("DROP VIEW IF EXISTS logs_view") if domain: # Parameters are not allowed in CREATE VIEW statements, so we must # safely interpolate the domain value ourselves. Escape any single # quotes to prevent malformed queries. safe_domain = domain.replace("'", "''") cur.execute( f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'" ) out_dir = OUTPUT_DIR / "domains" / domain / interval else: cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs") out_dir = OUTPUT_DIR / interval out_dir.mkdir(parents=True, exist_ok=True) report_list = [] for definition in cfg: name = definition["name"] query = definition["query"].replace("{bucket}", bucket) query = query.replace("FROM logs", "FROM logs_view") cur.execute(query) rows = cur.fetchall() headers = [c[0] for c in cur.description] data = [dict(zip(headers, row)) for row in rows] json_path = out_dir / f"{name}.json" _save_json(json_path, data) entry = { "name": name, "label": definition.get("label", name.title()), "chart": definition.get("chart", "line"), "json": f"{name}.json", "html": f"{name}.html", } if "color" in definition: entry["color"] = definition["color"] if "colors" in definition: entry["colors"] = definition["colors"] _render_snippet(entry, out_dir) report_list.append(entry) _save_json(out_dir / "reports.json", report_list) typer.echo(f"Generated {interval} reports") def _generate_all_domains(interval: str) -> None: """Generate reports for each unique domain.""" for domain in _get_domains(): _generate_interval(interval, domain) def _generate_root_index() -> None: """Render the top-level index listing all intervals and domains.""" intervals = [p.name for p in OUTPUT_DIR.iterdir() if p.is_dir() and p.name != "domains"] intervals.sort() domains_dir = OUTPUT_DIR / "domains" domains: List[str] = [] if domains_dir.is_dir(): domains = [p.name for p in domains_dir.iterdir() if p.is_dir()] domains.sort() env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) template = env.get_template("index.html") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) out_path = OUTPUT_DIR / "index.html" out_path.write_text(template.render(intervals=intervals, domains=domains)) typer.echo(f"Generated root index at {out_path}") @app.command() def hourly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate hourly reports.""" if all_domains: _generate_all_domains("hourly") else: _generate_interval("hourly", domain) @app.command() def daily( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate daily reports.""" if all_domains: _generate_all_domains("daily") else: _generate_interval("daily", domain) @app.command() def weekly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate weekly reports.""" if all_domains: _generate_all_domains("weekly") else: _generate_interval("weekly", domain) @app.command() def monthly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate monthly reports.""" if all_domains: _generate_all_domains("monthly") else: _generate_interval("monthly", domain) @app.command() def index() -> None: """Generate the root index page linking all reports.""" _generate_root_index() if __name__ == "__main__": app()