caldav-email-reminders/sync.py
2025-05-01 17:04:01 -05:00

223 lines
No EOL
8.6 KiB
Python

#!/usr/bin/env python3
import os
import sqlite3
import logging
import requests
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta, timezone, date
from urllib.parse import urljoin
from icalendar import Calendar
from dateutil.rrule import rrulestr
# ─── Configuration from environment ─────────────────────────────────────────────
CALDAV_BASE = os.getenv("CALDAV_BASE", "https://example.com/")
USERNAME = os.getenv("CALDAV_USERNAME", "your-username")
PASSWORD = os.getenv("CALDAV_PASSWORD", "your-password")
# Optional comma-separated override: e.g.
# CALENDAR_URLS="https://…/personal/,https://…/work/"
CALENDAR_URLS_ENV = os.getenv("CALENDAR_URLS", "")
FETCH_WINDOW_DAYS = int(os.getenv("FETCH_WINDOW_DAYS", "90"))
DB_PATH = os.getenv("DB_PATH", "/data/sync_state.db")
# ─── Logging setup ───────────────────────────────────────────────────────────────
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
# ─── Database initialization ────────────────────────────────────────────────────
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS events (
uid TEXT PRIMARY KEY,
lastmod TEXT
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS reminders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_uid TEXT,
trigger_dt TEXT,
summary TEXT,
description TEXT,
sent_flag INTEGER DEFAULT 0,
UNIQUE(event_uid, trigger_dt)
)
""")
conn.commit()
return conn
# ─── Calendar discovery ─────────────────────────────────────────────────────────
def discover_calendars():
if CALENDAR_URLS_ENV:
urls = [u.strip() for u in CALENDAR_URLS_ENV.split(",") if u.strip()]
logger.info(f"Using override: {urls}")
return urls
# autodiscover via PROPFIND
root_coll = urljoin(CALDAV_BASE, f"remote.php/dav/calendars/{USERNAME}/")
body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:">
<d:prop><d:resourcetype/></d:prop>
</d:propfind>
"""
headers = {"Depth":"1","Content-Type":"application/xml"}
resp = requests.request("PROPFIND", root_coll,
auth=(USERNAME, PASSWORD),
headers=headers, data=body)
resp.raise_for_status()
tree = ET.fromstring(resp.text)
ns = {"d":"DAV:"}
cals = []
for r in tree.findall(".//d:response", ns):
href = r.find("d:href", ns).text
# skip the root itself
if href.rstrip("/").endswith(f"calendars/{USERNAME}"):
continue
full = urljoin(CALDAV_BASE, href)
cals.append(full)
logger.info(f"Discovered calendars: {cals}")
return cals
# ─── Fetch report XML for one calendar ───────────────────────────────────────────
def fetch_report_xml(caldav_url):
body = """<?xml version="1.0" encoding="UTF-8"?>
<calendar-query xmlns="urn:ietf:params:xml:ns:caldav">
<prop><getetag/><getlastmodified/><calendar-data/></prop>
<filter><comp-filter name="VCALENDAR">
<comp-filter name="VEVENT"/>
</comp-filter></filter>
</calendar-query>
"""
resp = requests.request(
"REPORT",
caldav_url,
auth=(USERNAME, PASSWORD),
headers={"Depth":"1","Content-Type":"application/xml"},
data=body
)
resp.raise_for_status()
return resp.text
# ─── Parse the REPORT response ─────────────────────────────────────────────────
def parse_report(xml_text):
ns = {"d":"DAV:"}
root = ET.fromstring(xml_text)
for resp in root.findall(".//d:response", ns):
href = resp.find("d:href", ns).text
lastmod = resp.find(".//d:getlastmodified", ns).text
yield href, lastmod
# ─── Download a single .ics by href ─────────────────────────────────────────────
def download_ics(href):
url = urljoin(CALDAV_BASE, href)
r = requests.get(url, auth=(USERNAME, PASSWORD))
r.raise_for_status()
return r.content
# ─── Expand VEVENT alarms, handling RRULE and timezones ────────────────────────
def expand_event_alarms(ics_bytes, window_days=FETCH_WINDOW_DAYS):
cal = Calendar.from_ical(ics_bytes)
now = datetime.now(timezone.utc)
horizon = now + timedelta(days=window_days)
reminders = []
for comp in cal.walk():
if comp.name != "VEVENT":
continue
# UID
uid = str(comp.get("UID"))
# DTSTART normalized to aware datetime
dt = comp.decoded("DTSTART")
if isinstance(dt, date) and not isinstance(dt, datetime):
dtstart = datetime.combine(dt, datetime.min.time(), tzinfo=timezone.utc)
else:
dtstart = dt
if dtstart.tzinfo is None:
dtstart = dtstart.replace(tzinfo=timezone.utc)
else:
dtstart = dtstart.astimezone(timezone.utc)
# Summary & Description
summary = str(comp.get("SUMMARY", "(no title)"))
description = str(comp.get("DESCRIPTION", ""))
# VALARM blocks with EMAIL action
alarms = [
a for a in comp.subcomponents
if a.name=="VALARM" and a.get("ACTION")=="EMAIL"
]
if not alarms:
continue
# Recurrence expansion
if comp.get("RRULE"):
rule_str = comp["RRULE"].to_ical().decode()
rule = rrulestr(rule_str, dtstart=dtstart)
occs = rule.between(now, horizon, inc=True)
else:
occs = [dtstart] if dtstart >= now else []
for occ in occs:
for alarm in alarms:
trig = alarm.decoded("TRIGGER")
if isinstance(trig, timedelta):
trigger_dt = occ + trig
else:
# absolute triggers
trigger_dt = parse_dt(str(trig)).astimezone(timezone.utc)
reminders.append((uid,
trigger_dt.isoformat(),
summary,
description))
return reminders
# ─── Main sync logic ────────────────────────────────────────────────────────────
def sync():
conn = init_db()
c = conn.cursor()
calendars = discover_calendars()
logger.info(f"Calendars to sync: {calendars}")
for cal_url in calendars:
try:
report = fetch_report_xml(cal_url)
for href, lastmod in parse_report(report):
uid = os.path.basename(href).rsplit(".ics",1)[0]
# check state
c.execute("SELECT lastmod FROM events WHERE uid=?", (uid,))
row = c.fetchone()
if not row or row[0] != lastmod:
logger.info(f"Fetching event {uid}")
ics = download_ics(href)
for (u, trigger_iso, summary, desc) in expand_event_alarms(ics):
c.execute("""
INSERT OR IGNORE INTO reminders
(event_uid, trigger_dt, summary, description)
VALUES (?,?,?,?)
""", (u, trigger_iso, summary, desc))
# update lastmod
c.execute("""
INSERT INTO events(uid, lastmod)
VALUES (?,?)
ON CONFLICT(uid) DO UPDATE SET lastmod=excluded.lastmod
""", (uid, lastmod))
conn.commit()
else:
logger.debug(f"Skipping up-to-date {uid}")
except Exception as e:
logger.error(f"Error syncing {cal_url}: {e}")
conn.close()
if __name__ == "__main__":
sync()