diff options
| author | A Farzat <a@farzat.xyz> | 2025-10-09 18:10:00 +0300 |
|---|---|---|
| committer | A Farzat <a@farzat.xyz> | 2025-10-09 19:51:11 +0300 |
| commit | 2a2af44eacf0e962853dba0b6ecae19fe18f9ea3 (patch) | |
| tree | 093f8bf465af54711dce6eb5f8eb97f75cb3718f | |
| parent | 9fb20553d9f9e21c012f730a728c33d368e09bf2 (diff) | |
| download | csca5028-2a2af44eacf0e962853dba0b6ecae19fe18f9ea3.tar.gz csca5028-2a2af44eacf0e962853dba0b6ecae19fe18f9ea3.zip | |
Add the ability to use YouTube API
This is to circumvent YouTube blocking.
| -rw-r--r-- | components/extractor/obtain_vid_info.py | 18 | ||||
| -rwxr-xr-x | data_analyser/__main__.py | 9 | ||||
| -rw-r--r-- | data_analyser/utils.py | 19 | ||||
| -rw-r--r-- | requirements.txt | 3 | ||||
| -rw-r--r-- | tests/obtain_vid_info.py | 6 | ||||
| -rw-r--r-- | tests/utils/vid_url_to_html.py | 2 |
6 files changed, 42 insertions, 15 deletions
diff --git a/components/extractor/obtain_vid_info.py b/components/extractor/obtain_vid_info.py index bfbe861..45e2bdc 100644 --- a/components/extractor/obtain_vid_info.py +++ b/components/extractor/obtain_vid_info.py @@ -1,9 +1,25 @@ +from sys import stderr +from traceback import print_exc from urllib.request import urlopen from bs4 import BeautifulSoup from isodate import parse_duration # type: ignore +from requests import get -def obtain_vid_duration(url: str, html: str = '') -> int: +def obtain_vid_duration(url: str, vid_id: str, html: str='', api_key: str='') -> int: + if api_key: + try: + data = get("https://www.googleapis.com/youtube/v3/videos", params={ + 'part': "contentDetails", + 'id': vid_id[9:], + 'key': api_key, + }).json() + duration_str = data['items'][0]['contentDetails']['duration'] + print(vid_id[9:], duration_str) + return int(parse_duration(duration_str).total_seconds()) + except: + print("Web scraping will be used due to an error with the following id:", vid_id, file=stderr) + print_exc() html = html or urlopen(url).read().decode('utf-8') soup = BeautifulSoup(html, 'html.parser') diff --git a/data_analyser/__main__.py b/data_analyser/__main__.py index 8b4a984..ad85c09 100755 --- a/data_analyser/__main__.py +++ b/data_analyser/__main__.py @@ -1,10 +1,15 @@ #!/usr/bin/env python +from os import getenv from time import sleep -from .utils import analyse_collection + +from dotenv import load_dotenv from components.database import subscriptions +from .utils import analyse_collection + +load_dotenv('.env') while True: - analyse_collection(subscriptions) + analyse_collection(subscriptions, getenv("YOUTUBE_API_KEY") or '') sleep(30) diff --git a/data_analyser/utils.py b/data_analyser/utils.py index 95dc2fe..921015d 100644 --- a/data_analyser/utils.py +++ b/data_analyser/utils.py @@ -1,31 +1,34 @@ +from traceback import print_exc + +from pymongo.collection import Collection + from components.subscriptions.main import Subscription from components.subscriptions.typing import SubsDict from components.videos import VideoTuple from components.extractor.obtain_vid_info import obtain_vid_duration -from pymongo.collection import Collection - -def analyse_video(vid_tuple: VideoTuple) -> VideoTuple: +def analyse_video(vid_tuple: VideoTuple, api_key: str='') -> VideoTuple: try: - duration = obtain_vid_duration(vid_tuple.link) + duration = obtain_vid_duration(vid_tuple.link, vid_tuple.id, api_key=api_key) except: + print_exc() duration = -2 return vid_tuple._replace(analysed=True, duration=duration) -def analyse_subscription(sub: Subscription) -> bool: +def analyse_subscription(sub: Subscription, api_key: str='') -> bool: updated = False for i, vid in enumerate(sub.videos): if not vid.analysed: - sub.videos[i] = analyse_video(vid) + sub.videos[i] = analyse_video(vid, api_key) updated = True return updated -def analyse_collection(subs_collection: Collection[SubsDict]) -> int: +def analyse_collection(subs_collection: Collection[SubsDict], api_key: str='') -> int: num_updated = 0 for sub_dict in subs_collection.find(): sub = Subscription(**sub_dict) sub._collection = subs_collection - if analyse_subscription(sub): + if analyse_subscription(sub, api_key): sub.update_videos() num_updated += 1 return num_updated diff --git a/requirements.txt b/requirements.txt index 53fab07..d37bcfb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,11 +22,14 @@ pathspec==0.12.1 pymongo==4.14.0 python-dotenv==1.1.1 pytz==2025.2 +requests==2.32.5 schedule==1.2.2 sentinels==1.1.1 sgmllib3k==1.0.0 soupsieve==2.8 types-Flask-Cors==6.0.0.20250809 +types-requests==2.32.4.20250913 typing_extensions==4.14.1 +urllib3==2.5.0 Werkzeug==3.1.3 yt-dlp==2025.9.26 diff --git a/tests/obtain_vid_info.py b/tests/obtain_vid_info.py index 701104d..2bdf330 100644 --- a/tests/obtain_vid_info.py +++ b/tests/obtain_vid_info.py @@ -6,12 +6,12 @@ from .utils.vid_url_to_html import get_vid_html_from_url class TestObtainVidInfo(TestCase): def test_obtain_vid_duration_from_shorts(self) -> None: url = "https://www.youtube.com/shorts/iD1Z7ccGyhk" - self.assertEqual(60, obtain_vid_duration(url, html=get_vid_html_from_url(url))) + self.assertEqual(60, obtain_vid_duration(url, '', html=get_vid_html_from_url(url))) def test_obtain_vid_duration_from_videos(self) -> None: url = "https://www.youtube.com/watch?v=WI4U1SVIO3I" - self.assertEqual(8*60+11, obtain_vid_duration(url, html=get_vid_html_from_url(url))) + self.assertEqual(8*60+11, obtain_vid_duration(url, '', html=get_vid_html_from_url(url))) def test_obtain_vid_duration_from_videos_with_params(self) -> None: url = "https://www.youtube.com/watch?v=k7RM-ot2NWY&list=PLZHQObOWTQDPD3MizzM2xVFitgF8hE_ab&index=2&pp=iAQB" - self.assertEqual(9*60+59, obtain_vid_duration(url, html=get_vid_html_from_url(url))) + self.assertEqual(9*60+59, obtain_vid_duration(url, '', html=get_vid_html_from_url(url))) diff --git a/tests/utils/vid_url_to_html.py b/tests/utils/vid_url_to_html.py index 60f6ccc..91fcb40 100644 --- a/tests/utils/vid_url_to_html.py +++ b/tests/utils/vid_url_to_html.py @@ -18,5 +18,5 @@ def get_vid_html_from_url(url: str) -> str: with open(f'tests/data/video@{extract_vid_id(url)}.html', 'r') as file: return file.read() -def obtain_vid_duration(url:str, html: str = '') -> int: +def obtain_vid_duration(url:str, vid_id:str, html:str='', api_key:str='') -> int: return get_random_vid_duration(url) |
