Move towards a more object-oriented API
This commit is contained in:
		@@ -10,7 +10,7 @@ import recurring_ical_events
 | 
				
			|||||||
import yaml
 | 
					import yaml
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from asyncache import cached
 | 
					from asyncache import cached
 | 
				
			||||||
from cachetools import TTLCache
 | 
					from cachetools import TTLCache, LRUCache
 | 
				
			||||||
from icalendar import Calendar
 | 
					from icalendar import Calendar
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from fabcal.models import CalendarEvent
 | 
					from fabcal.models import CalendarEvent
 | 
				
			||||||
@@ -22,8 +22,20 @@ class ConfiguredCalendar(NamedTuple):
 | 
				
			|||||||
    default_color: str = None
 | 
					    default_color: str = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# this is not ideal, since it will read the file (at most) every 2 minutes, but there is no good way to cache the
 | 
					class CalendarClient:
 | 
				
			||||||
# settings, e.g., by storing it in the FastAPI object
 | 
					    def __init__(self, url: str):
 | 
				
			||||||
 | 
					        self.url = url
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def get(self, session):
 | 
				
			||||||
 | 
					        async with session.get(self.url) as response:
 | 
				
			||||||
 | 
					            response.raise_for_status()
 | 
				
			||||||
 | 
					            assert response.content_type.lower() == "text/calendar"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            return await response.text()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# this will be cached permanently, i.e., the server process needs to be restarted to apply config changes
 | 
				
			||||||
 | 
					@cached(LRUCache(100))
 | 
				
			||||||
def read_calendars_from_config_file():
 | 
					def read_calendars_from_config_file():
 | 
				
			||||||
    with open("config.yml") as f:
 | 
					    with open("config.yml") as f:
 | 
				
			||||||
        data = yaml.safe_load(f)
 | 
					        data = yaml.safe_load(f)
 | 
				
			||||||
@@ -32,55 +44,61 @@ def read_calendars_from_config_file():
 | 
				
			|||||||
            yield ConfiguredCalendar(**calendar)
 | 
					            yield ConfiguredCalendar(**calendar)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class CombinedCalendarClient:
 | 
				
			||||||
 | 
					    def __init__(self, configured_calendars: List[ConfiguredCalendar]):
 | 
				
			||||||
 | 
					        # make sure it's a list since read_calendars_from_config_file() returns an iterator
 | 
				
			||||||
 | 
					        self.configured_calendars = list(configured_calendars)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def fetch_calendars(self) -> Dict[ConfiguredCalendar, str]:
 | 
				
			||||||
 | 
					        async with aiohttp.ClientSession() as session:
 | 
				
			||||||
 | 
					            calendar_clients = [CalendarClient(calendar.url) for calendar in self.configured_calendars]
 | 
				
			||||||
 | 
					            responses = await asyncio.gather(*[calendar.get(session) for calendar in calendar_clients])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return dict(zip(self.configured_calendars, responses))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @staticmethod
 | 
				
			||||||
 | 
					    def combine_calendars(data: Dict[ConfiguredCalendar, str]) -> Calendar:
 | 
				
			||||||
 | 
					        combined_calendar = Calendar()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        combined_calendar.add("prodid", "-//FabCal//NONSGML//EN")
 | 
				
			||||||
 | 
					        combined_calendar.add("version", "2.0")
 | 
				
			||||||
 | 
					        combined_calendar.add("x-wr-calname", "FabLab Altmühlfranken e.V.")
 | 
				
			||||||
 | 
					        # TODO: normalize timezones of calendar events
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for configured_calendar, ical_str in data.items():
 | 
				
			||||||
 | 
					            cal = Calendar.from_ical(ical_str)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # check for a calendar color (e.g., Nextcloud has such a feature)
 | 
				
			||||||
 | 
					            # events that don't have a color assigned will be assigned this color unless a color was configured
 | 
				
			||||||
 | 
					            default_color = configured_calendar.default_color
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if not default_color:
 | 
				
			||||||
 | 
					                try:
 | 
				
			||||||
 | 
					                    # note: for some reason, getattr doesn't work
 | 
				
			||||||
 | 
					                    default_color = cal["x-apple-calendar-color"]
 | 
				
			||||||
 | 
					                except KeyError:
 | 
				
			||||||
 | 
					                    default_color = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # we don't copy anything but events from the
 | 
				
			||||||
 | 
					            for event in cal.walk("VEVENT"):
 | 
				
			||||||
 | 
					                # if no color has been configured in the event, we
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if "color" not in event:
 | 
				
			||||||
 | 
					                    event["color"] = default_color
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                combined_calendar.add_component(event)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return combined_calendar
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def fetch_and_combine_calendars(self) -> Calendar:
 | 
				
			||||||
 | 
					        return self.combine_calendars(await self.fetch_calendars())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@cached(TTLCache(maxsize=100, ttl=int(os.environ.get("FABCAL_CACHE_EXPIRE", 120))))
 | 
					@cached(TTLCache(maxsize=100, ttl=int(os.environ.get("FABCAL_CACHE_EXPIRE", 120))))
 | 
				
			||||||
async def get_data() -> Dict[ConfiguredCalendar, str]:
 | 
					async def get_data() -> Calendar:
 | 
				
			||||||
    calendars = list(read_calendars_from_config_file())
 | 
					    client = CombinedCalendarClient(read_calendars_from_config_file())
 | 
				
			||||||
 | 
					    combined_calendar = await client.fetch_and_combine_calendars()
 | 
				
			||||||
    async with aiohttp.ClientSession() as session:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        async def get(url: str) -> str:
 | 
					 | 
				
			||||||
            async with session.get(url) as response:
 | 
					 | 
				
			||||||
                response.raise_for_status()
 | 
					 | 
				
			||||||
                assert response.content_type.lower() == "text/calendar"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                return await response.text()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        responses = await asyncio.gather(*[get(calendar.url) for calendar in calendars])
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    return dict(zip(calendars, responses))
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def combined_calendar(data: Dict[ConfiguredCalendar, str]) -> Calendar:
 | 
					 | 
				
			||||||
    combined_calendar = Calendar()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    combined_calendar.add("prodid", "-//FabCal//NONSGML//EN")
 | 
					 | 
				
			||||||
    combined_calendar.add("version", "2.0")
 | 
					 | 
				
			||||||
    combined_calendar.add("x-wr-calname", "FabLab Altmühlfranken e.V.")
 | 
					 | 
				
			||||||
    # TODO: normalize timezones of calendar events
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    for configured_calendar, ical_str in data.items():
 | 
					 | 
				
			||||||
        cal = Calendar.from_ical(ical_str)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # check for a calendar color (e.g., Nextcloud has such a feature)
 | 
					 | 
				
			||||||
        # events that don't have a color assigned will be assigned this color unless a color was configured
 | 
					 | 
				
			||||||
        default_color = configured_calendar.default_color
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not default_color:
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                # note: for some reason, getattr doesn't work
 | 
					 | 
				
			||||||
                default_color = cal["x-apple-calendar-color"]
 | 
					 | 
				
			||||||
            except KeyError:
 | 
					 | 
				
			||||||
                default_color = None
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # we don't copy anything but events from the
 | 
					 | 
				
			||||||
        for event in cal.walk("VEVENT"):
 | 
					 | 
				
			||||||
            # if no color has been configured in the event, we
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            if "color" not in event:
 | 
					 | 
				
			||||||
                event["color"] = default_color
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            combined_calendar.add_component(event)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    return combined_calendar
 | 
					    return combined_calendar
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -88,7 +106,7 @@ def get_tzinfo():
 | 
				
			|||||||
    return timezone(timedelta(hours=1))
 | 
					    return timezone(timedelta(hours=1))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_events_from_calendar_string(cal: Calendar) -> List[CalendarEvent]:
 | 
					def get_events_from_calendar(cal: Calendar) -> List[CalendarEvent]:
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    Generate list of events from calendar vevents.
 | 
					    Generate list of events from calendar vevents.
 | 
				
			||||||
    Expands recurring events for +- one year.
 | 
					    Expands recurring events for +- one year.
 | 
				
			||||||
@@ -147,9 +165,7 @@ def get_events_from_calendar_string(cal: Calendar) -> List[CalendarEvent]:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def get_future_events():
 | 
					async def get_future_events():
 | 
				
			||||||
    cal = combined_calendar(await get_data())
 | 
					    events = get_events_from_calendar(await get_data())
 | 
				
			||||||
 | 
					 | 
				
			||||||
    events = get_events_from_calendar_string(cal)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    today = datetime.today().date()
 | 
					    today = datetime.today().date()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,7 +1,7 @@
 | 
				
			|||||||
from fastapi import APIRouter
 | 
					from fastapi import APIRouter
 | 
				
			||||||
from starlette.responses import Response
 | 
					from starlette.responses import Response
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from fabcal.calendar_client import combined_calendar, get_data
 | 
					from fabcal.calendar_client import get_data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
router = APIRouter()
 | 
					router = APIRouter()
 | 
				
			||||||
@@ -10,7 +10,7 @@ router = APIRouter()
 | 
				
			|||||||
@router.get("/events.ics")
 | 
					@router.get("/events.ics")
 | 
				
			||||||
async def events_ics():
 | 
					async def events_ics():
 | 
				
			||||||
    return Response(
 | 
					    return Response(
 | 
				
			||||||
        combined_calendar(await get_data()).to_ical(True),
 | 
					        (await get_data()).to_ical(True),
 | 
				
			||||||
        headers={
 | 
					        headers={
 | 
				
			||||||
            "content-type": "text/calendar",
 | 
					            "content-type": "text/calendar",
 | 
				
			||||||
        },
 | 
					        },
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user