import logging import os import random import re import string import urllib import xml.etree.ElementTree as ET import zipfile from datetime import date from http.client import InvalidURL from io import StringIO from typing import List from urllib.error import HTTPError import markdown.serializers import yaml from django.core.files.images import ImageFile from django.utils import timezone from markdown import Markdown from markdown.extensions import Extension from markdown.inlinepatterns import InlineProcessor from wagtail.contrib.redirects.models import Redirect from wagtail.core.models import Page from wagtail.core.models.collections import Collection from wagtail.core.rich_text import RichText from wagtail.images.models import Image from yaml.scanner import ScannerError logger = logging.getLogger(__name__) # from django.utils.dateparse import parse_date # pro hledani krome title i podle data image_params = {} # filled on JekyllArticleImporter init and used globally POSTS_DIR = "_posts" # ------------------------------- Misc helper functions ------------------------------- def clone_repo(url: str) -> (str, str): """ Naclonuje repo do tmp s využitím gitu a vrátí cestu k němu. Pokud URL končí lomítkem, odebereme ho, a vezmeme jako název repozitáře string za posledním lomítkem jako název repa. To použijeme i pro promazání takového adresáře, pokud už existuje. """ path = "/tmp/" if url.endswith("/"): url = url[:-1] repo_name = url.split("/")[-1] repo_path = os.path.join(path, repo_name) os.chdir(path) if os.path.exists(repo_path): os.chdir(repo_path) os.system("git pull --depth 1") return repo_path, repo_name os.system("git clone --depth 1 {}".format(url)) return repo_path, repo_name def download_repo_as_zip(url: str) -> (str, str): """ Stáhne .zip repa, extrahuje a vrátí cestu k extrahovanému repu. Hodně nešikovné je, že extrahovaná složka má ještě suffix "-gh-pages" a to nevím, jestli platí vždy... regex taky pro název repa také není optimální, ale ve finále nehraje moc roli, pokud vrátí cokoliv použitelného pro file name. """ path = "/tmp/" repo_name = re.search("pirati-web/(.*)/archive/", url).group(1) zip_path = "{}{}.zip".format(path, repo_name) if os.path.exists(zip_path): os.remove(zip_path) urllib.request.urlretrieve(url, zip_path) with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(path) # zdá se, že někdy je -gh-pages, někdy -master... gh_pages_path = os.path.join(path, "{}-gh-pages".format(repo_name)) gh_pages_path_exists = os.path.exists(gh_pages_path) master_path = os.path.join(path, "{}-master".format(repo_name)) master_path_exists = os.path.exists(master_path) if gh_pages_path_exists: return gh_pages_path, repo_name if master_path_exists: return master_path, repo_name else: raise NotImplementedError("Tento zip nedokážeme zpracovat.") def get_or_create_image( path: str, file_path: str, collection, repo_name: str ) -> Image or None: """ Funkce, která se snaží najít a vrátit Wagtail Image. Nejdříve hledá v existujících podle cesty, resp. title... Pak zkusí najít soubor fyzicky na disku... Pak zkusí ještě assets/img adresář... Pak zkusí ještě assets/img/posts adresář... Pak zkusí stáhnout image z https://a.pirati.cz... Pak zkusí přidat do https://a.pirati.cz "posts"... Pak se na to vykašle... Šla by určitě rozsekat a začistit, ale vzhledem k tomu, že je to one-timer, tak jsme to pouze dotáhli do stavu, kdy to schroupne co nejvíce případů. """ file_path = file_path.lstrip("/") if Image.objects.filter(title=file_path).exists(): return Image.objects.filter(title=file_path).first(), "" try: file = ImageFile(open(os.path.join(path, file_path), "rb"), name=file_path) image = Image(title=file_path, file=file, collection=collection) image.save() return image, "" except FileNotFoundError: pass # cesta pomocí file_path neexisuje, jdeme dál try: file = ImageFile( open(os.path.join(path, "assets/img", file_path), "rb"), name=file_path, ) image = Image(title=file_path, file=file, collection=collection) image.save() return image, "" except FileNotFoundError: pass # cesta s vložením "assets/img" před file_path neexisuje, jdeme dál try: file = ImageFile( open(os.path.join(path, "assets/img/posts", file_path), "rb"), name=file_path, ) image = Image(title=file_path, file=file, collection=collection) image.save() return image, "" except FileNotFoundError: pass # ani cesta "assets/img/posts" nefunguje, jdeme zkusit assets server a.pirati.cz fallback_name = ( "".join(random.choice(string.ascii_lowercase) for _ in range(10)) + ".jpg" ) # někdy je název obrzau spojený s poznámkou apod., připravíme si fallback name img_name = file_path.split("/")[-1] or fallback_name img_assets_folder = repo_name.split(".")[0] # např. "praha" z praha.pirati.cz img_url = "https://a.pirati.cz/{}/img/{}".format( img_assets_folder, file_path.split("#")[0] # cistime nazev od poznamek apod ) img_path = os.path.join(path, img_name) try: urllib.request.urlretrieve(img_url, img_path) except (HTTPError, UnicodeEncodeError, InvalidURL, IsADirectoryError): try: # druhý pokus s "posts" přidáno do URL (obvykle je ve file_path) img_url = "https://a.pirati.cz/{}/img/posts/{}".format( img_assets_folder, img_name.split()[0] ) urllib.request.urlretrieve(img_url, img_path) except (HTTPError, UnicodeEncodeError, InvalidURL, IsADirectoryError): msg = "Nedohledán obrázek při importu článků - ani na disku, ani na URL" log_message = "{}: cesta {}, URL {}\n".format(msg, file_path, img_url) logger.warning( log_message, extra={ "file_path": file_path, "img_name": img_name, "img_url": img_url, }, ) return None, log_message file = ImageFile(open(img_path, "rb"), name=img_path) image = Image(title=file_path, file=file, collection=collection) try: image.save() except Exception as e: msg = "Nelze uložit obrázek" logger.warning(msg, extra={"exc": e}) return None, msg return image, "" def get_path_and_repo_name(url: str, use_git: bool) -> (str, str): """ Vrací cestu a název repozitáře podle toho zíksané různými způsoby, podle toho jestli se jedná o odkaz na zip nebo na git. """ if use_git: return clone_repo(url) else: return download_repo_as_zip(url) def get_site_config(path) -> dict: """ Vrací config Jekyll repa jako dict. """ with open(os.path.join(path, "_config.yml")) as f: config = yaml.safe_load(f.read()) return config def get_title_from_site_config(site_config: dict) -> str: if "title" in site_config: return " - " + site_config.get("title", "") return "" def unmark_element(element, stream=None): """ Očišťuje element (perex) od ostatních značek """ if stream is None: stream = StringIO() if element.text: stream.write(element.text) for sub in element: unmark_element(sub, stream) if element.tail: stream.write(element.tail) return stream.getvalue() # ------------------- Setup markdown extensions and settings ----------------------- class ImgProcessor(InlineProcessor): def handleMatch(self, m, data): el = ET.Element("embed") el.attrib["embedtype"] = "image" el.attrib["alt"] = m.group(1) el.attrib["format"] = "left" parsed_image_path = JekyllArticleImporter.get_parsed_file_path(m.group(2)) image_obj, _ = get_or_create_image( path=image_params["path"], file_path=parsed_image_path, collection=image_params["collection"], repo_name=image_params["repo_name"], ) if not image_obj: return None, m.start(0), m.end(0) el.attrib["id"] = str(image_obj.pk) return el, m.start(0), m.end(0) class ImgExtension(Extension): def extendMarkdown(self, md): IMG_PATTERN = r"!\[(.*?)\]\((.*?)\)" md.inlinePatterns.register(ImgProcessor(IMG_PATTERN, md), "img", 175) # Wagtail to portrebuje # https://docs.wagtail.io/en/stable/extending/rich_text_internals.html#data-format markdown.serializers.HTML_EMPTY.add("embed") Markdown.output_formats["plain"] = unmark_element plain_md = Markdown(output_format="plain") plain_md.stripTopLevelTags = False html_md = Markdown(extensions=[ImgExtension()]) params = {} # ------------------------------- Importer class ------------------------------- class JekyllArticleImporter: def __init__( self, article_parent_page_id: int, collection_id: int, url: str, dry_run: bool, use_git: bool, parent_page_model, page_model, ): self.page_model = page_model # Params self.article_parent_page = parent_page_model.objects.get( id=article_parent_page_id ).specific # TODO test if specific should be included or not self.collection = Collection.objects.get(id=collection_id) self.dry_run = dry_run self.use_git = use_git self.url = url # Computed props self.path, self.repo_name = get_path_and_repo_name(self.url, self.use_git) self.site = self.article_parent_page.get_site() self.site_config = get_site_config(self.path) self.article_path = self.site_config.get("articlepath", None) self.permalink = self.site_config.get("permalink", None) self.title_suffix = get_title_from_site_config(self.site_config) # Counters self.success_counter = 0 self.exists_counter = 0 self.skipped_counter = 0 self.page_log = "" # output saved on page instance # Filling global var for ImgParser image_params["path"] = self.path image_params["collection"] = self.collection image_params["repo_name"] = self.repo_name def create_redirects(self, article, match): y = match.group(1) m = match.group(2) d = match.group(3) slug = match.group(4) if self.article_path: # asi jenom Ceske Budejovice Redirect.objects.get_or_create( site=self.site, old_path="/%s/%s/%s/%s/%s" % (self.article_path, y, m.zfill(2), d.zfill(2), slug), defaults={"is_permanent": True, "redirect_page": article}, ) elif self.permalink: Redirect.objects.get_or_create( site=self.site, old_path=self.permalink.replace(":title", slug), defaults={"is_permanent": True, "redirect_page": article}, ) def create_summary_log(self): """ Podle (aktuálních) hodnot counterů přidá do self.page_log různé zprávy pro uživatele. """ self.page_log += "==================================\n" if self.success_counter: base_msg = "Úspěšně otestováno" if self.dry_run else "Úspěšně naimportováno" self.page_log += "{} {} článků\n".format(base_msg, self.success_counter) if self.exists_counter: self.page_log += "z toho {} již existovalo\n".format(self.exists_counter) if self.skipped_counter: self.page_log += "NELZE importovat {} článků\n".format(self.skipped_counter) self.article_parent_page.last_import_log = self.page_log self.article_parent_page.save() @staticmethod def get_parsed_file_path(path: str): """ Získá cestu z proměnné v "{{ }}" závorkách """ if "{{" in path: try: parsed_path = path.split("{{")[1].split("|")[0].split("'")[1] except IndexError: parsed_path = path.split("{{")[1].split("|")[0].split('"')[1] return parsed_path else: return path @staticmethod def get_perex(text): text = re.split(r"^\s*$", text.strip(), flags=re.MULTILINE)[0] return plain_md.convert(text) @staticmethod def handle_meta_is_str(meta: str) -> dict: """ Snaží se vyřešit situaci, že meta se nenaparsovala na dict, ale na string, kde je sice klíč:hodnota, ale další klíč následuje za mezerou po předchozí hodnotě. Iteruju teda přes rozesekaný string přes dvojtečky, ale každá value kromě poslední (viz if idx == len(string_parts) - 2) má za poslední mezerou klíč pro další hodnotu. Takže položku z další iterace splitnu přes mezery, spojím všechny kromě poslední do value a tu přiřadím klíči z aktuální iterace. Poslední položka iterace už je samotná value, takže tam handluju jinak. Protože sahám na +1, tak hlídám číslo iterace pro přeskočení poslední (kde už je položka iterace pouze value bez key, takže jí beru v před-předposlední...). """ logger.info( "Meta se neparsuje na dict, ale na str - zkouším složitější parse", extra={"article_meta": meta}, ) string_parts = meta.split(":") meta_dict = {} for idx, part in enumerate(string_parts): if idx == len(string_parts) - 1: break key = part.split()[-1] if idx == len(string_parts) - 2: value = string_parts[idx + 1] else: value = " ".join(string_parts[idx + 1].split()[0:-1]) meta_dict.update({key: value}) return meta_dict def import_post(self, file_path): with open(os.path.join(self.path, file_path), "rt") as f: r = re.split(r"^---\s*$", f.read(), maxsplit=3, flags=re.MULTILINE) try: meta = yaml.safe_load(r[1].replace("\t", "")) except (ScannerError, ValueError): msg = "Nelze importovat článek - neparsovatelný YAML" logger.warning(msg, extra={"file_path": file_path}) self.page_log += "{} - {}\n".format(msg, file_path) self.skipped_counter += 1 return None md = r[2] html = html_md.convert(md) if isinstance(meta, str): # pokud se špatně naparsovalo meta (není dict) meta = self.handle_meta_is_str(meta) try: title = meta["title"] except TypeError: msg = "Nelze importovat článek - nepodařilo se získat title" logger.warning(msg, extra={"article_meta": meta}) self.page_log += "{} - {}\n".format(msg, meta) self.skipped_counter += 1 return None try: article = ( self.article_parent_page.get_descendants().get(title=title).specific ) self.exists_counter += 1 except (Page.DoesNotExist, Page.MultipleObjectsReturned): article = self.page_model() article.content = [("text", RichText(html))] article.perex = self.get_perex(md) or "..." if meta.get("date", None): meta_date = meta["date"] if isinstance(meta_date, date): article.date = meta_date else: parsed_date = meta["date"].split()[0] article.date = ( parsed_date[0:10] if parsed_date else timezone.now().date() ) else: article.date = timezone.now().date() article.title = meta["title"] article.author = meta.get("author", "Česká pirátská strana") article.seo_title = article.title + self.title_suffix article.search_description = meta.get("description", "") # for tag in meta['tags']: # article.tags.add(tag) if meta.get("image", None): article.image, log_message = get_or_create_image( self.path, meta["image"], self.collection, self.repo_name ) if log_message: self.page_log += "{}: {}".format(article.title, log_message) if self.dry_run: return article try: if not article.id: self.article_parent_page.add_child(instance=article) logger.info("Vytvářím článek: %s" % article) rev = article.save_revision() if meta.get("published", True): rev.publish() except Exception as e: msg = "Nelze uložit importovaný článek" logger.warning( msg, extra={"article_title": article.title, "exception": e}, ) self.page_log += "{} - {} - {}\n".format(msg, article.title, e) self.skipped_counter += 1 return article self.success_counter += 1 return article def perform_import(self) -> "List[dict]": """ Projde adresář článků a pokusí se zprocesovat Markdown do article. Vrací list dict pro django messages (klíč levelu, text). """ logger.info("Import započat") for file_name in os.listdir(os.path.join(self.path, POSTS_DIR)): # Případ podsložek (typicky po jednotlivých letech) if os.path.isdir(os.path.join(self.path, POSTS_DIR, file_name)): posts_sub_folder = os.path.join(self.path, POSTS_DIR, file_name) for sub_file_name in os.listdir(posts_sub_folder): file_path = os.path.join(posts_sub_folder, sub_file_name) self.process_article(sub_file_name, file_path) # Případ všech článků v jedné složce else: file_path = os.path.join(POSTS_DIR, file_name) self.process_article(file_name, file_path) self.create_summary_log() logger.info("Import dokončen") def process_article(self, file_name: str, file_path: str): match = re.match(r"(\d*)-(\d*)-(\d*)-(.*)\.(.*)", file_name) if match: ext = match.group(5) if ext == "md": article = self.import_post(file_path) if self.dry_run or not article: return article.save() # ujistím se, že mám "redirect_page" pro Redirect uloženou self.create_redirects(article, match) else: msg = "Nepodporovaná přípona souboru: %s" % ext logger.warning(msg) self.page_log += "{}\n".format(msg) self.skipped_counter += 1 else: msg = "Přeskočeno: %s" % file_name logger.warning(msg) self.page_log += "{}\n".format(msg) self.skipped_counter += 1