Add ability to download multiple resolutions per title

Closes #26
This commit is contained in:
rlaphoenix 2023-03-26 19:28:46 +01:00
parent 71cf2b4016
commit 33a9c307f3
3 changed files with 111 additions and 44 deletions

View File

@ -52,7 +52,7 @@ from devine.core.titles import Movie, Song, Title_T
from devine.core.titles.episode import Episode from devine.core.titles.episode import Episode
from devine.core.tracks import Audio, Subtitle, Video from devine.core.tracks import Audio, Subtitle, Video
from devine.core.utilities import get_binary_path, is_close_match, time_elapsed_since from devine.core.utilities import get_binary_path, is_close_match, time_elapsed_since
from devine.core.utils.click_types import LANGUAGE_RANGE, QUALITY, SEASON_RANGE, ContextData from devine.core.utils.click_types import LANGUAGE_RANGE, SEASON_RANGE, ContextData, QUALITY_LIST
from devine.core.utils.collections import merge_dict from devine.core.utils.collections import merge_dict
from devine.core.utils.subprocess import ffprobe from devine.core.utils.subprocess import ffprobe
from devine.core.vaults import Vaults from devine.core.vaults import Vaults
@ -69,8 +69,8 @@ class dl:
)) ))
@click.option("-p", "--profile", type=str, default=None, @click.option("-p", "--profile", type=str, default=None,
help="Profile to use for Credentials and Cookies (if available). Overrides profile set by config.") help="Profile to use for Credentials and Cookies (if available). Overrides profile set by config.")
@click.option("-q", "--quality", type=QUALITY, default=None, @click.option("-q", "--quality", type=QUALITY_LIST, default=[],
help="Download Resolution, defaults to best available.") help="Download Resolution(s), defaults to the best available resolution.")
@click.option("-v", "--vcodec", type=click.Choice(Video.Codec, case_sensitive=False), @click.option("-v", "--vcodec", type=click.Choice(Video.Codec, case_sensitive=False),
default=Video.Codec.AVC, default=Video.Codec.AVC,
help="Video Codec to download, defaults to H.264.") help="Video Codec to download, defaults to H.264.")
@ -246,7 +246,7 @@ class dl:
def result( def result(
self, self,
service: Service, service: Service,
quality: Optional[int], quality: list[int],
vcodec: Video.Codec, vcodec: Video.Codec,
acodec: Optional[Audio.Codec], acodec: Optional[Audio.Codec],
vbitrate: int, vbitrate: int,
@ -356,23 +356,44 @@ class dl:
if isinstance(title, (Movie, Episode)): if isinstance(title, (Movie, Episode)):
# filter video tracks # filter video tracks
title.tracks.select_video(lambda x: x.codec == vcodec) title.tracks.select_video(lambda x: x.codec == vcodec)
if not title.tracks.videos:
self.log.error(f"There's no {vcodec.name} Video Track...")
sys.exit(1)
title.tracks.select_video(lambda x: x.range == range_) title.tracks.select_video(lambda x: x.range == range_)
if not title.tracks.videos:
self.log.error(f"There's no {range_.name} Video Track...")
sys.exit(1)
if vbitrate: if vbitrate:
title.tracks.select_video(lambda x: x.bitrate and x.bitrate // 1000 == vbitrate) title.tracks.select_video(lambda x: x.bitrate and x.bitrate // 1000 == vbitrate)
if not title.tracks.videos: if not title.tracks.videos:
self.log.error(f"There's no {vbitrate}kbps Video Track...") self.log.error(f"There's no {vbitrate}kbps Video Track...")
sys.exit(1) sys.exit(1)
if quality:
title.tracks.with_resolution(quality)
if not title.tracks.videos:
self.log.error(f"There's no {quality}p {vcodec.name} ({range_.name}) Video Track...")
sys.exit(1)
video_language = v_lang or lang video_languages = v_lang or lang
if video_language and "all" not in video_language: if video_languages and "all" not in video_languages:
title.tracks.videos = title.tracks.select_per_language(title.tracks.videos, video_language) title.tracks.videos = title.tracks.by_language(title.tracks.videos, video_languages)
if not title.tracks.videos: if not title.tracks.videos:
self.log.error(f"There's no {video_language} Video Track...") self.log.error(f"There's no {video_languages} Video Track...")
sys.exit(1)
if quality:
title.tracks.by_resolutions(quality, per_resolution=1)
missing_resolutions = []
for resolution in quality:
if any(video.height == resolution for video in title.tracks.videos):
continue
if any(int(video.width * (9 / 16)) == resolution for video in title.tracks.videos):
continue
missing_resolutions.append(resolution)
if missing_resolutions:
res_list = ""
if len(missing_resolutions) > 1:
res_list = (", ".join([f"{x}p" for x in missing_resolutions[:-1]])) + " or "
res_list = f"{res_list}{missing_resolutions[-1]}p"
plural = "s" if len(missing_resolutions) > 1 else ""
self.log.error(f"There's no {res_list} Video Track{plural}...")
sys.exit(1) sys.exit(1)
# filter subtitle tracks # filter subtitle tracks
@ -402,7 +423,7 @@ class dl:
self.log.error(f"There's no {channels} Audio Track...") self.log.error(f"There's no {channels} Audio Track...")
sys.exit(1) sys.exit(1)
if lang and "all" not in lang: if lang and "all" not in lang:
title.tracks.audio = title.tracks.select_per_language(title.tracks.audio, lang) title.tracks.audio = title.tracks.by_language(title.tracks.audio, lang, per_language=1)
if not title.tracks.audio: if not title.tracks.audio:
if all(x.descriptor == Video.Descriptor.M3U for x in title.tracks.videos): if all(x.descriptor == Video.Descriptor.M3U for x in title.tracks.videos):
self.log.warning(f"There's no {lang} Audio Tracks, " self.log.warning(f"There's no {lang} Audio Tracks, "
@ -569,7 +590,17 @@ class dl:
# we don't want to fill up the log with "Repacked x track" # we don't want to fill up the log with "Repacked x track"
self.log.info("Repacked one or more tracks with FFMPEG") self.log.info("Repacked one or more tracks with FFMPEG")
final_path = self.mux_tracks(title, not no_folder, not no_source) for track in list(title.tracks.videos):
title.tracks.videos = [track]
final_path = self.mux_tracks(
title,
season_folder=not no_folder,
add_source=not no_source,
delete=False
)
for track in title.tracks:
track.delete()
title_dl_time = time_elapsed_since(dl_start_time) title_dl_time = time_elapsed_since(dl_start_time)
@ -853,7 +884,13 @@ class dl:
if callable(track.OnDownloaded): if callable(track.OnDownloaded):
track.OnDownloaded(track) track.OnDownloaded(track)
def mux_tracks(self, title: Title_T, season_folder: bool = True, add_source: bool = True) -> Path: def mux_tracks(
self,
title: Title_T,
season_folder: bool = True,
add_source: bool = True,
delete: bool = False
) -> Path:
"""Mux Tracks, Delete Pre-Mux files, and move to the final location.""" """Mux Tracks, Delete Pre-Mux files, and move to the final location."""
if isinstance(title, (Movie, Episode)): if isinstance(title, (Movie, Episode)):
multiplexing_progress = Progress( multiplexing_progress = Progress(
@ -874,7 +911,8 @@ class dl:
progress=partial( progress=partial(
multiplexing_progress.update, multiplexing_progress.update,
task_id=task task_id=task
) ),
delete=delete
) )
if return_code == 1: if return_code == 1:
self.log.warning("mkvmerge had at least one warning, will continue anyway...") self.log.warning("mkvmerge had at least one warning, will continue anyway...")

View File

@ -268,6 +268,36 @@ class Tracks:
videos_quality = [x for x in self.videos if int(x.width * (9 / 16)) == resolution] videos_quality = [x for x in self.videos if int(x.width * (9 / 16)) == resolution]
self.videos = videos_quality self.videos = videos_quality
def by_resolutions(self, resolutions: list[int], per_resolution: int = 0) -> None:
# Note: Do not merge these list comprehensions. They must be done separately so the results
# from the 16:9 canvas check is only used if there's no exact height resolution match.
selected = []
for resolution in resolutions:
matches = [ # exact matches
x
for x in self.videos
if x.height == resolution
]
if not matches:
matches = [ # 16:9 canvas matches
x
for x in self.videos
if int(x.width * (9 / 16)) == resolution
]
selected.extend(matches[:per_resolution or None])
self.videos = selected
@staticmethod
def by_language(tracks: list[TrackT], languages: list[str], per_language: int = 0) -> list[TrackT]:
selected = []
for language in languages:
selected.extend([
x
for x in tracks
if closest_supported_match(x.language, [language], LANGUAGE_MAX_DISTANCE)
][:per_language or None])
return selected
def export_chapters(self, to_file: Optional[Union[Path, str]] = None) -> str: def export_chapters(self, to_file: Optional[Union[Path, str]] = None) -> str:
"""Export all chapters in order to a string or file.""" """Export all chapters in order to a string or file."""
self.sort_chapters() self.sort_chapters()
@ -278,19 +308,6 @@ class Tracks:
to_file.write_text(data, encoding="utf8") to_file.write_text(data, encoding="utf8")
return data return data
@staticmethod
def select_per_language(tracks: list[TrackT], languages: list[str]) -> list[TrackT]:
"""
Enumerates and return the first Track per language.
You should sort the list so the wanted track is closer to the start of the list.
"""
tracks_ = []
for language in languages:
match = closest_supported_match(language, [str(x.language) for x in tracks], LANGUAGE_MAX_DISTANCE)
if match:
tracks_.append(next(x for x in tracks if str(x.language) == match))
return tracks_
def mux(self, title: str, delete: bool = True, progress: Optional[partial] = None) -> tuple[Path, int]: def mux(self, title: str, delete: bool = True, progress: Optional[partial] = None) -> tuple[Path, int]:
""" """
Multiplex all the Tracks into a Matroska Container file. Multiplex all the Tracks into a Matroska Container file.

View File

@ -94,22 +94,34 @@ class LanguageRange(click.ParamType):
return re.split(r"\s*[,;]\s*", value) return re.split(r"\s*[,;]\s*", value)
class Quality(click.ParamType): class QualityList(click.ParamType):
name = "quality" name = "quality_list"
def convert(self, value: str, param: Optional[click.Parameter] = None, ctx: Optional[click.Context] = None) -> int: def convert(
try: self,
return int(value.lower().rstrip("p")) value: Union[str, list[str]],
except TypeError: param: Optional[click.Parameter] = None,
self.fail( ctx: Optional[click.Context] = None
f"expected string for int() conversion, got {value!r} of type {type(value).__name__}", ) -> list[int]:
param, if not value:
ctx return []
) if not isinstance(value, list):
except ValueError: value = value.split(",")
self.fail(f"{value!r} is not a valid integer", param, ctx) resolutions = []
for resolution in value:
try:
resolutions.append(int(resolution.lower().rstrip("p")))
except TypeError:
self.fail(
f"Expected string for int() conversion, got {resolution!r} of type {type(resolution).__name__}",
param,
ctx
)
except ValueError:
self.fail(f"{resolution!r} is not a valid integer", param, ctx)
return resolutions
SEASON_RANGE = SeasonRange() SEASON_RANGE = SeasonRange()
LANGUAGE_RANGE = LanguageRange() LANGUAGE_RANGE = LanguageRange()
QUALITY = Quality() QUALITY_LIST = QualityList()