aboutsummaryrefslogtreecommitdiff
path: root/components/subscriptions/main.py
blob: af9d891019936fac7a6772db1582920baa5ed042 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from dataclasses import dataclass, field, asdict
from datetime import datetime, UTC
from sys import stderr
from typing import TypedDict, List, cast, Dict, Any
from bson.objectid import ObjectId
from feedparser import parse # type: ignore
from pymongo.collection import Collection
from pymongo.results import InsertOneResult, UpdateResult
from schedule import Job, Scheduler
from components.database import subscriptions
from components.subscriptions.typing import SubsDict
from components.videos import VideoTuple

default_scheduler = Scheduler()

@dataclass
class Subscription:
    _id: str
    link: str
    time_between_fetches: int
    last_fetch: datetime = datetime.min.replace(tzinfo=UTC)
    last_video_update: datetime = datetime.min.replace(tzinfo=UTC)
    videos: List[VideoTuple] = field(default_factory=list)
    subscribers: List[ObjectId] = field(default_factory=list)

    def __post_init__(self) -> None:
        self._collection: Collection[SubsDict] = subscriptions
        self._scheduler: Scheduler = default_scheduler
        if len(self.videos) and type(self.videos[0]) != VideoTuple:
            self.videos = [VideoTuple._make(vid) for vid in self.videos]

    def initialise_job(self) -> None:
        self._job: Job = self._scheduler.every(self.time_between_fetches).minutes.do(self.fetch)
        if self.last_fetch > datetime.min.replace(tzinfo=UTC):
            self._job.next_run += self.last_fetch - datetime.now(tz=UTC)

    def fetch(self) -> None:
        try:
            rss = parse(self.link)
        except Exception as e:
            print("Ran into an exception while fetching", self._id + ":", e, file=stderr)
            return
        for vid in map(VideoTuple.from_rss_entry, rss.entries):
            if vid.published > self.last_video_update:
                self.videos.append(vid)
            elif vid.updated > self.last_video_update:
                for i, old_vid in enumerate(self.videos):
                    if vid.id == old_vid.id:
                        self.videos[i] = vid
                        break
        last_video_update = max((vid.updated for vid in self.videos))
        self.last_fetch = datetime.now(tz=UTC)
        if last_video_update > self.last_video_update:
            print("Updating", self._id, end=", ")
            print("New vid count:", len(self.videos))
            self.last_video_update = last_video_update
            self.update_fetch(videos=True)
        else:
            self.update_fetch()
        print("Fetched", self._id, "at", self.last_fetch)

    def asdict(self) -> SubsDict:
        return cast(SubsDict, asdict(self))

    def insert(self) -> InsertOneResult:
        return self._collection.insert_one(self.asdict())

    def update_fetch(self, videos: bool=False) -> UpdateResult:
        updated_values: Dict[str, Any] = { "last_fetch": self.last_fetch, }
        if videos:
            updated_values["videos"] = self.videos
            updated_values["last_video_update"] = self.last_video_update
        return self._collection.update_one(
            {"_id": self._id},
            {"$set": updated_values},
        )