fabcal/fabcal/calendar_client.py

235 lines
7.3 KiB
Python

import asyncio
import logging
import os
from collections import OrderedDict
from datetime import date, datetime, timedelta, timezone
from typing import Dict, List, NamedTuple
import aiohttp
import pytz
import recurring_ical_events
import yaml
from asyncache import cached
from cachetools import TTLCache, FIFOCache
from icalendar import Calendar
from fabcal.models import CalendarEvent
class ConfiguredCalendar(NamedTuple):
name: str
url: str
default_color: str = None
overwrite_summary: str = None
overwrite_description: str = None
overwrite_location: str = None
class CalendarClient:
def __init__(self, url: str):
self.url = url
async def get(self, session):
async with session.get(self.url) as response:
response.raise_for_status()
assert response.content_type.lower() == "text/calendar"
return await response.text()
# this will be cached permanently, i.e., the server process needs to be restarted to apply config changes
# note that caching doesn't work at all with iterators (for obvious reasons)
@cached(FIFOCache(1))
def read_calendars_from_config_file() -> List[ConfiguredCalendar]:
rv = []
with open("config.yml") as f:
data = yaml.safe_load(f)
for calendar in data["calendars"]:
rv.append(ConfiguredCalendar(**calendar))
return rv
class CombinedCalendarClient:
def __init__(self, configured_calendars: List[ConfiguredCalendar]):
self.configured_calendars = configured_calendars
async def fetch_calendars(self) -> Dict[ConfiguredCalendar, str]:
async with aiohttp.ClientSession(headers={
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
}) as session:
# need to make sure we return the calendar and the gathered values together
async def coro(calendar):
print("argh", calendar)
return calendar, await CalendarClient(calendar.url).get(session)
rv = {}
for coro in asyncio.as_completed([coro(calendar) for calendar in self.configured_calendars]):
try:
cal, response = await coro
rv[cal] = response
except aiohttp.client_exceptions.ClientResponseError:
logging.exception("Failed to fetch data for calendar, skipping")
return rv
@staticmethod
def combine_calendars(data: Dict[ConfiguredCalendar, str]) -> Calendar:
combined_calendar = Calendar()
combined_calendar.add("prodid", "-//FabCal//NONSGML//EN")
combined_calendar.add("version", "2.0")
combined_calendar.add("x-wr-calname", "FabLab Altmühlfranken e.V.")
# TODO: normalize timezones of calendar events
for configured_calendar, ical_str in data.items():
cal = Calendar.from_ical(ical_str)
# check for a calendar color (e.g., Nextcloud has such a feature)
# events that don't have a color assigned will be assigned this color unless a color was configured
default_color = configured_calendar.default_color
if not default_color:
try:
# note: for some reason, getattr doesn't work
default_color = cal["x-apple-calendar-color"]
except KeyError:
default_color = None
# if the user wishes to overwrite some data on every item in a calendar, we should do so
overwrite_summary = configured_calendar.overwrite_summary
overwrite_description = configured_calendar.overwrite_description
overwrite_location = configured_calendar.overwrite_location
# we don't copy anything but events from the
for event in cal.walk("VEVENT"):
# if no color has been configured in the event, we
if "color" not in event:
event["color"] = default_color
if overwrite_summary:
event["SUMMARY"] = overwrite_summary
if overwrite_description:
event["DESCRIPTION"] = overwrite_description
if overwrite_location:
event["LOCATION"] = overwrite_location
combined_calendar.add_component(event)
return combined_calendar
async def fetch_and_combine_calendars(self) -> Calendar:
return self.combine_calendars(await self.fetch_calendars())
@cached(TTLCache(maxsize=100, ttl=int(os.environ.get("FABCAL_CACHE_EXPIRE", 120))))
async def get_data() -> Calendar:
client = CombinedCalendarClient(read_calendars_from_config_file())
combined_calendar = await client.fetch_and_combine_calendars()
return combined_calendar
def get_tzinfo():
return timezone(timedelta(hours=1))
def get_events_from_calendar(cal: Calendar) -> List[CalendarEvent]:
"""
Generate list of events from calendar vevents.
Expands recurring events for +- one year.
Note that there is no validation, e.g., checking for values required (by other code in this application).
:param cal: calendar to fetch events from
:return: events
"""
now = datetime.now()
td = timedelta(days=365)
past_year = now - td
next_year = now + td
events = []
for vevent in recurring_ical_events.of(cal).between(past_year, next_year):
# I'm pessimistic here. Prove me wrong!
all_day_event = False
start = vevent.get("DTSTART", None)
if start is not None:
start = start.dt
if not isinstance(start, datetime):
all_day_event = True
start = datetime.combine(start, datetime.min.time(), tzinfo=get_tzinfo())
start = start.astimezone(pytz.timezone("Europe/Berlin"))
end = vevent.get("DTEND", None)
if end is not None:
end = end.dt
if not isinstance(end, datetime):
all_day_event = True
end = datetime.combine(end, datetime.max.time(), tzinfo=get_tzinfo())
end = end.astimezone(pytz.timezone("Europe/Berlin"))
def get_str(key: str):
value = vevent.get(key, None)
if value is not None:
return str(value)
return value
summary = get_str("SUMMARY")
description = get_str("DESCRIPTION")
location = get_str("LOCATION")
color = get_str("COLOR")
uid = get_str("UID")
event = CalendarEvent(start, end, all_day_event, summary, description, location, color, uid)
events.append(event)
events.sort(key=lambda e: e.start)
return events
async def get_future_events():
events = get_events_from_calendar(await get_data())
today = datetime.today().date()
future_events = []
for event in events:
if event.start.date() < today:
continue
future_events.append(event)
return future_events
def group_by_date(events: List[CalendarEvent]):
grouped_events: OrderedDict[date, List[CalendarEvent]] = OrderedDict()
for event in events:
start_date = event.start.date()
if start_date not in grouped_events:
grouped_events[start_date] = []
grouped_events[start_date].append(event)
return grouped_events