ngxstat/scripts/generate_reports.py

196 lines
5.8 KiB
Python

import json
import sqlite3
from pathlib import Path
from typing import List, Dict, Optional
import yaml
import typer
from jinja2 import Environment, FileSystemLoader
DB_PATH = Path("database/ngxstat.db")
OUTPUT_DIR = Path("output")
TEMPLATE_DIR = Path("templates")
REPORT_CONFIG = Path("reports.yml")
# Mapping of interval names to SQLite strftime formats. These strings are
# substituted into report queries whenever the special ``{bucket}`` token is
# present so that a single report definition can be reused for multiple
# intervals.
INTERVAL_FORMATS = {
"hourly": "%Y-%m-%d %H:00:00",
"daily": "%Y-%m-%d",
"weekly": "%Y-%W",
"monthly": "%Y-%m",
}
app = typer.Typer(help="Generate aggregated log reports")
def _get_domains() -> List[str]:
"""Return a sorted list of unique domains from the logs table."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
domains = [row[0] for row in cur.fetchall()]
conn.close()
return domains
def _load_config() -> List[Dict]:
if not REPORT_CONFIG.exists():
typer.echo(f"Config file not found: {REPORT_CONFIG}")
raise typer.Exit(1)
with REPORT_CONFIG.open("r") as fh:
data = yaml.safe_load(fh) or []
if not isinstance(data, list):
typer.echo("reports.yml must contain a list of report definitions")
raise typer.Exit(1)
return data
def _save_json(path: Path, data: List[Dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
def _render_html(interval: str, reports: List[Dict], out_path: Path) -> None:
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
template = env.get_template("report.html")
out_path.write_text(template.render(interval=interval, reports=reports))
def _bucket_expr(interval: str) -> str:
"""Return the SQLite strftime expression for the given interval."""
fmt = INTERVAL_FORMATS.get(interval)
if not fmt:
typer.echo(f"Unsupported interval: {interval}")
raise typer.Exit(1)
return f"strftime('{fmt}', datetime(time))"
def _generate_interval(interval: str, domain: Optional[str] = None) -> None:
cfg = _load_config()
if not cfg:
typer.echo("No report definitions found")
return
bucket = _bucket_expr(interval)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Create a temporary view so queries can easily be filtered by domain
cur.execute("DROP VIEW IF EXISTS logs_view")
if domain:
# Parameters are not allowed in CREATE VIEW statements, so we must
# safely interpolate the domain value ourselves. Escape any single
# quotes to prevent malformed queries.
safe_domain = domain.replace("'", "''")
cur.execute(
f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'"
)
out_dir = OUTPUT_DIR / domain / interval
else:
cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs")
out_dir = OUTPUT_DIR / interval
out_dir.mkdir(parents=True, exist_ok=True)
report_list = []
for definition in cfg:
name = definition["name"]
query = definition["query"].replace("{bucket}", bucket)
query = query.replace("FROM logs", "FROM logs_view")
cur.execute(query)
rows = cur.fetchall()
headers = [c[0] for c in cur.description]
data = [dict(zip(headers, row)) for row in rows]
json_path = out_dir / f"{name}.json"
_save_json(json_path, data)
report_list.append(
{
"name": name,
"label": definition.get("label", name.title()),
"chart": definition.get("chart", "line"),
"json": f"{name}.json",
}
)
_save_json(out_dir / "reports.json", report_list)
_render_html(interval, report_list, out_dir / "index.html")
typer.echo(f"Generated {interval} reports")
def _generate_all_domains(interval: str) -> None:
"""Generate reports for each unique domain."""
for domain in _get_domains():
_generate_interval(interval, domain)
@app.command()
def hourly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate hourly reports."""
if all_domains:
_generate_all_domains("hourly")
else:
_generate_interval("hourly", domain)
@app.command()
def daily(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate daily reports."""
if all_domains:
_generate_all_domains("daily")
else:
_generate_interval("daily", domain)
@app.command()
def weekly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate weekly reports."""
if all_domains:
_generate_all_domains("weekly")
else:
_generate_interval("weekly", domain)
@app.command()
def monthly(
domain: Optional[str] = typer.Option(
None, help="Generate reports for a specific domain"
),
all_domains: bool = typer.Option(
False, "--all-domains", help="Generate reports for each domain"
),
) -> None:
"""Generate monthly reports."""
if all_domains:
_generate_all_domains("monthly")
else:
_generate_interval("monthly", domain)
if __name__ == "__main__":
app()