Merge pull request #31 from wagesj45/codex/create-nginx_config-utility-module
Add nginx config parser utilities
This commit is contained in:
		
				commit
				
					
						9bd926ff06
					
				
			
		
					 2 changed files with 158 additions and 0 deletions
				
			
		
							
								
								
									
										88
									
								
								scripts/nginx_config.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										88
									
								
								scripts/nginx_config.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,88 @@ | ||||||
|  | #!/usr/bin/env python3 | ||||||
|  | """Utilities for discovering and parsing Nginx configuration files. | ||||||
|  | 
 | ||||||
|  | This module provides helper functions to locate Nginx configuration files and | ||||||
|  | extract key details from ``server`` blocks.  Typical usage:: | ||||||
|  | 
 | ||||||
|  |     from scripts.nginx_config import discover_configs, parse_servers | ||||||
|  | 
 | ||||||
|  |     files = discover_configs() | ||||||
|  |     servers = parse_servers(files) | ||||||
|  |     for s in servers: | ||||||
|  |         print(s.get("server_name"), s.get("listen")) | ||||||
|  | 
 | ||||||
|  | The functions intentionally tolerate missing or unreadable files and will simply | ||||||
|  | skip over them. | ||||||
|  | """ | ||||||
|  | 
 | ||||||
|  | import os | ||||||
|  | import re | ||||||
|  | from pathlib import Path | ||||||
|  | from typing import Dict, List, Set | ||||||
|  | 
 | ||||||
|  | DEFAULT_PATHS = [ | ||||||
|  |     "/etc/nginx/nginx.conf", | ||||||
|  |     "/usr/local/etc/nginx/nginx.conf", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
|  | INCLUDE_RE = re.compile(r"^\s*include\s+(.*?);", re.MULTILINE) | ||||||
|  | SERVER_RE = re.compile(r"server\s*{(.*?)}", re.DOTALL) | ||||||
|  | DIRECTIVE_RE = re.compile(r"^\s*(\S+)\s+(.*?);", re.MULTILINE) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def discover_configs() -> Set[Path]: | ||||||
|  |     """Return a set of all config files reachable from :data:`DEFAULT_PATHS`.""" | ||||||
|  | 
 | ||||||
|  |     found: Set[Path] = set() | ||||||
|  |     queue = [Path(p) for p in DEFAULT_PATHS] | ||||||
|  | 
 | ||||||
|  |     while queue: | ||||||
|  |         path = queue.pop() | ||||||
|  |         if path in found: | ||||||
|  |             continue | ||||||
|  |         if not path.exists(): | ||||||
|  |             continue | ||||||
|  |         try: | ||||||
|  |             text = path.read_text() | ||||||
|  |         except OSError: | ||||||
|  |             continue | ||||||
|  |         found.add(path) | ||||||
|  |         for pattern in INCLUDE_RE.findall(text): | ||||||
|  |             pattern = os.path.expanduser(pattern.strip()) | ||||||
|  |             for included in path.parent.glob(pattern): | ||||||
|  |                 if included.is_file() and included not in found: | ||||||
|  |                     queue.append(included) | ||||||
|  |     return found | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def parse_servers(paths: Set[Path]) -> List[Dict[str, str]]: | ||||||
|  |     """Parse ``server`` blocks from the given files. | ||||||
|  | 
 | ||||||
|  |     Parameters | ||||||
|  |     ---------- | ||||||
|  |     paths: | ||||||
|  |         Iterable of configuration file paths. | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     servers: List[Dict[str, str]] = [] | ||||||
|  |     for p in paths: | ||||||
|  |         try: | ||||||
|  |             text = Path(p).read_text() | ||||||
|  |         except OSError: | ||||||
|  |             continue | ||||||
|  |         for block in SERVER_RE.findall(text): | ||||||
|  |             directives: Dict[str, List[str]] = {} | ||||||
|  |             for name, value in DIRECTIVE_RE.findall(block): | ||||||
|  |                 directives.setdefault(name, []).append(value.strip()) | ||||||
|  |             entry: Dict[str, str] = {} | ||||||
|  |             if "server_name" in directives: | ||||||
|  |                 entry["server_name"] = " ".join(directives["server_name"]) | ||||||
|  |             if "listen" in directives: | ||||||
|  |                 entry["listen"] = " ".join(directives["listen"]) | ||||||
|  |             if "proxy_cache" in directives: | ||||||
|  |                 entry["proxy_cache"] = " ".join(directives["proxy_cache"]) | ||||||
|  |             if "root" in directives: | ||||||
|  |                 entry["root"] = " ".join(directives["root"]) | ||||||
|  |             servers.append(entry) | ||||||
|  |     return servers | ||||||
|  | 
 | ||||||
							
								
								
									
										70
									
								
								tests/test_nginx_config.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										70
									
								
								tests/test_nginx_config.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,70 @@ | ||||||
|  | import sys | ||||||
|  | from pathlib import Path | ||||||
|  | import pytest | ||||||
|  | 
 | ||||||
|  | REPO_ROOT = Path(__file__).resolve().parents[1] | ||||||
|  | sys.path.append(str(REPO_ROOT)) | ||||||
|  | from scripts import nginx_config as nc | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_discover_configs(tmp_path, monkeypatch): | ||||||
|  |     root = tmp_path / "nginx" | ||||||
|  |     root.mkdir() | ||||||
|  |     conf_d = root / "conf.d" | ||||||
|  |     conf_d.mkdir() | ||||||
|  |     subdir = root / "sub" | ||||||
|  |     subdir.mkdir() | ||||||
|  | 
 | ||||||
|  |     main = root / "nginx.conf" | ||||||
|  |     site = conf_d / "site.conf" | ||||||
|  |     extra = root / "extra.conf" | ||||||
|  |     nested = subdir / "foo.conf" | ||||||
|  | 
 | ||||||
|  |     main.write_text("include conf.d/*.conf;\ninclude extra.conf;\n") | ||||||
|  |     site.write_text("# site config\n") | ||||||
|  |     extra.write_text("include sub/foo.conf;\n") | ||||||
|  |     nested.write_text("# nested config\n") | ||||||
|  | 
 | ||||||
|  |     monkeypatch.setattr(nc, "DEFAULT_PATHS", [str(main)]) | ||||||
|  |     found = nc.discover_configs() | ||||||
|  | 
 | ||||||
|  |     assert found == {main, site, extra, nested} | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_parse_servers(tmp_path): | ||||||
|  |     conf1 = tmp_path / "site.conf" | ||||||
|  |     conf2 = tmp_path / "other.conf" | ||||||
|  | 
 | ||||||
|  |     conf1.write_text( | ||||||
|  |         """ | ||||||
|  | server { | ||||||
|  |     listen 80; | ||||||
|  |     server_name example.com; | ||||||
|  |     root /srv/example; | ||||||
|  |     proxy_cache cache1; | ||||||
|  | } | ||||||
|  | """ | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     conf2.write_text( | ||||||
|  |         """ | ||||||
|  | server { | ||||||
|  |     listen 443 ssl; | ||||||
|  |     server_name example.org; | ||||||
|  | } | ||||||
|  | """ | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     servers = nc.parse_servers({conf1, conf2, tmp_path / "missing.conf"}) | ||||||
|  |     servers = sorted(servers, key=lambda s: s.get("server_name")) | ||||||
|  | 
 | ||||||
|  |     assert len(servers) == 2 | ||||||
|  |     assert servers[0]["server_name"] == "example.com" | ||||||
|  |     assert servers[0]["listen"] == "80" | ||||||
|  |     assert servers[0]["root"] == "/srv/example" | ||||||
|  |     assert servers[0]["proxy_cache"] == "cache1" | ||||||
|  | 
 | ||||||
|  |     assert servers[1]["server_name"] == "example.org" | ||||||
|  |     assert servers[1]["listen"] == "443 ssl" | ||||||
|  |     assert "proxy_cache" not in servers[1] | ||||||
|  | 
 | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue