diff --git a/devine/commands/search.py b/devine/commands/search.py new file mode 100644 index 0000000..9f0b344 --- /dev/null +++ b/devine/commands/search.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +import logging +import re +import sys +from typing import Any, Optional + +import click +import yaml +from rich.padding import Padding +from rich.rule import Rule +from rich.tree import Tree + +from devine.commands.dl import dl +from devine.core.config import config +from devine.core.console import console +from devine.core.constants import context_settings +from devine.core.proxies import Basic, Hola, NordVPN +from devine.core.service import Service +from devine.core.services import Services +from devine.core.utilities import get_binary_path +from devine.core.utils.click_types import ContextData +from devine.core.utils.collections import merge_dict + + +@click.command( + short_help="Search for titles from a Service.", + cls=Services, + context_settings=dict( + **context_settings, + token_normalize_func=Services.get_tag + )) +@click.option("-p", "--profile", type=str, default=None, + help="Profile to use for Credentials and Cookies (if available).") +@click.option("--proxy", type=str, default=None, + help="Proxy URI to use. If a 2-letter country is provided, it will try get a proxy from the config.") +@click.option("--no-proxy", is_flag=True, default=False, + help="Force disable all proxy use.") +@click.pass_context +def search( + ctx: click.Context, + no_proxy: bool, + profile: Optional[str] = None, + proxy: Optional[str] = None +): + if not ctx.invoked_subcommand: + raise ValueError("A subcommand to invoke was not specified, the main code cannot continue.") + + log = logging.getLogger("search") + + service = Services.get_tag(ctx.invoked_subcommand) + profile = profile + + if profile: + log.info(f"Using profile: '{profile}'") + + with console.status("Loading Service Config...", spinner="dots"): + service_config_path = Services.get_path(service) / config.filenames.config + if service_config_path.exists(): + service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8")) + log.info("Service Config loaded") + else: + service_config = {} + merge_dict(config.services.get(service), service_config) + + proxy_providers = [] + if no_proxy: + ctx.params["proxy"] = None + else: + with console.status("Loading Proxy Providers...", spinner="dots"): + if config.proxy_providers.get("basic"): + proxy_providers.append(Basic(**config.proxy_providers["basic"])) + if config.proxy_providers.get("nordvpn"): + proxy_providers.append(NordVPN(**config.proxy_providers["nordvpn"])) + if get_binary_path("hola-proxy"): + proxy_providers.append(Hola()) + for proxy_provider in proxy_providers: + log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}") + + if proxy: + requested_provider = None + if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE): + # requesting proxy from a specific proxy provider + requested_provider, proxy = proxy.split(":", maxsplit=1) + if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE): + proxy = proxy.lower() + with console.status(f"Getting a Proxy to {proxy}...", spinner="dots"): + if requested_provider: + proxy_provider = next(( + x + for x in proxy_providers + if x.__class__.__name__.lower() == requested_provider + ), None) + if not proxy_provider: + log.error(f"The proxy provider '{requested_provider}' was not recognised.") + sys.exit(1) + proxy_uri = proxy_provider.get_proxy(proxy) + if not proxy_uri: + log.error(f"The proxy provider {requested_provider} had no proxy for {proxy}") + sys.exit(1) + proxy = ctx.params["proxy"] = proxy_uri + log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}") + else: + for proxy_provider in proxy_providers: + proxy_uri = proxy_provider.get_proxy(proxy) + if proxy_uri: + proxy = ctx.params["proxy"] = proxy_uri + log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}") + break + else: + log.info(f"Using explicit Proxy: {proxy}") + + ctx.obj = ContextData( + config=service_config, + cdm=None, + proxy_providers=proxy_providers, + profile=profile + ) + + +@search.result_callback() +def result(service: Service, profile: Optional[str] = None, **_: Any) -> None: + log = logging.getLogger("search") + + service_tag = service.__class__.__name__ + + with console.status("Authenticating with Service...", spinner="dots"): + cookies = dl.get_cookie_jar(service_tag, profile) + credential = dl.get_credentials(service_tag, profile) + service.authenticate(cookies, credential) + if cookies or credential: + log.info("Authenticated with Service") + + search_results = Tree("Search Results", hide_root=True) + with console.status("Searching...", spinner="dots"): + for result in service.search(): + result_text = f"[bold text]{result.title}[/]" + if result.url: + result_text = f"[link={result.url}]{result_text}[/link]" + if result.label: + result_text += f" [pink]{result.label}[/]" + if result.description: + result_text += f"\n[text2]{result.description}[/]" + result_text += f"\n[bright_black]id: {result.id}[/]" + search_results.add(result_text + "\n") + + # update cookies + cookie_file = dl.get_cookie_path(service_tag, profile) + if cookie_file: + dl.save_cookies(cookie_file, service.session.cookies) + + console.print(Padding( + Rule(f"[rule.text]{len(search_results.children)} Search Results"), + (1, 2) + )) + + if search_results.children: + console.print(Padding( + search_results, + (0, 5) + )) + else: + console.print(Padding( + "[bold text]No matches[/]\n[bright_black]Please check spelling and search again....[/]", + (0, 5) + )) diff --git a/devine/core/search_result.py b/devine/core/search_result.py new file mode 100644 index 0000000..916cc2f --- /dev/null +++ b/devine/core/search_result.py @@ -0,0 +1,44 @@ +from typing import Optional, Union + + +class SearchResult: + def __init__( + self, + id_: Union[str, int], + title: str, + description: Optional[str] = None, + label: Optional[str] = None, + url: Optional[str] = None + ): + """ + A Search Result for any support Title Type. + + Parameters: + id_: The search result's Title ID. + title: The primary display text, e.g., the Title's Name. + description: The secondary display text, e.g., the Title's Description or + further title information. + label: The tertiary display text. This will typically be used to display + an informative label or tag to the result. E.g., "unavailable", the + title's price tag, region, etc. + url: A hyperlink to the search result or title's page. + """ + if not isinstance(id_, (str, int)): + raise TypeError(f"Expected id_ to be a {str} or {int}, not {type(id_)}") + if not isinstance(title, str): + raise TypeError(f"Expected title to be a {str}, not {type(title)}") + if not isinstance(description, (str, type(None))): + raise TypeError(f"Expected description to be a {str}, not {type(description)}") + if not isinstance(label, (str, type(None))): + raise TypeError(f"Expected label to be a {str}, not {type(label)}") + if not isinstance(url, (str, type(None))): + raise TypeError(f"Expected url to be a {str}, not {type(url)}") + + self.id = id_ + self.title = title + self.description = description + self.label = label + self.url = url + + +__all__ = ("SearchResult",) diff --git a/devine/core/service.py b/devine/core/service.py index 423b408..40480d5 100644 --- a/devine/core/service.py +++ b/devine/core/service.py @@ -1,6 +1,7 @@ import base64 import logging from abc import ABCMeta, abstractmethod +from collections.abc import Generator from http.cookiejar import CookieJar from typing import Optional, Union from urllib.parse import urlparse @@ -16,6 +17,7 @@ from devine.core.config import config from devine.core.console import console from devine.core.constants import AnyTrack from devine.core.credential import Credential +from devine.core.search_result import SearchResult from devine.core.titles import Title_T, Titles_T from devine.core.tracks import Chapters, Tracks from devine.core.utilities import get_ip_info @@ -123,6 +125,17 @@ class Service(metaclass=ABCMeta): raise TypeError(f"Expected cookies to be a {CookieJar}, not {cookies!r}.") self.session.cookies.update(cookies) + def search(self) -> Generator[SearchResult, None, None]: + """ + Search by query for titles from the Service. + + The query must be taken as a CLI argument by the Service class. + Ideally just re-use the title ID argument (i.e. self.title). + + Search results will be displayed in the order yielded. + """ + raise NotImplementedError(f"Search functionality has not been implemented by {self.__class__.__name__}") + def get_widevine_service_certificate(self, *, challenge: bytes, title: Title_T, track: AnyTrack) \ -> Union[bytes, str]: """