fabcal/main.py

225 lines
5.7 KiB
Python
Raw Normal View History

2022-03-27 23:24:59 +02:00
import base64
import locale
import os
from collections import OrderedDict
from datetime import date, datetime, timedelta, timezone
from typing import NamedTuple, List
import aiohttp
import babel.dates
import recurring_ical_events
from icalendar import Calendar, vText
from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://fablab-altmuehlfranken.de",
"https://www.fablab-altmuehlfranken.de",
],
)
locale.setlocale(locale.LC_TIME, locale.getlocale())
def get_calendar_url():
url = os.environ["CALENDAR_URL"]
# convenience feature
url = url.replace("webcal://", "https://")
return url
def sanitize(data: str):
# sanitize input from random source
cal = Calendar.from_ical(data)
# name needs to be fixed
cal["X-WR-CALNAME"] = vText(b"FabLab-Termine")
return cal.to_ical()
# caching strings works better than caching calendar objects
@cache(expire=120)
async def get_data() -> str:
async with aiohttp.ClientSession() as session:
async with session.get(get_calendar_url()) as response:
response.raise_for_status()
assert response.content_type.lower() == "text/calendar"
return sanitize(await response.text())
class CalendarEvent(NamedTuple):
start: datetime
end: datetime
# just a convenience thing, we want to keep start/date as datetime and save the client from guessing this themselves
all_day_event: bool
summary: str
description: str
location: str
color: str
uid: str
def get_tzinfo():
return timezone(timedelta(hours=1))
def get_events_from_calendar_string(cal: Calendar) -> List[CalendarEvent]:
"""
Generate list of events from calendar vevents.
Expands recurring events for +- one year.
Note that there is no validation, e.g., checking for values required (by other code in this application).
:param cal: calendar to fetch events from
:return: events
"""
now = datetime.now()
td = timedelta(days=365)
past_year = now - td
next_year = now + td
events = []
for vevent in recurring_ical_events.of(cal).between(past_year, next_year):
# I'm pessimistic here. Prove me wrong!
all_day_event = False
start = vevent.get("DTSTART", None)
if start is not None:
start = start.dt
if not isinstance(start, datetime):
all_day_event = True
start = datetime.combine(start, datetime.min.time(), tzinfo=get_tzinfo())
end = vevent.get("DTEND", None)
if end is not None:
end = end.dt
if not isinstance(end, datetime):
all_day_event = True
end = datetime.combine(end, datetime.max.time(), tzinfo=get_tzinfo())
def get_str(key: str):
value = vevent.get(key, None)
if value is not None:
return str(value)
return value
summary = get_str("SUMMARY")
description = get_str("DESCRIPTION")
location = get_str("LOCATION")
color = get_str("COLOR")
uid = get_str("UID")
event = CalendarEvent(start, end, all_day_event, summary, description, location, color, uid)
events.append(event)
events.sort(key=lambda e: e.start)
return events
async def get_future_events():
cal = Calendar.from_ical(await get_data())
events = get_events_from_calendar_string(cal)
today = datetime.today().date()
future_events = []
for event in events:
if event.start.date() <= today:
continue
future_events.append(event)
return future_events
@app.get("/events.ics")
async def ics():
return Response(
await get_data(),
headers={
"content-type": "text/calendar",
},
)
def group_by_date(events: List[CalendarEvent]):
grouped_events: OrderedDict[date, List[CalendarEvent]] = OrderedDict()
for event in events:
start_date = event.start.date()
if start_date not in grouped_events:
grouped_events[start_date] = []
grouped_events[start_date].append(event)
return grouped_events
@app.get("/embed-sidebar.html")
async def embed(request: Request, max_width: str = None):
# await asyncio.sleep(1)
events = await get_future_events()
grouped_events = list(group_by_date(events).items())
# couple of helpers
def localized_abbreviated_month(dt: datetime):
return babel.dates.format_datetime(dt, format="%b", locale="de_DE")
# couple of helpers
def localized_abbreviated_weekday(dt: datetime):
return babel.dates.format_datetime(dt, format="%b", locale="de_DE")
def base64_encode(s: str):
return base64.b64encode(s.encode()).decode()
return templates.TemplateResponse(
"embed-sidebar.html",
context={
"request": request,
"grouped_events": grouped_events,
"dir": dir,
"localized_abbreviated_month": localized_abbreviated_month,
"localized_abbreviated_weekday": localized_abbreviated_weekday,
"base64_encode": base64_encode,
"max_width": max_width,
},
)
@app.on_event("startup")
async def startup():
FastAPICache.init(InMemoryBackend())