fabcal/main.py

225 lines
5.7 KiB
Python

import base64
import locale
import os
from collections import OrderedDict
from datetime import date, datetime, timedelta, timezone
from typing import NamedTuple, List
import aiohttp
import babel.dates
import recurring_ical_events
from icalendar import Calendar, vText
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://fablab-altmuehlfranken.de",
"https://www.fablab-altmuehlfranken.de",
],
)
locale.setlocale(locale.LC_TIME, locale.getlocale())
def get_calendar_url():
url = os.environ["CALENDAR_URL"]
# convenience feature
url = url.replace("webcal://", "https://")
return url
def sanitize(data: str):
# sanitize input from random source
cal = Calendar.from_ical(data)
# name needs to be fixed
cal["X-WR-CALNAME"] = vText(b"FabLab-Termine")
return cal.to_ical()
# caching strings works better than caching calendar objects
@cache(expire=120)
async def get_data() -> str:
async with aiohttp.ClientSession() as session:
async with session.get(get_calendar_url()) as response:
response.raise_for_status()
assert response.content_type.lower() == "text/calendar"
return sanitize(await response.text())
class CalendarEvent(NamedTuple):
start: datetime
end: datetime
# just a convenience thing, we want to keep start/date as datetime and save the client from guessing this themselves
all_day_event: bool
summary: str
description: str
location: str
color: str
uid: str
def get_tzinfo():
return timezone(timedelta(hours=1))
def get_events_from_calendar_string(cal: Calendar) -> List[CalendarEvent]:
"""
Generate list of events from calendar vevents.
Expands recurring events for +- one year.
Note that there is no validation, e.g., checking for values required (by other code in this application).
:param cal: calendar to fetch events from
:return: events
"""
now = datetime.now()
td = timedelta(days=365)
past_year = now - td
next_year = now + td
events = []
for vevent in recurring_ical_events.of(cal).between(past_year, next_year):
# I'm pessimistic here. Prove me wrong!
all_day_event = False
start = vevent.get("DTSTART", None)
if start is not None:
start = start.dt
if not isinstance(start, datetime):
all_day_event = True
start = datetime.combine(start, datetime.min.time(), tzinfo=get_tzinfo())
end = vevent.get("DTEND", None)
if end is not None:
end = end.dt
if not isinstance(end, datetime):
all_day_event = True
end = datetime.combine(end, datetime.max.time(), tzinfo=get_tzinfo())
def get_str(key: str):
value = vevent.get(key, None)
if value is not None:
return str(value)
return value
summary = get_str("SUMMARY")
description = get_str("DESCRIPTION")
location = get_str("LOCATION")
color = get_str("COLOR")
uid = get_str("UID")
event = CalendarEvent(start, end, all_day_event, summary, description, location, color, uid)
events.append(event)
events.sort(key=lambda e: e.start)
return events
async def get_future_events():
cal = Calendar.from_ical(await get_data())
events = get_events_from_calendar_string(cal)
today = datetime.today().date()
future_events = []
for event in events:
if event.start.date() < today:
continue
future_events.append(event)
return future_events
@app.get("/events.ics")
async def ics():
return Response(
await get_data(),
headers={
"content-type": "text/calendar",
},
)
def group_by_date(events: List[CalendarEvent]):
grouped_events: OrderedDict[date, List[CalendarEvent]] = OrderedDict()
for event in events:
start_date = event.start.date()
if start_date not in grouped_events:
grouped_events[start_date] = []
grouped_events[start_date].append(event)
return grouped_events
@app.get("/embed-sidebar.html")
async def embed(request: Request, max_width: str = None):
# await asyncio.sleep(1)
events = await get_future_events()
grouped_events = list(group_by_date(events).items())
# couple of helpers
def localized_abbreviated_month(dt: datetime):
return babel.dates.format_datetime(dt, format="%b", locale="de_DE")
# couple of helpers
def localized_abbreviated_weekday(dt: datetime):
return babel.dates.format_datetime(dt, format="%b", locale="de_DE")
def base64_encode(s: str):
return base64.b64encode(s.encode()).decode()
return templates.TemplateResponse(
"embed-sidebar.html",
context={
"request": request,
"grouped_events": grouped_events,
"dir": dir,
"localized_abbreviated_month": localized_abbreviated_month,
"localized_abbreviated_weekday": localized_abbreviated_weekday,
"base64_encode": base64_encode,
"max_width": max_width,
},
)
@app.on_event("startup")
async def startup():
FastAPICache.init(InMemoryBackend())