Add check_missing_domains command and tests
This commit is contained in:
parent
4c80860b20
commit
1a6e836631
2 changed files with 124 additions and 1 deletions
|
@ -18,7 +18,9 @@ from __future__ import annotations
|
|||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, Set
|
||||
|
||||
import json
|
||||
|
||||
import typer
|
||||
|
||||
|
@ -110,5 +112,36 @@ def cache_ratio_cmd(domain: Optional[str] = typer.Option(None, help="Filter by d
|
|||
typer.echo(f"Cache hit ratio: {ratio:.2f}%")
|
||||
|
||||
|
||||
@app.command("check-missing-domains")
|
||||
def check_missing_domains(json_output: bool = typer.Option(False, "--json", help="Output missing domains as JSON")) -> None:
|
||||
"""Show domains present in the database but absent from Nginx config."""
|
||||
try:
|
||||
from scripts.generate_reports import _get_domains as _db_domains
|
||||
except Exception: # pragma: no cover - fallback if import fails
|
||||
_db_domains = load_domains_from_db
|
||||
|
||||
if not isinstance(json_output, bool):
|
||||
json_output = False
|
||||
|
||||
db_domains = set(_db_domains())
|
||||
|
||||
paths = nginx_config.discover_configs()
|
||||
servers = nginx_config.parse_servers(paths)
|
||||
config_domains: Set[str] = set()
|
||||
for server in servers:
|
||||
names = server.get("server_name", "")
|
||||
for name in names.split():
|
||||
if name:
|
||||
config_domains.add(name)
|
||||
|
||||
missing = sorted(db_domains - config_domains)
|
||||
|
||||
if json_output:
|
||||
typer.echo(json.dumps(missing))
|
||||
else:
|
||||
for d in missing:
|
||||
typer.echo(d)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
|
|
90
tests/test_analyze.py
Normal file
90
tests/test_analyze.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
import sys
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.append(str(REPO_ROOT))
|
||||
from scripts import analyze
|
||||
from scripts import generate_reports as gr
|
||||
|
||||
|
||||
def setup_db(path: Path) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE logs (
|
||||
id INTEGER PRIMARY KEY,
|
||||
ip TEXT,
|
||||
host TEXT,
|
||||
time TEXT,
|
||||
request TEXT,
|
||||
status INTEGER,
|
||||
bytes_sent INTEGER,
|
||||
referer TEXT,
|
||||
user_agent TEXT,
|
||||
cache_status TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
cur.execute(
|
||||
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"127.0.0.1",
|
||||
"example.com",
|
||||
"2024-01-01 10:00:00",
|
||||
"GET / HTTP/1.1",
|
||||
200,
|
||||
100,
|
||||
"-",
|
||||
"curl",
|
||||
"MISS",
|
||||
),
|
||||
)
|
||||
cur.execute(
|
||||
"INSERT INTO logs (ip, host, time, request, status, bytes_sent, referer, user_agent, cache_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"127.0.0.1",
|
||||
"missing.com",
|
||||
"2024-01-01 11:00:00",
|
||||
"GET / HTTP/1.1",
|
||||
200,
|
||||
100,
|
||||
"-",
|
||||
"curl",
|
||||
"MISS",
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_check_missing_domains(tmp_path, monkeypatch, capsys):
|
||||
db_path = tmp_path / "database" / "ngxstat.db"
|
||||
setup_db(db_path)
|
||||
|
||||
conf = tmp_path / "nginx.conf"
|
||||
conf.write_text(
|
||||
"""
|
||||
server {
|
||||
listen 80;
|
||||
server_name example.com;
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
monkeypatch.setattr(analyze, "DB_PATH", db_path)
|
||||
monkeypatch.setattr(gr, "DB_PATH", db_path)
|
||||
monkeypatch.setattr(analyze.nginx_config, "DEFAULT_PATHS", [str(conf)])
|
||||
|
||||
analyze.check_missing_domains(json_output=False)
|
||||
out = capsys.readouterr().out.strip().splitlines()
|
||||
assert out == ["missing.com"]
|
||||
|
||||
analyze.check_missing_domains(json_output=True)
|
||||
out_json = json.loads(capsys.readouterr().out.strip())
|
||||
assert out_json == ["missing.com"]
|
Loading…
Add table
Add a link
Reference in a new issue