diff --git a/scripts/analyze.py b/scripts/analyze.py index 7c4c141..9f49978 100644 --- a/scripts/analyze.py +++ b/scripts/analyze.py @@ -155,10 +155,9 @@ def check_missing_domains( typer.echo(d) -@app.command("suggest-cache") def suggest_cache( - threshold: int = typer.Option(10, help="Minimum number of MISS entries to report"), - json_output: bool = typer.Option(False, "--json", help="Output results as JSON"), + threshold: int = 10, + json_output: bool = False, ) -> None: """Suggest domain/path pairs that could benefit from caching. @@ -191,7 +190,7 @@ def suggest_cache( HAVING miss_count >= ? ORDER BY miss_count DESC """, - (threshold,), + (int(threshold),), ) rows = [r for r in cur.fetchall() if r[0] in no_cache] @@ -211,11 +210,18 @@ def suggest_cache( for item in result: typer.echo(f"{item['host']} {item['path']} {item['misses']}") +@app.command("suggest-cache") +def suggest_cache_cli( + threshold: int = typer.Option(10, help="Minimum number of MISS entries to report"), + json_output: bool = typer.Option(False, "--json", help="Output results as JSON"), +) -> None: + """CLI wrapper for suggest_cache.""" + suggest_cache(threshold=threshold, json_output=json_output) + -@app.command("detect-threats") def detect_threats( - hours: int = typer.Option(1, help="Number of recent hours to analyze"), - ip_threshold: int = typer.Option(100, help="Requests from a single IP to flag"), + hours: int = 1, + ip_threshold: int = 100, ) -> None: """Detect potential security threats from recent logs.""" @@ -231,8 +237,8 @@ def detect_threats( max_dt = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S") recent_end = max_dt - recent_start = recent_end - timedelta(hours=hours) - prev_start = recent_start - timedelta(hours=hours) + recent_start = recent_end - timedelta(hours=int(hours)) + prev_start = recent_start - timedelta(hours=int(hours)) prev_end = recent_start fmt = "%Y-%m-%d %H:%M:%S" @@ -339,6 +345,14 @@ def detect_threats( out_path.write_text(json.dumps(report, indent=2)) typer.echo(json.dumps(report)) +@app.command("detect-threats") +def detect_threats_cli( + hours: int = typer.Option(1, help="Number of recent hours to analyze"), + ip_threshold: int = typer.Option(100, help="Requests from a single IP to flag"), +) -> None: + """CLI wrapper for detect_threats.""" + detect_threats(hours=hours, ip_threshold=ip_threshold) + if __name__ == "__main__": app()