Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • to/majak
  • b1242/majak
2 results
Select Git revision
Show changes
Showing
with 723 additions and 0 deletions
import logging
from django.core.management.base import BaseCommand
from ...models import Calendar
logger = logging.getLogger(__name__)
class Command(BaseCommand):
def handle(self, *args, **options):
self.stdout.write("Removing orphaned calendars...")
for cal in Calendar.objects.filter(
districtcalendarpage=None,
districtcenterpage=None,
districthomepage=None,
districtpersonpage=None,
elections2021calendarpage=None,
mainpersonpage=None,
senatcampaignhomepage=None,
uniwebhomepage=None,
uniwebcalendarpage=None,
):
try:
self.stdout.write(f"- {cal.id} | {cal.url}")
cal.delete()
except Exception as e:
logger.error("Calendar delete failed for %s", cal.url, exc_info=True)
self.stdout.write(" - failed")
self.stdout.write(str(e))
self.stdout.write("Updating calendars...")
for cal in Calendar.objects.all():
self.stdout.write(f"\n@ {cal.id} | {cal.url}")
try:
cal.update_source()
self.stdout.write("+ ok")
except Exception as e:
logger.error("Calendar update failed for %s", cal.url, exc_info=True)
self.stdout.write("- failed")
self.stdout.write(str(e))
self.stdout.write("\nUpdating calendars finished!")
# Generated by Django 3.0.6 on 2020-05-22 08:30
import django.core.serializers.json
from django.db import migrations, models
import calendar_utils.models
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name="Calendar",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("url", models.URLField(unique=True)),
("source", models.TextField(null=True)),
("last_update", models.DateTimeField(null=True)),
(
"past_events",
calendar_utils.models.EventsJSONField(
encoder=django.core.serializers.json.DjangoJSONEncoder,
null=True,
),
),
(
"future_events",
calendar_utils.models.EventsJSONField(
encoder=django.core.serializers.json.DjangoJSONEncoder,
null=True,
),
),
],
),
]
# Generated by Django 3.0.6 on 2020-05-23 00:43
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("calendar_utils", "0001_initial"),
]
operations = [
migrations.AlterField(
model_name="calendar",
name="url",
field=models.URLField(),
),
]
# Generated by Django 4.0.4 on 2022-05-05 10:27
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("calendar_utils", "0002_auto_20200523_0243"),
]
operations = [
migrations.RemoveField(
model_name="calendar",
name="source",
),
migrations.AddField(
model_name="calendar",
name="event_hash",
field=models.CharField(max_length=256, null=True),
),
]
# Generated by Django 4.0.4 on 2022-05-05 10:28
from datetime import date, timedelta
import arrow
from django.db import migrations
from calendar_utils.icalevents import icalevents
from calendar_utils.parser import process_event_list
def update_event_lists(apps, schema):
Calendar = apps.get_model("calendar_utils", "Calendar")
for calendar in Calendar.objects.all():
try:
event_list = icalevents.events(
url=calendar.url,
start=date.today() - timedelta(days=30),
end=date.today() + timedelta(days=60),
)
except ValueError:
print("Could not parse calendar from {}".format(calendar.url))
event_list_hash = str(hash(str(event_list)))
past, future = process_event_list(event_list)
calendar.past_events = past
calendar.future_events = list(reversed(future))
calendar.event_hash = event_list_hash
calendar.last_update = arrow.utcnow().datetime
calendar.save()
class Migration(migrations.Migration):
dependencies = [
("calendar_utils", "0003_remove_calendar_source_calendar_event_hash"),
]
operations = [
migrations.RunPython(update_event_lists, reverse_code=migrations.RunPython.noop)
]
import json
import logging
from datetime import timedelta
from functools import partial
import arrow
from django.core.serializers.json import DjangoJSONEncoder
from django.core.validators import URLValidator, ValidationError
from django.db import models, transaction
from django.utils.html import escape
from django.utils.timezone import now
from .icalevents import icalevents
from .parser import process_event_list
from .tasks import update_calendar_source
logger = logging.getLogger(__name__)
def _convert_arrow_to_datetime(event):
event["start"] = event["start"].datetime
event["end"] = event["end"].datetime
return event
class EventsJSONField(models.JSONField):
"""
JSONField for lists of events which converts `begin` and `end` to datetime
on load from DB.
"""
def from_db_value(self, value, expression, connection):
value = super().from_db_value(value, expression, connection)
urlValidator = URLValidator()
if value:
for event in value:
event["start"] = arrow.get(event.get("start")).datetime
event["end"] = arrow.get(event["end"]).datetime
try:
urlValidator(event.get("location"))
event["url"] = event.get("location")
except ValidationError:
pass
return value
class Calendar(models.Model):
CURRENT_NUM = 6
url = models.URLField()
event_hash = models.CharField(max_length=256, null=True)
last_update = models.DateTimeField(null=True)
past_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True)
future_events = EventsJSONField(encoder=DjangoJSONEncoder, null=True)
def current_events(self):
if self.future_events is not None:
return self.future_events[: self.CURRENT_NUM]
else:
return []
def handle_event_list(self, event_list):
event_list_hash = str(hash(str(event_list)))
if event_list_hash != self.event_hash:
past, future = process_event_list(event_list)
self.past_events = past
self.future_events = future
self.event_hash = event_list_hash
self.last_update = arrow.utcnow().datetime
self.save()
def update_source(self):
event_list = icalevents.events(
url=self.url,
start=now() - timedelta(days=30),
end=now()
+ timedelta(days=365), # Pull a year ahead due to "popular" demand.
)
self.handle_event_list(event_list)
class CalendarMixin(models.Model):
"""
Mixin to be used in other models, like site settings, which adds relation
to Calendar.
"""
calendar_url = models.URLField(
"URL kalendáře ve formátu iCal",
blank=True,
null=True,
help_text="Kalendář se po uložení stránky aktualizuje na pozadí. U plnějších kalendářů to může trvat i desítky sekund.",
)
calendar = models.ForeignKey(
Calendar, null=True, blank=True, on_delete=models.SET_NULL
)
class Meta:
abstract = True
def get_fullcalendar_data(self) -> str:
calendar_format_events = []
if self.calendar is None:
return []
for event in (
self.calendar.past_events if self.calendar.past_events is not None else []
) + (
self.calendar.future_events
if self.calendar.future_events is not None
else []
):
parsed_event = {
"allDay": event["all_day"],
"start": event["start"].isoformat(),
"end": event["end"].isoformat(),
}
if event["summary"] not in ("", None):
parsed_event["title"] = event["summary"]
if event["url"] not in ("", None):
parsed_event["url"] = event["url"]
if event["location"] not in ("", None):
parsed_event["location"] = event["location"]
if event["description"] not in ("", None):
parsed_event["description"] = event["description"]
calendar_format_events.append(parsed_event)
return escape(json.dumps(calendar_format_events))
def save(self, *args, **kwargs):
# create or update related Calendar
if self.calendar_url:
if not self.calendar or self.calendar.url != self.calendar_url:
calendar = Calendar.objects.filter(url=self.calendar_url).first()
if calendar:
self.calendar = calendar
else:
self.calendar = Calendar.objects.create(url=self.calendar_url)
transaction.on_commit(
partial(update_calendar_source.delay, self.calendar.id)
)
# delete related Calendar when URL is cleared
if not self.calendar_url and self.calendar:
self.calendar = None
super().save(*args, **kwargs)
from operator import itemgetter
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo
import arrow
import bleach
import nh3
from django.conf import settings
from django.utils.timezone import is_naive
if TYPE_CHECKING:
from .icalevents.icalparser import Event
EVENT_KEYS = ("start", "end", "all_day", "summary", "description", "location", "url")
def split_event_dict_list(event_list: "list[dict]") -> tuple[list[dict], list[dict]]:
"""Splits events and returns list of past events and future events."""
singularity = arrow.utcnow().shift(hours=-2)
past = [ev for ev in event_list if ev["end"] < singularity]
future = list(reversed([ev for ev in event_list if ev["end"] > singularity]))
return past, future
def set_event_description(event: "Event") -> "Event":
"""Clears even description from unwanted tags."""
description: str = event.description or ""
event.description = bleach.clean(description, tags=["a", "br"], strip=True)
return event
def set_event_duration(event: "Event") -> "Event":
"""Sets duration for event."""
if event.all_day:
event.duration = "celý den"
return event
delta = event.end - event.start
if delta.days < 1:
begin = arrow.get(event.start).to(settings.TIME_ZONE).format("H:mm")
end = arrow.get(event.end).to(settings.TIME_ZONE).format("H:mm")
event.duration = f"{begin} - {end}"
else:
begin = arrow.get(event.start).to(settings.TIME_ZONE).format("H:mm")
end = arrow.get(event.end).to(settings.TIME_ZONE).format("H:mm (D.M.)")
event.duration = f"{begin} - {end}"
return event
def set_event_timezone(event: "Event") -> "Event":
"""Sets default project timezone for event if missing."""
if is_naive(event.start) or is_naive(event.end):
event.start = event.start.replace(tzinfo=ZoneInfo(settings.TIME_ZONE))
event.end = event.end.replace(tzinfo=ZoneInfo(settings.TIME_ZONE))
return event
def process_event(event: "Event") -> dict:
"""Processes single event for use in Majak"""
event = set_event_timezone(event)
event = set_event_duration(event)
event = set_event_description(event)
event.description = nh3.clean(
event.description,
tags={"h1", "h2", "h3", "h4", "h5", "h6", "a", "em", "p", "b", "strong", "br"},
)
# for event in sorted(cal.events, key=attrgetter("start"), reverse=True): TODO check
return {key: getattr(event, key) for key in EVENT_KEYS}
def process_event_list(event_list: "list[Event]") -> tuple[list[dict], list[dict]]:
"""Parses iCalendar source and returns events as list of dicts. Returns
tuple of past and future events.
"""
processed_event_list = list(map(process_event, event_list))
processed_event_list = sorted(
processed_event_list, key=itemgetter("start"), reverse=True
)
return split_event_dict_list(processed_event_list)
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task
def update_calendar_source(calendar_id):
from .models import Calendar # noqa circular import
cal = Calendar.objects.get(id=calendar_id)
cal.update_source()
from django import template
register = template.Library()
def event_list(calendar, full_list):
"""
Outputs a list of events, in case the calendar is on calendar page, it will print all future events,
otherwise will print only a fraction
"""
return calendar.future_events if full_list else calendar.current_events
register.filter("event_list", event_list)
from django.apps import AppConfig
class CzechInspirationalConfig(AppConfig):
name = "czech_inspirational"
# Generated by Django 3.1.7 on 2021-03-22 21:37
import django.db.models.deletion
import wagtailmetadata.models
from django.db import migrations, models
import shared.models
class Migration(migrations.Migration):
initial = True
dependencies = [
("wagtailcore", "0060_fix_workflow_unique_constraint"),
("wagtailimages", "0023_add_choose_permissions"),
]
operations = [
migrations.CreateModel(
name="CzechInspirationalHomePage",
fields=[
(
"page_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="wagtailcore.page",
),
),
(
"matomo_id",
models.IntegerField(
blank=True,
null=True,
verbose_name="Matomo ID pro sledování návštěvnosti",
),
),
(
"search_image",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="wagtailimages.image",
verbose_name="Search image",
),
),
],
options={
"verbose_name": "Česko inspirativní",
},
bases=(
"wagtailcore.page",
wagtailmetadata.models.WagtailImageMetadataMixin,
models.Model,
),
),
migrations.CreateModel(
name="CzechInspirationalDownloadPage",
fields=[
(
"page_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="wagtailcore.page",
),
),
(
"search_image",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="wagtailimages.image",
verbose_name="Search image",
),
),
],
options={
"verbose_name": "Download",
},
bases=(
"wagtailcore.page",
shared.models.SubpageMixin,
wagtailmetadata.models.WagtailImageMetadataMixin,
models.Model,
),
),
migrations.CreateModel(
name="CzechInspirationalChaptersPage",
fields=[
(
"page_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="wagtailcore.page",
),
),
(
"search_image",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="wagtailimages.image",
verbose_name="Search image",
),
),
],
options={
"verbose_name": "Přehled kapitol",
},
bases=(
"wagtailcore.page",
shared.models.SubpageMixin,
wagtailmetadata.models.WagtailImageMetadataMixin,
models.Model,
),
),
migrations.CreateModel(
name="CzechInspirationalChapterPage",
fields=[
(
"page_ptr",
models.OneToOneField(
auto_created=True,
on_delete=django.db.models.deletion.CASCADE,
parent_link=True,
primary_key=True,
serialize=False,
to="wagtailcore.page",
),
),
(
"search_image",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="wagtailimages.image",
verbose_name="Search image",
),
),
],
options={
"verbose_name": "Kapitola",
},
bases=(
"wagtailcore.page",
shared.models.SubpageMixin,
wagtailmetadata.models.WagtailImageMetadataMixin,
models.Model,
),
),
]
# Generated by Django 3.1.7 on 2021-03-23 01:03
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("czech_inspirational", "0001_initial"),
]
operations = [
migrations.AddField(
model_name="czechinspirationalchapterpage",
name="number",
field=models.IntegerField(default=0, verbose_name="číslo kapitoly"),
),
]
# Generated by Django 3.1.7 on 2021-03-23 02:12
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("wagtaildocs", "0012_uploadeddocument"),
("czech_inspirational", "0002_czechinspirationalchapterpage_number"),
]
operations = [
migrations.AddField(
model_name="czechinspirationaldownloadpage",
name="book_file",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="wagtaildocs.document",
),
),
]
# Generated by Django 3.1.7 on 2021-03-23 02:52
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("wagtaildocs", "0012_uploadeddocument"),
("czech_inspirational", "0003_czechinspirationaldownloadpage_book_file"),
]
operations = [
migrations.AddField(
model_name="czechinspirationalhomepage",
name="buy_book_url",
field=models.URLField(
blank=True, null=True, verbose_name="URL pro nákup knihy"
),
),
migrations.AlterField(
model_name="czechinspirationaldownloadpage",
name="book_file",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="wagtaildocs.document",
verbose_name="ebook",
),
),
]
# Generated by Django 3.1.7 on 2021-03-23 04:36
import django.db.models.deletion
import wagtail.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("wagtailimages", "0023_add_choose_permissions"),
("czech_inspirational", "0004_auto_20210323_0352"),
]
operations = [
migrations.AddField(
model_name="czechinspirationalchapterpage",
name="author",
field=models.CharField(
blank=True, max_length=250, null=True, verbose_name="autor"
),
),
migrations.AddField(
model_name="czechinspirationalchapterpage",
name="image",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.PROTECT,
to="wagtailimages.image",
verbose_name="obrázek",
),
),
migrations.AddField(
model_name="czechinspirationalchapterpage",
name="text",
field=wagtail.fields.RichTextField(blank=True, verbose_name="text"),
),
]