Create and use new utility to get file extension from URLs/Paths

Fixes #73
This commit is contained in:
rlaphoenix 2024-02-19 18:14:50 +00:00
parent c826a702ab
commit 1259a26b14
5 changed files with 30 additions and 7 deletions

View File

@ -18,7 +18,7 @@ from rich.text import Text
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import DOWNLOAD_CANCELLED
from devine.core.utilities import get_binary_path, get_free_port
from devine.core.utilities import get_binary_path, get_extension, get_free_port
def rpc(caller: Callable, secret: str, method: str, params: Optional[list[Any]] = None) -> Any:
@ -107,7 +107,7 @@ def download(
url_data: dict[str, Any] = url
url_filename = filename.format(
i=i,
ext=Path(url_data["url"]).suffix
ext=get_extension(url_data["url"])
)
url_text = url_data["url"]
url_text += f"\n\tdir={output_dir}"

View File

@ -11,6 +11,7 @@ from rich import filesize
from devine.core.config import config
from devine.core.constants import DOWNLOAD_CANCELLED
from devine.core.utilities import get_extension
MAX_ATTEMPTS = 5
RETRY_WAIT = 2
@ -208,7 +209,7 @@ def curl_impersonate(
for i, url in enumerate(urls)
for save_path in [output_dir / filename.format(
i=i,
ext=Path((url["url"]) if isinstance(url, dict) else url).suffix
ext=get_extension(url["url"] if isinstance(url, dict) else url)
)]
]

View File

@ -10,6 +10,7 @@ from requests import Session
from rich import filesize
from devine.core.constants import DOWNLOAD_CANCELLED
from devine.core.utilities import get_extension
MAX_ATTEMPTS = 5
RETRY_WAIT = 2
@ -205,7 +206,7 @@ def requests(
for i, url in enumerate(urls)
for save_path in [output_dir / filename.format(
i=i,
ext=Path((url["url"]) if isinstance(url, dict) else url).suffix
ext=get_extension(url["url"] if isinstance(url, dict) else url)
)]
]

View File

@ -24,7 +24,7 @@ from devine.core.downloaders import downloader
from devine.core.downloaders import requests as requests_downloader
from devine.core.drm import DRM_T, ClearKey, Widevine
from devine.core.tracks import Audio, Subtitle, Tracks, Video
from devine.core.utilities import get_binary_path, is_close_match, try_ensure_utf8
from devine.core.utilities import get_binary_path, get_extension, is_close_match, try_ensure_utf8
class HLS:
@ -292,7 +292,7 @@ class HLS:
for i, segment in enumerate(segments):
is_last_segment = (i + 1) == total_segments
name_len = len(str(total_segments))
segment_file_ext = Path(segment.uri).suffix
segment_file_ext = get_extension(segment.uri)
segment_file_path = segment_save_dir / f"{str(i).zfill(name_len)}{segment_file_ext}"
def merge(to: Path, via: list[Path], delete: bool = False, include_map_data: bool = False):
@ -336,7 +336,7 @@ class HLS:
range_len = (last_segment_i - first_segment_i) + 1
segment_range = f"{str(first_segment_i).zfill(name_len)}-{str(last_segment_i).zfill(name_len)}"
merged_path = segment_save_dir / f"{segment_range}{Path(segments[last_segment_i].uri).suffix}"
merged_path = segment_save_dir / f"{segment_range}{get_extension(segments[last_segment_i].uri)}"
decrypted_path = segment_save_dir / f"{merged_path.stem}_decrypted{merged_path.suffix}"
files = [

View File

@ -1,6 +1,7 @@
import ast
import contextlib
import importlib.util
import os
import re
import shutil
import socket
@ -12,6 +13,7 @@ from datetime import datetime
from pathlib import Path
from types import ModuleType
from typing import Optional, Sequence, Union
from urllib.parse import ParseResult, urlparse
import chardet
import requests
@ -256,6 +258,25 @@ def get_free_port() -> int:
return s.getsockname()[1]
def get_extension(value: Union[str, Path, ParseResult]) -> Optional[str]:
"""
Get a URL or Path file extension/suffix.
Note: The returned value will begin with `.`.
"""
if isinstance(value, ParseResult):
value_parsed = value
elif isinstance(value, (str, Path)):
value_parsed = urlparse(str(value))
else:
raise TypeError(f"Expected {str}, {Path}, or {ParseResult}, got {type(value)}")
if value_parsed.path:
ext = os.path.splitext(value_parsed.path)[1]
if ext and ext != ".":
return ext
class FPS(ast.NodeVisitor):
def visit_BinOp(self, node: ast.BinOp) -> float:
if isinstance(node.op, ast.Div):