Add missing domain check command #33

Merged
wagesj45 merged 1 commit from codex/add-check_missing_domains-command into main 2025-07-19 02:01:14 -05:00
2 changed files with 124 additions and 1 deletions

View file

@ -18,7 +18,9 @@ from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Set
import json
import typer
@ -110,5 +112,36 @@ def cache_ratio_cmd(domain: Optional[str] = typer.Option(None, help="Filter by d
typer.echo(f"Cache hit ratio: {ratio:.2f}%")
@app.command("check-missing-domains")
def check_missing_domains(json_output: bool = typer.Option(False, "--json", help="Output missing domains as JSON")) -> None:
"""Show domains present in the database but absent from Nginx config."""
try:
from scripts.generate_reports import _get_domains as _db_domains
except Exception: # pragma: no cover - fallback if import fails
_db_domains = load_domains_from_db
if not isinstance(json_output, bool):
json_output = False
db_domains = set(_db_domains())
paths = nginx_config.discover_configs()
servers = nginx_config.parse_servers(paths)
config_domains: Set[str] = set()
for server in servers:
names = server.get("server_name", "")
for name in names.split():
if name:
config_domains.add(name)
missing = sorted(db_domains - config_domains)
if json_output:
typer.echo(json.dumps(missing))
else:
for d in missing:
typer.echo(d)
if __name__ == "__main__":
app()

90
tests/test_analyze.py Normal file
View file

@ -0,0 +1,90 @@
import sys
import json
import sqlite3
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(REPO_ROOT))
from scripts import analyze
from scripts import generate_reports as gr
def setup_db(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(path)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE logs (
id INTEGER PRIMARY KEY,
ip TEXT,
host TEXT,
time TEXT,
request TEXT,
status INTEGER,
bytes_sent INTEGER,
referer TEXT,
user_agent TEXT,
cache_status TEXT
)
"""
)
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"example.com",
"2024-01-01 10:00:00",
"GET / HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
cur.execute(
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"127.0.0.1",
"missing.com",
"2024-01-01 11:00:00",
"GET / HTTP/1.1",
200,
100,
"-",
"curl",
"MISS",
),
)
conn.commit()
conn.close()
def test_check_missing_domains(tmp_path, monkeypatch, capsys):
db_path = tmp_path / "database" / "ngxstat.db"
setup_db(db_path)
conf = tmp_path / "nginx.conf"
conf.write_text(
"""
server {
listen 80;
server_name example.com;
}
"""
)
monkeypatch.setattr(analyze, "DB_PATH", db_path)
monkeypatch.setattr(gr, "DB_PATH", db_path)
monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)])
analyze.check_missing_domains(json_output=False)
out = capsys.readouterr().out.strip().splitlines()
assert out == ["missing.com"]
analyze.check_missing_domains(json_output=True)
out_json = json.loads(capsys.readouterr().out.strip())
assert out_json == ["missing.com"]