Skip to content
Snippets Groups Projects
models.py 4.92 KiB
import json
import logging
from datetime import timedelta
from functools import partial

import arrow
from django.core.serializers.json import DjangoJSONEncoder
from django.core.validators import URLValidator, ValidationError
from django.db import models, transaction
from django.utils.html import escape
from django.utils.timezone import now

from .icalevents import icalevents
from .parser import process_event_list
from .tasks import update_calendar_source

logger = logging.getLogger(__name__)


def _convert_arrow_to_datetime(event):
    event["start"] = event["start"].datetime
    event["end"] = event["end"].datetime
    return event


class EventsJSONField(models.JSONField):
    """
    JSONField for lists of events which converts `begin` and `end` to datetime
    on load from DB.
    """

    def from_db_value(self, value, expression, connection):
        value = super().from_db_value(value, expression, connection)
        urlValidator = URLValidator()
        if value:
            for event in value:
                event["start"] = arrow.get(event.get("start")).datetime
                event["end"] = arrow.get(event["end"]).datetime
                try:
                    urlValidator(event.get("location"))
                    event["url"] = event.get("location")
                except ValidationError:
                    pass

        return value


class Calendar(models.Model):
    CURRENT_NUM = 6

    url = models.URLField()
    event_hash = models.CharField(max_length=256, null=True)
    last_update = models.DateTimeField(null=True)
    past_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True)
    future_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True)

    def current_events(self):
        if self.future_events is not None:
            return self.future_events[: self.CURRENT_NUM]
        else:
            return []

    def handle_event_list(self, event_list):
        event_list_hash = str(hash(str(event_list)))

        if event_list_hash != self.event_hash:
            past, future = process_event_list(event_list)
            self.past_events = past
            self.future_events = future
            self.event_hash = event_list_hash

        self.last_update = arrow.utcnow().datetime
        self.save()

    def update_source(self):
        event_list = icalevents.events(
            url=self.url,
            start=now() - timedelta(days=30),
            end=now()
            + timedelta(days=365),  # Pull a year ahead due to "popular" demand.
        )
        self.handle_event_list(event_list)


class CalendarMixin(models.Model):
    """
    Mixin to be used in other models, like site settings, which adds relation
    to Calendar.
    """

    calendar_url = models.URLField(
        "URL kalendáře ve formátu iCal",
        blank=True,
        null=True,
        help_text="Kalendář se po uložení stránky aktualizuje na pozadí. U plnějších kalendářů to může trvat i desítky sekund.",
    )
    calendar = models.ForeignKey(
        Calendar, null=True, blank=True, on_delete=models.SET_NULL
    )

    class Meta:
        abstract = True

    def get_fullcalendar_data(self) -> str:
        calendar_format_events = []

        if self.calendar is None:
            return []

        for event in (
            self.calendar.past_events if self.calendar.past_events is not None else []
        ) + (
            self.calendar.future_events
            if self.calendar.future_events is not None
            else []
        ):
            parsed_event = {
                "allDay": event["all_day"],
                "start": event["start"].isoformat(),
                "end": event["end"].isoformat(),
            }

            if event["summary"] not in ("", None):
                parsed_event["title"] = event["summary"]

            if event["url"] not in ("", None):
                parsed_event["url"] = event["url"]

            if event["location"] not in ("", None):
                parsed_event["location"] = event["location"]

            if event["description"] not in ("", None):
                parsed_event["description"] = event["description"]

            calendar_format_events.append(parsed_event)

        return escape(json.dumps(calendar_format_events))

    def save(self, *args, **kwargs):
        # create or update related Calendar
        if self.calendar_url:
            if not self.calendar or self.calendar.url != self.calendar_url:
                calendar = Calendar.objects.filter(url=self.calendar_url).first()
                if calendar:
                    self.calendar = calendar
                else:
                    self.calendar = Calendar.objects.create(url=self.calendar_url)

            transaction.on_commit(
                partial(update_calendar_source.delay, self.calendar.id)
            )

        # delete related Calendar when URL is cleared
        if not self.calendar_url and self.calendar:
            self.calendar = None

        super().save(*args, **kwargs)