-
Alexa Valentová authoredAlexa Valentová authored
models.py 4.92 KiB
import json
import logging
from datetime import timedelta
from functools import partial
import arrow
from django.core.serializers.json import DjangoJSONEncoder
from django.core.validators import URLValidator, ValidationError
from django.db import models, transaction
from django.utils.html import escape
from django.utils.timezone import now
from .icalevents import icalevents
from .parser import process_event_list
from .tasks import update_calendar_source
logger = logging.getLogger(__name__)
def _convert_arrow_to_datetime(event):
event["start"] = event["start"].datetime
event["end"] = event["end"].datetime
return event
class EventsJSONField(models.JSONField):
"""
JSONField for lists of events which converts `begin` and `end` to datetime
on load from DB.
"""
def from_db_value(self, value, expression, connection):
value = super().from_db_value(value, expression, connection)
urlValidator = URLValidator()
if value:
for event in value:
event["start"] = arrow.get(event.get("start")).datetime
event["end"] = arrow.get(event["end"]).datetime
try:
urlValidator(event.get("location"))
event["url"] = event.get("location")
except ValidationError:
pass
return value
class Calendar(models.Model):
CURRENT_NUM = 6
url = models.URLField()
event_hash = models.CharField(max_length=256, null=True)
last_update = models.DateTimeField(null=True)
past_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True)
future_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True)
def current_events(self):
if self.future_events is not None:
return self.future_events[: self.CURRENT_NUM]
else:
return []
def handle_event_list(self, event_list):
event_list_hash = str(hash(str(event_list)))
if event_list_hash != self.event_hash:
past, future = process_event_list(event_list)
self.past_events = past
self.future_events = future
self.event_hash = event_list_hash
self.last_update = arrow.utcnow().datetime
self.save()
def update_source(self):
event_list = icalevents.events(
url=self.url,
start=now() - timedelta(days=30),
end=now()
+ timedelta(days=365), # Pull a year ahead due to "popular" demand.
)
self.handle_event_list(event_list)
class CalendarMixin(models.Model):
"""
Mixin to be used in other models, like site settings, which adds relation
to Calendar.
"""
calendar_url = models.URLField(
"URL kalendáře ve formátu iCal",
blank=True,
null=True,
help_text="Kalendář se po uložení stránky aktualizuje na pozadí. U plnějších kalendářů to může trvat i desítky sekund.",
)
calendar = models.ForeignKey(
Calendar, null=True, blank=True, on_delete=models.SET_NULL
)
class Meta:
abstract = True
def get_fullcalendar_data(self) -> str:
calendar_format_events = []
if self.calendar is None:
return []
for event in (
self.calendar.past_events if self.calendar.past_events is not None else []
) + (
self.calendar.future_events
if self.calendar.future_events is not None
else []
):
parsed_event = {
"allDay": event["all_day"],
"start": event["start"].isoformat(),
"end": event["end"].isoformat(),
}
if event["summary"] not in ("", None):
parsed_event["title"] = event["summary"]
if event["url"] not in ("", None):
parsed_event["url"] = event["url"]
if event["location"] not in ("", None):
parsed_event["location"] = event["location"]
if event["description"] not in ("", None):
parsed_event["description"] = event["description"]
calendar_format_events.append(parsed_event)
return escape(json.dumps(calendar_format_events))
def save(self, *args, **kwargs):
# create or update related Calendar
if self.calendar_url:
if not self.calendar or self.calendar.url != self.calendar_url:
calendar = Calendar.objects.filter(url=self.calendar_url).first()
if calendar:
self.calendar = calendar
else:
self.calendar = Calendar.objects.create(url=self.calendar_url)
transaction.on_commit(
partial(update_calendar_source.delay, self.calendar.id)
)
# delete related Calendar when URL is cleared
if not self.calendar_url and self.calendar:
self.calendar = None
super().save(*args, **kwargs)