import json import sys import sqlite3 from pathlib import Path import shutil from typing import List, Dict, Optional from datetime import datetime, timezone import time import yaml import typer from jinja2 import Environment, FileSystemLoader # Ensure project root is importable when running as a script (python scripts/generate_reports.py) PROJECT_ROOT = Path(__file__).resolve().parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) DB_PATH = Path("database/ngxstat.db") OUTPUT_DIR = Path("output") TEMPLATE_DIR = Path("templates") REPORT_CONFIG = Path("reports.yml") GENERATED_MARKER = OUTPUT_DIR / "generated.txt" # Mapping of interval names to SQLite strftime formats. These strings are # substituted into report queries whenever the special ``{bucket}`` token is # present so that a single report definition can be reused for multiple # intervals. INTERVAL_FORMATS = { "hourly": "%Y-%m-%d %H:00:00", "daily": "%Y-%m-%d", "weekly": "%Y-%W", "monthly": "%Y-%m", } app = typer.Typer(help="Generate aggregated log reports") @app.callback() def _cli_callback(ctx: typer.Context) -> None: """Register post-command hook to note generation time.""" def _write_marker() -> None: OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # Use timezone-aware UTC to avoid deprecation warnings and ambiguity timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") GENERATED_MARKER.write_text(f"{timestamp}\n") ctx.call_on_close(_write_marker) def _get_domains() -> List[str]: """Return a sorted list of unique domains from the logs table.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("SELECT DISTINCT host FROM logs ORDER BY host") domains = [row[0] for row in cur.fetchall()] conn.close() return domains def _load_config() -> List[Dict]: if not REPORT_CONFIG.exists(): typer.echo(f"Config file not found: {REPORT_CONFIG}") raise typer.Exit(1) with REPORT_CONFIG.open("r") as fh: data = yaml.safe_load(fh) or [] if not isinstance(data, list): typer.echo("reports.yml must contain a list of report definitions") raise typer.Exit(1) return data def _save_json(path: Path, data: List[Dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2)) def _copy_icons() -> None: """Copy vendored icons and scripts to the output directory.""" src_dir = Path("static/icons") dst_dir = OUTPUT_DIR / "icons" if src_dir.is_dir(): dst_dir.mkdir(parents=True, exist_ok=True) for icon in src_dir.glob("*.svg"): shutil.copy(icon, dst_dir / icon.name) js_src = Path("static/chartManager.js") if js_src.is_file(): shutil.copy(js_src, OUTPUT_DIR / js_src.name) def _render_snippet(report: Dict, out_dir: Path) -> None: """Render a single report snippet to ``.html`` inside ``out_dir``.""" env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) template = env.get_template("report_snippet.html") snippet_path = out_dir / f"{report['name']}.html" snippet_path.write_text(template.render(report=report)) def _write_stats( generated_at: Optional[str] = None, generation_seconds: Optional[float] = None ) -> None: """Query basic dataset stats and write them to ``output/global/stats.json``.""" conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM logs") total_logs = cur.fetchone()[0] or 0 cur.execute("SELECT MIN(time), MAX(time) FROM logs") row = cur.fetchone() or (None, None) start_date = row[0] or "" end_date = row[1] or "" cur.execute("SELECT COUNT(DISTINCT host) FROM logs") unique_domains = cur.fetchone()[0] or 0 conn.close() stats = { "total_logs": total_logs, "start_date": start_date, "end_date": end_date, "unique_domains": unique_domains, } if generated_at: stats["generated_at"] = generated_at if generation_seconds is not None: stats["generation_seconds"] = generation_seconds out_path = OUTPUT_DIR / "global" / "stats.json" _save_json(out_path, stats) def _bucket_expr(interval: str) -> str: """Return the SQLite strftime expression for the given interval.""" fmt = INTERVAL_FORMATS.get(interval) if not fmt: typer.echo(f"Unsupported interval: {interval}") raise typer.Exit(1) return f"strftime('{fmt}', datetime(time))" def _generate_interval(interval: str, domain: Optional[str] = None) -> None: cfg = _load_config() if not cfg: typer.echo("No report definitions found") return _copy_icons() bucket = _bucket_expr(interval) conn = sqlite3.connect(DB_PATH) cur = conn.cursor() # Create a temporary view so queries can easily be filtered by domain cur.execute("DROP VIEW IF EXISTS logs_view") if domain: # Parameters are not allowed in CREATE VIEW statements, so we must # safely interpolate the domain value ourselves. Escape any single # quotes to prevent malformed queries. safe_domain = domain.replace("'", "''") cur.execute( f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'" ) out_dir = OUTPUT_DIR / "domains" / domain / interval else: cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs") out_dir = OUTPUT_DIR / interval out_dir.mkdir(parents=True, exist_ok=True) report_list = [] for definition in cfg: if "{bucket}" not in definition["query"] or definition.get("global"): # Global reports are generated separately continue if domain and not definition.get("per_domain", True): # Skip reports marked as not applicable to per-domain runs continue name = definition["name"] query = definition["query"].replace("{bucket}", bucket) query = query.replace("FROM logs", "FROM logs_view") # Apply top_n limit for tables (performance-friendly), if configured top_n = definition.get("top_n") chart_type = definition.get("chart", "line") if top_n and chart_type == "table": try: n = int(top_n) if "LIMIT" not in query.upper(): query = f"{query}\nLIMIT {n}" except Exception: pass cur.execute(query) rows = cur.fetchall() headers = [c[0] for c in cur.description] data = [dict(zip(headers, row)) for row in rows] json_path = out_dir / f"{name}.json" _save_json(json_path, data) entry = { "name": name, "label": definition.get("label", name.title()), "chart": definition.get("chart", "line"), "json": f"{name}.json", "html": f"{name}.html", } if "icon" in definition: entry["icon"] = definition["icon"] if "bucket" in definition: entry["bucket"] = definition["bucket"] if "buckets" in definition: entry["buckets"] = definition["buckets"] if "bucket_label" in definition: entry["bucket_label"] = definition["bucket_label"] if "color" in definition: entry["color"] = definition["color"] if "colors" in definition: entry["colors"] = definition["colors"] # Optional UX metadata passthrough for frontend-only transforms for key in ( "windows_supported", "window_default", "group_others_threshold", "exclude_values", "top_n", "stacked", "palette", ): if key in definition: entry[key] = definition[key] _render_snippet(entry, out_dir) report_list.append(entry) _save_json(out_dir / "reports.json", report_list) if domain: typer.echo(f"Generated {interval} reports for {domain}") else: typer.echo(f"Generated {interval} reports") def _generate_all_domains(interval: str) -> None: """Generate reports for each unique domain.""" for domain in _get_domains(): _generate_interval(interval, domain) def _generate_root_index() -> None: """Render the top-level index listing all intervals and domains.""" _copy_icons() intervals = sorted( [name for name in INTERVAL_FORMATS if (OUTPUT_DIR / name).is_dir()] ) domains_dir = OUTPUT_DIR / "domains" domains: List[str] = [] if domains_dir.is_dir(): domains = [p.name for p in domains_dir.iterdir() if p.is_dir()] domains.sort() env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) template = env.get_template("index.html") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) out_path = OUTPUT_DIR / "index.html" out_path.write_text(template.render(intervals=intervals, domains=domains)) typer.echo(f"Generated root index at {out_path}") def _generate_global() -> None: """Generate reports that do not depend on an interval.""" cfg = _load_config() if not cfg: typer.echo("No report definitions found") return start_time = time.time() # Use timezone-aware UTC for generated_at (string remains unchanged format) generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") _copy_icons() conn = sqlite3.connect(DB_PATH) cur = conn.cursor() out_dir = OUTPUT_DIR / "global" out_dir.mkdir(parents=True, exist_ok=True) report_list = [] for definition in cfg: if "{bucket}" in definition["query"] and not definition.get("global"): continue name = definition["name"] query = definition["query"] # Apply top_n limit for tables (performance-friendly), if configured top_n = definition.get("top_n") chart_type = definition.get("chart", "line") if top_n and chart_type == "table": try: n = int(top_n) if "LIMIT" not in query.upper(): query = f"{query}\nLIMIT {n}" except Exception: pass cur.execute(query) rows = cur.fetchall() headers = [c[0] for c in cur.description] data = [dict(zip(headers, row)) for row in rows] json_path = out_dir / f"{name}.json" _save_json(json_path, data) entry = { "name": name, "label": definition.get("label", name.title()), "chart": definition.get("chart", "line"), "json": f"{name}.json", "html": f"{name}.html", } if "icon" in definition: entry["icon"] = definition["icon"] if "bucket" in definition: entry["bucket"] = definition["bucket"] if "buckets" in definition: entry["buckets"] = definition["buckets"] if "bucket_label" in definition: entry["bucket_label"] = definition["bucket_label"] if "color" in definition: entry["color"] = definition["color"] if "colors" in definition: entry["colors"] = definition["colors"] # Optional UX metadata passthrough for frontend-only transforms for key in ( "windows_supported", "window_default", "group_others_threshold", "exclude_values", "top_n", "stacked", "palette", ): if key in definition: entry[key] = definition[key] _render_snippet(entry, out_dir) report_list.append(entry) _save_json(out_dir / "reports.json", report_list) elapsed = round(time.time() - start_time, 2) _write_stats(generated_at, elapsed) typer.echo("Generated global reports") def _generate_analysis() -> None: """Generate analysis JSON files consumed by the Analysis tab.""" try: # Import lazily to avoid circulars and keep dependencies optional from scripts import analyze except Exception as exc: # pragma: no cover - defensive typer.echo(f"Failed to import analysis module: {exc}") return # Ensure output root and icons present for parity _copy_icons() # These commands write JSON files under output/analysis/ try: analyze.check_missing_domains(json_output=True) except Exception as exc: # pragma: no cover - continue best-effort typer.echo(f"check_missing_domains failed: {exc}") try: analyze.suggest_cache(json_output=True) except Exception as exc: # pragma: no cover typer.echo(f"suggest_cache failed: {exc}") try: analyze.detect_threats() except Exception as exc: # pragma: no cover typer.echo(f"detect_threats failed: {exc}") typer.echo("Generated analysis JSON files") @app.command() def hourly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate hourly reports.""" if all_domains: _generate_all_domains("hourly") else: _generate_interval("hourly", domain) @app.command() def daily( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate daily reports.""" if all_domains: _generate_all_domains("daily") else: _generate_interval("daily", domain) @app.command() def weekly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate weekly reports.""" if all_domains: _generate_all_domains("weekly") else: _generate_interval("weekly", domain) @app.command() def monthly( domain: Optional[str] = typer.Option( None, help="Generate reports for a specific domain" ), all_domains: bool = typer.Option( False, "--all-domains", help="Generate reports for each domain" ), ) -> None: """Generate monthly reports.""" if all_domains: _generate_all_domains("monthly") else: _generate_interval("monthly", domain) @app.command("global") def global_reports() -> None: """Generate global reports.""" _generate_global() @app.command() def analysis() -> None: """Generate analysis JSON files for the Analysis tab.""" _generate_analysis() @app.command() def index() -> None: """Generate the root index page linking all reports.""" _generate_root_index() if __name__ == "__main__": app()