344 lines
9.8 KiB
Python
344 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Utility helpers for ad-hoc log analysis.
|
|
|
|
This module exposes small helper functions to inspect the ``ngxstat`` SQLite
|
|
database. The intent is to allow quick queries from the command line or other
|
|
scripts without rewriting SQL each time.
|
|
|
|
Examples
|
|
--------
|
|
To list all domains present in the database::
|
|
|
|
python scripts/analyze.py domains
|
|
|
|
The CLI is powered by :mod:`typer` and currently only offers a couple of
|
|
commands. More analysis routines can be added over time.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set
|
|
from datetime import datetime, timedelta
|
|
|
|
import json
|
|
|
|
import typer
|
|
|
|
from scripts import nginx_config # noqa: F401 # imported for side effects/usage
|
|
|
|
DB_PATH = Path("database/ngxstat.db")
|
|
ANALYSIS_DIR = Path("output/analysis")
|
|
|
|
app = typer.Typer(help="Ad-hoc statistics queries")
|
|
|
|
|
|
def _connect() -> sqlite3.Connection:
|
|
"""Return a new SQLite connection to :data:`DB_PATH`."""
|
|
return sqlite3.connect(DB_PATH)
|
|
|
|
|
|
def load_domains_from_db() -> List[str]:
|
|
"""Return a sorted list of distinct domains from the ``logs`` table."""
|
|
conn = _connect()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT DISTINCT host FROM logs ORDER BY host")
|
|
domains = [row[0] for row in cur.fetchall()]
|
|
conn.close()
|
|
return domains
|
|
|
|
|
|
def get_hit_count(domain: Optional[str] = None) -> int:
|
|
"""Return total request count.
|
|
|
|
Parameters
|
|
----------
|
|
domain:
|
|
Optional domain to filter on. If ``None`` the count includes all logs.
|
|
"""
|
|
conn = _connect()
|
|
cur = conn.cursor()
|
|
if domain:
|
|
cur.execute("SELECT COUNT(*) FROM logs WHERE host = ?", (domain,))
|
|
else:
|
|
cur.execute("SELECT COUNT(*) FROM logs")
|
|
count = cur.fetchone()[0] or 0
|
|
conn.close()
|
|
return count
|
|
|
|
|
|
def get_cache_ratio(domain: Optional[str] = None) -> float:
|
|
"""Return the percentage of requests served from cache."""
|
|
conn = _connect()
|
|
cur = conn.cursor()
|
|
if domain:
|
|
cur.execute(
|
|
"SELECT SUM(CASE WHEN cache_status = 'HIT' THEN 1 ELSE 0 END) * 1.0 / "
|
|
"COUNT(*) FROM logs WHERE host = ?",
|
|
(domain,),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT SUM(CASE WHEN cache_status = 'HIT' THEN 1 ELSE 0 END) * 1.0 / "
|
|
"COUNT(*) FROM logs"
|
|
)
|
|
result = cur.fetchone()[0]
|
|
conn.close()
|
|
return float(result or 0.0)
|
|
|
|
|
|
@app.command()
|
|
def domains() -> None:
|
|
"""Print the list of domains discovered in the database."""
|
|
for d in load_domains_from_db():
|
|
typer.echo(d)
|
|
|
|
|
|
@app.command()
|
|
def hits(domain: Optional[str] = typer.Option(None, help="Filter by domain")) -> None:
|
|
"""Show request count."""
|
|
count = get_hit_count(domain)
|
|
if domain:
|
|
typer.echo(f"{domain}: {count} hits")
|
|
else:
|
|
typer.echo(f"Total hits: {count}")
|
|
|
|
|
|
@app.command("cache-ratio")
|
|
def cache_ratio_cmd(domain: Optional[str] = typer.Option(None, help="Filter by domain")) -> None:
|
|
"""Display cache hit ratio as a percentage."""
|
|
ratio = get_cache_ratio(domain) * 100
|
|
if domain:
|
|
typer.echo(f"{domain}: {ratio:.2f}% cached")
|
|
else:
|
|
typer.echo(f"Cache hit ratio: {ratio:.2f}%")
|
|
|
|
|
|
@app.command("check-missing-domains")
|
|
def check_missing_domains(json_output: bool = typer.Option(False, "--json", help="Output missing domains as JSON")) -> None:
|
|
"""Show domains present in the database but absent from Nginx config."""
|
|
try:
|
|
from scripts.generate_reports import _get_domains as _db_domains
|
|
except Exception: # pragma: no cover - fallback if import fails
|
|
_db_domains = load_domains_from_db
|
|
|
|
if not isinstance(json_output, bool):
|
|
json_output = False
|
|
|
|
db_domains = set(_db_domains())
|
|
|
|
paths = nginx_config.discover_configs()
|
|
servers = nginx_config.parse_servers(paths)
|
|
config_domains: Set[str] = set()
|
|
for server in servers:
|
|
names = server.get("server_name", "")
|
|
for name in names.split():
|
|
if name:
|
|
config_domains.add(name)
|
|
|
|
missing = sorted(db_domains - config_domains)
|
|
|
|
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)
|
|
out_path = ANALYSIS_DIR / "missing_domains.json"
|
|
out_path.write_text(json.dumps(missing, indent=2))
|
|
|
|
if json_output:
|
|
typer.echo(json.dumps(missing))
|
|
else:
|
|
for d in missing:
|
|
typer.echo(d)
|
|
|
|
|
|
@app.command("suggest-cache")
|
|
def suggest_cache(
|
|
threshold: int = typer.Option(
|
|
10, help="Minimum number of MISS entries to report"
|
|
),
|
|
json_output: bool = typer.Option(False, "--json", help="Output results as JSON"),
|
|
) -> None:
|
|
"""Suggest domain/path pairs that could benefit from caching.
|
|
|
|
Paths with at least ``threshold`` ``MISS`` entries are shown for domains
|
|
whose server blocks lack a ``proxy_cache`` directive.
|
|
"""
|
|
|
|
# Discover domains without explicit proxy_cache
|
|
paths = nginx_config.discover_configs()
|
|
servers = nginx_config.parse_servers(paths)
|
|
no_cache: Set[str] = set()
|
|
for server in servers:
|
|
if "proxy_cache" in server:
|
|
continue
|
|
for name in server.get("server_name", "").split():
|
|
if name:
|
|
no_cache.add(name)
|
|
|
|
conn = _connect()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
SELECT host,
|
|
substr(request, instr(request, ' ')+1,
|
|
instr(request, ' HTTP') - instr(request, ' ') - 1) AS path,
|
|
COUNT(*) AS miss_count
|
|
FROM logs
|
|
WHERE cache_status = 'MISS'
|
|
GROUP BY host, path
|
|
HAVING miss_count >= ?
|
|
ORDER BY miss_count DESC
|
|
""",
|
|
(threshold,),
|
|
)
|
|
|
|
rows = [r for r in cur.fetchall() if r[0] in no_cache]
|
|
conn.close()
|
|
|
|
result = [
|
|
{"host": host, "path": path, "misses": count} for host, path, count in rows
|
|
]
|
|
|
|
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)
|
|
out_path = ANALYSIS_DIR / "cache_suggestions.json"
|
|
out_path.write_text(json.dumps(result, indent=2))
|
|
|
|
if json_output:
|
|
typer.echo(json.dumps(result))
|
|
else:
|
|
for item in result:
|
|
typer.echo(f"{item['host']} {item['path']} {item['misses']}")
|
|
|
|
|
|
@app.command("detect-threats")
|
|
def detect_threats(
|
|
hours: int = typer.Option(1, help="Number of recent hours to analyze"),
|
|
ip_threshold: int = typer.Option(
|
|
100, help="Requests from a single IP to flag"
|
|
),
|
|
) -> None:
|
|
"""Detect potential security threats from recent logs."""
|
|
|
|
conn = _connect()
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("SELECT MAX(time) FROM logs")
|
|
row = cur.fetchone()
|
|
if not row or not row[0]:
|
|
typer.echo("No logs found")
|
|
conn.close()
|
|
return
|
|
|
|
max_dt = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")
|
|
recent_end = max_dt
|
|
recent_start = recent_end - timedelta(hours=hours)
|
|
prev_start = recent_start - timedelta(hours=hours)
|
|
prev_end = recent_start
|
|
|
|
fmt = "%Y-%m-%d %H:%M:%S"
|
|
recent_start_s = recent_start.strftime(fmt)
|
|
recent_end_s = recent_end.strftime(fmt)
|
|
prev_start_s = prev_start.strftime(fmt)
|
|
prev_end_s = prev_end.strftime(fmt)
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT host,
|
|
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors,
|
|
COUNT(*) AS total
|
|
FROM logs
|
|
WHERE time >= ? AND time < ?
|
|
GROUP BY host
|
|
""",
|
|
(recent_start_s, recent_end_s),
|
|
)
|
|
recent_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()}
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT host,
|
|
SUM(CASE WHEN status >= 400 THEN 1 ELSE 0 END) AS errors,
|
|
COUNT(*) AS total
|
|
FROM logs
|
|
WHERE time >= ? AND time < ?
|
|
GROUP BY host
|
|
""",
|
|
(prev_start_s, prev_end_s),
|
|
)
|
|
prev_rows = {r[0]: (r[1], r[2]) for r in cur.fetchall()}
|
|
|
|
error_spikes = []
|
|
for host in set(recent_rows) | set(prev_rows):
|
|
r_err, r_total = recent_rows.get(host, (0, 0))
|
|
p_err, p_total = prev_rows.get(host, (0, 0))
|
|
r_rate = r_err * 100.0 / r_total if r_total else 0.0
|
|
p_rate = p_err * 100.0 / p_total if p_total else 0.0
|
|
if r_rate >= 10 and r_rate >= p_rate * 2:
|
|
error_spikes.append(
|
|
{
|
|
"host": host,
|
|
"recent_error_rate": round(r_rate, 2),
|
|
"previous_error_rate": round(p_rate, 2),
|
|
}
|
|
)
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT DISTINCT user_agent FROM logs
|
|
WHERE time >= ? AND time < ?
|
|
""",
|
|
(prev_start_s, prev_end_s),
|
|
)
|
|
prev_agents = {r[0] for r in cur.fetchall()}
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT user_agent, COUNT(*) AS c
|
|
FROM logs
|
|
WHERE time >= ? AND time < ?
|
|
GROUP BY user_agent
|
|
HAVING c >= 10
|
|
""",
|
|
(recent_start_s, recent_end_s),
|
|
)
|
|
suspicious_agents = [
|
|
{"user_agent": ua, "requests": cnt}
|
|
for ua, cnt in cur.fetchall()
|
|
if ua not in prev_agents
|
|
]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT ip, COUNT(*) AS c
|
|
FROM logs
|
|
WHERE time >= ? AND time < ?
|
|
GROUP BY ip
|
|
HAVING c >= ?
|
|
ORDER BY c DESC
|
|
""",
|
|
(recent_start_s, recent_end_s, ip_threshold),
|
|
)
|
|
high_ip_requests = [
|
|
{"ip": ip, "requests": cnt} for ip, cnt in cur.fetchall()
|
|
]
|
|
|
|
conn.close()
|
|
|
|
report = {
|
|
"time_range": {
|
|
"recent_start": recent_start_s,
|
|
"recent_end": recent_end_s,
|
|
"previous_start": prev_start_s,
|
|
"previous_end": prev_end_s,
|
|
},
|
|
"error_spikes": error_spikes,
|
|
"suspicious_agents": suspicious_agents,
|
|
"high_ip_requests": high_ip_requests,
|
|
}
|
|
|
|
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)
|
|
out_path = ANALYSIS_DIR / "threat_report.json"
|
|
out_path.write_text(json.dumps(report, indent=2))
|
|
typer.echo(json.dumps(report))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|