Skip to content
Snippets Groups Projects
Select Git revision
  • f5529bc1a1a9044b7a8109ea0a9eabf09e6dcf50
  • test default protected
  • master protected
  • feat/custom-css
  • feat/redesign-improvements-10
  • feat/redesign-improvements-8
  • feat/redesign-fixes-3
  • feat/pirstan-changes
  • feat/separate-import-thread
  • feat/dary-improvements
  • features/add-pdf-page
  • features/add-typed-table
  • features/fix-broken-calendar-categories
  • features/add-embed-to-articles
  • features/create-mastodon-feed-block
  • features/add-custom-numbering-for-candidates
  • features/add-timeline
  • features/create-wordcloud-from-article-page
  • features/create-collapsible-extra-legal-info
  • features/extend-hero-banner
  • features/add-link-to-images
21 results

services.py

Blame
  • services.py 3.14 KiB
    import io
    import logging
    import os
    
    import instaloader
    import requests
    from django.conf import settings
    from django.core.files import File
    
    from main.models import MainHomePage, MainPersonPage
    
    from .models import InstagramPost
    
    logger = logging.getLogger()
    
    
    class InstagramDownloadService:
        def get_usernames(self) -> list[str]:
            access_block = MainHomePage.objects.first().instagram_access
    
            username_list = [block["value"]["username"] for block in access_block.raw_data]
    
            for person_page in MainPersonPage.objects.all():
                if (
                    person_page.instagram_username is None
                    or person_page.instagram_username in username_list
                ):
                    continue
    
                username_list.append(person_page.instagram_username)
    
            return username_list
    
        def download_remote_image(self, image_url) -> (str, File):
            try:
                response = requests.get(image_url)
                response.raise_for_status()
            except Exception as exc:
                logger.warning("Error getting Instagram image at %s: %s", image_url, exc)
                return "", None
    
            return os.path.basename(image_url), File(io.BytesIO(response.content))
    
        def parse_media_for_user(self, username: str) -> None:
            loader = instaloader.Instaloader()
    
            if settings.INSTAGRAM_SESSION and settings.INSTAGRAM_USERNAME:
                loader.load_session(settings.INSTAGRAM_USERNAME, settings.INSTAGRAM_SESSION)
    
            profile = instaloader.Profile.from_username(loader.context, username)
    
            post_position = 0
    
            for remote_post in profile.get_posts():
                if post_position == 64:
                    # Don't go past 64 saved posts
                    return
    
                post_position += 1
    
                if remote_post.is_video:
                    logger.info(
                        "Instagram post ID %s is a video, skipping.", remote_post.shortcode
                    )
    
                # Don't recreate existing posts
                if InstagramPost.objects.filter(remote_id=remote_post.shortcode).exists():
                    logging.info(
                        "Skipping Instagram post ID %s, already exists",
                        remote_post.shortcode,
                    )
    
                    continue
    
                caption = remote_post.caption
    
                if len(caption) > 255:
                    caption = caption[:255] + "..."
    
                local_post_instance = InstagramPost(
                    remote_id=remote_post.shortcode,
                    author_name=profile.full_name,
                    author_username=profile.username,
                    timestamp=remote_post.date_local,
                    caption=caption,
                    url=f"https://instagram.com/p/{remote_post.shortcode}",
                )
    
                local_post_instance.image.save(
                    *self.download_remote_image(remote_post.url),
                    False,  # Don't save yet
                )
    
                local_post_instance.save()
    
                logger.info(
                    "Saved Instagram post ID %s",
                    remote_post.shortcode,
                )
    
        def perform_update(self) -> None:
            for username in self.get_usernames():
                self.parse_media_for_user(username)