From 222761b145c3d3082fa023d68f57e3ba159e5031 Mon Sep 17 00:00:00 2001 From: OndraRehounek <ondra.rehounek@seznam.cz> Date: Fri, 25 Mar 2022 17:04:55 +0100 Subject: [PATCH] Refactor jekyll import to Class --- district/forms.py | 7 +- district/jekyll_import.py | 659 ++++++++++++++++++++------------------ 2 files changed, 354 insertions(+), 312 deletions(-) diff --git a/district/forms.py b/district/forms.py index 579e97e8..38a96195 100644 --- a/district/forms.py +++ b/district/forms.py @@ -3,7 +3,7 @@ from django.contrib.messages import ERROR, WARNING from wagtail.admin.forms import WagtailAdminPageForm from wagtail.core.models.collections import Collection -from .jekyll_import import perform_import +from .jekyll_import import JekyllArticleImporter class JekyllImportForm(WagtailAdminPageForm): @@ -72,13 +72,14 @@ class JekyllImportForm(WagtailAdminPageForm): return cleaned_data def handle_import(self): - import_message_list = perform_import( + import_message_list = JekyllArticleImporter( article_parent_page=self.instance, collection=self.cleaned_data["collection"], url=self.cleaned_data["jekyll_repo_url"], dry_run=self.cleaned_data["dry_run"], use_git=self.cleaned_data["use_git"], - ) + ).perform_import() + self.instance.import_message_list = import_message_list return import_message_list diff --git a/district/jekyll_import.py b/district/jekyll_import.py index d2e5d49d..123a4223 100644 --- a/district/jekyll_import.py +++ b/district/jekyll_import.py @@ -1,3 +1,4 @@ +import logging import os import re import urllib @@ -23,187 +24,76 @@ from wagtail.core.rich_text import RichText from wagtail.images.models import Image from yaml.scanner import ScannerError +logger = logging.getLogger(__name__) # from django.utils.dateparse import parse_date TODO enable date check again... -# Wagtail to portrebuje https://docs.wagtail.io/en/stable/extending/rich_text_internals.html#data-format -markdown.serializers.HTML_EMPTY.add("embed") - -message_list = [] - - -# Plain format pro perex -def unmark_element(element, stream=None): - if stream is None: - stream = StringIO() - if element.text: - stream.write(element.text) - for sub in element: - unmark_element(sub, stream) - if element.tail: - stream.write(element.tail) - return stream.getvalue() - - -Markdown.output_formats["plain"] = unmark_element -plain_md = Markdown(output_format="plain") -plain_md.stripTopLevelTags = False - -params = {} - - -class ImgProcessor(InlineProcessor): - def handleMatch(self, m, data): - el = ET.Element("embed") - el.attrib["embedtype"] = "image" - el.attrib["alt"] = m.group(1) - el.attrib["format"] = "left" - collection = get_collection() - # TODO FIX REPO_NAME - parsed_image_path = get_parsed_file_path(m.group(2)) - image_obj = get_or_create_image( - params["path"], parsed_image_path, collection=collection, repo_name="" - ) - if not image_obj: - message_list.append( - { - "level": WARNING, - "text": "Nenalezen obrázek {}".format(params["path"]), - } - ) - return None, m.start(0), m.end(0) - - el.attrib["id"] = str(image_obj.pk) - return el, m.start(0), m.end(0) - - -def get_parsed_file_path(path: str): - if "{{" in path: - try: - parsed_path = path.split("{{")[1].split("|")[0].split("'")[1] - except IndexError: - parsed_path = path.split("{{")[1].split("|")[0].split('"')[1] - return parsed_path - else: - return path - - -class ImgExtension(Extension): - def extendMarkdown(self, md): - IMG_PATTERN = r"!\[(.*?)\]\((.*?)\)" - md.inlinePatterns.register(ImgProcessor(IMG_PATTERN, md), "img", 175) - - -html_md = Markdown(extensions=[ImgExtension()]) - - -def get_perex(text): - text = re.split(r"^\s*$", text.strip(), flags=re.MULTILINE)[0] - return plain_md.convert(text) - - +image_params = ( + {} +) # filled on JekyllArticleImported init and used in markdown overwrites POSTS_DIR = "_posts" -# TITLE_SUFFIX = " - Piráti České Budějovice" +# ------------------------------- Misc helper functions ------------------------------- -def get_site_config(path) -> dict: - with open(os.path.join(path, "_config.yml")) as f: - config = yaml.safe_load(f.read()) - return config - - -def import_post(path, file_path, parent, title_suffix, dry_run, repo_name): - from district.models import DistrictArticlePage - - with open(os.path.join(path, file_path), "rt") as f: - r = re.split(r"^---\s*$", f.read(), maxsplit=3, flags=re.MULTILINE) - try: - meta = yaml.safe_load(r[1]) - except (ScannerError, ValueError): - message_list.append( - {"level": ERROR, "text": "Nevalidní yaml pro {}".format(path)} - ) - return None, False - - md = r[2] - html = html_md.convert(md) - - # meta_date = meta["date"] - # article_date = meta_date if isinstance(meta_date, date) else meta["date"].split()[0] - - if DistrictArticlePage.objects.filter(title=meta["title"]).exists(): - for article in DistrictArticlePage.objects.filter(title=meta["title"]): - # if article.date == parse_date(meta["date"].split()[0]): - msg = "Článek již existuje: %s" % article - stdout.write(msg) - message_list.append({"level": INFO, "text": msg}) - return article, False - - article = DistrictArticlePage() - - # article.text = html - article.content = [("text", RichText(html))] - article.perex = get_perex(md) - - if meta.get("date", None): - meta_date = meta["date"] - if isinstance(meta_date, date): - article.date = meta_date - else: - parsed_date = meta["date"].split()[0] - article.date = parsed_date[0:10] if parsed_date else timezone.now().date() - else: - article.date = timezone.now().date() - - article.title = meta["title"] - article.author = meta.get("author", "Česká pirátská strana") +def clone_repo(url: str) -> (str, str): + """ + Naclonuje repo do tmp s využitím gitu a vrátí cestu k němu. + Pokud URL končí lomítkem, odebereme ho, a vezmeme jako název repozitáře + string za posledním lomítkem jako název repa. To použijeme i pro promazání + takového adresáře, pokud už existuje. + """ + path = "/tmp/" + if url.endswith("/"): + url = url[:-1] + repo_name = url.split("/")[-1] + repo_path = os.path.join(path, repo_name) - article.seo_title = article.title + title_suffix - article.search_description = meta.get("description", "") + os.chdir(path) + if os.path.exists(repo_path): + os.chdir(repo_path) + os.system("git pull --depth 1") + return repo_path, repo_name - # for tag in meta['tags']: - # article.tags.add(tag) + os.system("git clone --depth 1 {}".format(url)) - collection = get_collection() - if meta.get("image", None): - article.image = get_or_create_image( - path, meta["image"], collection=collection, repo_name=repo_name - ) + return repo_path, repo_name - if dry_run: - return article, True - - try: - parent.add_child(instance=article) - stdout.write("Creating article: %s" % article) - rev = article.save_revision() - if meta.get("published", True): - rev.publish() - except Exception as e: - message_list.append( - { - "level": WARNING if dry_run else ERROR, - "text": "Nelze uložit článek {}: {}".format(article.title, str(e)), - } - ) - return article, False - return article, True +def download_repo_as_zip(url: str) -> (str, str): + """ + Stáhne .zip repa, extrahuje a vrátí cestu k extrahovanému repu. + Hodně nešikovné je, že extrahovaná složka má ještě suffix "-gh-pages" + a to nevím, jestli platí vždy... regex taky pro název repa také není optimální, + ale ve finále nehraje moc roli, pokud vrátí cokoliv použitelného pro file name. + """ + path = "/tmp/" + repo_name = re.search("pirati-web/(.*)/archive/", url).group(1) + zip_path = "{}{}.zip".format(path, repo_name) + if os.path.exists(zip_path): + os.remove(zip_path) -def get_collection(): - return params["kolekce"] + urllib.request.urlretrieve(url, zip_path) + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(path) -def get_path_and_repo_name(url: str, use_git: bool) -> (str, str): - if use_git: - return clone_repo(url) - else: - return download_repo_as_zip(url) + return os.path.join(path, "{}-gh-pages".format(repo_name)), repo_name -def get_or_create_image(path, file_path, collection, repo_name) -> Image or None: +def get_or_create_image( + path: str, file_path: str, collection, repo_name: str +) -> Image or None: + """ + Funkce, která se snaží najít a vrátit Wagtail Image. + Nejdříve hledá v existujících podle cesty... + Pak zkusí najít soubor fyzicky na disku... + Pak zkusí ještě assets/img adresář... + Pak zkusí stáhnout image z https://a.pirati.cz... + Pak se na to vykašle... + """ file_path = file_path.lstrip("/") + if Image.objects.filter(title=file_path).exists(): return Image.objects.filter(title=file_path).first() else: @@ -223,7 +113,7 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None return image except FileNotFoundError: img_name = file_path.split("/")[-1] - img_assets_folder = repo_name.split(".")[0] # TODO make as form field + img_assets_folder = repo_name.split(".")[0] img_url = "https://a.pirati.cz/{}/img/{}".format( img_assets_folder, file_path.split("#")[0] ) @@ -231,11 +121,16 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None try: urllib.request.urlretrieve(img_url, img_path) except (HTTPError, UnicodeEncodeError, InvalidURL, IsADirectoryError): - message_list.append( - { - "level": WARNING, - "text": "Nelze stáhout obrázek {}".format(img_url), - } + # FIXME praha má např. https://a.pirati.cz/praha/img/posts/srovnani.png + # ale dává to 'https://a.pirati.cz//img/assets/img/posts/srovnani.png' + + logger.warning( + "Nedohledán obrázek při importu článků", + extra={ + "file_path": file_path, + "img_name": img_name, + "img_url": img_url, + }, ) return None @@ -245,174 +140,320 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None return image +def get_path_and_repo_name(url: str, use_git: bool) -> (str, str): + """ + Vrací cestu a název repozitáře podle toho zíksané různými způsoby, + podle toho jestli se jedná o odkaz na zip nebo na git. + """ + if use_git: + return clone_repo(url) + else: + return download_repo_as_zip(url) + + +def get_site_config(path) -> dict: + """ + Vrací config Jekyll repa jako dict. + """ + with open(os.path.join(path, "_config.yml")) as f: + config = yaml.safe_load(f.read()) + return config + + def get_title_from_site_config(site_config: dict) -> str: if "title" in site_config: return " - " + site_config.get("title", "") return "" -def clone_repo(url: str) -> (str, str): +def unmark_element(element, stream=None): """ - Naclonuje repo do tmp s využitím gitu a vrátí cestu k němu. - Pokud URL končí lomítkem, odebereme ho, a vezmeme jako název repozitáře - string za posledním lomítkem jako název repa. To použijeme i pro promazání - takového adresáře, pokud už existuje. + Očišťuje element (perex) od ostatních značek """ - path = "/tmp/" - if url.endswith("/"): - url = url[:-1] - repo_name = url.split("/")[-1] - repo_path = os.path.join(path, repo_name) + if stream is None: + stream = StringIO() + if element.text: + stream.write(element.text) + for sub in element: + unmark_element(sub, stream) + if element.tail: + stream.write(element.tail) + return stream.getvalue() - os.chdir(path) - if os.path.exists(repo_path): - os.chdir(repo_path) - os.system("git pull --depth 1") - return repo_path, repo_name - os.system("git clone --depth 1 {}".format(url)) +# ------------------- Setup markdown extensions and settings ----------------------- - return repo_path, repo_name +class ImgProcessor(InlineProcessor): + def handleMatch(self, m, data): + el = ET.Element("embed") + el.attrib["embedtype"] = "image" + el.attrib["alt"] = m.group(1) + el.attrib["format"] = "left" -def download_repo_as_zip(url: str) -> (str, str): - """ - Stáhne .zip repa, extrahuje a vrátí cestu k extrahovanému repu. - Hodně nešikovné je, že extrahovaná složka má ještě suffix "-gh-pages" - a to nevím, jestli platí vždy... regex taky pro název repa také není optimální, - ale ve finále nehraje moc roli, pokud vrátí cokoliv použitelného pro file name. - """ - path = "/tmp/" - repo_name = re.search("pirati-web/(.*)/archive/", url).group(1) - zip_path = "{}{}.zip".format(path, repo_name) + parsed_image_path = JekyllArticleImporter.get_parsed_file_path(m.group(2)) + image_obj = get_or_create_image( + path=image_params["path"], + file_path=parsed_image_path, + collection=image_params["collection"], + repo_name=image_params["repo_name"], + ) - if os.path.exists(zip_path): - os.remove(zip_path) + if not image_obj: + return None, m.start(0), m.end(0) - urllib.request.urlretrieve(url, zip_path) + el.attrib["id"] = str(image_obj.pk) + return el, m.start(0), m.end(0) - with zipfile.ZipFile(zip_path, "r") as zip_ref: - zip_ref.extractall(path) - return os.path.join(path, "{}-gh-pages".format(repo_name)), repo_name +class ImgExtension(Extension): + def extendMarkdown(self, md): + IMG_PATTERN = r"!\[(.*?)\]\((.*?)\)" + md.inlinePatterns.register(ImgProcessor(IMG_PATTERN, md), "img", 175) -def perform_import( - article_parent_page, collection, url: str, dry_run: bool, use_git: bool -) -> "List[dict]": - """ - Přijímá parent page pro články, kolekci pro obrázky, url pro stažení (zip nebo git - repo, boolean jestli jde o testovací běh a boolean, zda použít git (anebo zip)). - Vrací list dict pro requests messages (klíče level, text). - """ - success_counter = 0 - - params["kolekce"] = collection - site = article_parent_page.get_site() - - params["path"], repo_name = get_path_and_repo_name(url=url, use_git=use_git) - path = params["path"] - site_config = get_site_config(path) - title_suffix = get_title_from_site_config(site_config) - - articlepath = site_config.get("articlepath", "aktuality") - - for fn in os.listdir(os.path.join(path, POSTS_DIR)): - if os.path.isdir(os.path.join(path, POSTS_DIR, fn)): - posts_sub_folder = os.path.join(path, POSTS_DIR, fn) - for sub_fn in os.listdir(posts_sub_folder): - file_path = os.path.join(posts_sub_folder, sub_fn) - success_counter = article_parser( - articlepath, - path, - sub_fn, - file_path, - article_parent_page, - title_suffix, - dry_run, - repo_name, - success_counter, - site, - ) - else: - file_path = os.path.join(POSTS_DIR, fn) - success_counter = article_parser( - articlepath, - path, - fn, - file_path, - article_parent_page, - title_suffix, - dry_run, - repo_name, - success_counter, - site, +# Wagtail to portrebuje +# https://docs.wagtail.io/en/stable/extending/rich_text_internals.html#data-format +markdown.serializers.HTML_EMPTY.add("embed") + +Markdown.output_formats["plain"] = unmark_element +plain_md = Markdown(output_format="plain") +plain_md.stripTopLevelTags = False + +html_md = Markdown(extensions=[ImgExtension()]) +params = {} + + +# ------------------------------- Importer class ------------------------------- + + +class JekyllArticleImporter: + def __init__( + self, article_parent_page, collection, url: str, dry_run: bool, use_git: bool + ): + # Params + self.article_parent_page = article_parent_page + self.collection = collection + self.dry_run = dry_run + self.use_git = use_git + self.url = url + + # Computed props + self.path, self.repo_name = get_path_and_repo_name(self.url, self.use_git) + self.site = article_parent_page.get_site() + self.site_config = get_site_config(self.path) + + self.article_path = self.site_config.get("articlepath", "aktuality") + self.title_suffix = get_title_from_site_config(self.site_config) + + # Counters + self.success_counter = 0 + self.exists_counter = 0 + self.skipped_counter = 0 + # self.image_warning_counter = 0 # TODO nějak vymyslet + + self.message_list = [] # output for django.messages + + # Filling global var for ImgParser + image_params["path"] = self.path + image_params["collection"] = self.collection + image_params["repo_name"] = self.repo_name + + def create_django_messages(self): + """ + Podle (aktuálních) hodnot counterů přidá do self.message_list + různé zprávy pro uživatele. + """ + if self.success_counter: + base_msg = "Lze importovat" if self.dry_run else "Úspěšně naimportováno" + self.message_list.append( + { + "level": SUCCESS, + "text": "{} {} článků".format(base_msg, self.success_counter), + } ) - if success_counter: - base_msg = "Lze importovat" if dry_run else "Úspěšně naimportováno" - message_list.append( - {"level": SUCCESS, "text": "{} {} článků".format(base_msg, success_counter)} - ) + if self.exists_counter: + self.message_list.append( + { + "level": INFO, + "text": "{} článků s tímto názvev již existuje".format( + self.exists_counter + ), + } + ) - return message_list - - -def article_parser( - articlepath: str, - path: str, - fn, - file_path, - article_parent_page, - title_suffix, - dry_run, - repo_name, - success_counter, - site, -): - match = re.match(r"(\d*)-(\d*)-(\d*)-(.*)\.(.*)", fn) - if match: - y = match.group(1) - m = match.group(2) - d = match.group(3) - slug = match.group(4) - ext = match.group(5) - - if ext == "md": - article, success = import_post( - path, file_path, article_parent_page, title_suffix, dry_run, repo_name + if self.skipped_counter: + self.message_list.append( + { + "level": WARNING, + "text": "Nelze importovat {} článků".format(self.skipped_counter), + } ) - if not success: - return success_counter + @staticmethod + def get_parsed_file_path(path: str): + """ + Získá cestu z proměnné v "{{ }}" závorkách + """ + if "{{" in path: + try: + parsed_path = path.split("{{")[1].split("|")[0].split("'")[1] + except IndexError: + parsed_path = path.split("{{")[1].split("|")[0].split('"')[1] + return parsed_path + else: + return path - if dry_run: - return success_counter + 1 + @staticmethod + def get_perex(text): + text = re.split(r"^\s*$", text.strip(), flags=re.MULTILINE)[0] + return plain_md.convert(text) - Redirect.objects.get_or_create( - site=site, - old_path="/%s/%s/%s/%s/%s" - % (articlepath, y, m.zfill(2), d.zfill(2), slug), - defaults={"is_permanent": True, "redirect_page": article}, + def import_post(self, file_path): + from district.models import DistrictArticlePage + + with open(os.path.join(self.path, file_path), "rt") as f: + r = re.split(r"^---\s*$", f.read(), maxsplit=3, flags=re.MULTILINE) + try: + meta = yaml.safe_load(r[1].replace("\t", "")) + except (ScannerError, ValueError): + logger.warning( + "Nelze importovat článek - neparsovatelný YAML", + extra={"file_path": file_path}, ) + self.skipped_counter += 1 + return None - # TODO handle redirects! PRAGUE X CB - # Redirect.objects.get_or_create( - # site=site, - # old_path="/%s/%s.html" - # % (articlepath, slug), - # defaults={"is_permanent": True, "redirect_page": article}, - # ) + md = r[2] + html = html_md.convert(md) - success_counter += 1 + # meta_date = meta["date"] + # article_date = meta_date if isinstance(meta_date, date) else meta["date"].split()[0] + try: + title = meta["title"] + except TypeError: + logger.warning( + "Nelze importovat článek - YAML se neparsuje na dict", + extra={"article_meta": meta}, + ) + self.skipped_counter += 1 + return None + + if DistrictArticlePage.objects.filter(title=title).exists(): + for article in DistrictArticlePage.objects.filter(title=meta["title"]): + # if article.date == parse_date(meta["date"].split()[0]): + msg = "Článek již existuje: %s" % article + stdout.write(msg) + # message_list.append({"level": INFO, "text": msg}) + self.exists_counter += 1 + + return None + + article = DistrictArticlePage() + + # article.text = html + article.content = [("text", RichText(html))] + article.perex = self.get_perex(md) + + if meta.get("date", None): + meta_date = meta["date"] + if isinstance(meta_date, date): + article.date = meta_date + else: + parsed_date = meta["date"].split()[0] + article.date = ( + parsed_date[0:10] if parsed_date else timezone.now().date() + ) else: - msg = "ERROR: Nepodporovaná přípona souboru: %s" % ext - message_list.append({"level": ERROR, "text": msg}) - stdout.write(msg) - else: - msg = "Skipped: %s" % fn - stdout.write(msg) + article.date = timezone.now().date() + + article.title = meta["title"] + article.author = meta.get("author", "Česká pirátská strana") + + article.seo_title = article.title + self.title_suffix + article.search_description = meta.get("description", "") + + # for tag in meta['tags']: + # article.tags.add(tag) + + if meta.get("image", None): + article.image = get_or_create_image( + self.path, meta["image"], self.collection, self.repo_name + ) + + if self.dry_run: + return article - if dry_run: - message_list.append({"level": WARNING, "text": msg}) - return success_counter + try: + self.article_parent_page.add_child(instance=article) + stdout.write("Vytvářím článek: %s" % article) + rev = article.save_revision() + if meta.get("published", True): + rev.publish() + except Exception as e: + logger.warning( + "Nelze uložit importovaný článek", + extra={"article_title": article.title, "exception": e}, + ) + self.skipped_counter += 1 + return article + + self.success_counter += 1 + return article + + def perform_import(self) -> "List[dict]": + """ + Projde adresář článků a pokusí se zprocesovat Markdown do article. + Vrací list dict pro django messages (klíč levelu, text). + """ + for file_name in os.listdir(os.path.join(self.path, POSTS_DIR)): + # Případ podsložek (typicky po jednotlivých letech) + if os.path.isdir(os.path.join(self.path, POSTS_DIR, file_name)): + posts_sub_folder = os.path.join(self.path, POSTS_DIR, file_name) + for sub_file_name in os.listdir(posts_sub_folder): + file_path = os.path.join(posts_sub_folder, sub_file_name) + self.process_article(sub_file_name, file_path) + # Případ všech článků v jedné složce + else: + file_path = os.path.join(POSTS_DIR, file_name) + self.process_article(file_name, file_path) + + self.create_django_messages() + return self.message_list + + def process_article(self, file_name: str, file_path: str): + match = re.match(r"(\d*)-(\d*)-(\d*)-(.*)\.(.*)", file_name) + if match: + y = match.group(1) + m = match.group(2) + d = match.group(3) + slug = match.group(4) + ext = match.group(5) + + if ext == "md": + article = self.import_post(file_path) + + if article: + Redirect.objects.get_or_create( + site=self.site, + old_path="/%s/%s/%s/%s/%s" + % (self.article_path, y, m.zfill(2), d.zfill(2), slug), + defaults={"is_permanent": True, "redirect_page": article}, + ) + + # TODO handle redirects! PRAGUE X CB + # Redirect.objects.get_or_create( + # site=site, + # old_path="/%s/%s.html" + # % (article_path, slug), + # defaults={"is_permanent": True, "redirect_page": article}, + # ) + else: + msg = "Nepodporovaná přípona souboru: %s" % ext + logger.warning(msg) + self.skipped_counter += 1 + else: + msg = "Přeskočeno: %s" % file_name + logger.warning(msg) + self.skipped_counter += 1 -- GitLab