summaryrefslogtreecommitdiff
path: root/tests/analyser.py
blob: acb4d446d84bcf905c331091c1630b7c9e76080f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from hashlib import md5
from typing import Any, Dict
from unittest import TestCase
from unittest.mock import MagicMock, patch

from mongomock import MongoClient
from pymongo.collection import Collection

from components.subscriptions.main import Subscription
from components.subscriptions.typing import SubsDict
from components.videos import VideoTuple
from components.ytdlp import obtain_vid_info
from data_analyser.utils import analyse_video, analyse_subscription, analyse_collection

def url_based_extract_info(url: str, download: Any=None) -> Dict[str, Any]:
    hash = md5(url.encode()).hexdigest()
    hash_int = int(hash[:4], 16)
    seconds = hash_int %  60
    minutes = hash_int // 60
    hours   = minutes  // 60
    minutes = minutes  %  60
    if hours:
        duration_string = "%d:%02d:%02d" % (hours, minutes, seconds)
    elif minutes:
        duration_string = "%d:%02d" % (minutes, seconds)
    else:
        duration_string = "%d" % (seconds, )
    return {
        "duration_string": duration_string,
        "other_key": "other_value",
        "other key": "other value",
    }

class TestAnalyser(TestCase):
    def setUp(self) -> None:
        self.client: MongoClient[Any] = MongoClient(tz_aware=True)
        self.collection: Collection[SubsDict] = self.client.db.collection

        self.sub1 = Subscription(
            _id="yt:channel:hlgI3UHCOnwUGzWzbJ3H5w",
            link=r"tests/data/feed@ytnnews24@001.xml",
            time_between_fetches=1,
        )
        self.sub1._collection = self.collection
        self.sub1.insert()
        self.sub1.fetch()

        self.mock_ydl_class = patch('components.ytdlp.YoutubeDL').start()
        self.mock_ydl_instance = MagicMock()
        self.mock_ydl_class.return_value.__enter__.return_value = self.mock_ydl_instance
        self.mock_ydl_instance.extract_info.side_effect = url_based_extract_info
        self.addCleanup(patch.stopall)

    def test_obtain_vid_info(self) -> None:
        result = obtain_vid_info(self.sub1.videos[0].link)
        expected_duration = url_based_extract_info(self.sub1.videos[0].link)["duration_string"]
        self.assertEqual(result, {"duration_string": expected_duration})

    def test_obtain_vid_info_with_exception(self) -> None:
        self.mock_ydl_instance.extract_info.side_effect = Exception("Network error")

        result = obtain_vid_info(self.sub1.videos[0].link)
        expected_keys = {"duration_string"}
        actual_keys = set(result.keys())
        self.assertEqual(expected_keys, actual_keys,
                         "Expected keys %s, got %s." % (expected_keys, actual_keys))
        self.assertRegex(result["duration_string"], "^-1:[0-5][0-9]$")

    def test_analyse_video(self) -> None:
        modified_vid = analyse_video(self.sub1.videos[1])

        expected_duration = url_based_extract_info(self.sub1.videos[1].link)["duration_string"]
        self.assertIsInstance(modified_vid, VideoTuple)
        self.assertEqual(modified_vid.duration_string, expected_duration)
        self.assertTrue(modified_vid.analysed)

    def test_analyse_video_with_exception(self) -> None:
        self.mock_ydl_instance.extract_info.side_effect = Exception("Network error")

        modified_vid = analyse_video(self.sub1.videos[0])
        self.assertIsInstance(modified_vid, VideoTuple)
        self.assertRegex(modified_vid.duration_string, "^-1:[0-5][0-9]$")
        self.assertTrue(modified_vid.analysed)

    def test_analyse_subscription(self) -> None:
        self.assertTrue(analyse_subscription(self.sub1))
        for vid in self.sub1.videos:
            expected_duration = url_based_extract_info(vid.link)["duration_string"]
            self.assertEqual(vid.duration_string, expected_duration)
            self.assertTrue(vid.analysed)

    def test_analyse_subscription_with_further_fetch(self) -> None:
        self.assertTrue(analyse_subscription(self.sub1))
        for vid in self.sub1.videos:
            expected_duration = url_based_extract_info(vid.link)["duration_string"]
            self.assertEqual(vid.duration_string, expected_duration)
            self.assertTrue(vid.analysed)
        self.sub1.link = r"tests/data/feed@ytnnews24@002.xml"
        self.sub1.fetch()
        self.assertTrue(analyse_subscription(self.sub1))
        for vid in self.sub1.videos:
            expected_duration = url_based_extract_info(vid.link)["duration_string"]
            self.assertEqual(vid.duration_string, expected_duration)
            self.assertTrue(vid.analysed)

    def test_analyse_subscription_without_further_fetch(self) -> None:
        self.assertTrue(analyse_subscription(self.sub1))
        for vid in self.sub1.videos:
            expected_duration = url_based_extract_info(vid.link)["duration_string"]
            self.assertEqual(vid.duration_string, expected_duration)
            self.assertTrue(vid.analysed)
        self.assertFalse(analyse_subscription(self.sub1))

    def test_analyse_collection(self) -> None:
        self.assertEqual(analyse_collection(self.collection), 1)
        for sub_dict in self.collection.find():
            for vid in map(VideoTuple._make, sub_dict["videos"]):
                expected_duration = url_based_extract_info(vid.link)["duration_string"]
                self.assertEqual(vid.duration_string, expected_duration)
                self.assertTrue(vid.analysed)
        self.sub2 = Subscription(
            _id="yt:channel:7YOGHUfC1Tb6E4pudI9STA",
            link=r"tests/data/feed@mentaloutlaw@001.xml",
            time_between_fetches=1,
        )
        self.sub2._collection = self.collection
        self.sub2.insert()
        self.sub2.fetch()
        self.assertEqual(analyse_collection(self.collection), 1)
        for sub_dict in self.collection.find():
            sub = Subscription(**sub_dict)
            for vid in map(VideoTuple._make, sub_dict["videos"]):
                expected_duration = url_based_extract_info(vid.link)["duration_string"]
                self.assertEqual(vid.duration_string, expected_duration)
                self.assertTrue(vid.analysed)

    def tearDown(self) -> None:
        self.client.close()