-
Tomáš Valenta authoredTomáš Valenta authored
services.py 4.19 KiB
import datetime
import logging
import io
import os
import requests_cache
from django.core.files import File
from main.models import MainHomePage, MainPersonPage
from .models import InstagramPost
logger = logging.getLogger()
class InstagramDownloadService:
"""
TODO
"""
def __init__(self, app_id: int, app_secret: str):
self.session = requests_cache.CachedSession("instagram_cache")
self.app_id = app_id
self.app_secret = app_secret
def get_user_info_list(self) -> list[str]:
access_block = MainHomePage.objects.first().instagram_access
homepage_access_list = [
(
block["value"]["name"],
block["value"]["access_token"]
)
for block in access_block.raw_data
]
people_access_list = []
for people_page in MainPersonPage.objects.all():
people_access_list += [
(
block["value"]["name"],
block["value"]["access_token"]
)
for block in people_page.instagram_access.raw_data
]
# Remove duplicates
return list({*people_access_list, *homepage_access_list})
def download_remote_image(self, image_url) -> (str, File):
try:
response = self.session.get(image_url)
response.raise_for_status()
except Exception as exc:
logger.warning(
"Error getting Instagram image at %s: %s",
image_url, exc
)
return "", None
return os.path.basename(image_url), File(io.BytesIO(response.content))
def get_user_data(self, access_token: str) -> dict:
user_data = self.session.get(
f"https://graph.instagram.com/v16.0/me?access_token={access_token}"
"&fields=id,username"
)
user_data.raise_for_status()
return user_data.json()
def get_recent_media(self, user_data: dict, access_token: str) -> list[dict]:
with self.session.cache_disabled():
recent_media = self.session.get(
f"https://graph.instagram.com/v16.0/{user_data['id']}/media?access_token="
f"{access_token}&fields=id,timestamp,caption,media_type,permalink,"
"media_url,thumbnail_url"
)
if not recent_media.ok:
logger.warning(
"Error getting media for user %s: %s",
user_data["id"],
recent_media.status_code
)
return []
logger.debug("Parsing Instagram feed: %s", recent_media)
return recent_media.json()["data"]
def parse_media_for_user(self, name: str, access_token: str) -> None:
user_data = self.get_user_data(access_token)
recent_media_json = self.get_recent_media(user_data, access_token)
if len(recent_media_json) == 0:
return
posts = []
for media_data in recent_media_json:
# Don't recreate existing posts'
if InstagramPost.objects.filter(remote_id=media_data["id"]).exists():
logging.info(
"Skipping Instagram post ID %s, already exists",
media_data["id"]
)
continue
post = InstagramPost(
remote_id=media_data["id"],
author_name=name,
author_username=user_data["username"],
timestamp=datetime.datetime.strptime(
media_data["timestamp"],
"%Y-%m-%dT%H:%M:%S%z",
),
caption=media_data["caption"],
url=media_data["permalink"],
)
post.image.save(
*self.download_remote_image(media_data["media_url"]),
False, # Don't save yet
)
post.save()
logger.info(
"Saved Instagram post ID %s",
post.remote_id,
)
def perform_update(self) -> None:
user_info_list = self.get_user_info_list()
media_list = []
for user_info in user_info_list:
self.parse_media_for_user(*user_info)