import logging import os from datetime import timedelta from typing import TYPE_CHECKING from urllib import request from django.core.files import File from django.utils import timezone from tweepy import Client from tweepy.errors import BadRequest from main.models import MainHomePage, MainPersonPage from .models import Tweet if TYPE_CHECKING: from tweepy import Media from tweepy import Tweet as TweetResponse from tweepy import User logger = logging.getLogger() class TweetDownloadService: """ Service class starající se o update tweetů z Twitter API, v současné chvíli bere tweety z účtu nastavených v (první) MainHomePage stránce (HP pirati.cz). """ client: Client days_back: int def __init__(self, bearer_token, days_back=1): if not bearer_token: raise RuntimeError("Twitter bearer token not set, cannot update tweets") self.client = Client(bearer_token=bearer_token) self.days_back = days_back @staticmethod def download_remote_image(image_url) -> (str, File): try: response = request.urlretrieve(image_url) except Exception as exc: logger.warning(exc) return "", None return os.path.basename(image_url), File(open(response[0], "rb")) @staticmethod def get_existing_tweet_id_list() -> list[int]: """ Vrací IDs už uložených Tweetů - možná by stálo za to brát jen z určitého časového období... """ return Tweet.objects.values_list("twitter_id", flat=True) @staticmethod def get_tweet_media_url(media_key, media_list): return next(m.url for m in media_list if m.media_key == media_key) def get_tweets_response(self, user_id) -> (list["TweetResponse"], list["Media"]): """ Vrací list tweetů (objektů) pro daného Twitter uživatele. """ tweets_response = self.client.get_users_tweets( user_id, exclude=["retweets"], expansions=[ "author_id", "attachments.media_keys", "entities.mentions.username", ], max_results=100, media_fields=["url"], # TODO use this? download need probably start_time=timezone.now() - timedelta(days=self.days_back), tweet_fields=["author_id", "created_at"], user_fields=["name", "username"], ) return tweets_response.data or [], tweets_response[1].get("media", []) def get_user_list_data(self) -> list["User"]: twitter_usernames_block = MainHomePage.objects.first().twitter_usernames person_username_list = ( MainPersonPage.objects.filter(twitter_username__isnull=False) .values_list("twitter_username", flat=True) .distinct() ) homepage_username_list = [ username_data["value"] for username_data in twitter_usernames_block.raw_data ] # kvůli duplicitám udělám list/set/list konverzi username_list = list({*person_username_list, *homepage_username_list}) user_data_list = [] for username in username_list: try: user_data_list.append(self.get_user_response(username)) except BadRequest: logger.error( "Cannot download tweets for the username", extra={"username": username}, ) return user_data_list def get_user_response(self, username) -> "User": """ Vrací informace o daném uživateli. """ user_response = self.client.get_user( username=username, user_fields=["profile_image_url"], # id, name, username enabled by default ) return user_response.data def perform_update(self) -> int: """ Obaluje celý proces downloadu Tweetů z API do DB. """ existing_tweet_id_list = self.get_existing_tweet_id_list() user_data_list = self.get_user_list_data() tweets_to_save = [] for user_data in user_data_list: tweet_resp_list, media_list = self.get_tweets_response(user_id=user_data.id) for tweet_response in tweet_resp_list: if str(tweet_response.id) in existing_tweet_id_list: continue # vyzobej data z responses tweet = Tweet( author_name=user_data.name, author_username=user_data.username, text=tweet_response.text.split("https://t.co")[0], twitter_id=str(tweet_response.id), ) # ulož obrázek Twitter účtu do media tweet.author_img.save( *self.download_remote_image(user_data.profile_image_url), False # to prevent model save before bulk create ) # zkus dohledat obrázek pro Tweet if tweet_response.attachments: self.try_find_image_for_tweet(tweet, tweet_response, media_list) # přidej do seznamu k uložení tweets_to_save.append(tweet) return Tweet.objects.bulk_create(tweets_to_save) def try_find_image_for_tweet( self, tweet: Tweet, tweet_response: "TweetResponse", media_list: list["Media"] ): tweet_media_keys = tweet_response.attachments.get("media_keys", []) if tweet_media_keys: img_url = self.get_tweet_media_url(tweet_media_keys[0], media_list) if img_url: # ne vždycky je obrázek v media_listu... tweet.image.save( *self.download_remote_image(image_url=img_url), False # to prevent model save before bulk create )