diff --git a/devine/commands/dl.py b/devine/commands/dl.py index 40f854e..9da5d13 100644 --- a/devine/commands/dl.py +++ b/devine/commands/dl.py @@ -52,7 +52,7 @@ from devine.core.titles import Movie, Song, Title_T from devine.core.titles.episode import Episode from devine.core.tracks import Audio, Subtitle, Video from devine.core.utilities import get_binary_path, is_close_match, time_elapsed_since -from devine.core.utils.click_types import LANGUAGE_RANGE, QUALITY, SEASON_RANGE, ContextData +from devine.core.utils.click_types import LANGUAGE_RANGE, SEASON_RANGE, ContextData, QUALITY_LIST from devine.core.utils.collections import merge_dict from devine.core.utils.subprocess import ffprobe from devine.core.vaults import Vaults @@ -69,8 +69,8 @@ class dl: )) @click.option("-p", "--profile", type=str, default=None, help="Profile to use for Credentials and Cookies (if available). Overrides profile set by config.") - @click.option("-q", "--quality", type=QUALITY, default=None, - help="Download Resolution, defaults to best available.") + @click.option("-q", "--quality", type=QUALITY_LIST, default=[], + help="Download Resolution(s), defaults to the best available resolution.") @click.option("-v", "--vcodec", type=click.Choice(Video.Codec, case_sensitive=False), default=Video.Codec.AVC, help="Video Codec to download, defaults to H.264.") @@ -246,7 +246,7 @@ class dl: def result( self, service: Service, - quality: Optional[int], + quality: list[int], vcodec: Video.Codec, acodec: Optional[Audio.Codec], vbitrate: int, @@ -356,23 +356,44 @@ class dl: if isinstance(title, (Movie, Episode)): # filter video tracks title.tracks.select_video(lambda x: x.codec == vcodec) + if not title.tracks.videos: + self.log.error(f"There's no {vcodec.name} Video Track...") + sys.exit(1) + title.tracks.select_video(lambda x: x.range == range_) + if not title.tracks.videos: + self.log.error(f"There's no {range_.name} Video Track...") + sys.exit(1) + if vbitrate: title.tracks.select_video(lambda x: x.bitrate and x.bitrate // 1000 == vbitrate) if not title.tracks.videos: self.log.error(f"There's no {vbitrate}kbps Video Track...") sys.exit(1) - if quality: - title.tracks.with_resolution(quality) - if not title.tracks.videos: - self.log.error(f"There's no {quality}p {vcodec.name} ({range_.name}) Video Track...") - sys.exit(1) - video_language = v_lang or lang - if video_language and "all" not in video_language: - title.tracks.videos = title.tracks.select_per_language(title.tracks.videos, video_language) + video_languages = v_lang or lang + if video_languages and "all" not in video_languages: + title.tracks.videos = title.tracks.by_language(title.tracks.videos, video_languages) if not title.tracks.videos: - self.log.error(f"There's no {video_language} Video Track...") + self.log.error(f"There's no {video_languages} Video Track...") + sys.exit(1) + + if quality: + title.tracks.by_resolutions(quality, per_resolution=1) + missing_resolutions = [] + for resolution in quality: + if any(video.height == resolution for video in title.tracks.videos): + continue + if any(int(video.width * (9 / 16)) == resolution for video in title.tracks.videos): + continue + missing_resolutions.append(resolution) + if missing_resolutions: + res_list = "" + if len(missing_resolutions) > 1: + res_list = (", ".join([f"{x}p" for x in missing_resolutions[:-1]])) + " or " + res_list = f"{res_list}{missing_resolutions[-1]}p" + plural = "s" if len(missing_resolutions) > 1 else "" + self.log.error(f"There's no {res_list} Video Track{plural}...") sys.exit(1) # filter subtitle tracks @@ -402,7 +423,7 @@ class dl: self.log.error(f"There's no {channels} Audio Track...") sys.exit(1) if lang and "all" not in lang: - title.tracks.audio = title.tracks.select_per_language(title.tracks.audio, lang) + title.tracks.audio = title.tracks.by_language(title.tracks.audio, lang, per_language=1) if not title.tracks.audio: if all(x.descriptor == Video.Descriptor.M3U for x in title.tracks.videos): self.log.warning(f"There's no {lang} Audio Tracks, " @@ -569,7 +590,17 @@ class dl: # we don't want to fill up the log with "Repacked x track" self.log.info("Repacked one or more tracks with FFMPEG") - final_path = self.mux_tracks(title, not no_folder, not no_source) + for track in list(title.tracks.videos): + title.tracks.videos = [track] + final_path = self.mux_tracks( + title, + season_folder=not no_folder, + add_source=not no_source, + delete=False + ) + + for track in title.tracks: + track.delete() title_dl_time = time_elapsed_since(dl_start_time) @@ -853,7 +884,13 @@ class dl: if callable(track.OnDownloaded): track.OnDownloaded(track) - def mux_tracks(self, title: Title_T, season_folder: bool = True, add_source: bool = True) -> Path: + def mux_tracks( + self, + title: Title_T, + season_folder: bool = True, + add_source: bool = True, + delete: bool = False + ) -> Path: """Mux Tracks, Delete Pre-Mux files, and move to the final location.""" if isinstance(title, (Movie, Episode)): multiplexing_progress = Progress( @@ -874,7 +911,8 @@ class dl: progress=partial( multiplexing_progress.update, task_id=task - ) + ), + delete=delete ) if return_code == 1: self.log.warning("mkvmerge had at least one warning, will continue anyway...") diff --git a/devine/core/tracks/tracks.py b/devine/core/tracks/tracks.py index 5134f35..f2cd64b 100644 --- a/devine/core/tracks/tracks.py +++ b/devine/core/tracks/tracks.py @@ -268,6 +268,36 @@ class Tracks: videos_quality = [x for x in self.videos if int(x.width * (9 / 16)) == resolution] self.videos = videos_quality + def by_resolutions(self, resolutions: list[int], per_resolution: int = 0) -> None: + # Note: Do not merge these list comprehensions. They must be done separately so the results + # from the 16:9 canvas check is only used if there's no exact height resolution match. + selected = [] + for resolution in resolutions: + matches = [ # exact matches + x + for x in self.videos + if x.height == resolution + ] + if not matches: + matches = [ # 16:9 canvas matches + x + for x in self.videos + if int(x.width * (9 / 16)) == resolution + ] + selected.extend(matches[:per_resolution or None]) + self.videos = selected + + @staticmethod + def by_language(tracks: list[TrackT], languages: list[str], per_language: int = 0) -> list[TrackT]: + selected = [] + for language in languages: + selected.extend([ + x + for x in tracks + if closest_supported_match(x.language, [language], LANGUAGE_MAX_DISTANCE) + ][:per_language or None]) + return selected + def export_chapters(self, to_file: Optional[Union[Path, str]] = None) -> str: """Export all chapters in order to a string or file.""" self.sort_chapters() @@ -278,19 +308,6 @@ class Tracks: to_file.write_text(data, encoding="utf8") return data - @staticmethod - def select_per_language(tracks: list[TrackT], languages: list[str]) -> list[TrackT]: - """ - Enumerates and return the first Track per language. - You should sort the list so the wanted track is closer to the start of the list. - """ - tracks_ = [] - for language in languages: - match = closest_supported_match(language, [str(x.language) for x in tracks], LANGUAGE_MAX_DISTANCE) - if match: - tracks_.append(next(x for x in tracks if str(x.language) == match)) - return tracks_ - def mux(self, title: str, delete: bool = True, progress: Optional[partial] = None) -> tuple[Path, int]: """ Multiplex all the Tracks into a Matroska Container file. diff --git a/devine/core/utils/click_types.py b/devine/core/utils/click_types.py index 5072f0f..fa607a3 100644 --- a/devine/core/utils/click_types.py +++ b/devine/core/utils/click_types.py @@ -94,22 +94,34 @@ class LanguageRange(click.ParamType): return re.split(r"\s*[,;]\s*", value) -class Quality(click.ParamType): - name = "quality" +class QualityList(click.ParamType): + name = "quality_list" - def convert(self, value: str, param: Optional[click.Parameter] = None, ctx: Optional[click.Context] = None) -> int: - try: - return int(value.lower().rstrip("p")) - except TypeError: - self.fail( - f"expected string for int() conversion, got {value!r} of type {type(value).__name__}", - param, - ctx - ) - except ValueError: - self.fail(f"{value!r} is not a valid integer", param, ctx) + def convert( + self, + value: Union[str, list[str]], + param: Optional[click.Parameter] = None, + ctx: Optional[click.Context] = None + ) -> list[int]: + if not value: + return [] + if not isinstance(value, list): + value = value.split(",") + resolutions = [] + for resolution in value: + try: + resolutions.append(int(resolution.lower().rstrip("p"))) + except TypeError: + self.fail( + f"Expected string for int() conversion, got {resolution!r} of type {type(resolution).__name__}", + param, + ctx + ) + except ValueError: + self.fail(f"{resolution!r} is not a valid integer", param, ctx) + return resolutions SEASON_RANGE = SeasonRange() LANGUAGE_RANGE = LanguageRange() -QUALITY = Quality() +QUALITY_LIST = QualityList()