Skip to content
Snippets Groups Projects
services.py 4.19 KiB
import datetime
import logging
import io
import os
import requests_cache

from django.core.files import File
from main.models import MainHomePage, MainPersonPage

from .models import InstagramPost


logger = logging.getLogger()


class InstagramDownloadService:
    """
    TODO
    """

    def __init__(self, app_id: int, app_secret: str):
        self.session = requests_cache.CachedSession("instagram_cache")
        
        self.app_id = app_id
        self.app_secret = app_secret

    def get_user_info_list(self) -> list[str]:
        access_block = MainHomePage.objects.first().instagram_access

        homepage_access_list = [
            (
                block["value"]["name"],
                block["value"]["access_token"]
            )
            for block in access_block.raw_data
        ]

        people_access_list = []

        for people_page in MainPersonPage.objects.all():
            people_access_list += [
                (
                    block["value"]["name"],
                    block["value"]["access_token"]
                )
                for block in people_page.instagram_access.raw_data
            ]

        # Remove duplicates
        return list({*people_access_list, *homepage_access_list})

    def download_remote_image(self, image_url) -> (str, File):
        try:
            response = self.session.get(image_url)
            response.raise_for_status()
        except Exception as exc:
            logger.warning(
                "Error getting Instagram image at %s: %s",
                image_url, exc
            )
            return "", None

        return os.path.basename(image_url), File(io.BytesIO(response.content))

    def get_user_data(self, access_token: str) -> dict:
        user_data = self.session.get(
            f"https://graph.instagram.com/v16.0/me?access_token={access_token}"
            "&fields=id,username"
        )
        user_data.raise_for_status()

        return user_data.json()

    def get_recent_media(self, user_data: dict, access_token: str) -> list[dict]:
        with self.session.cache_disabled():
            recent_media = self.session.get(
                f"https://graph.instagram.com/v16.0/{user_data['id']}/media?access_token="
                f"{access_token}&fields=id,timestamp,caption,media_type,permalink,"
                "media_url,thumbnail_url"
            )

        if not recent_media.ok:
            logger.warning(
                "Error getting media for user %s: %s",
                user_data["id"],
                recent_media.status_code
            )

            return []

        logger.debug("Parsing Instagram feed: %s", recent_media)

        return recent_media.json()["data"]

    def parse_media_for_user(self, name: str, access_token: str) -> None:
        user_data = self.get_user_data(access_token)
        recent_media_json = self.get_recent_media(user_data, access_token)

        if len(recent_media_json) == 0:
            return

        posts = []

        for media_data in recent_media_json:
            # Don't recreate existing posts'
            if InstagramPost.objects.filter(remote_id=media_data["id"]).exists():
                logging.info(
                    "Skipping Instagram post ID %s, already exists",
                    media_data["id"]
                )

                continue

            post = InstagramPost(
                remote_id=media_data["id"],
                author_name=name,
                author_username=user_data["username"],
                timestamp=datetime.datetime.strptime(
                    media_data["timestamp"],
                    "%Y-%m-%dT%H:%M:%S%z",
                ),
                caption=media_data["caption"],
                url=media_data["permalink"],
            )

            post.image.save(
                *self.download_remote_image(media_data["media_url"]),
                False,  # Don't save yet
            )

            post.save()

            logger.info(
                "Saved Instagram post ID %s",
                post.remote_id,
            )

    def perform_update(self) -> None:
        user_info_list = self.get_user_info_list()

        media_list = []

        for user_info in user_info_list:
            self.parse_media_for_user(*user_info)