458 lines
14 KiB
Python
458 lines
14 KiB
Python
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
import shutil
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
import time
|
|
|
|
import yaml
|
|
|
|
import typer
|
|
from jinja2 import Environment, FileSystemLoader
|
|
|
|
DB_PATH = Path("database/ngxstat.db")
|
|
OUTPUT_DIR = Path("output")
|
|
TEMPLATE_DIR = Path("templates")
|
|
REPORT_CONFIG = Path("reports.yml")
|
|
GENERATED_MARKER = OUTPUT_DIR / "generated.txt"
|
|
|
|
# Mapping of interval names to SQLite strftime formats. These strings are
|
|
# substituted into report queries whenever the special ``{bucket}`` token is
|
|
# present so that a single report definition can be reused for multiple
|
|
# intervals.
|
|
INTERVAL_FORMATS = {
|
|
"hourly": "%Y-%m-%d %H:00:00",
|
|
"daily": "%Y-%m-%d",
|
|
"weekly": "%Y-%W",
|
|
"monthly": "%Y-%m",
|
|
}
|
|
|
|
app = typer.Typer(help="Generate aggregated log reports")
|
|
|
|
|
|
@app.callback()
|
|
def _cli_callback(ctx: typer.Context) -> None:
|
|
"""Register post-command hook to note generation time."""
|
|
|
|
def _write_marker() -> None:
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
|
GENERATED_MARKER.write_text(f"{timestamp}\n")
|
|
|
|
ctx.call_on_close(_write_marker)
|
|
|
|
|
|
def _get_domains() -> List[str]:
|
|
"""Return a sorted list of unique domains from the logs table."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
|
|
domains = [row[0] for row in cur.fetchall()]
|
|
conn.close()
|
|
return domains
|
|
|
|
|
|
def _load_config() -> List[Dict]:
|
|
if not REPORT_CONFIG.exists():
|
|
typer.echo(f"Config file not found: {REPORT_CONFIG}")
|
|
raise typer.Exit(1)
|
|
with REPORT_CONFIG.open("r") as fh:
|
|
data = yaml.safe_load(fh) or []
|
|
if not isinstance(data, list):
|
|
typer.echo("reports.yml must contain a list of report definitions")
|
|
raise typer.Exit(1)
|
|
return data
|
|
|
|
|
|
def _save_json(path: Path, data: List[Dict]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
def _copy_icons() -> None:
|
|
"""Copy vendored icons and scripts to the output directory."""
|
|
src_dir = Path("static/icons")
|
|
dst_dir = OUTPUT_DIR / "icons"
|
|
if src_dir.is_dir():
|
|
dst_dir.mkdir(parents=True, exist_ok=True)
|
|
for icon in src_dir.glob("*.svg"):
|
|
shutil.copy(icon, dst_dir / icon.name)
|
|
|
|
js_src = Path("static/chartManager.js")
|
|
if js_src.is_file():
|
|
shutil.copy(js_src, OUTPUT_DIR / js_src.name)
|
|
|
|
|
|
def _render_snippet(report: Dict, out_dir: Path) -> None:
|
|
"""Render a single report snippet to ``<name>.html`` inside ``out_dir``."""
|
|
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
|
|
template = env.get_template("report_snippet.html")
|
|
snippet_path = out_dir / f"{report['name']}.html"
|
|
snippet_path.write_text(template.render(report=report))
|
|
|
|
|
|
def _write_stats(
|
|
generated_at: Optional[str] = None, generation_seconds: Optional[float] = None
|
|
) -> None:
|
|
"""Query basic dataset stats and write them to ``output/global/stats.json``."""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT COUNT(*) FROM logs")
|
|
total_logs = cur.fetchone()[0] or 0
|
|
|
|
cur.execute("SELECT MIN(time), MAX(time) FROM logs")
|
|
row = cur.fetchone() or (None, None)
|
|
start_date = row[0] or ""
|
|
end_date = row[1] or ""
|
|
|
|
cur.execute("SELECT COUNT(DISTINCT host) FROM logs")
|
|
unique_domains = cur.fetchone()[0] or 0
|
|
|
|
conn.close()
|
|
|
|
stats = {
|
|
"total_logs": total_logs,
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
"unique_domains": unique_domains,
|
|
}
|
|
if generated_at:
|
|
stats["generated_at"] = generated_at
|
|
if generation_seconds is not None:
|
|
stats["generation_seconds"] = generation_seconds
|
|
|
|
out_path = OUTPUT_DIR / "global" / "stats.json"
|
|
_save_json(out_path, stats)
|
|
|
|
|
|
def _bucket_expr(interval: str) -> str:
|
|
"""Return the SQLite strftime expression for the given interval."""
|
|
fmt = INTERVAL_FORMATS.get(interval)
|
|
if not fmt:
|
|
typer.echo(f"Unsupported interval: {interval}")
|
|
raise typer.Exit(1)
|
|
return f"strftime('{fmt}', datetime(time))"
|
|
|
|
|
|
def _generate_interval(interval: str, domain: Optional[str] = None) -> None:
|
|
cfg = _load_config()
|
|
if not cfg:
|
|
typer.echo("No report definitions found")
|
|
return
|
|
|
|
_copy_icons()
|
|
|
|
bucket = _bucket_expr(interval)
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
# Create a temporary view so queries can easily be filtered by domain
|
|
cur.execute("DROP VIEW IF EXISTS logs_view")
|
|
if domain:
|
|
# Parameters are not allowed in CREATE VIEW statements, so we must
|
|
# safely interpolate the domain value ourselves. Escape any single
|
|
# quotes to prevent malformed queries.
|
|
safe_domain = domain.replace("'", "''")
|
|
cur.execute(
|
|
f"CREATE TEMP VIEW logs_view AS SELECT * FROM logs WHERE host = '{safe_domain}'"
|
|
)
|
|
out_dir = OUTPUT_DIR / "domains" / domain / interval
|
|
else:
|
|
cur.execute("CREATE TEMP VIEW logs_view AS SELECT * FROM logs")
|
|
out_dir = OUTPUT_DIR / interval
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
report_list = []
|
|
for definition in cfg:
|
|
if "{bucket}" not in definition["query"] or definition.get("global"):
|
|
# Global reports are generated separately
|
|
continue
|
|
if domain and not definition.get("per_domain", True):
|
|
# Skip reports marked as not applicable to per-domain runs
|
|
continue
|
|
|
|
name = definition["name"]
|
|
query = definition["query"].replace("{bucket}", bucket)
|
|
query = query.replace("FROM logs", "FROM logs_view")
|
|
# Apply top_n limit for tables (performance-friendly), if configured
|
|
top_n = definition.get("top_n")
|
|
chart_type = definition.get("chart", "line")
|
|
if top_n and chart_type == "table":
|
|
try:
|
|
n = int(top_n)
|
|
if "LIMIT" not in query.upper():
|
|
query = f"{query}\nLIMIT {n}"
|
|
except Exception:
|
|
pass
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
headers = [c[0] for c in cur.description]
|
|
data = [dict(zip(headers, row)) for row in rows]
|
|
json_path = out_dir / f"{name}.json"
|
|
_save_json(json_path, data)
|
|
entry = {
|
|
"name": name,
|
|
"label": definition.get("label", name.title()),
|
|
"chart": definition.get("chart", "line"),
|
|
"json": f"{name}.json",
|
|
"html": f"{name}.html",
|
|
}
|
|
if "icon" in definition:
|
|
entry["icon"] = definition["icon"]
|
|
if "bucket" in definition:
|
|
entry["bucket"] = definition["bucket"]
|
|
if "buckets" in definition:
|
|
entry["buckets"] = definition["buckets"]
|
|
if "bucket_label" in definition:
|
|
entry["bucket_label"] = definition["bucket_label"]
|
|
if "color" in definition:
|
|
entry["color"] = definition["color"]
|
|
if "colors" in definition:
|
|
entry["colors"] = definition["colors"]
|
|
# Optional UX metadata passthrough for frontend-only transforms
|
|
for key in (
|
|
"windows_supported",
|
|
"window_default",
|
|
"group_others_threshold",
|
|
"exclude_values",
|
|
"top_n",
|
|
"stacked",
|
|
"palette",
|
|
):
|
|
if key in definition:
|
|
entry[key] = definition[key]
|
|
_render_snippet(entry, out_dir)
|
|
report_list.append(entry)
|
|
|
|
_save_json(out_dir / "reports.json", report_list)
|
|
if domain:
|
|
typer.echo(f"Generated {interval} reports for {domain}")
|
|
else:
|
|
typer.echo(f"Generated {interval} reports")
|
|
|
|
|
|
def _generate_all_domains(interval: str) -> None:
|
|
"""Generate reports for each unique domain."""
|
|
for domain in _get_domains():
|
|
_generate_interval(interval, domain)
|
|
|
|
|
|
def _generate_root_index() -> None:
|
|
"""Render the top-level index listing all intervals and domains."""
|
|
_copy_icons()
|
|
intervals = sorted(
|
|
[name for name in INTERVAL_FORMATS if (OUTPUT_DIR / name).is_dir()]
|
|
)
|
|
|
|
domains_dir = OUTPUT_DIR / "domains"
|
|
domains: List[str] = []
|
|
if domains_dir.is_dir():
|
|
domains = [p.name for p in domains_dir.iterdir() if p.is_dir()]
|
|
domains.sort()
|
|
|
|
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
|
|
template = env.get_template("index.html")
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
out_path = OUTPUT_DIR / "index.html"
|
|
out_path.write_text(template.render(intervals=intervals, domains=domains))
|
|
typer.echo(f"Generated root index at {out_path}")
|
|
|
|
|
|
def _generate_global() -> None:
|
|
"""Generate reports that do not depend on an interval."""
|
|
cfg = _load_config()
|
|
if not cfg:
|
|
typer.echo("No report definitions found")
|
|
return
|
|
|
|
start_time = time.time()
|
|
generated_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
_copy_icons()
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cur = conn.cursor()
|
|
|
|
out_dir = OUTPUT_DIR / "global"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
report_list = []
|
|
for definition in cfg:
|
|
if "{bucket}" in definition["query"] and not definition.get("global"):
|
|
continue
|
|
|
|
name = definition["name"]
|
|
query = definition["query"]
|
|
# Apply top_n limit for tables (performance-friendly), if configured
|
|
top_n = definition.get("top_n")
|
|
chart_type = definition.get("chart", "line")
|
|
if top_n and chart_type == "table":
|
|
try:
|
|
n = int(top_n)
|
|
if "LIMIT" not in query.upper():
|
|
query = f"{query}\nLIMIT {n}"
|
|
except Exception:
|
|
pass
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
headers = [c[0] for c in cur.description]
|
|
data = [dict(zip(headers, row)) for row in rows]
|
|
json_path = out_dir / f"{name}.json"
|
|
_save_json(json_path, data)
|
|
entry = {
|
|
"name": name,
|
|
"label": definition.get("label", name.title()),
|
|
"chart": definition.get("chart", "line"),
|
|
"json": f"{name}.json",
|
|
"html": f"{name}.html",
|
|
}
|
|
if "icon" in definition:
|
|
entry["icon"] = definition["icon"]
|
|
if "bucket" in definition:
|
|
entry["bucket"] = definition["bucket"]
|
|
if "buckets" in definition:
|
|
entry["buckets"] = definition["buckets"]
|
|
if "bucket_label" in definition:
|
|
entry["bucket_label"] = definition["bucket_label"]
|
|
if "color" in definition:
|
|
entry["color"] = definition["color"]
|
|
if "colors" in definition:
|
|
entry["colors"] = definition["colors"]
|
|
# Optional UX metadata passthrough for frontend-only transforms
|
|
for key in (
|
|
"windows_supported",
|
|
"window_default",
|
|
"group_others_threshold",
|
|
"exclude_values",
|
|
"top_n",
|
|
"stacked",
|
|
"palette",
|
|
):
|
|
if key in definition:
|
|
entry[key] = definition[key]
|
|
_render_snippet(entry, out_dir)
|
|
report_list.append(entry)
|
|
|
|
_save_json(out_dir / "reports.json", report_list)
|
|
elapsed = round(time.time() - start_time, 2)
|
|
_write_stats(generated_at, elapsed)
|
|
typer.echo("Generated global reports")
|
|
|
|
|
|
def _generate_analysis() -> None:
|
|
"""Generate analysis JSON files consumed by the Analysis tab."""
|
|
try:
|
|
# Import lazily to avoid circulars and keep dependencies optional
|
|
from scripts import analyze
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
typer.echo(f"Failed to import analysis module: {exc}")
|
|
return
|
|
|
|
# Ensure output root and icons present for parity
|
|
_copy_icons()
|
|
|
|
# These commands write JSON files under output/analysis/
|
|
try:
|
|
analyze.check_missing_domains(json_output=True)
|
|
except Exception as exc: # pragma: no cover - continue best-effort
|
|
typer.echo(f"check_missing_domains failed: {exc}")
|
|
try:
|
|
analyze.suggest_cache(json_output=True)
|
|
except Exception as exc: # pragma: no cover
|
|
typer.echo(f"suggest_cache failed: {exc}")
|
|
try:
|
|
analyze.detect_threats()
|
|
except Exception as exc: # pragma: no cover
|
|
typer.echo(f"detect_threats failed: {exc}")
|
|
typer.echo("Generated analysis JSON files")
|
|
|
|
|
|
@app.command()
|
|
def hourly(
|
|
domain: Optional[str] = typer.Option(
|
|
None, help="Generate reports for a specific domain"
|
|
),
|
|
all_domains: bool = typer.Option(
|
|
False, "--all-domains", help="Generate reports for each domain"
|
|
),
|
|
) -> None:
|
|
"""Generate hourly reports."""
|
|
if all_domains:
|
|
_generate_all_domains("hourly")
|
|
else:
|
|
_generate_interval("hourly", domain)
|
|
|
|
|
|
@app.command()
|
|
def daily(
|
|
domain: Optional[str] = typer.Option(
|
|
None, help="Generate reports for a specific domain"
|
|
),
|
|
all_domains: bool = typer.Option(
|
|
False, "--all-domains", help="Generate reports for each domain"
|
|
),
|
|
) -> None:
|
|
"""Generate daily reports."""
|
|
if all_domains:
|
|
_generate_all_domains("daily")
|
|
else:
|
|
_generate_interval("daily", domain)
|
|
|
|
|
|
@app.command()
|
|
def weekly(
|
|
domain: Optional[str] = typer.Option(
|
|
None, help="Generate reports for a specific domain"
|
|
),
|
|
all_domains: bool = typer.Option(
|
|
False, "--all-domains", help="Generate reports for each domain"
|
|
),
|
|
) -> None:
|
|
"""Generate weekly reports."""
|
|
if all_domains:
|
|
_generate_all_domains("weekly")
|
|
else:
|
|
_generate_interval("weekly", domain)
|
|
|
|
|
|
@app.command()
|
|
def monthly(
|
|
domain: Optional[str] = typer.Option(
|
|
None, help="Generate reports for a specific domain"
|
|
),
|
|
all_domains: bool = typer.Option(
|
|
False, "--all-domains", help="Generate reports for each domain"
|
|
),
|
|
) -> None:
|
|
"""Generate monthly reports."""
|
|
if all_domains:
|
|
_generate_all_domains("monthly")
|
|
else:
|
|
_generate_interval("monthly", domain)
|
|
|
|
|
|
@app.command("global")
|
|
def global_reports() -> None:
|
|
"""Generate global reports."""
|
|
_generate_global()
|
|
|
|
|
|
@app.command()
|
|
def analysis() -> None:
|
|
"""Generate analysis JSON files for the Analysis tab."""
|
|
_generate_analysis()
|
|
|
|
|
|
@app.command()
|
|
def index() -> None:
|
|
"""Generate the root index page linking all reports."""
|
|
_generate_root_index()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|