Fix incremental log import #2

Merged
wagesj45 merged 1 commit from codex/fix-duplicate-entries-during-import into main 2025-07-17 23:27:50 -05:00

View file

@ -1,23 +1,25 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sqlite3
import re import re
from pathlib import Path import sqlite3
from datetime import datetime
LOG_DIR = "/var/log/nginx" LOG_DIR = "/var/log/nginx"
DB_FILE = "database/ngxstat.db" DB_FILE = "database/ngxstat.db"
LOG_FILE_PATTERN = re.compile(r'access\.log(\.\d+)?$') LOG_FILE_PATTERN = re.compile(r"access\.log(\.\d+)?$")
LOG_FORMAT_REGEX = re.compile( LOG_FORMAT_REGEX = re.compile(
r'(?P<ip>\S+) - (?P<host>\S+) \[(?P<time>.*?)\] "(?P<request>.*?)" ' r'(?P<ip>\S+) - (?P<host>\S+) \[(?P<time>.*?)\] "(?P<request>.*?)" '
r'(?P<status>\d{3}) (?P<bytes_sent>\d+) "(?P<referer>.*?)" "(?P<user_agent>.*?)" (?P<cache_status>\S+)' r'(?P<status>\d{3}) (?P<bytes_sent>\d+) "(?P<referer>.*?)" "(?P<user_agent>.*?)" (?P<cache_status>\S+)'
) )
DATE_FMT = "%d/%b/%Y:%H:%M:%S %z"
os.makedirs("database", exist_ok=True) os.makedirs("database", exist_ok=True)
conn = sqlite3.connect(DB_FILE) conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(''' cursor.execute(
"""
CREATE TABLE IF NOT EXISTS logs ( CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
ip TEXT, ip TEXT,
@ -30,16 +32,29 @@ CREATE TABLE IF NOT EXISTS logs (
user_agent TEXT, user_agent TEXT,
cache_status TEXT cache_status TEXT
) )
''') """
)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_time ON logs(time)")
conn.commit() conn.commit()
cursor.execute("SELECT time FROM logs ORDER BY id DESC LIMIT 1")
row = cursor.fetchone()
last_dt = None
if row and row[0]:
try:
last_dt = datetime.strptime(row[0], DATE_FMT)
except ValueError:
last_dt = None
try: try:
log_files = sorted([ log_files = sorted(
[
os.path.join(LOG_DIR, f) os.path.join(LOG_DIR, f)
for f in os.listdir(LOG_DIR) for f in os.listdir(LOG_DIR)
if LOG_FILE_PATTERN.match(f) if LOG_FILE_PATTERN.match(f)
]) ]
)
except FileNotFoundError: except FileNotFoundError:
print(f"[ERROR] Log directory not found: {LOG_DIR}") print(f"[ERROR] Log directory not found: {LOG_DIR}")
exit(1) exit(1)
@ -49,21 +64,36 @@ print(f"[INFO] Found {len(log_files)} log files.")
inserted = 0 inserted = 0
for log_file in log_files: for log_file in log_files:
print(f"[INFO] Parsing {log_file}...") print(f"[INFO] Parsing {log_file}...")
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
for line in f: for line in f:
match = LOG_FORMAT_REGEX.match(line.strip()) match = LOG_FORMAT_REGEX.match(line.strip())
if match: if match:
data = match.groupdict() data = match.groupdict()
cursor.execute(''' try:
entry_dt = datetime.strptime(data["time"], DATE_FMT)
except ValueError:
continue
if last_dt and entry_dt <= last_dt:
continue
cursor.execute(
"""
INSERT INTO logs ( INSERT INTO logs (
ip, host, time, request, status, bytes_sent, ip, host, time, request, status, bytes_sent,
referer, user_agent, cache_status referer, user_agent, cache_status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', ( """,
data["ip"], data["host"], data["time"], data["request"], (
int(data["status"]), int(data["bytes_sent"]), data["ip"],
data["referer"], data["user_agent"], data["cache_status"] data["host"],
)) data["time"],
data["request"],
int(data["status"]),
int(data["bytes_sent"]),
data["referer"],
data["user_agent"],
data["cache_status"],
),
)
inserted += 1 inserted += 1
conn.commit() conn.commit()