235 lines
7.3 KiB
Python
235 lines
7.3 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
|
|
from collections import OrderedDict
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from typing import Dict, List, NamedTuple
|
|
|
|
import aiohttp
|
|
import pytz
|
|
import recurring_ical_events
|
|
import yaml
|
|
|
|
from asyncache import cached
|
|
from cachetools import TTLCache, FIFOCache
|
|
from icalendar import Calendar
|
|
|
|
from fabcal.models import CalendarEvent
|
|
|
|
|
|
class ConfiguredCalendar(NamedTuple):
|
|
name: str
|
|
url: str
|
|
default_color: str = None
|
|
overwrite_summary: str = None
|
|
overwrite_description: str = None
|
|
overwrite_location: str = None
|
|
|
|
|
|
class CalendarClient:
|
|
def __init__(self, url: str):
|
|
self.url = url
|
|
|
|
async def get(self, session):
|
|
async with session.get(self.url) as response:
|
|
response.raise_for_status()
|
|
assert response.content_type.lower() == "text/calendar"
|
|
|
|
return await response.text()
|
|
|
|
|
|
# this will be cached permanently, i.e., the server process needs to be restarted to apply config changes
|
|
# note that caching doesn't work at all with iterators (for obvious reasons)
|
|
@cached(FIFOCache(1))
|
|
def read_calendars_from_config_file() -> List[ConfiguredCalendar]:
|
|
rv = []
|
|
|
|
with open("config.yml") as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
for calendar in data["calendars"]:
|
|
rv.append(ConfiguredCalendar(**calendar))
|
|
|
|
return rv
|
|
|
|
|
|
class CombinedCalendarClient:
|
|
def __init__(self, configured_calendars: List[ConfiguredCalendar]):
|
|
self.configured_calendars = configured_calendars
|
|
|
|
async def fetch_calendars(self) -> Dict[ConfiguredCalendar, str]:
|
|
async with aiohttp.ClientSession(headers={
|
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
|
}) as session:
|
|
# need to make sure we return the calendar and the gathered values together
|
|
async def coro(calendar):
|
|
print("argh", calendar)
|
|
return calendar, await CalendarClient(calendar.url).get(session)
|
|
|
|
rv = {}
|
|
|
|
for coro in asyncio.as_completed([coro(calendar) for calendar in self.configured_calendars]):
|
|
try:
|
|
cal, response = await coro
|
|
rv[cal] = response
|
|
|
|
except aiohttp.client_exceptions.ClientResponseError:
|
|
logging.exception("Failed to fetch data for calendar, skipping")
|
|
|
|
return rv
|
|
|
|
@staticmethod
|
|
def combine_calendars(data: Dict[ConfiguredCalendar, str]) -> Calendar:
|
|
combined_calendar = Calendar()
|
|
|
|
combined_calendar.add("prodid", "-//FabCal//NONSGML//EN")
|
|
combined_calendar.add("version", "2.0")
|
|
combined_calendar.add("x-wr-calname", "FabLab Altmühlfranken e.V.")
|
|
# TODO: normalize timezones of calendar events
|
|
|
|
for configured_calendar, ical_str in data.items():
|
|
cal = Calendar.from_ical(ical_str)
|
|
|
|
# check for a calendar color (e.g., Nextcloud has such a feature)
|
|
# events that don't have a color assigned will be assigned this color unless a color was configured
|
|
default_color = configured_calendar.default_color
|
|
|
|
if not default_color:
|
|
try:
|
|
# note: for some reason, getattr doesn't work
|
|
default_color = cal["x-apple-calendar-color"]
|
|
except KeyError:
|
|
default_color = None
|
|
|
|
# if the user wishes to overwrite some data on every item in a calendar, we should do so
|
|
overwrite_summary = configured_calendar.overwrite_summary
|
|
overwrite_description = configured_calendar.overwrite_description
|
|
overwrite_location = configured_calendar.overwrite_location
|
|
|
|
# we don't copy anything but events from the
|
|
for event in cal.walk("VEVENT"):
|
|
# if no color has been configured in the event, we
|
|
|
|
if "color" not in event:
|
|
event["color"] = default_color
|
|
|
|
if overwrite_summary:
|
|
event["SUMMARY"] = overwrite_summary
|
|
|
|
if overwrite_description:
|
|
event["DESCRIPTION"] = overwrite_description
|
|
|
|
if overwrite_location:
|
|
event["LOCATION"] = overwrite_location
|
|
|
|
combined_calendar.add_component(event)
|
|
|
|
return combined_calendar
|
|
|
|
async def fetch_and_combine_calendars(self) -> Calendar:
|
|
return self.combine_calendars(await self.fetch_calendars())
|
|
|
|
|
|
@cached(TTLCache(maxsize=100, ttl=int(os.environ.get("FABCAL_CACHE_EXPIRE", 120))))
|
|
async def get_data() -> Calendar:
|
|
client = CombinedCalendarClient(read_calendars_from_config_file())
|
|
combined_calendar = await client.fetch_and_combine_calendars()
|
|
return combined_calendar
|
|
|
|
|
|
def get_tzinfo():
|
|
return timezone(timedelta(hours=1))
|
|
|
|
|
|
def get_events_from_calendar(cal: Calendar) -> List[CalendarEvent]:
|
|
"""
|
|
Generate list of events from calendar vevents.
|
|
Expands recurring events for +- one year.
|
|
Note that there is no validation, e.g., checking for values required (by other code in this application).
|
|
:param cal: calendar to fetch events from
|
|
:return: events
|
|
"""
|
|
|
|
now = datetime.now()
|
|
td = timedelta(days=365)
|
|
past_year = now - td
|
|
next_year = now + td
|
|
|
|
events = []
|
|
|
|
for vevent in recurring_ical_events.of(cal).between(past_year, next_year):
|
|
# I'm pessimistic here. Prove me wrong!
|
|
all_day_event = False
|
|
|
|
start = vevent.get("DTSTART", None)
|
|
if start is not None:
|
|
start = start.dt
|
|
|
|
if not isinstance(start, datetime):
|
|
all_day_event = True
|
|
start = datetime.combine(start, datetime.min.time(), tzinfo=get_tzinfo())
|
|
|
|
start = start.astimezone(pytz.timezone("Europe/Berlin"))
|
|
|
|
end = vevent.get("DTEND", None)
|
|
if end is not None:
|
|
end = end.dt
|
|
|
|
if not isinstance(end, datetime):
|
|
all_day_event = True
|
|
end = datetime.combine(end, datetime.max.time(), tzinfo=get_tzinfo())
|
|
|
|
end = end.astimezone(pytz.timezone("Europe/Berlin"))
|
|
|
|
def get_str(key: str):
|
|
value = vevent.get(key, None)
|
|
|
|
if value is not None:
|
|
return str(value)
|
|
|
|
return value
|
|
|
|
summary = get_str("SUMMARY")
|
|
description = get_str("DESCRIPTION")
|
|
location = get_str("LOCATION")
|
|
color = get_str("COLOR")
|
|
uid = get_str("UID")
|
|
|
|
event = CalendarEvent(start, end, all_day_event, summary, description, location, color, uid)
|
|
events.append(event)
|
|
|
|
events.sort(key=lambda e: e.start)
|
|
|
|
return events
|
|
|
|
|
|
async def get_future_events():
|
|
events = get_events_from_calendar(await get_data())
|
|
|
|
today = datetime.today().date()
|
|
|
|
future_events = []
|
|
|
|
for event in events:
|
|
if event.start.date() < today:
|
|
continue
|
|
|
|
future_events.append(event)
|
|
|
|
return future_events
|
|
|
|
|
|
def group_by_date(events: List[CalendarEvent]):
|
|
grouped_events: OrderedDict[date, List[CalendarEvent]] = OrderedDict()
|
|
|
|
for event in events:
|
|
start_date = event.start.date()
|
|
|
|
if start_date not in grouped_events:
|
|
grouped_events[start_date] = []
|
|
|
|
grouped_events[start_date].append(event)
|
|
|
|
return grouped_events
|