From 1259a26b1461195086b736be2c809a8bb7ec04f1 Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Mon, 19 Feb 2024 18:14:50 +0000 Subject: [PATCH] Create and use new utility to get file extension from URLs/Paths Fixes #73 --- devine/core/downloaders/aria2c.py | 4 ++-- devine/core/downloaders/curl_impersonate.py | 3 ++- devine/core/downloaders/requests.py | 3 ++- devine/core/manifests/hls.py | 6 +++--- devine/core/utilities.py | 21 +++++++++++++++++++++ 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/devine/core/downloaders/aria2c.py b/devine/core/downloaders/aria2c.py index 0cea146..de4d1ef 100644 --- a/devine/core/downloaders/aria2c.py +++ b/devine/core/downloaders/aria2c.py @@ -18,7 +18,7 @@ from rich.text import Text from devine.core.config import config from devine.core.console import console from devine.core.constants import DOWNLOAD_CANCELLED -from devine.core.utilities import get_binary_path, get_free_port +from devine.core.utilities import get_binary_path, get_extension, get_free_port def rpc(caller: Callable, secret: str, method: str, params: Optional[list[Any]] = None) -> Any: @@ -107,7 +107,7 @@ def download( url_data: dict[str, Any] = url url_filename = filename.format( i=i, - ext=Path(url_data["url"]).suffix + ext=get_extension(url_data["url"]) ) url_text = url_data["url"] url_text += f"\n\tdir={output_dir}" diff --git a/devine/core/downloaders/curl_impersonate.py b/devine/core/downloaders/curl_impersonate.py index fc23c77..c227c31 100644 --- a/devine/core/downloaders/curl_impersonate.py +++ b/devine/core/downloaders/curl_impersonate.py @@ -11,6 +11,7 @@ from rich import filesize from devine.core.config import config from devine.core.constants import DOWNLOAD_CANCELLED +from devine.core.utilities import get_extension MAX_ATTEMPTS = 5 RETRY_WAIT = 2 @@ -208,7 +209,7 @@ def curl_impersonate( for i, url in enumerate(urls) for save_path in [output_dir / filename.format( i=i, - ext=Path((url["url"]) if isinstance(url, dict) else url).suffix + ext=get_extension(url["url"] if isinstance(url, dict) else url) )] ] diff --git a/devine/core/downloaders/requests.py b/devine/core/downloaders/requests.py index af6f100..296b7dd 100644 --- a/devine/core/downloaders/requests.py +++ b/devine/core/downloaders/requests.py @@ -10,6 +10,7 @@ from requests import Session from rich import filesize from devine.core.constants import DOWNLOAD_CANCELLED +from devine.core.utilities import get_extension MAX_ATTEMPTS = 5 RETRY_WAIT = 2 @@ -205,7 +206,7 @@ def requests( for i, url in enumerate(urls) for save_path in [output_dir / filename.format( i=i, - ext=Path((url["url"]) if isinstance(url, dict) else url).suffix + ext=get_extension(url["url"] if isinstance(url, dict) else url) )] ] diff --git a/devine/core/manifests/hls.py b/devine/core/manifests/hls.py index dd1fd66..580f924 100644 --- a/devine/core/manifests/hls.py +++ b/devine/core/manifests/hls.py @@ -24,7 +24,7 @@ from devine.core.downloaders import downloader from devine.core.downloaders import requests as requests_downloader from devine.core.drm import DRM_T, ClearKey, Widevine from devine.core.tracks import Audio, Subtitle, Tracks, Video -from devine.core.utilities import get_binary_path, is_close_match, try_ensure_utf8 +from devine.core.utilities import get_binary_path, get_extension, is_close_match, try_ensure_utf8 class HLS: @@ -292,7 +292,7 @@ class HLS: for i, segment in enumerate(segments): is_last_segment = (i + 1) == total_segments name_len = len(str(total_segments)) - segment_file_ext = Path(segment.uri).suffix + segment_file_ext = get_extension(segment.uri) segment_file_path = segment_save_dir / f"{str(i).zfill(name_len)}{segment_file_ext}" def merge(to: Path, via: list[Path], delete: bool = False, include_map_data: bool = False): @@ -336,7 +336,7 @@ class HLS: range_len = (last_segment_i - first_segment_i) + 1 segment_range = f"{str(first_segment_i).zfill(name_len)}-{str(last_segment_i).zfill(name_len)}" - merged_path = segment_save_dir / f"{segment_range}{Path(segments[last_segment_i].uri).suffix}" + merged_path = segment_save_dir / f"{segment_range}{get_extension(segments[last_segment_i].uri)}" decrypted_path = segment_save_dir / f"{merged_path.stem}_decrypted{merged_path.suffix}" files = [ diff --git a/devine/core/utilities.py b/devine/core/utilities.py index 5232a7a..3cf02bb 100644 --- a/devine/core/utilities.py +++ b/devine/core/utilities.py @@ -1,6 +1,7 @@ import ast import contextlib import importlib.util +import os import re import shutil import socket @@ -12,6 +13,7 @@ from datetime import datetime from pathlib import Path from types import ModuleType from typing import Optional, Sequence, Union +from urllib.parse import ParseResult, urlparse import chardet import requests @@ -256,6 +258,25 @@ def get_free_port() -> int: return s.getsockname()[1] +def get_extension(value: Union[str, Path, ParseResult]) -> Optional[str]: + """ + Get a URL or Path file extension/suffix. + + Note: The returned value will begin with `.`. + """ + if isinstance(value, ParseResult): + value_parsed = value + elif isinstance(value, (str, Path)): + value_parsed = urlparse(str(value)) + else: + raise TypeError(f"Expected {str}, {Path}, or {ParseResult}, got {type(value)}") + + if value_parsed.path: + ext = os.path.splitext(value_parsed.path)[1] + if ext and ext != ".": + return ext + + class FPS(ast.NodeVisitor): def visit_BinOp(self, node: ast.BinOp) -> float: if isinstance(node.op, ast.Div):