Merge pull request #2 from wagesj45/codex/fix-duplicate-entries-during-import
Fix incremental log import
This commit is contained in:
		
				commit
				
					
						15770f5161
					
				
			
		
					 1 changed files with 47 additions and 17 deletions
				
			
		|  | @ -1,23 +1,25 @@ | ||||||
| #!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||||
| 
 | 
 | ||||||
| import os | import os | ||||||
| import sqlite3 |  | ||||||
| import re | import re | ||||||
| from pathlib import Path | import sqlite3 | ||||||
|  | from datetime import datetime | ||||||
| 
 | 
 | ||||||
| LOG_DIR = "/var/log/nginx" | LOG_DIR = "/var/log/nginx" | ||||||
| DB_FILE = "database/ngxstat.db" | DB_FILE = "database/ngxstat.db" | ||||||
| LOG_FILE_PATTERN = re.compile(r'access\.log(\.\d+)?$') | LOG_FILE_PATTERN = re.compile(r"access\.log(\.\d+)?$") | ||||||
| LOG_FORMAT_REGEX = re.compile( | LOG_FORMAT_REGEX = re.compile( | ||||||
|     r'(?P<ip>\S+) - (?P<host>\S+) \[(?P<time>.*?)\] "(?P<request>.*?)" ' |     r'(?P<ip>\S+) - (?P<host>\S+) \[(?P<time>.*?)\] "(?P<request>.*?)" ' | ||||||
|     r'(?P<status>\d{3}) (?P<bytes_sent>\d+) "(?P<referer>.*?)" "(?P<user_agent>.*?)" (?P<cache_status>\S+)' |     r'(?P<status>\d{3}) (?P<bytes_sent>\d+) "(?P<referer>.*?)" "(?P<user_agent>.*?)" (?P<cache_status>\S+)' | ||||||
| ) | ) | ||||||
|  | DATE_FMT = "%d/%b/%Y:%H:%M:%S %z" | ||||||
| 
 | 
 | ||||||
| os.makedirs("database", exist_ok=True) | os.makedirs("database", exist_ok=True) | ||||||
| conn = sqlite3.connect(DB_FILE) | conn = sqlite3.connect(DB_FILE) | ||||||
| cursor = conn.cursor() | cursor = conn.cursor() | ||||||
| 
 | 
 | ||||||
| cursor.execute(''' | cursor.execute( | ||||||
|  |     """ | ||||||
| CREATE TABLE IF NOT EXISTS logs ( | CREATE TABLE IF NOT EXISTS logs ( | ||||||
|     id INTEGER PRIMARY KEY, |     id INTEGER PRIMARY KEY, | ||||||
|     ip TEXT, |     ip TEXT, | ||||||
|  | @ -30,16 +32,29 @@ CREATE TABLE IF NOT EXISTS logs ( | ||||||
|     user_agent TEXT, |     user_agent TEXT, | ||||||
|     cache_status TEXT |     cache_status TEXT | ||||||
| ) | ) | ||||||
| ''') | """ | ||||||
|  | ) | ||||||
|  | cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_time ON logs(time)") | ||||||
| 
 | 
 | ||||||
| conn.commit() | conn.commit() | ||||||
| 
 | 
 | ||||||
|  | cursor.execute("SELECT time FROM logs ORDER BY id DESC LIMIT 1") | ||||||
|  | row = cursor.fetchone() | ||||||
|  | last_dt = None | ||||||
|  | if row and row[0]: | ||||||
|     try: |     try: | ||||||
|     log_files = sorted([ |         last_dt = datetime.strptime(row[0], DATE_FMT) | ||||||
|  |     except ValueError: | ||||||
|  |         last_dt = None | ||||||
|  | 
 | ||||||
|  | try: | ||||||
|  |     log_files = sorted( | ||||||
|  |         [ | ||||||
|             os.path.join(LOG_DIR, f) |             os.path.join(LOG_DIR, f) | ||||||
|             for f in os.listdir(LOG_DIR) |             for f in os.listdir(LOG_DIR) | ||||||
|             if LOG_FILE_PATTERN.match(f) |             if LOG_FILE_PATTERN.match(f) | ||||||
|     ]) |         ] | ||||||
|  |     ) | ||||||
| except FileNotFoundError: | except FileNotFoundError: | ||||||
|     print(f"[ERROR] Log directory not found: {LOG_DIR}") |     print(f"[ERROR] Log directory not found: {LOG_DIR}") | ||||||
|     exit(1) |     exit(1) | ||||||
|  | @ -49,21 +64,36 @@ print(f"[INFO] Found {len(log_files)} log files.") | ||||||
| inserted = 0 | inserted = 0 | ||||||
| for log_file in log_files: | for log_file in log_files: | ||||||
|     print(f"[INFO] Parsing {log_file}...") |     print(f"[INFO] Parsing {log_file}...") | ||||||
|     with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: |     with open(log_file, "r", encoding="utf-8", errors="ignore") as f: | ||||||
|         for line in f: |         for line in f: | ||||||
|             match = LOG_FORMAT_REGEX.match(line.strip()) |             match = LOG_FORMAT_REGEX.match(line.strip()) | ||||||
|             if match: |             if match: | ||||||
|                 data = match.groupdict() |                 data = match.groupdict() | ||||||
|                 cursor.execute(''' |                 try: | ||||||
|  |                     entry_dt = datetime.strptime(data["time"], DATE_FMT) | ||||||
|  |                 except ValueError: | ||||||
|  |                     continue | ||||||
|  |                 if last_dt and entry_dt <= last_dt: | ||||||
|  |                     continue | ||||||
|  |                 cursor.execute( | ||||||
|  |                     """ | ||||||
|                     INSERT INTO logs ( |                     INSERT INTO logs ( | ||||||
|                         ip, host, time, request, status, bytes_sent, |                         ip, host, time, request, status, bytes_sent, | ||||||
|                         referer, user_agent, cache_status |                         referer, user_agent, cache_status | ||||||
|                     ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) |                     ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | ||||||
|                 ''', ( |                     """, | ||||||
|                     data["ip"], data["host"], data["time"], data["request"], |                     ( | ||||||
|                     int(data["status"]), int(data["bytes_sent"]), |                         data["ip"], | ||||||
|                     data["referer"], data["user_agent"], data["cache_status"] |                         data["host"], | ||||||
|                 )) |                         data["time"], | ||||||
|  |                         data["request"], | ||||||
|  |                         int(data["status"]), | ||||||
|  |                         int(data["bytes_sent"]), | ||||||
|  |                         data["referer"], | ||||||
|  |                         data["user_agent"], | ||||||
|  |                         data["cache_status"], | ||||||
|  |                     ), | ||||||
|  |                 ) | ||||||
|                 inserted += 1 |                 inserted += 1 | ||||||
| 
 | 
 | ||||||
| conn.commit() | conn.commit() | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue