diff --git a/scripts/nginx_config.py b/scripts/nginx_config.py new file mode 100644 index 0000000..e2dc447 --- /dev/null +++ b/scripts/nginx_config.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +"""Utilities for discovering and parsing Nginx configuration files. + +This module provides helper functions to locate Nginx configuration files and +extract key details from ``server`` blocks. Typical usage:: + + from scripts.nginx_config import discover_configs, parse_servers + + files = discover_configs() + servers = parse_servers(files) + for s in servers: + print(s.get("server_name"), s.get("listen")) + +The functions intentionally tolerate missing or unreadable files and will simply +skip over them. +""" + +import os +import re +from pathlib import Path +from typing import Dict, List, Set + +DEFAULT_PATHS = [ + "/etc/nginx/nginx.conf", + "/usr/local/etc/nginx/nginx.conf", +] + +INCLUDE_RE = re.compile(r"^\s*include\s+(.*?);", re.MULTILINE) +SERVER_RE = re.compile(r"server\s*{(.*?)}", re.DOTALL) +DIRECTIVE_RE = re.compile(r"^\s*(\S+)\s+(.*?);", re.MULTILINE) + + +def discover_configs() -> Set[Path]: + """Return a set of all config files reachable from :data:`DEFAULT_PATHS`.""" + + found: Set[Path] = set() + queue = [Path(p) for p in DEFAULT_PATHS] + + while queue: + path = queue.pop() + if path in found: + continue + if not path.exists(): + continue + try: + text = path.read_text() + except OSError: + continue + found.add(path) + for pattern in INCLUDE_RE.findall(text): + pattern = os.path.expanduser(pattern.strip()) + for included in path.parent.glob(pattern): + if included.is_file() and included not in found: + queue.append(included) + return found + + +def parse_servers(paths: Set[Path]) -> List[Dict[str, str]]: + """Parse ``server`` blocks from the given files. + + Parameters + ---------- + paths: + Iterable of configuration file paths. + """ + + servers: List[Dict[str, str]] = [] + for p in paths: + try: + text = Path(p).read_text() + except OSError: + continue + for block in SERVER_RE.findall(text): + directives: Dict[str, List[str]] = {} + for name, value in DIRECTIVE_RE.findall(block): + directives.setdefault(name, []).append(value.strip()) + entry: Dict[str, str] = {} + if "server_name" in directives: + entry["server_name"] = " ".join(directives["server_name"]) + if "listen" in directives: + entry["listen"] = " ".join(directives["listen"]) + if "proxy_cache" in directives: + entry["proxy_cache"] = " ".join(directives["proxy_cache"]) + if "root" in directives: + entry["root"] = " ".join(directives["root"]) + servers.append(entry) + return servers + diff --git a/tests/test_nginx_config.py b/tests/test_nginx_config.py new file mode 100644 index 0000000..782c635 --- /dev/null +++ b/tests/test_nginx_config.py @@ -0,0 +1,70 @@ +import sys +from pathlib import Path +import pytest + +REPO_ROOT = Path(__file__).resolve().parents[1] +sys.path.append(str(REPO_ROOT)) +from scripts import nginx_config as nc + + +def test_discover_configs(tmp_path, monkeypatch): + root = tmp_path / "nginx" + root.mkdir() + conf_d = root / "conf.d" + conf_d.mkdir() + subdir = root / "sub" + subdir.mkdir() + + main = root / "nginx.conf" + site = conf_d / "site.conf" + extra = root / "extra.conf" + nested = subdir / "foo.conf" + + main.write_text("include conf.d/*.conf;\ninclude extra.conf;\n") + site.write_text("# site config\n") + extra.write_text("include sub/foo.conf;\n") + nested.write_text("# nested config\n") + + monkeypatch.setattr(nc, "DEFAULT_PATHS", [str(main)]) + found = nc.discover_configs() + + assert found == {main, site, extra, nested} + + +def test_parse_servers(tmp_path): + conf1 = tmp_path / "site.conf" + conf2 = tmp_path / "other.conf" + + conf1.write_text( + """ +server { + listen 80; + server_name example.com; + root /srv/example; + proxy_cache cache1; +} +""" + ) + + conf2.write_text( + """ +server { + listen 443 ssl; + server_name example.org; +} +""" + ) + + servers = nc.parse_servers({conf1, conf2, tmp_path / "missing.conf"}) + servers = sorted(servers, key=lambda s: s.get("server_name")) + + assert len(servers) == 2 + assert servers[0]["server_name"] == "example.com" + assert servers[0]["listen"] == "80" + assert servers[0]["root"] == "/srv/example" + assert servers[0]["proxy_cache"] == "cache1" + + assert servers[1]["server_name"] == "example.org" + assert servers[1]["listen"] == "443 ssl" + assert "proxy_cache" not in servers[1] +