import datetime import logging import io import os import requests_cache from django.core.files import File from main.models import MainHomePage, MainPersonPage from .models import InstagramPost logger = logging.getLogger() class InstagramDownloadService: """ TODO """ def __init__(self, app_id: int, app_secret: str): self.session = requests_cache.CachedSession("instagram_cache") self.app_id = app_id self.app_secret = app_secret def get_user_info_list(self) -> list[str]: access_block = MainHomePage.objects.first().instagram_access homepage_access_list = [ ( block["value"]["name"], block["value"]["access_token"] ) for block in access_block.raw_data ] people_access_list = [] for people_page in MainPersonPage.objects.all(): people_access_list += [ ( block["value"]["name"], block["value"]["access_token"] ) for block in people_page.instagram_access.raw_data ] # Remove duplicates return list({*people_access_list, *homepage_access_list}) def download_remote_image(self, image_url) -> (str, File): try: response = self.session.get(image_url) response.raise_for_status() except Exception as exc: logger.warning( "Error getting Instagram image at %s: %s", image_url, exc ) return "", None return os.path.basename(image_url), File(io.BytesIO(response.content)) def get_user_data(self, access_token: str) -> dict: user_data = self.session.get( f"https://graph.instagram.com/v16.0/me?access_token={access_token}" "&fields=id,username" ) user_data.raise_for_status() return user_data.json() def get_recent_media(self, user_data: dict, access_token: str) -> list[dict]: with self.session.cache_disabled(): recent_media = self.session.get( f"https://graph.instagram.com/v16.0/{user_data['id']}/media?access_token=" f"{access_token}&fields=id,timestamp,caption,media_type,permalink," "media_url,thumbnail_url" ) if not recent_media.ok: logger.warning( "Error getting media for user %s: %s", user_data["id"], recent_media.status_code ) return [] logger.debug("Parsing Instagram feed: %s", recent_media) return recent_media.json()["data"] def parse_media_for_user(self, name: str, access_token: str) -> None: user_data = self.get_user_data(access_token) recent_media_json = self.get_recent_media(user_data, access_token) if len(recent_media_json) == 0: return posts = [] for media_data in recent_media_json: # Don't recreate existing posts' if InstagramPost.objects.filter(remote_id=media_data["id"]).exists(): logging.info( "Skipping Instagram post ID %s, already exists", media_data["id"] ) continue post = InstagramPost( remote_id=media_data["id"], author_name=name, author_username=user_data["username"], timestamp=datetime.datetime.strptime( media_data["timestamp"], "%Y-%m-%dT%H:%M:%S%z", ), caption=media_data["caption"], url=media_data["permalink"], ) post.image.save( *self.download_remote_image(media_data["media_url"]), False, # Don't save yet ) post.save() logger.info( "Saved Instagram post ID %s", post.remote_id, ) def perform_update(self) -> None: user_info_list = self.get_user_info_list() media_list = [] for user_info in user_info_list: self.parse_media_for_user(*user_info)