Add missing domain check command #33

Merged
wagesj45 merged 1 commit from codex/add-check_missing_domains-command into main 2025-07-19 02:01:14 -05:00
2 changed files with 124 additions and 1 deletions

View file

@ -18,7 +18,9 @@ from __future__ import annotations
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Optional, Set
import json
import typer import typer
@ -110,5 +112,36 @@ def cache_ratio_cmd(domain: Optional[str] = typer.Option(None, help="Filter by d
typer.echo(f"Cache hit ratio: {ratio:.2f}%") typer.echo(f"Cache hit ratio: {ratio:.2f}%")
@app.command("check-missing-domains")
def check_missing_domains(json_output: bool = typer.Option(False, "--json", help="Output missing domains as JSON")) -> None:
"""Show domains present in the database but absent from Nginx config."""
try:
from scripts.generate_reports import _get_domains as _db_domains
except Exception: # pragma: no cover - fallback if import fails
_db_domains = load_domains_from_db
if not isinstance(json_output, bool):
json_output = False
db_domains = set(_db_domains())
paths = nginx_config.discover_configs()
servers = nginx_config.parse_servers(paths)
config_domains: Set[str] = set()
for server in servers:
names = server.get("server_name", "")
for name in names.split():
if name:
config_domains.add(name)
missing = sorted(db_domains - config_domains)
if json_output:
typer.echo(json.dumps(missing))
else:
for d in missing:
typer.echo(d)
if __name__ == "__main__": if __name__ == "__main__":
app() app()

90
tests/test_analyze.py Normal file
View file

@ -0,0 +1,90 @@
import sys
import json
import sqlite3
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(REPO_ROOT))
from scripts import analyze
from scripts import generate_reports as gr
def setup_db(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE logs (
id INTEGER PRIMARY KEY,
ip TEXT,
host TEXT,
time TEXT,
request TEXT,
status INTEGER,
bytes_sent INTEGER,
referer TEXT,
user_agent TEXT,
cache_status TEXT
)
"""
)
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"example.com",
"2024-01-01 10:00:00",
"GET / HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"missing.com",
"2024-01-01 11:00:00",
"GET / HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
conn.commit()
conn.close()
def test_check_missing_domains(tmp_path, monkeypatch, capsys):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
conf = tmp_path / "nginx.conf"
conf.write_text(
"""
server {
listen 80;
server_name example.com;
}
"""
)
monkeypatch.setattr(analyze, "DB_PATH", db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)])
analyze.check_missing_domains(json_output=False)
out = capsys.readouterr().out.strip().splitlines()
assert out == ["missing.com"]
analyze.check_missing_domains(json_output=True)
out_json = json.loads(capsys.readouterr().out.strip())
assert out_json == ["missing.com"]