import json import logging from datetime import timedelta from functools import partial import arrow from django.core.serializers.json import DjangoJSONEncoder from django.core.validators import URLValidator, ValidationError from django.db import models, transaction from django.utils.html import escape from django.utils.timezone import now from .icalevents import icalevents from .parser import process_event_list from .tasks import update_calendar_source logger = logging.getLogger(__name__) def _convert_arrow_to_datetime(event): event["start"] = event["start"].datetime event["end"] = event["end"].datetime return event class EventsJSONField(models.JSONField): """ JSONField for lists of events which converts `begin` and `end` to datetime on load from DB. """ def from_db_value(self, value, expression, connection): value = super().from_db_value(value, expression, connection) urlValidator = URLValidator() if value: for event in value: event["start"] = arrow.get(event.get("start")).datetime event["end"] = arrow.get(event["end"]).datetime try: urlValidator(event.get("location")) event["url"] = event.get("location") except ValidationError: pass return value class Calendar(models.Model): CURRENT_NUM = 6 url = models.URLField() event_hash = models.CharField(max_length=256, null=True) last_update = models.DateTimeField(null=True) past_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True) future_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True) def current_events(self): if self.future_events is not None: return self.future_events[: self.CURRENT_NUM] else: return [] def handle_event_list(self, event_list): event_list_hash = str(hash(str(event_list))) if event_list_hash != self.event_hash: past, future = process_event_list(event_list) self.past_events = past self.future_events = future self.event_hash = event_list_hash self.last_update = arrow.utcnow().datetime self.save() def update_source(self): event_list = icalevents.events( url=self.url, start=now() - timedelta(days=30), end=now() + timedelta(days=365), # Pull a year ahead due to "popular" demand. ) self.handle_event_list(event_list) class CalendarMixin(models.Model): """ Mixin to be used in other models, like site settings, which adds relation to Calendar. """ calendar_url = models.URLField( "URL kalendáře ve formátu iCal", blank=True, null=True, help_text="Kalendář se po uložení stránky aktualizuje na pozadí. U plnějších kalendářů to může trvat i desítky sekund.", ) calendar = models.ForeignKey( Calendar, null=True, blank=True, on_delete=models.SET_NULL ) class Meta: abstract = True def get_fullcalendar_data(self) -> str: calendar_format_events = [] if self.calendar is None: return [] for event in ( self.calendar.past_events if self.calendar.past_events is not None else [] ) + ( self.calendar.future_events if self.calendar.future_events is not None else [] ): parsed_event = { "allDay": event["all_day"], "start": event["start"].isoformat(), "end": event["end"].isoformat(), } if event["summary"] not in ("", None): parsed_event["title"] = event["summary"] if event["url"] not in ("", None): parsed_event["url"] = event["url"] if event["location"] not in ("", None): parsed_event["location"] = event["location"] if event["description"] not in ("", None): parsed_event["description"] = event["description"] calendar_format_events.append(parsed_event) return escape(json.dumps(calendar_format_events)) def save(self, *args, **kwargs): # create or update related Calendar if self.calendar_url: if not self.calendar or self.calendar.url != self.calendar_url: calendar = Calendar.objects.filter(url=self.calendar_url).first() if calendar: self.calendar = calendar else: self.calendar = Calendar.objects.create(url=self.calendar_url) transaction.on_commit( partial(update_calendar_source.delay, self.calendar.id) ) # delete related Calendar when URL is cleared if not self.calendar_url and self.calendar: self.calendar = None super().save(*args, **kwargs)