Skip to content
Snippets Groups Projects
Commit 222761b1 authored by OndraRehounek's avatar OndraRehounek Committed by jan.bednarik
Browse files

Refactor jekyll import to Class

parent 657e3a7c
Branches
No related tags found
2 merge requests!442Release,!432Feature/majak imports
...@@ -3,7 +3,7 @@ from django.contrib.messages import ERROR, WARNING ...@@ -3,7 +3,7 @@ from django.contrib.messages import ERROR, WARNING
from wagtail.admin.forms import WagtailAdminPageForm from wagtail.admin.forms import WagtailAdminPageForm
from wagtail.core.models.collections import Collection from wagtail.core.models.collections import Collection
from .jekyll_import import perform_import from .jekyll_import import JekyllArticleImporter
class JekyllImportForm(WagtailAdminPageForm): class JekyllImportForm(WagtailAdminPageForm):
...@@ -72,13 +72,14 @@ class JekyllImportForm(WagtailAdminPageForm): ...@@ -72,13 +72,14 @@ class JekyllImportForm(WagtailAdminPageForm):
return cleaned_data return cleaned_data
def handle_import(self): def handle_import(self):
import_message_list = perform_import( import_message_list = JekyllArticleImporter(
article_parent_page=self.instance, article_parent_page=self.instance,
collection=self.cleaned_data["collection"], collection=self.cleaned_data["collection"],
url=self.cleaned_data["jekyll_repo_url"], url=self.cleaned_data["jekyll_repo_url"],
dry_run=self.cleaned_data["dry_run"], dry_run=self.cleaned_data["dry_run"],
use_git=self.cleaned_data["use_git"], use_git=self.cleaned_data["use_git"],
) ).perform_import()
self.instance.import_message_list = import_message_list self.instance.import_message_list = import_message_list
return import_message_list return import_message_list
......
import logging
import os import os
import re import re
import urllib import urllib
...@@ -23,187 +24,76 @@ from wagtail.core.rich_text import RichText ...@@ -23,187 +24,76 @@ from wagtail.core.rich_text import RichText
from wagtail.images.models import Image from wagtail.images.models import Image
from yaml.scanner import ScannerError from yaml.scanner import ScannerError
logger = logging.getLogger(__name__)
# from django.utils.dateparse import parse_date TODO enable date check again... # from django.utils.dateparse import parse_date TODO enable date check again...
# Wagtail to portrebuje https://docs.wagtail.io/en/stable/extending/rich_text_internals.html#data-format image_params = (
markdown.serializers.HTML_EMPTY.add("embed") {}
) # filled on JekyllArticleImported init and used in markdown overwrites
message_list = []
# Plain format pro perex
def unmark_element(element, stream=None):
if stream is None:
stream = StringIO()
if element.text:
stream.write(element.text)
for sub in element:
unmark_element(sub, stream)
if element.tail:
stream.write(element.tail)
return stream.getvalue()
Markdown.output_formats["plain"] = unmark_element
plain_md = Markdown(output_format="plain")
plain_md.stripTopLevelTags = False
params = {}
class ImgProcessor(InlineProcessor):
def handleMatch(self, m, data):
el = ET.Element("embed")
el.attrib["embedtype"] = "image"
el.attrib["alt"] = m.group(1)
el.attrib["format"] = "left"
collection = get_collection()
# TODO FIX REPO_NAME
parsed_image_path = get_parsed_file_path(m.group(2))
image_obj = get_or_create_image(
params["path"], parsed_image_path, collection=collection, repo_name=""
)
if not image_obj:
message_list.append(
{
"level": WARNING,
"text": "Nenalezen obrázek {}".format(params["path"]),
}
)
return None, m.start(0), m.end(0)
el.attrib["id"] = str(image_obj.pk)
return el, m.start(0), m.end(0)
def get_parsed_file_path(path: str):
if "{{" in path:
try:
parsed_path = path.split("{{")[1].split("|")[0].split("'")[1]
except IndexError:
parsed_path = path.split("{{")[1].split("|")[0].split('"')[1]
return parsed_path
else:
return path
class ImgExtension(Extension):
def extendMarkdown(self, md):
IMG_PATTERN = r"!\[(.*?)\]\((.*?)\)"
md.inlinePatterns.register(ImgProcessor(IMG_PATTERN, md), "img", 175)
html_md = Markdown(extensions=[ImgExtension()])
def get_perex(text):
text = re.split(r"^\s*$", text.strip(), flags=re.MULTILINE)[0]
return plain_md.convert(text)
POSTS_DIR = "_posts" POSTS_DIR = "_posts"
# TITLE_SUFFIX = " - Piráti České Budějovice"
# ------------------------------- Misc helper functions -------------------------------
def get_site_config(path) -> dict:
with open(os.path.join(path, "_config.yml")) as f:
config = yaml.safe_load(f.read())
return config
def import_post(path, file_path, parent, title_suffix, dry_run, repo_name):
from district.models import DistrictArticlePage
with open(os.path.join(path, file_path), "rt") as f:
r = re.split(r"^---\s*$", f.read(), maxsplit=3, flags=re.MULTILINE)
try:
meta = yaml.safe_load(r[1])
except (ScannerError, ValueError):
message_list.append(
{"level": ERROR, "text": "Nevalidní yaml pro {}".format(path)}
)
return None, False
md = r[2]
html = html_md.convert(md)
# meta_date = meta["date"]
# article_date = meta_date if isinstance(meta_date, date) else meta["date"].split()[0]
if DistrictArticlePage.objects.filter(title=meta["title"]).exists():
for article in DistrictArticlePage.objects.filter(title=meta["title"]):
# if article.date == parse_date(meta["date"].split()[0]):
msg = "Článek již existuje: %s" % article
stdout.write(msg)
message_list.append({"level": INFO, "text": msg})
return article, False def clone_repo(url: str) -> (str, str):
"""
article = DistrictArticlePage() Naclonuje repo do tmp s využitím gitu a vrátí cestu k němu.
Pokud URL končí lomítkem, odebereme ho, a vezmeme jako název repozitáře
# article.text = html string za posledním lomítkem jako název repa. To použijeme i pro promazání
article.content = [("text", RichText(html))] takového adresáře, pokud už existuje.
article.perex = get_perex(md) """
path = "/tmp/"
if meta.get("date", None): if url.endswith("/"):
meta_date = meta["date"] url = url[:-1]
if isinstance(meta_date, date): repo_name = url.split("/")[-1]
article.date = meta_date repo_path = os.path.join(path, repo_name)
else:
parsed_date = meta["date"].split()[0]
article.date = parsed_date[0:10] if parsed_date else timezone.now().date()
else:
article.date = timezone.now().date()
article.title = meta["title"]
article.author = meta.get("author", "Česká pirátská strana")
article.seo_title = article.title + title_suffix os.chdir(path)
article.search_description = meta.get("description", "") if os.path.exists(repo_path):
os.chdir(repo_path)
os.system("git pull --depth 1")
return repo_path, repo_name
# for tag in meta['tags']: os.system("git clone --depth 1 {}".format(url))
# article.tags.add(tag)
collection = get_collection() return repo_path, repo_name
if meta.get("image", None):
article.image = get_or_create_image(
path, meta["image"], collection=collection, repo_name=repo_name
)
if dry_run:
return article, True
try:
parent.add_child(instance=article)
stdout.write("Creating article: %s" % article)
rev = article.save_revision()
if meta.get("published", True):
rev.publish()
except Exception as e:
message_list.append(
{
"level": WARNING if dry_run else ERROR,
"text": "Nelze uložit článek {}: {}".format(article.title, str(e)),
}
)
return article, False
return article, True def download_repo_as_zip(url: str) -> (str, str):
"""
Stáhne .zip repa, extrahuje a vrátí cestu k extrahovanému repu.
Hodně nešikovné je, že extrahovaná složka má ještě suffix "-gh-pages"
a to nevím, jestli platí vždy... regex taky pro název repa také není optimální,
ale ve finále nehraje moc roli, pokud vrátí cokoliv použitelného pro file name.
"""
path = "/tmp/"
repo_name = re.search("pirati-web/(.*)/archive/", url).group(1)
zip_path = "{}{}.zip".format(path, repo_name)
if os.path.exists(zip_path):
os.remove(zip_path)
def get_collection(): urllib.request.urlretrieve(url, zip_path)
return params["kolekce"]
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(path)
def get_path_and_repo_name(url: str, use_git: bool) -> (str, str): return os.path.join(path, "{}-gh-pages".format(repo_name)), repo_name
if use_git:
return clone_repo(url)
else:
return download_repo_as_zip(url)
def get_or_create_image(path, file_path, collection, repo_name) -> Image or None: def get_or_create_image(
path: str, file_path: str, collection, repo_name: str
) -> Image or None:
"""
Funkce, která se snaží najít a vrátit Wagtail Image.
Nejdříve hledá v existujících podle cesty...
Pak zkusí najít soubor fyzicky na disku...
Pak zkusí ještě assets/img adresář...
Pak zkusí stáhnout image z https://a.pirati.cz...
Pak se na to vykašle...
"""
file_path = file_path.lstrip("/") file_path = file_path.lstrip("/")
if Image.objects.filter(title=file_path).exists(): if Image.objects.filter(title=file_path).exists():
return Image.objects.filter(title=file_path).first() return Image.objects.filter(title=file_path).first()
else: else:
...@@ -223,7 +113,7 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None ...@@ -223,7 +113,7 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None
return image return image
except FileNotFoundError: except FileNotFoundError:
img_name = file_path.split("/")[-1] img_name = file_path.split("/")[-1]
img_assets_folder = repo_name.split(".")[0] # TODO make as form field img_assets_folder = repo_name.split(".")[0]
img_url = "https://a.pirati.cz/{}/img/{}".format( img_url = "https://a.pirati.cz/{}/img/{}".format(
img_assets_folder, file_path.split("#")[0] img_assets_folder, file_path.split("#")[0]
) )
...@@ -231,11 +121,16 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None ...@@ -231,11 +121,16 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None
try: try:
urllib.request.urlretrieve(img_url, img_path) urllib.request.urlretrieve(img_url, img_path)
except (HTTPError, UnicodeEncodeError, InvalidURL, IsADirectoryError): except (HTTPError, UnicodeEncodeError, InvalidURL, IsADirectoryError):
message_list.append( # FIXME praha má např. https://a.pirati.cz/praha/img/posts/srovnani.png
{ # ale dává to 'https://a.pirati.cz//img/assets/img/posts/srovnani.png'
"level": WARNING,
"text": "Nelze stáhout obrázek {}".format(img_url), logger.warning(
} "Nedohledán obrázek při importu článků",
extra={
"file_path": file_path,
"img_name": img_name,
"img_url": img_url,
},
) )
return None return None
...@@ -245,174 +140,320 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None ...@@ -245,174 +140,320 @@ def get_or_create_image(path, file_path, collection, repo_name) -> Image or None
return image return image
def get_path_and_repo_name(url: str, use_git: bool) -> (str, str):
"""
Vrací cestu a název repozitáře podle toho zíksané různými způsoby,
podle toho jestli se jedná o odkaz na zip nebo na git.
"""
if use_git:
return clone_repo(url)
else:
return download_repo_as_zip(url)
def get_site_config(path) -> dict:
"""
Vrací config Jekyll repa jako dict.
"""
with open(os.path.join(path, "_config.yml")) as f:
config = yaml.safe_load(f.read())
return config
def get_title_from_site_config(site_config: dict) -> str: def get_title_from_site_config(site_config: dict) -> str:
if "title" in site_config: if "title" in site_config:
return " - " + site_config.get("title", "") return " - " + site_config.get("title", "")
return "" return ""
def clone_repo(url: str) -> (str, str): def unmark_element(element, stream=None):
""" """
Naclonuje repo do tmp s využitím gitu a vrátí cestu k němu. Očišťuje element (perex) od ostatních značek
Pokud URL končí lomítkem, odebereme ho, a vezmeme jako název repozitáře
string za posledním lomítkem jako název repa. To použijeme i pro promazání
takového adresáře, pokud už existuje.
""" """
path = "/tmp/" if stream is None:
if url.endswith("/"): stream = StringIO()
url = url[:-1] if element.text:
repo_name = url.split("/")[-1] stream.write(element.text)
repo_path = os.path.join(path, repo_name) for sub in element:
unmark_element(sub, stream)
if element.tail:
stream.write(element.tail)
return stream.getvalue()
os.chdir(path)
if os.path.exists(repo_path):
os.chdir(repo_path)
os.system("git pull --depth 1")
return repo_path, repo_name
os.system("git clone --depth 1 {}".format(url)) # ------------------- Setup markdown extensions and settings -----------------------
return repo_path, repo_name
class ImgProcessor(InlineProcessor):
def handleMatch(self, m, data):
el = ET.Element("embed")
el.attrib["embedtype"] = "image"
el.attrib["alt"] = m.group(1)
el.attrib["format"] = "left"
def download_repo_as_zip(url: str) -> (str, str): parsed_image_path = JekyllArticleImporter.get_parsed_file_path(m.group(2))
""" image_obj = get_or_create_image(
Stáhne .zip repa, extrahuje a vrátí cestu k extrahovanému repu. path=image_params["path"],
Hodně nešikovné je, že extrahovaná složka má ještě suffix "-gh-pages" file_path=parsed_image_path,
a to nevím, jestli platí vždy... regex taky pro název repa také není optimální, collection=image_params["collection"],
ale ve finále nehraje moc roli, pokud vrátí cokoliv použitelného pro file name. repo_name=image_params["repo_name"],
""" )
path = "/tmp/"
repo_name = re.search("pirati-web/(.*)/archive/", url).group(1)
zip_path = "{}{}.zip".format(path, repo_name)
if os.path.exists(zip_path): if not image_obj:
os.remove(zip_path) return None, m.start(0), m.end(0)
urllib.request.urlretrieve(url, zip_path) el.attrib["id"] = str(image_obj.pk)
return el, m.start(0), m.end(0)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(path)
return os.path.join(path, "{}-gh-pages".format(repo_name)), repo_name class ImgExtension(Extension):
def extendMarkdown(self, md):
IMG_PATTERN = r"!\[(.*?)\]\((.*?)\)"
md.inlinePatterns.register(ImgProcessor(IMG_PATTERN, md), "img", 175)
def perform_import( # Wagtail to portrebuje
article_parent_page, collection, url: str, dry_run: bool, use_git: bool # https://docs.wagtail.io/en/stable/extending/rich_text_internals.html#data-format
) -> "List[dict]": markdown.serializers.HTML_EMPTY.add("embed")
"""
Přijímá parent page pro články, kolekci pro obrázky, url pro stažení (zip nebo git Markdown.output_formats["plain"] = unmark_element
repo, boolean jestli jde o testovací běh a boolean, zda použít git (anebo zip)). plain_md = Markdown(output_format="plain")
Vrací list dict pro requests messages (klíče level, text). plain_md.stripTopLevelTags = False
"""
success_counter = 0 html_md = Markdown(extensions=[ImgExtension()])
params = {}
params["kolekce"] = collection
site = article_parent_page.get_site()
# ------------------------------- Importer class -------------------------------
params["path"], repo_name = get_path_and_repo_name(url=url, use_git=use_git)
path = params["path"]
site_config = get_site_config(path) class JekyllArticleImporter:
title_suffix = get_title_from_site_config(site_config) def __init__(
self, article_parent_page, collection, url: str, dry_run: bool, use_git: bool
articlepath = site_config.get("articlepath", "aktuality") ):
# Params
for fn in os.listdir(os.path.join(path, POSTS_DIR)): self.article_parent_page = article_parent_page
if os.path.isdir(os.path.join(path, POSTS_DIR, fn)): self.collection = collection
posts_sub_folder = os.path.join(path, POSTS_DIR, fn) self.dry_run = dry_run
for sub_fn in os.listdir(posts_sub_folder): self.use_git = use_git
file_path = os.path.join(posts_sub_folder, sub_fn) self.url = url
success_counter = article_parser(
articlepath, # Computed props
path, self.path, self.repo_name = get_path_and_repo_name(self.url, self.use_git)
sub_fn, self.site = article_parent_page.get_site()
file_path, self.site_config = get_site_config(self.path)
article_parent_page,
title_suffix, self.article_path = self.site_config.get("articlepath", "aktuality")
dry_run, self.title_suffix = get_title_from_site_config(self.site_config)
repo_name,
success_counter, # Counters
site, self.success_counter = 0
) self.exists_counter = 0
else: self.skipped_counter = 0
file_path = os.path.join(POSTS_DIR, fn) # self.image_warning_counter = 0 # TODO nějak vymyslet
success_counter = article_parser(
articlepath, self.message_list = [] # output for django.messages
path,
fn, # Filling global var for ImgParser
file_path, image_params["path"] = self.path
article_parent_page, image_params["collection"] = self.collection
title_suffix, image_params["repo_name"] = self.repo_name
dry_run,
repo_name, def create_django_messages(self):
success_counter, """
site, Podle (aktuálních) hodnot counterů přidá do self.message_list
různé zprávy pro uživatele.
"""
if self.success_counter:
base_msg = "Lze importovat" if self.dry_run else "Úspěšně naimportováno"
self.message_list.append(
{
"level": SUCCESS,
"text": "{} {} článků".format(base_msg, self.success_counter),
}
) )
if success_counter: if self.exists_counter:
base_msg = "Lze importovat" if dry_run else "Úspěšně naimportováno" self.message_list.append(
message_list.append( {
{"level": SUCCESS, "text": "{} {} článků".format(base_msg, success_counter)} "level": INFO,
) "text": "{} článků s tímto názvev již existuje".format(
self.exists_counter
),
}
)
return message_list if self.skipped_counter:
self.message_list.append(
{
def article_parser( "level": WARNING,
articlepath: str, "text": "Nelze importovat {} článků".format(self.skipped_counter),
path: str, }
fn,
file_path,
article_parent_page,
title_suffix,
dry_run,
repo_name,
success_counter,
site,
):
match = re.match(r"(\d*)-(\d*)-(\d*)-(.*)\.(.*)", fn)
if match:
y = match.group(1)
m = match.group(2)
d = match.group(3)
slug = match.group(4)
ext = match.group(5)
if ext == "md":
article, success = import_post(
path, file_path, article_parent_page, title_suffix, dry_run, repo_name
) )
if not success: @staticmethod
return success_counter def get_parsed_file_path(path: str):
"""
Získá cestu z proměnné v "{{ }}" závorkách
"""
if "{{" in path:
try:
parsed_path = path.split("{{")[1].split("|")[0].split("'")[1]
except IndexError:
parsed_path = path.split("{{")[1].split("|")[0].split('"')[1]
return parsed_path
else:
return path
if dry_run: @staticmethod
return success_counter + 1 def get_perex(text):
text = re.split(r"^\s*$", text.strip(), flags=re.MULTILINE)[0]
return plain_md.convert(text)
Redirect.objects.get_or_create( def import_post(self, file_path):
site=site, from district.models import DistrictArticlePage
old_path="/%s/%s/%s/%s/%s"
% (articlepath, y, m.zfill(2), d.zfill(2), slug), with open(os.path.join(self.path, file_path), "rt") as f:
defaults={"is_permanent": True, "redirect_page": article}, r = re.split(r"^---\s*$", f.read(), maxsplit=3, flags=re.MULTILINE)
try:
meta = yaml.safe_load(r[1].replace("\t", ""))
except (ScannerError, ValueError):
logger.warning(
"Nelze importovat článek - neparsovatelný YAML",
extra={"file_path": file_path},
) )
self.skipped_counter += 1
return None
# TODO handle redirects! PRAGUE X CB md = r[2]
# Redirect.objects.get_or_create( html = html_md.convert(md)
# site=site,
# old_path="/%s/%s.html"
# % (articlepath, slug),
# defaults={"is_permanent": True, "redirect_page": article},
# )
success_counter += 1 # meta_date = meta["date"]
# article_date = meta_date if isinstance(meta_date, date) else meta["date"].split()[0]
try:
title = meta["title"]
except TypeError:
logger.warning(
"Nelze importovat článek - YAML se neparsuje na dict",
extra={"article_meta": meta},
)
self.skipped_counter += 1
return None
if DistrictArticlePage.objects.filter(title=title).exists():
for article in DistrictArticlePage.objects.filter(title=meta["title"]):
# if article.date == parse_date(meta["date"].split()[0]):
msg = "Článek již existuje: %s" % article
stdout.write(msg)
# message_list.append({"level": INFO, "text": msg})
self.exists_counter += 1
return None
article = DistrictArticlePage()
# article.text = html
article.content = [("text", RichText(html))]
article.perex = self.get_perex(md)
if meta.get("date", None):
meta_date = meta["date"]
if isinstance(meta_date, date):
article.date = meta_date
else:
parsed_date = meta["date"].split()[0]
article.date = (
parsed_date[0:10] if parsed_date else timezone.now().date()
)
else: else:
msg = "ERROR: Nepodporovaná přípona souboru: %s" % ext article.date = timezone.now().date()
message_list.append({"level": ERROR, "text": msg})
stdout.write(msg) article.title = meta["title"]
else: article.author = meta.get("author", "Česká pirátská strana")
msg = "Skipped: %s" % fn
stdout.write(msg) article.seo_title = article.title + self.title_suffix
article.search_description = meta.get("description", "")
# for tag in meta['tags']:
# article.tags.add(tag)
if meta.get("image", None):
article.image = get_or_create_image(
self.path, meta["image"], self.collection, self.repo_name
)
if self.dry_run:
return article
if dry_run: try:
message_list.append({"level": WARNING, "text": msg}) self.article_parent_page.add_child(instance=article)
return success_counter stdout.write("Vytvářím článek: %s" % article)
rev = article.save_revision()
if meta.get("published", True):
rev.publish()
except Exception as e:
logger.warning(
"Nelze uložit importovaný článek",
extra={"article_title": article.title, "exception": e},
)
self.skipped_counter += 1
return article
self.success_counter += 1
return article
def perform_import(self) -> "List[dict]":
"""
Projde adresář článků a pokusí se zprocesovat Markdown do article.
Vrací list dict pro django messages (klíč levelu, text).
"""
for file_name in os.listdir(os.path.join(self.path, POSTS_DIR)):
# Případ podsložek (typicky po jednotlivých letech)
if os.path.isdir(os.path.join(self.path, POSTS_DIR, file_name)):
posts_sub_folder = os.path.join(self.path, POSTS_DIR, file_name)
for sub_file_name in os.listdir(posts_sub_folder):
file_path = os.path.join(posts_sub_folder, sub_file_name)
self.process_article(sub_file_name, file_path)
# Případ všech článků v jedné složce
else:
file_path = os.path.join(POSTS_DIR, file_name)
self.process_article(file_name, file_path)
self.create_django_messages()
return self.message_list
def process_article(self, file_name: str, file_path: str):
match = re.match(r"(\d*)-(\d*)-(\d*)-(.*)\.(.*)", file_name)
if match:
y = match.group(1)
m = match.group(2)
d = match.group(3)
slug = match.group(4)
ext = match.group(5)
if ext == "md":
article = self.import_post(file_path)
if article:
Redirect.objects.get_or_create(
site=self.site,
old_path="/%s/%s/%s/%s/%s"
% (self.article_path, y, m.zfill(2), d.zfill(2), slug),
defaults={"is_permanent": True, "redirect_page": article},
)
# TODO handle redirects! PRAGUE X CB
# Redirect.objects.get_or_create(
# site=site,
# old_path="/%s/%s.html"
# % (article_path, slug),
# defaults={"is_permanent": True, "redirect_page": article},
# )
else:
msg = "Nepodporovaná přípona souboru: %s" % ext
logger.warning(msg)
self.skipped_counter += 1
else:
msg = "Přeskočeno: %s" % file_name
logger.warning(msg)
self.skipped_counter += 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment