226 lines
5.7 KiB
Python
226 lines
5.7 KiB
Python
import base64
|
|
import locale
|
|
import os
|
|
|
|
from collections import OrderedDict
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from typing import List, NamedTuple
|
|
|
|
|
|
import aiohttp
|
|
import babel.dates
|
|
import recurring_ical_events
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.requests import Request
|
|
from fastapi.responses import Response
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi_cache import FastAPICache
|
|
from fastapi_cache.backends.inmemory import InMemoryBackend
|
|
from fastapi_cache.decorator import cache
|
|
from icalendar import Calendar, vText
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"https://fablab-altmuehlfranken.de",
|
|
"https://www.fablab-altmuehlfranken.de",
|
|
],
|
|
)
|
|
|
|
|
|
locale.setlocale(locale.LC_TIME, locale.getlocale())
|
|
|
|
|
|
def get_calendar_url():
|
|
url = os.environ["CALENDAR_URL"]
|
|
|
|
# convenience feature
|
|
url = url.replace("webcal://", "https://")
|
|
|
|
return url
|
|
|
|
|
|
def sanitize(data: str):
|
|
# sanitize input from random source
|
|
cal = Calendar.from_ical(data)
|
|
|
|
# name needs to be fixed
|
|
cal["X-WR-CALNAME"] = vText(b"FabLab-Termine")
|
|
|
|
return cal.to_ical()
|
|
|
|
|
|
# caching strings works better than caching calendar objects
|
|
@cache(expire=120)
|
|
async def get_data() -> str:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(get_calendar_url()) as response:
|
|
response.raise_for_status()
|
|
assert response.content_type.lower() == "text/calendar"
|
|
|
|
return sanitize(await response.text())
|
|
|
|
|
|
class CalendarEvent(NamedTuple):
|
|
start: datetime
|
|
end: datetime
|
|
# just a convenience thing, we want to keep start/date as datetime and save the client from guessing this themselves
|
|
all_day_event: bool
|
|
summary: str
|
|
description: str
|
|
location: str
|
|
color: str
|
|
uid: str
|
|
|
|
|
|
def get_tzinfo():
|
|
return timezone(timedelta(hours=1))
|
|
|
|
|
|
def get_events_from_calendar_string(cal: Calendar) -> List[CalendarEvent]:
|
|
"""
|
|
Generate list of events from calendar vevents.
|
|
Expands recurring events for +- one year.
|
|
Note that there is no validation, e.g., checking for values required (by other code in this application).
|
|
:param cal: calendar to fetch events from
|
|
:return: events
|
|
"""
|
|
|
|
now = datetime.now()
|
|
td = timedelta(days=365)
|
|
past_year = now - td
|
|
next_year = now + td
|
|
|
|
events = []
|
|
|
|
for vevent in recurring_ical_events.of(cal).between(past_year, next_year):
|
|
# I'm pessimistic here. Prove me wrong!
|
|
all_day_event = False
|
|
|
|
start = vevent.get("DTSTART", None)
|
|
if start is not None:
|
|
start = start.dt
|
|
|
|
if not isinstance(start, datetime):
|
|
all_day_event = True
|
|
start = datetime.combine(start, datetime.min.time(), tzinfo=get_tzinfo())
|
|
|
|
end = vevent.get("DTEND", None)
|
|
if end is not None:
|
|
end = end.dt
|
|
|
|
if not isinstance(end, datetime):
|
|
all_day_event = True
|
|
end = datetime.combine(end, datetime.max.time(), tzinfo=get_tzinfo())
|
|
|
|
def get_str(key: str):
|
|
value = vevent.get(key, None)
|
|
|
|
if value is not None:
|
|
return str(value)
|
|
|
|
return value
|
|
|
|
summary = get_str("SUMMARY")
|
|
description = get_str("DESCRIPTION")
|
|
location = get_str("LOCATION")
|
|
color = get_str("COLOR")
|
|
uid = get_str("UID")
|
|
|
|
event = CalendarEvent(start, end, all_day_event, summary, description, location, color, uid)
|
|
events.append(event)
|
|
|
|
events.sort(key=lambda e: e.start)
|
|
|
|
return events
|
|
|
|
|
|
async def get_future_events():
|
|
cal = Calendar.from_ical(await get_data())
|
|
|
|
events = get_events_from_calendar_string(cal)
|
|
|
|
today = datetime.today().date()
|
|
|
|
future_events = []
|
|
|
|
for event in events:
|
|
if event.start.date() < today:
|
|
continue
|
|
|
|
future_events.append(event)
|
|
|
|
return future_events
|
|
|
|
|
|
@app.get("/events.ics")
|
|
async def ics():
|
|
return Response(
|
|
await get_data(),
|
|
headers={
|
|
"content-type": "text/calendar",
|
|
},
|
|
)
|
|
|
|
|
|
def group_by_date(events: List[CalendarEvent]):
|
|
grouped_events: OrderedDict[date, List[CalendarEvent]] = OrderedDict()
|
|
|
|
for event in events:
|
|
start_date = event.start.date()
|
|
|
|
if start_date not in grouped_events:
|
|
grouped_events[start_date] = []
|
|
|
|
grouped_events[start_date].append(event)
|
|
|
|
return grouped_events
|
|
|
|
|
|
@app.get("/embed-sidebar.html")
|
|
async def embed(request: Request, max_width: str = None):
|
|
# await asyncio.sleep(1)
|
|
|
|
events = await get_future_events()
|
|
|
|
grouped_events = list(group_by_date(events).items())
|
|
|
|
# couple of helpers
|
|
def localized_abbreviated_month(dt: datetime):
|
|
return babel.dates.format_datetime(dt, format="%b", locale="de_DE")
|
|
|
|
# couple of helpers
|
|
def localized_abbreviated_weekday(dt: datetime):
|
|
return babel.dates.format_datetime(dt, format="%b", locale="de_DE")
|
|
|
|
def base64_encode(s: str):
|
|
return base64.b64encode(s.encode()).decode()
|
|
|
|
return templates.TemplateResponse(
|
|
"embed-sidebar.html",
|
|
context={
|
|
"request": request,
|
|
"grouped_events": grouped_events,
|
|
"dir": dir,
|
|
"localized_abbreviated_month": localized_abbreviated_month,
|
|
"localized_abbreviated_weekday": localized_abbreviated_weekday,
|
|
"base64_encode": base64_encode,
|
|
"max_width": max_width,
|
|
},
|
|
)
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
FastAPICache.init(InMemoryBackend())
|