ngxstat/scripts/analyze.py

201 lines
5.7 KiB
Python

#!/usr/bin/env python3
"""Utility helpers for ad-hoc log analysis.
This module exposes small helper functions to inspect the ``ngxstat`` SQLite
database. The intent is to allow quick queries from the command line or other
scripts without rewriting SQL each time.
Examples
--------
To list all domains present in the database::
python scripts/analyze.py domains
The CLI is powered by :mod:`typer` and currently only offers a couple of
commands. More analysis routines can be added over time.
"""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional, Set
import json
import typer
from scripts import nginx_config # noqa: F401 # imported for side effects/usage
DB_PATH = Path("database/ngxstat.db")
app = typer.Typer(help="Ad-hoc statistics queries")
def _connect() -> sqlite3.Connection:
"""Return a new SQLite connection to :data:`DB_PATH`."""
return sqlite3.connect(DB_PATH)
def load_domains_from_db() -> List[str]:
"""Return a sorted list of distinct domains from the ``logs`` table."""
conn = _connect()
cur = conn.cursor()
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
domains = [row[0] for row in cur.fetchall()]
conn.close()
return domains
def get_hit_count(domain: Optional[str] = None) -> int:
"""Return total request count.
Parameters
----------
domain:
Optional domain to filter on. If ``None`` the count includes all logs.
"""
conn = _connect()
cur = conn.cursor()
if domain:
cur.execute("SELECT COUNT(*) FROM logs WHERE host = ?", (domain,))
else:
cur.execute("SELECT COUNT(*) FROM logs")
count = cur.fetchone()[0] or 0
conn.close()
return count
def get_cache_ratio(domain: Optional[str] = None) -> float:
"""Return the percentage of requests served from cache."""
conn = _connect()
cur = conn.cursor()
if domain:
cur.execute(
"SELECT SUM(CASE WHEN cache_status = 'HIT' THEN 1 ELSE 0 END) * 1.0 / "
"COUNT(*) FROM logs WHERE host = ?",
(domain,),
)
else:
cur.execute(
"SELECT SUM(CASE WHEN cache_status = 'HIT' THEN 1 ELSE 0 END) * 1.0 / "
"COUNT(*) FROM logs"
)
result = cur.fetchone()[0]
conn.close()
return float(result or 0.0)
@app.command()
def domains() -> None:
"""Print the list of domains discovered in the database."""
for d in load_domains_from_db():
typer.echo(d)
@app.command()
def hits(domain: Optional[str] = typer.Option(None, help="Filter by domain")) -> None:
"""Show request count."""
count = get_hit_count(domain)
if domain:
typer.echo(f"{domain}: {count} hits")
else:
typer.echo(f"Total hits: {count}")
@app.command("cache-ratio")
def cache_ratio_cmd(domain: Optional[str] = typer.Option(None, help="Filter by domain")) -> None:
"""Display cache hit ratio as a percentage."""
ratio = get_cache_ratio(domain) * 100
if domain:
typer.echo(f"{domain}: {ratio:.2f}% cached")
else:
typer.echo(f"Cache hit ratio: {ratio:.2f}%")
@app.command("check-missing-domains")
def check_missing_domains(json_output: bool = typer.Option(False, "--json", help="Output missing domains as JSON")) -> None:
"""Show domains present in the database but absent from Nginx config."""
try:
from scripts.generate_reports import _get_domains as _db_domains
except Exception: # pragma: no cover - fallback if import fails
_db_domains = load_domains_from_db
if not isinstance(json_output, bool):
json_output = False
db_domains = set(_db_domains())
paths = nginx_config.discover_configs()
servers = nginx_config.parse_servers(paths)
config_domains: Set[str] = set()
for server in servers:
names = server.get("server_name", "")
for name in names.split():
if name:
config_domains.add(name)
missing = sorted(db_domains - config_domains)
if json_output:
typer.echo(json.dumps(missing))
else:
for d in missing:
typer.echo(d)
@app.command("suggest-cache")
def suggest_cache(
threshold: int = typer.Option(
10, help="Minimum number of MISS entries to report"
),
json_output: bool = typer.Option(False, "--json", help="Output results as JSON"),
) -> None:
"""Suggest domain/path pairs that could benefit from caching.
Paths with at least ``threshold`` ``MISS`` entries are shown for domains
whose server blocks lack a ``proxy_cache`` directive.
"""
# Discover domains without explicit proxy_cache
paths = nginx_config.discover_configs()
servers = nginx_config.parse_servers(paths)
no_cache: Set[str] = set()
for server in servers:
if "proxy_cache" in server:
continue
for name in server.get("server_name", "").split():
if name:
no_cache.add(name)
conn = _connect()
cur = conn.cursor()
cur.execute(
"""
SELECT host,
substr(request, instr(request, ' ')+1,
instr(request, ' HTTP') - instr(request, ' ') - 1) AS path,
COUNT(*) AS miss_count
FROM logs
WHERE cache_status = 'MISS'
GROUP BY host, path
HAVING miss_count >= ?
ORDER BY miss_count DESC
""",
(threshold,),
)
rows = [r for r in cur.fetchall() if r[0] in no_cache]
conn.close()
if json_output:
result = [
{"host": host, "path": path, "misses": count} for host, path, count in rows
]
typer.echo(json.dumps(result))
else:
for host, path, count in rows:
typer.echo(f"{host} {path} {count}")
if __name__ == "__main__":
app()