#!/usr/bin/env python3 """Utility helpers for ad-hoc log analysis. This module exposes small helper functions to inspect the ``ngxstat`` SQLite database. The intent is to allow quick queries from the command line or other scripts without rewriting SQL each time. Examples -------- To list all domains present in the database:: python scripts/analyze.py domains The CLI is powered by :mod:`typer` and currently only offers a couple of commands. More analysis routines can be added over time. """ from __future__ import annotations import sqlite3 from pathlib import Path from typing import Dict, List, Optional, Set import json import typer from scripts import nginx_config # noqa: F401 # imported for side effects/usage DB_PATH = Path("database/ngxstat.db") app = typer.Typer(help="Ad-hoc statistics queries") def _connect() -> sqlite3.Connection: """Return a new SQLite connection to :data:`DB_PATH`.""" return sqlite3.connect(DB_PATH) def load_domains_from_db() -> List[str]: """Return a sorted list of distinct domains from the ``logs`` table.""" conn = _connect() cur = conn.cursor() cur.execute("SELECT DISTINCT host FROM logs ORDER BY host") domains = [row[0] for row in cur.fetchall()] conn.close() return domains def get_hit_count(domain: Optional[str] = None) -> int: """Return total request count. Parameters ---------- domain: Optional domain to filter on. If ``None`` the count includes all logs. """ conn = _connect() cur = conn.cursor() if domain: cur.execute("SELECT COUNT(*) FROM logs WHERE host = ?", (domain,)) else: cur.execute("SELECT COUNT(*) FROM logs") count = cur.fetchone()[0] or 0 conn.close() return count def get_cache_ratio(domain: Optional[str] = None) -> float: """Return the percentage of requests served from cache.""" conn = _connect() cur = conn.cursor() if domain: cur.execute( "SELECT SUM(CASE WHEN cache_status = 'HIT' THEN 1 ELSE 0 END) * 1.0 / " "COUNT(*) FROM logs WHERE host = ?", (domain,), ) else: cur.execute( "SELECT SUM(CASE WHEN cache_status = 'HIT' THEN 1 ELSE 0 END) * 1.0 / " "COUNT(*) FROM logs" ) result = cur.fetchone()[0] conn.close() return float(result or 0.0) @app.command() def domains() -> None: """Print the list of domains discovered in the database.""" for d in load_domains_from_db(): typer.echo(d) @app.command() def hits(domain: Optional[str] = typer.Option(None, help="Filter by domain")) -> None: """Show request count.""" count = get_hit_count(domain) if domain: typer.echo(f"{domain}: {count} hits") else: typer.echo(f"Total hits: {count}") @app.command("cache-ratio") def cache_ratio_cmd(domain: Optional[str] = typer.Option(None, help="Filter by domain")) -> None: """Display cache hit ratio as a percentage.""" ratio = get_cache_ratio(domain) * 100 if domain: typer.echo(f"{domain}: {ratio:.2f}% cached") else: typer.echo(f"Cache hit ratio: {ratio:.2f}%") @app.command("check-missing-domains") def check_missing_domains(json_output: bool = typer.Option(False, "--json", help="Output missing domains as JSON")) -> None: """Show domains present in the database but absent from Nginx config.""" try: from scripts.generate_reports import _get_domains as _db_domains except Exception: # pragma: no cover - fallback if import fails _db_domains = load_domains_from_db if not isinstance(json_output, bool): json_output = False db_domains = set(_db_domains()) paths = nginx_config.discover_configs() servers = nginx_config.parse_servers(paths) config_domains: Set[str] = set() for server in servers: names = server.get("server_name", "") for name in names.split(): if name: config_domains.add(name) missing = sorted(db_domains - config_domains) if json_output: typer.echo(json.dumps(missing)) else: for d in missing: typer.echo(d) @app.command("suggest-cache") def suggest_cache( threshold: int = typer.Option( 10, help="Minimum number of MISS entries to report" ), json_output: bool = typer.Option(False, "--json", help="Output results as JSON"), ) -> None: """Suggest domain/path pairs that could benefit from caching. Paths with at least ``threshold`` ``MISS`` entries are shown for domains whose server blocks lack a ``proxy_cache`` directive. """ # Discover domains without explicit proxy_cache paths = nginx_config.discover_configs() servers = nginx_config.parse_servers(paths) no_cache: Set[str] = set() for server in servers: if "proxy_cache" in server: continue for name in server.get("server_name", "").split(): if name: no_cache.add(name) conn = _connect() cur = conn.cursor() cur.execute( """ SELECT host, substr(request, instr(request, ' ')+1, instr(request, ' HTTP') - instr(request, ' ') - 1) AS path, COUNT(*) AS miss_count FROM logs WHERE cache_status = 'MISS' GROUP BY host, path HAVING miss_count >= ? ORDER BY miss_count DESC """, (threshold,), ) rows = [r for r in cur.fetchall() if r[0] in no_cache] conn.close() if json_output: result = [ {"host": host, "path": path, "misses": count} for host, path, count in rows ] typer.echo(json.dumps(result)) else: for host, path, count in rows: typer.echo(f"{host} {path} {count}") if __name__ == "__main__": app()