fabcal/fabcal/calendar_client.py
Fabian Müller 70a069fe95 Support multiple calendars
This implied switching to a different caching approach.
2023-02-18 04:21:28 +01:00

179 lines
5.1 KiB
Python

import asyncio
import os
from collections import OrderedDict
from datetime import date, datetime, timedelta, timezone
from typing import Dict, List, NamedTuple
import aiohttp
import recurring_ical_events
import yaml
from asyncache import cached
from cachetools import TTLCache
from icalendar import Calendar
from fabcal.models import CalendarEvent
class ConfiguredCalendar(NamedTuple):
name: str
url: str
default_color: str = None
# this is not ideal, since it will read the file (at most) every 2 minutes, but there is no good way to cache the
# settings, e.g., by storing it in the FastAPI object
def read_calendars_from_config_file():
with open("config.yml") as f:
data = yaml.safe_load(f)
for calendar in data["calendars"]:
yield ConfiguredCalendar(**calendar)
@cached(TTLCache(maxsize=100, ttl=int(os.environ.get("FABCAL_CACHE_EXPIRE", 120))))
async def get_data() -> Dict[ConfiguredCalendar, str]:
calendars = list(read_calendars_from_config_file())
async with aiohttp.ClientSession() as session:
async def get(url: str) -> str:
async with session.get(url) as response:
response.raise_for_status()
assert response.content_type.lower() == "text/calendar"
return await response.text()
responses = await asyncio.gather(*[get(calendar.url) for calendar in calendars])
return dict(zip(calendars, responses))
def combined_calendar(data: Dict[ConfiguredCalendar, str]) -> Calendar:
combined_calendar = Calendar()
combined_calendar.add("prodid", "-//FabCal//NONSGML//EN")
combined_calendar.add("version", "2.0")
combined_calendar.add("x-wr-calname", "FabLab Altmühlfranken e.V.")
# TODO: normalize timezones of calendar events
for configured_calendar, ical_str in data.items():
cal = Calendar.from_ical(ical_str)
# check for a calendar color (e.g., Nextcloud has such a feature)
# events that don't have a color assigned will be assigned this color unless a color was configured
default_color = configured_calendar.default_color
if not default_color:
try:
# note: for some reason, getattr doesn't work
default_color = cal["x-apple-calendar-color"]
except KeyError:
default_color = None
# we don't copy anything but events from the
for event in cal.walk("VEVENT"):
# if no color has been configured in the event, we
if "color" not in event:
event["color"] = default_color
combined_calendar.add_component(event)
return combined_calendar
def get_tzinfo():
return timezone(timedelta(hours=1))
def get_events_from_calendar_string(cal: Calendar) -> List[CalendarEvent]:
"""
Generate list of events from calendar vevents.
Expands recurring events for +- one year.
Note that there is no validation, e.g., checking for values required (by other code in this application).
:param cal: calendar to fetch events from
:return: events
"""
now = datetime.now()
td = timedelta(days=365)
past_year = now - td
next_year = now + td
events = []
for vevent in recurring_ical_events.of(cal).between(past_year, next_year):
# I'm pessimistic here. Prove me wrong!
all_day_event = False
start = vevent.get("DTSTART", None)
if start is not None:
start = start.dt
if not isinstance(start, datetime):
all_day_event = True
start = datetime.combine(start, datetime.min.time(), tzinfo=get_tzinfo())
end = vevent.get("DTEND", None)
if end is not None:
end = end.dt
if not isinstance(end, datetime):
all_day_event = True
end = datetime.combine(end, datetime.max.time(), tzinfo=get_tzinfo())
def get_str(key: str):
value = vevent.get(key, None)
if value is not None:
return str(value)
return value
summary = get_str("SUMMARY")
description = get_str("DESCRIPTION")
location = get_str("LOCATION")
color = get_str("COLOR")
uid = get_str("UID")
event = CalendarEvent(start, end, all_day_event, summary, description, location, color, uid)
events.append(event)
events.sort(key=lambda e: e.start)
return events
async def get_future_events():
cal = combined_calendar(await get_data())
events = get_events_from_calendar_string(cal)
today = datetime.today().date()
future_events = []
for event in events:
if event.start.date() < today:
continue
future_events.append(event)
return future_events
def group_by_date(events: List[CalendarEvent]):
grouped_events: OrderedDict[date, List[CalendarEvent]] = OrderedDict()
for event in events:
start_date = event.start.date()
if start_date not in grouped_events:
grouped_events[start_date] = []
grouped_events[start_date].append(event)
return grouped_events