#!/usr/bin/env python3 import os import sqlite3 import logging import requests import xml.etree.ElementTree as ET from datetime import datetime, timedelta, timezone, date from urllib.parse import urljoin from icalendar import Calendar from dateutil.rrule import rrulestr # ─── Configuration from environment ───────────────────────────────────────────── CALDAV_BASE = os.getenv("CALDAV_BASE", "https://example.com/") USERNAME = os.getenv("CALDAV_USERNAME", "your-username") PASSWORD = os.getenv("CALDAV_PASSWORD", "your-password") # Optional comma-separated override: e.g. # CALENDAR_URLS="https://…/personal/,https://…/work/" CALENDAR_URLS_ENV = os.getenv("CALENDAR_URLS", "") FETCH_WINDOW_DAYS = int(os.getenv("FETCH_WINDOW_DAYS", "90")) DB_PATH = os.getenv("DB_PATH", "/data/sync_state.db") # ─── Logging setup ─────────────────────────────────────────────────────────────── logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s [%(levelname)s] %(message)s" ) logger = logging.getLogger(__name__) # ─── Database initialization ──────────────────────────────────────────────────── def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS events ( uid TEXT PRIMARY KEY, lastmod TEXT ) """) c.execute(""" CREATE TABLE IF NOT EXISTS reminders ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_uid TEXT, trigger_dt TEXT, summary TEXT, description TEXT, sent_flag INTEGER DEFAULT 0, UNIQUE(event_uid, trigger_dt) ) """) conn.commit() return conn # ─── Calendar discovery ───────────────────────────────────────────────────────── def discover_calendars(): if CALENDAR_URLS_ENV: urls = [u.strip() for u in CALENDAR_URLS_ENV.split(",") if u.strip()] logger.info(f"Using override: {urls}") return urls # autodiscover via PROPFIND root_coll = urljoin(CALDAV_BASE, f"remote.php/dav/calendars/{USERNAME}/") body = """ """ headers = {"Depth":"1","Content-Type":"application/xml"} resp = requests.request("PROPFIND", root_coll, auth=(USERNAME, PASSWORD), headers=headers, data=body) resp.raise_for_status() tree = ET.fromstring(resp.text) ns = {"d":"DAV:"} cals = [] for r in tree.findall(".//d:response", ns): href = r.find("d:href", ns).text # skip the root itself if href.rstrip("/").endswith(f"calendars/{USERNAME}"): continue full = urljoin(CALDAV_BASE, href) cals.append(full) logger.info(f"Discovered calendars: {cals}") return cals # ─── Fetch report XML for one calendar ─────────────────────────────────────────── def fetch_report_xml(caldav_url): body = """ """ resp = requests.request( "REPORT", caldav_url, auth=(USERNAME, PASSWORD), headers={"Depth":"1","Content-Type":"application/xml"}, data=body ) resp.raise_for_status() return resp.text # ─── Parse the REPORT response ───────────────────────────────────────────────── def parse_report(xml_text): ns = {"d":"DAV:"} root = ET.fromstring(xml_text) for resp in root.findall(".//d:response", ns): href = resp.find("d:href", ns).text lastmod = resp.find(".//d:getlastmodified", ns).text yield href, lastmod # ─── Download a single .ics by href ───────────────────────────────────────────── def download_ics(href): url = urljoin(CALDAV_BASE, href) r = requests.get(url, auth=(USERNAME, PASSWORD)) r.raise_for_status() return r.content # ─── Expand VEVENT alarms, handling RRULE and timezones ──────────────────────── def expand_event_alarms(ics_bytes, window_days=FETCH_WINDOW_DAYS): cal = Calendar.from_ical(ics_bytes) now = datetime.now(timezone.utc) horizon = now + timedelta(days=window_days) reminders = [] for comp in cal.walk(): if comp.name != "VEVENT": continue # UID uid = str(comp.get("UID")) # DTSTART normalized to aware datetime dt = comp.decoded("DTSTART") if isinstance(dt, date) and not isinstance(dt, datetime): dtstart = datetime.combine(dt, datetime.min.time(), tzinfo=timezone.utc) else: dtstart = dt if dtstart.tzinfo is None: dtstart = dtstart.replace(tzinfo=timezone.utc) else: dtstart = dtstart.astimezone(timezone.utc) # Summary & Description summary = str(comp.get("SUMMARY", "(no title)")) description = str(comp.get("DESCRIPTION", "")) # VALARM blocks with EMAIL action alarms = [ a for a in comp.subcomponents if a.name=="VALARM" and a.get("ACTION")=="EMAIL" ] if not alarms: continue # Recurrence expansion if comp.get("RRULE"): rule_str = comp["RRULE"].to_ical().decode() rule = rrulestr(rule_str, dtstart=dtstart) occs = rule.between(now, horizon, inc=True) else: occs = [dtstart] if dtstart >= now else [] for occ in occs: for alarm in alarms: trig = alarm.decoded("TRIGGER") if isinstance(trig, timedelta): trigger_dt = occ + trig else: # absolute triggers trigger_dt = parse_dt(str(trig)).astimezone(timezone.utc) reminders.append((uid, trigger_dt.isoformat(), summary, description)) return reminders # ─── Main sync logic ──────────────────────────────────────────────────────────── def sync(): conn = init_db() c = conn.cursor() calendars = discover_calendars() logger.info(f"Calendars to sync: {calendars}") for cal_url in calendars: try: report = fetch_report_xml(cal_url) for href, lastmod in parse_report(report): uid = os.path.basename(href).rsplit(".ics",1)[0] # check state c.execute("SELECT lastmod FROM events WHERE uid=?", (uid,)) row = c.fetchone() if not row or row[0] != lastmod: logger.info(f"Fetching event {uid}") ics = download_ics(href) for (u, trigger_iso, summary, desc) in expand_event_alarms(ics): c.execute(""" INSERT OR IGNORE INTO reminders (event_uid, trigger_dt, summary, description) VALUES (?,?,?,?) """, (u, trigger_iso, summary, desc)) # update lastmod c.execute(""" INSERT INTO events(uid, lastmod) VALUES (?,?) ON CONFLICT(uid) DO UPDATE SET lastmod=excluded.lastmod """, (uid, lastmod)) conn.commit() else: logger.debug(f"Skipping up-to-date {uid}") except Exception as e: logger.error(f"Error syncing {cal_url}: {e}") conn.close() if __name__ == "__main__": sync()