diff --git a/devine/commands/dl.py b/devine/commands/dl.py index 1994566..4c5bc8b 100644 --- a/devine/commands/dl.py +++ b/devine/commands/dl.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import html import logging import math @@ -22,6 +23,7 @@ from typing import Any, Callable, Optional import click import jsonpickle +import pycaption import yaml from pymediainfo import MediaInfo from pywidevine.cdm import Cdm as WidevineCdm @@ -30,15 +32,17 @@ from pywidevine.remotecdm import RemoteCdm from tqdm import tqdm from devine.core.config import config -from devine.core.constants import DRM_SORT_MAP, LOG_FORMATTER, AnyTrack, context_settings +from devine.core.constants import LOG_FORMATTER, AnyTrack, context_settings from devine.core.credential import Credential +from devine.core.downloaders import aria2c from devine.core.drm import DRM_T, Widevine +from devine.core.manifests import DASH, HLS from devine.core.proxies import Basic, Hola, NordVPN from devine.core.service import Service from devine.core.services import Services from devine.core.titles import Movie, Song, Title_T from devine.core.titles.episode import Episode -from devine.core.tracks import Audio, Video +from devine.core.tracks import Audio, Subtitle, Video from devine.core.utilities import get_binary_path, is_close_match from devine.core.utils.click_types import LANGUAGE_RANGE, QUALITY, SEASON_RANGE, ContextData from devine.core.utils.collections import merge_dict @@ -417,50 +421,6 @@ class dl: if list_: continue # only wanted to see what tracks were available and chosen - # Prepare Track DRM (if any) - for track in title.tracks: - if not track.drm and isinstance(track, (Video, Audio)): - # service might not list DRM in manifest, get from stream data - try: - track.drm = [Widevine.from_track(track, service.session)] - except Widevine.Exceptions.PSSHNotFound: - # it might not have Widevine DRM, or might not have found the PSSH - self.log.warning("No Widevine PSSH was found for this track, is it DRM free?") - if track.drm: - # choose first-available DRM in order of Enum value - track.drm = next(iter(sorted(track.drm, key=lambda x: DRM_SORT_MAP.index(x.__class__.__name__)))) - if isinstance(track.drm, Widevine): - # Get Widevine Content Keys now, this must be done in main thread due to SQLite objects - self.log.info(f"Getting {track.drm.__class__.__name__} Keys for: {track}") - self.prepare_drm( - drm=track.drm, - licence=partial( - service.get_widevine_license, - title=title, - track=track - ), - certificate=partial( - service.get_widevine_service_certificate, - title=title, - track=track - ), - cdm_only=cdm_only, - vaults_only=vaults_only - ) - - if export: - keys = {} - if export.is_file(): - keys = jsonpickle.loads(export.read_text(encoding="utf8")) - if str(title) not in keys: - keys[str(title)] = {} - keys[str(title)][str(track)] = { - kid: key - for kid, key in track.drm.content_keys.items() - if kid in track.drm.kids - } - export.write_text(jsonpickle.dumps(keys, indent=4), encoding="utf8") - if skip_dl: self.log.info("Skipping Download...") else: @@ -472,7 +432,17 @@ class dl: self.download_track, service=service, track=track, - title=title + title=title, + prepare_drm=partial( + self.prepare_drm, + track=track, + title=title, + certificate=service.get_widevine_service_certificate, + licence=service.get_widevine_license, + cdm_only=cdm_only, + vaults_only=vaults_only, + export=export + ) ) for track in title.tracks )): @@ -507,11 +477,97 @@ class dl: self.log.info("Processed all titles!") + def prepare_drm( + self, + drm: DRM_T, + track: AnyTrack, + title: Title_T, + certificate: Callable, + licence: Callable, + cdm_only: bool = False, + vaults_only: bool = False, + export: Optional[Path] = None + ): + """ + Prepare the DRM by getting decryption data like KIDs, Keys, and such. + The DRM object should be ready for decryption once this function ends. + """ + if not drm: + return + + if isinstance(drm, Widevine): + self.log.info(f"Licensing Content Keys using Widevine for {drm.pssh.dumps()}") + + for kid in drm.kids: + if kid in drm.content_keys: + continue + + if not cdm_only: + content_key, vault_used = self.vaults.get_key(kid) + if content_key: + drm.content_keys[kid] = content_key + self.log.info(f" + {kid.hex}:{content_key} ({vault_used})") + add_count = self.vaults.add_key(kid, content_key, excluding=vault_used) + self.log.info(f" + Cached to {add_count}/{len(self.vaults) - 1} Vaults") + elif vaults_only: + self.log.error(f" - No Content Key found in any Vault for {kid.hex}") + sys.exit(1) + + if kid not in drm.content_keys and not vaults_only: + from_vaults = drm.content_keys.copy() + + try: + drm.get_content_keys( + cdm=self.cdm, + licence=licence, + certificate=certificate + ) + except ValueError as e: + self.log.error(str(e)) + sys.exit(1) + + for kid_, key in drm.content_keys.items(): + msg = f" + {kid_.hex}:{key}" + if kid_ == kid: + msg += " *" + if key == "0" * 32: + msg += " (Unusable!)" + self.log.info(msg) + + drm.content_keys = { + kid_: key + for kid_, key in drm.content_keys.items() + if key and key.count("0") != len(key) + } + + # The CDM keys may have returned blank content keys for KIDs we got from vaults. + # So we re-add the keys from vaults earlier overwriting blanks or removed KIDs data. + drm.content_keys.update(from_vaults) + + cached_keys = self.vaults.add_keys(drm.content_keys) + self.log.info(f" + Newly added to {cached_keys}/{len(drm.content_keys)} Vaults") + + if kid not in drm.content_keys: + self.log.error(f" - No usable key was returned for {kid.hex}, cannot continue") + sys.exit(1) + + if export: + keys = {} + if export.is_file(): + keys = jsonpickle.loads(export.read_text(encoding="utf8")) + if str(title) not in keys: + keys[str(title)] = {} + if str(track) not in keys[str(title)]: + keys[str(title)][str(track)] = {} + keys[str(title)][str(track)].update(drm.content_keys) + export.write_text(jsonpickle.dumps(keys, indent=4), encoding="utf8") + def download_track( self, service: Service, track: AnyTrack, - title: Title_T + title: Title_T, + prepare_drm: Callable ): time.sleep(1) if self.DL_POOL_STOP.is_set(): @@ -523,17 +579,108 @@ class dl: proxy = None self.log.info(f"Downloading: {track}") - track.download(config.directories.temp, headers=service.session.headers, proxy=proxy) + + if config.directories.temp.is_file(): + self.log.error(f"Temp Directory '{config.directories.temp}' must be a Directory, not a file") + sys.exit(1) + + config.directories.temp.mkdir(parents=True, exist_ok=True) + + save_path = config.directories.temp / f"{track.__class__.__name__}_{track.id}.mp4" + if isinstance(track, Subtitle): + save_path = save_path.with_suffix(f".{track.codec.extension}") + + if track.descriptor != track.Descriptor.URL: + save_dir = save_path.with_name(save_path.name + "_segments") + else: + save_dir = save_path.parent + + # Delete any pre-existing temp files matching this track. + # We can't re-use or continue downloading these tracks as they do not use a + # lock file. Or at least the majority don't. Even if they did I've encountered + # corruptions caused by sudden interruptions to the lock file. + for existing_file in config.directories.temp.glob(f"{save_path.stem}.*{save_path.suffix}"): + # e.g., foo.decrypted.mp4, foo.repack.mp4, and such + existing_file.unlink() + if save_dir.exists() and save_dir.name.endswith("_segments"): + shutil.rmtree(save_dir) + + if track.descriptor == track.Descriptor.M3U: + HLS.download_track( + track=track, + save_dir=save_dir, + session=service.session, + proxy=proxy, + license_widevine=prepare_drm + ) + elif track.descriptor == track.Descriptor.MPD: + DASH.download_track( + track=track, + save_dir=save_dir, + session=service.session, + proxy=proxy, + license_widevine=prepare_drm + ) + + # no else-if as DASH may convert the track to URL descriptor + if track.descriptor == track.Descriptor.URL: + asyncio.run(aria2c( + track.url, + save_path, + service.session.headers, + proxy if track.needs_proxy else None + )) + track.path = save_path + + if not track.drm and isinstance(track, (Video, Audio)): + try: + track.drm = [Widevine.from_track(track, service.session)] + except Widevine.Exceptions.PSSHNotFound: + # it might not have Widevine DRM, or might not have found the PSSH + self.log.warning("No Widevine PSSH was found for this track, is it DRM free?") + + if track.drm: + drm = track.drm[0] # just use the first supported DRM system for now + if isinstance(drm, Widevine): + # license and grab content keys + prepare_drm(drm) + drm.decrypt(track) + if callable(track.OnDecrypted): + track.OnDecrypted(track) + else: + with open(save_path, "wb") as f: + for file in save_dir.iterdir(): + f.write(file.read_bytes()) + file.unlink() + save_dir.rmdir() + track.path = save_path + + if track.path.stat().st_size <= 3: # Empty UTF-8 BOM == 3 bytes + raise IOError( + "Download failed, the downloaded file is empty. " + f"This {'was' if track.needs_proxy else 'was not'} downloaded with a proxy." + + ( + " Perhaps you need to set `needs_proxy` as True to use the proxy for this track." + if not track.needs_proxy else "" + ) + ) + + if ( + isinstance(track, Subtitle) and + track.codec not in (Subtitle.Codec.SubRip, Subtitle.Codec.SubStationAlphav4) + ): + caption_set = track.parse(track.path.read_bytes(), track.codec) + track.merge_same_cues(caption_set) + srt = pycaption.SRTWriter().write(caption_set) + # NOW sometimes has this, when it isn't, causing mux problems + srt = srt.replace("MULTI-LANGUAGE SRT\n", "") + track.path.write_text(srt, encoding="utf8") + track.codec = Subtitle.Codec.SubRip + track.move(track.path.with_suffix(".srt")) + if callable(track.OnDownloaded): track.OnDownloaded(track) - if track.drm: - self.log.info(f"Decrypting file with {track.drm.__class__.__name__} DRM...") - track.drm.decrypt(track) - self.log.info(" + Decrypted") - if callable(track.OnDecrypted): - track.OnDecrypted(track) - if track.needs_repack: self.log.info("Repackaging stream with FFMPEG (fix malformed streams)") track.repackage() @@ -574,81 +721,6 @@ class dl: sys.exit(1) self.log.info(" + No EIA-CC Captions...") - def prepare_drm( - self, - drm: DRM_T, - certificate: Callable, - licence: Callable, - cdm_only: bool = False, - vaults_only: bool = False - ) -> None: - """ - Prepare the DRM by getting decryption data like KIDs, Keys, and such. - The DRM object should be ready for decryption once this function ends. - """ - if not drm: - return - - if isinstance(drm, Widevine): - self.log.info(f"PSSH: {drm.pssh.dumps()}") - self.log.info("KIDs:") - for kid in drm.kids: - self.log.info(f" + {kid.hex}") - - for kid in drm.kids: - if kid in drm.content_keys: - continue - - if not cdm_only: - content_key, vault_used = self.vaults.get_key(kid) - if content_key: - drm.content_keys[kid] = content_key - self.log.info(f"Content Key: {kid.hex}:{content_key} ({vault_used})") - add_count = self.vaults.add_key(kid, content_key, excluding=vault_used) - self.log.info(f" + Cached to {add_count}/{len(self.vaults) - 1} Vaults") - elif vaults_only: - self.log.error(f" - No Content Key found in vaults for {kid.hex}") - sys.exit(1) - - if kid not in drm.content_keys and not vaults_only: - from_vaults = drm.content_keys.copy() - - try: - drm.get_content_keys( - cdm=self.cdm, - licence=licence, - certificate=certificate - ) - except ValueError as e: - self.log.error(str(e)) - sys.exit(1) - - self.log.info("Content Keys:") - for kid_, key in drm.content_keys.items(): - msg = f" + {kid_.hex}:{key}" - if kid_ == kid: - msg += " *" - if key == "0" * 32: - msg += " [Unusable!]" - self.log.info(msg) - - drm.content_keys = { - kid_: key - for kid_, key in drm.content_keys.items() - if key and key.count("0") != len(key) - } - - # The CDM keys may have returned blank content keys for KIDs we got from vaults. - # So we re-add the keys from vaults earlier overwriting blanks or removed KIDs data. - drm.content_keys.update(from_vaults) - - cached_keys = self.vaults.add_keys(drm.content_keys) - self.log.info(f" + Newly added to {cached_keys}/{len(drm.content_keys)} Vaults") - - if kid not in drm.content_keys: - self.log.error(f" - No Content Key with the KID ({kid.hex}) was returned...") - sys.exit(1) - def mux_tracks(self, title: Title_T, season_folder: bool = True, add_source: bool = True) -> None: """Mux Tracks, Delete Pre-Mux files, and move to the final location.""" self.log.info("Muxing Tracks into a Matroska Container") diff --git a/devine/core/drm/widevine.py b/devine/core/drm/widevine.py index 5020d03..0496416 100644 --- a/devine/core/drm/widevine.py +++ b/devine/core/drm/widevine.py @@ -205,7 +205,7 @@ class Widevine: ] ]), "--temp_dir", config.directories.temp - ]) + ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: raise subprocess.SubprocessError(f"Failed to Decrypt! Shaka Packager Error: {e}") track.swap(decrypted_path) diff --git a/devine/core/manifests/dash.py b/devine/core/manifests/dash.py index 04bdc79..f235bb1 100644 --- a/devine/core/manifests/dash.py +++ b/devine/core/manifests/dash.py @@ -1,10 +1,14 @@ from __future__ import annotations +import asyncio import base64 +import logging import math import re +import sys from copy import copy from hashlib import md5 +from pathlib import Path from typing import Any, Callable, Optional, Union from urllib.parse import urljoin, urlparse from uuid import UUID @@ -15,6 +19,8 @@ from pywidevine.cdm import Cdm as WidevineCdm from pywidevine.pssh import PSSH from requests import Session +from devine.core.constants import AnyTrack +from devine.core.downloaders import aria2c from devine.core.drm import Widevine from devine.core.tracks import Audio, Subtitle, Tracks, Video from devine.core.utilities import is_close_match @@ -92,12 +98,7 @@ class DASH: if callable(period_filter) and period_filter(period): continue - period_base_url = period.findtext("BaseURL") or self.manifest.findtext("BaseURL") - if not period_base_url or not re.match("^https?://", period_base_url, re.IGNORECASE): - period_base_url = urljoin(self.url, period_base_url) - for adaptation_set in period.findall("AdaptationSet"): - # flags trick_mode = any( x.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode" for x in ( @@ -105,6 +106,10 @@ class DASH: adaptation_set.findall("SupplementalProperty") ) ) + if trick_mode: + # we don't want trick mode streams (they are only used for fast-forward/rewind) + continue + descriptive = any( (x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "descriptive") for x in adaptation_set.findall("Accessibility") @@ -121,12 +126,8 @@ class DASH: for x in adaptation_set.findall("Role") ) - if trick_mode: - # we don't want trick mode streams (they are only used for fast-forward/rewind) - continue - for rep in adaptation_set.findall("Representation"): - supplements = rep.findall("SupplementalProperty") + adaptation_set.findall("SupplementalProperty") + codecs = rep.get("codecs") or adaptation_set.get("codecs") content_type = adaptation_set.get("contentType") or \ adaptation_set.get("mimeType") or \ @@ -136,8 +137,6 @@ class DASH: raise ValueError("No content type value could be found") content_type = content_type.split("/")[0] - codecs = rep.get("codecs") or adaptation_set.get("codecs") - if content_type.startswith("image"): # we don't want what's likely thumbnails for the seekbar continue @@ -154,6 +153,8 @@ class DASH: if mime and not mime.endswith("/mp4"): codecs = mime.split("/")[1] + supplements = rep.findall("SupplementalProperty") + adaptation_set.findall("SupplementalProperty") + joc = next(( x.get("value") for x in supplements @@ -167,18 +168,6 @@ class DASH: "The provided fallback language is not valid or is `None` or `und`." ) - drm = DASH.get_drm(rep.findall("ContentProtection") + adaptation_set.findall("ContentProtection")) - - # from here we need to calculate the Segment Template and compute a final list of URLs - - segment_urls = DASH.get_segment_urls( - representation=rep, - period_duration=period.get("duration") or self.manifest.get("mediaPresentationDuration"), - fallback_segment_template=adaptation_set.find("SegmentTemplate"), - fallback_base_url=period_base_url, - fallback_query=urlparse(self.url).query - ) - # for some reason it's incredibly common for services to not provide # a good and actually unique track ID, sometimes because of the lang # dialect not being represented in the id, or the bitrate, or such. @@ -206,7 +195,7 @@ class DASH: tracks.add(track_type( id_=track_id, - url=segment_urls, + url=(self.url, rep, adaptation_set, period), codec=track_codec, language=track_lang, is_original_lang=not track_lang or not language or is_close_match(track_lang, [language]), @@ -254,8 +243,7 @@ class DASH: rep.find("SegmentBase").get("timescale") if rep.find("SegmentBase") is not None else None ) - ), - drm=drm + ) ) if track_type is Video else dict( bitrate=rep.get("bandwidth"), channels=next(iter( @@ -263,8 +251,7 @@ class DASH: or adaptation_set.xpath("AudioChannelConfiguration/@value") ), None), joc=joc, - descriptive=descriptive, - drm=drm + descriptive=descriptive ) if track_type is Audio else dict( forced=forced, cc=cc @@ -276,6 +263,241 @@ class DASH: return tracks + @staticmethod + def download_track( + track: AnyTrack, + save_dir: Path, + session: Optional[Session] = None, + proxy: Optional[str] = None, + license_widevine: Optional[Callable] = None + ): + if not session: + session = Session() + elif not isinstance(session, Session): + raise TypeError(f"Expected session to be a {Session}, not {session!r}") + + if not track.needs_proxy and proxy: + proxy = None + + if proxy: + session.proxies.update({ + "all": proxy + }) + + log = logging.getLogger("DASH") + + manifest_url, representation, adaptation_set, period = track.url + + drm = DASH.get_drm( + representation.findall("ContentProtection") + + adaptation_set.findall("ContentProtection") + ) + if drm: + drm = drm[0] # just use the first supported DRM system for now + if isinstance(drm, Widevine): + # license and grab content keys + if not license_widevine: + raise ValueError("license_widevine func must be supplied to use Widevine DRM") + license_widevine(drm) + else: + drm = None + + segment_urls: list[str] = [] + manifest = load_xml(session.get(manifest_url).text) + manifest_url_query = urlparse(manifest_url).query + + period_base_url = period.findtext("BaseURL") or manifest.findtext("BaseURL") + if not period_base_url or not re.match("^https?://", period_base_url, re.IGNORECASE): + period_base_url = urljoin(manifest_url, period_base_url) + period_duration = period.get("duration") or manifest.get("mediaPresentationDuration") + + base_url = representation.findtext("BaseURL") or period_base_url + + segment_template = representation.find("SegmentTemplate") + if segment_template is None: + segment_template = adaptation_set.find("SegmentTemplate") + + segment_base = representation.find("SegmentBase") + if segment_base is None: + segment_base = adaptation_set.find("SegmentBase") + + segment_list = representation.find("SegmentList") + if segment_list is None: + segment_list = adaptation_set.find("SegmentList") + + if segment_template is not None: + segment_template = copy(segment_template) + start_number = int(segment_template.get("startNumber") or 1) + segment_timeline = segment_template.find("SegmentTimeline") + + for item in ("initialization", "media"): + value = segment_template.get(item) + if not value: + continue + if not re.match("^https?://", value, re.IGNORECASE): + if not base_url: + raise ValueError("Resolved Segment URL is not absolute, and no Base URL is available.") + value = urljoin(base_url, value) + if not urlparse(value).query and manifest_url_query: + value += f"?{manifest_url_query}" + segment_template.set(item, value) + + if segment_timeline is not None: + seg_time_list = [] + current_time = 0 + for s in segment_timeline.findall("S"): + if s.get("t"): + current_time = int(s.get("t")) + for _ in range(1 + (int(s.get("r") or 0))): + seg_time_list.append(current_time) + current_time += int(s.get("d")) + seg_num_list = list(range(start_number, len(seg_time_list) + start_number)) + segment_urls += [ + DASH.replace_fields( + segment_template.get("media"), + Bandwidth=representation.get("bandwidth"), + Number=n, + RepresentationID=representation.get("id"), + Time=t + ) + for t, n in zip(seg_time_list, seg_num_list) + ] + else: + if not period_duration: + raise ValueError("Duration of the Period was unable to be determined.") + period_duration = DASH.pt_to_sec(period_duration) + segment_duration = float(segment_template.get("duration")) + segment_timescale = float(segment_template.get("timescale") or 1) + + total_segments = math.ceil(period_duration / (segment_duration / segment_timescale)) + segment_urls += [ + DASH.replace_fields( + segment_template.get("media"), + Bandwidth=representation.get("bandwidth"), + Number=s, + RepresentationID=representation.get("id"), + Time=s + ) + for s in range(start_number, start_number + total_segments) + ] + + init_data = None + init_url = segment_template.get("initialization") + if init_url: + res = session.get(DASH.replace_fields( + init_url, + Bandwidth=representation.get("bandwidth"), + RepresentationID=representation.get("id") + )) + res.raise_for_status() + init_data = res.content + + for i, segment_url in enumerate(segment_urls): + segment_filename = str(i).zfill(len(str(len(segment_urls)))) + segment_save_path = (save_dir / segment_filename).with_suffix(".mp4") + + asyncio.run(aria2c( + segment_url, + segment_save_path, + session.headers, + proxy + )) + # TODO: More like `segment.path`, but this will do for now + # Needed for the drm.decrypt() call couple lines down + track.path = segment_save_path + + if isinstance(track, Audio) or init_data: + with open(track.path, "rb+") as f: + segment_data = f.read() + if isinstance(track, Audio): + # fix audio decryption on ATVP by fixing the sample description index + # TODO: Is this in mpeg data, or init data? + segment_data = re.sub( + b"(tfhd\x00\x02\x00\x1a\x00\x00\x00\x01\x00\x00\x00)\x02", + b"\\g<1>\x01", + segment_data + ) + # prepend the init data to be able to decrypt + if init_data: + f.seek(0) + f.write(init_data) + f.write(segment_data) + + if drm: + # TODO: What if the manifest does not mention DRM, but has DRM + drm.decrypt(track) + if callable(track.OnDecrypted): + track.OnDecrypted(track) + elif segment_list is not None: + base_media_url = urljoin(period_base_url, base_url) + if any(x.get("media") is not None for x in segment_list.findall("SegmentURL")): + # at least one segment has no URL specified, it uses the base url and ranges + track.url = base_media_url + track.descriptor = track.Descriptor.URL + track.drm = [drm] if drm else [] + else: + init_data = None + initialization = segment_list.find("Initialization") + if initialization: + source_url = initialization.get("sourceURL") + if source_url is None: + source_url = base_media_url + + res = session.get(source_url) + res.raise_for_status() + init_data = res.content + + for i, segment_url in enumerate(segment_list.findall("SegmentURL")): + segment_filename = str(i).zfill(len(str(len(segment_urls)))) + segment_save_path = (save_dir / segment_filename).with_suffix(".mp4") + + media_url = segment_url.get("media") + if media_url is None: + media_url = base_media_url + + asyncio.run(aria2c( + media_url, + segment_save_path, + session.headers, + proxy, + byte_range=segment_url.get("mediaRange") + )) + # TODO: More like `segment.path`, but this will do for now + # Needed for the drm.decrypt() call couple lines down + track.path = segment_save_path + + if isinstance(track, Audio) or init_data: + with open(track.path, "rb+") as f: + segment_data = f.read() + if isinstance(track, Audio): + # fix audio decryption on ATVP by fixing the sample description index + # TODO: Is this in mpeg data, or init data? + segment_data = re.sub( + b"(tfhd\x00\x02\x00\x1a\x00\x00\x00\x01\x00\x00\x00)\x02", + b"\\g<1>\x01", + segment_data + ) + # prepend the init data to be able to decrypt + if init_data: + f.seek(0) + f.write(init_data) + f.write(segment_data) + + if drm: + # TODO: What if the manifest does not mention DRM, but has DRM + drm.decrypt(track) + if callable(track.OnDecrypted): + track.OnDecrypted(track) + elif segment_base is not None or base_url: + # SegmentBase more or less boils down to defined ByteRanges + # So, we don't care, just download the full file + track.url = urljoin(period_base_url, base_url) + track.descriptor = track.Descriptor.URL + track.drm = [drm] if drm else [] + else: + log.error("Could not find a way to get segments from this MPD manifest.") + sys.exit(1) + @staticmethod def get_language(*options: Any) -> Optional[Language]: for option in options: @@ -285,8 +507,9 @@ class DASH: return Language.get(option) @staticmethod - def get_drm(protections) -> Optional[list[Widevine]]: + def get_drm(protections) -> list[Widevine]: drm = [] + for protection in protections: # TODO: Add checks for PlayReady, FairPlay, maybe more urn = (protection.get("schemeIdUri") or "").lower() @@ -319,9 +542,6 @@ class DASH: kid=kid )) - if not drm: - drm = None - return drm @staticmethod @@ -350,91 +570,5 @@ class DASH: url = url.replace(m.group(), f"{value:{m.group(1)}}") return url - @staticmethod - def get_segment_urls( - representation, - period_duration: str, - fallback_segment_template, - fallback_base_url: Optional[str] = None, - fallback_query: Optional[str] = None - ) -> list[str]: - segment_urls: list[str] = [] - if representation.find("SegmentTemplate") is not None: - segment_template = representation.find("SegmentTemplate") - else: - segment_template = fallback_segment_template - base_url = representation.findtext("BaseURL") or fallback_base_url - - if segment_template is None: - # We could implement SegmentBase, but it's basically a list of Byte Range's to download - # So just return the Base URL as a segment, why give the downloader extra effort - return [urljoin(fallback_base_url, base_url)] - - segment_template = copy(segment_template) - start_number = int(segment_template.get("startNumber") or 1) - segment_timeline = segment_template.find("SegmentTimeline") - - for item in ("initialization", "media"): - value = segment_template.get(item) - if not value: - continue - if not re.match("^https?://", value, re.IGNORECASE): - if not base_url: - raise ValueError("Resolved Segment URL is not absolute, and no Base URL is available.") - value = urljoin(base_url, value) - if not urlparse(value).query and fallback_query: - value += f"?{fallback_query}" - segment_template.set(item, value) - - initialization = segment_template.get("initialization") - if initialization: - segment_urls.append(DASH.replace_fields( - initialization, - Bandwidth=representation.get("bandwidth"), - RepresentationID=representation.get("id") - )) - - if segment_timeline is not None: - seg_time_list = [] - current_time = 0 - for s in segment_timeline.findall("S"): - if s.get("t"): - current_time = int(s.get("t")) - for _ in range(1 + (int(s.get("r") or 0))): - seg_time_list.append(current_time) - current_time += int(s.get("d")) - seg_num_list = list(range(start_number, len(seg_time_list) + start_number)) - segment_urls += [ - DASH.replace_fields( - segment_template.get("media"), - Bandwidth=representation.get("bandwidth"), - Number=n, - RepresentationID=representation.get("id"), - Time=t - ) - for t, n in zip(seg_time_list, seg_num_list) - ] - else: - if not period_duration: - raise ValueError("Duration of the Period was unable to be determined.") - period_duration = DASH.pt_to_sec(period_duration) - - segment_duration = ( - float(segment_template.get("duration")) / float(segment_template.get("timescale") or 1) - ) - total_segments = math.ceil(period_duration / segment_duration) - segment_urls += [ - DASH.replace_fields( - segment_template.get("media"), - Bandwidth=representation.get("bandwidth"), - Number=s, - RepresentationID=representation.get("id"), - Time=s - ) - for s in range(start_number, start_number + total_segments) - ] - - return segment_urls - __ALL__ = (DASH,) diff --git a/devine/core/manifests/hls.py b/devine/core/manifests/hls.py index 35cd322..8698d6e 100644 --- a/devine/core/manifests/hls.py +++ b/devine/core/manifests/hls.py @@ -1,8 +1,12 @@ from __future__ import annotations +import asyncio +import logging import re +import sys from hashlib import md5 -from typing import Any, Optional, Union +from pathlib import Path +from typing import Any, Callable, Optional, Union import m3u8 import requests @@ -11,7 +15,10 @@ from m3u8 import M3U8 from pywidevine.cdm import Cdm as WidevineCdm from pywidevine.pssh import PSSH from requests import Session +from tqdm import tqdm +from devine.core.constants import AnyTrack +from devine.core.downloaders import aria2c from devine.core.drm import DRM_T, ClearKey, Widevine from devine.core.tracks import Audio, Subtitle, Tracks, Video from devine.core.utilities import is_close_match @@ -68,14 +75,12 @@ class HLS: return cls(master) - def to_tracks(self, language: Union[str, Language], **args: Any) -> Tracks: + def to_tracks(self, language: Union[str, Language]) -> Tracks: """ Convert a Variant Playlist M3U(8) document to Video, Audio and Subtitle Track objects. Parameters: language: Language you expect the Primary Track to be in. - args: You may pass any arbitrary named header to be passed to all requests made within - this method. All Track objects' URL will be to another M3U(8) document. However, these documents will be Invariant Playlists and contain the list of segments URIs among other metadata. @@ -95,20 +100,6 @@ class HLS: audio_codec = Audio.Codec.from_codecs(playlist.stream_info.codecs) audio_codecs_by_group_id[audio_group] = audio_codec - if session_drm: - drm = session_drm - else: - # keys may be in the invariant playlist instead, annoying... - res = self.session.get(url, **args) - if not res.ok: - raise requests.ConnectionError( - "Failed to request an invariant M3U(8) document.", - response=res - ) - - invariant_playlist = m3u8.loads(res.text, url) - drm = HLS.get_drm(invariant_playlist.keys) - try: # TODO: Any better way to figure out the primary track type? Video.Codec.from_codecs(playlist.stream_info.codecs) @@ -125,7 +116,7 @@ class HLS: is_original_lang=True, # TODO: All we can do is assume Yes bitrate=playlist.stream_info.average_bandwidth or playlist.stream_info.bandwidth, descriptor=Video.Descriptor.M3U, - drm=drm, + drm=session_drm, extra=playlist, # video track args **(dict( @@ -147,23 +138,6 @@ class HLS: if not re.match("^https?://", url): url = media.base_uri + url - if media.type == "AUDIO": - if session_drm: - drm = session_drm - else: - # keys may be in the invariant playlist instead, annoying... - res = self.session.get(url, **args) - if not res.ok: - raise requests.ConnectionError( - "Failed to request an invariant M3U(8) document.", - response=res - ) - - invariant_playlist = m3u8.loads(res.text, url) - drm = HLS.get_drm(invariant_playlist.keys) - else: - drm = None - joc = 0 if media.type == "AUDIO": track_type = Audio @@ -182,7 +156,7 @@ class HLS: language=media.language or language, # HLS media may not have language info, fallback if needed is_original_lang=language and is_close_match(media.language, [language]), descriptor=Audio.Descriptor.M3U, - drm=drm, + drm=session_drm if media.type == "AUDIO" else None, extra=media, # audio track args **(dict( @@ -198,6 +172,129 @@ class HLS: return tracks + @staticmethod + def download_track( + track: AnyTrack, + save_dir: Path, + session: Optional[Session] = None, + proxy: Optional[str] = None, + license_widevine: Optional[Callable] = None + ) -> None: + if not session: + session = Session() + elif not isinstance(session, Session): + raise TypeError(f"Expected session to be a {Session}, not {session!r}") + + if not track.needs_proxy and proxy: + proxy = None + + if proxy: + session.proxies.update({ + "all": proxy + }) + + log = logging.getLogger("HLS") + + master = m3u8.loads( + # should be an invariant m3u8 playlist URI + session.get(track.url).text, + uri=track.url + ) + + if not master.segments: + log.error("Track's HLS playlist has no segments, expecting an invariant M3U8 playlist.") + sys.exit(1) + + init_data = None + last_segment_key: tuple[Optional[Union[ClearKey, Widevine]], Optional[m3u8.Key]] = (None, None) + + for i, segment in enumerate(tqdm(master.segments, unit="segments")): + segment_filename = str(i).zfill(len(str(len(master.segments)))) + segment_save_path = (save_dir / segment_filename).with_suffix(".mp4") + + if segment.key and last_segment_key[1] != segment.key: + # try: + # drm = HLS.get_drm([segment.key]) + # except NotImplementedError: + # drm = None # never mind, try with master.keys + # if not drm and master.keys: + # # TODO: segment might have multiple keys but m3u8 only grabs the last! + # drm = HLS.get_drm(master.keys) + try: + drm = HLS.get_drm( + # TODO: We append master.keys because m3u8 class only puts the last EXT-X-KEY + # to the segment.key property, not supporting multi-drm scenarios. + # By re-adding every single EXT-X-KEY found, we can at least try to get + # a suitable key. However, it may not match the right segment/timeframe! + # It will try to use the first key provided where possible. + keys=[segment.key] + master.keys, + proxy=proxy + ) + except NotImplementedError as e: + log.error(str(e)) + sys.exit(1) + else: + if drm: + drm = drm[0] # just use the first supported DRM system for now + log.debug("Got segment key, %s", drm) + if isinstance(drm, Widevine): + # license and grab content keys + if not license_widevine: + raise ValueError("license_widevine func must be supplied to use Widevine DRM") + license_widevine(drm) + last_segment_key = (drm, segment.key) + + if callable(track.OnSegmentFilter) and track.OnSegmentFilter(segment): + continue + + if segment.init_section and (not init_data or segment.discontinuity): + # Only use the init data if there's no init data yet (e.g., start of file) + # or if EXT-X-DISCONTINUITY is reached at the same time as EXT-X-MAP. + # Even if a new EXT-X-MAP is supplied, it may just be duplicate and would + # be unnecessary and slow to re-download the init data each time. + if not segment.init_section.uri.startswith(segment.init_section.base_uri): + segment.init_section.uri = segment.init_section.base_uri + segment.init_section.uri + + log.debug("Got new init segment, %s", segment.init_section.uri) + res = session.get(segment.init_section.uri) + res.raise_for_status() + init_data = res.content + + if not segment.uri.startswith(segment.base_uri): + segment.uri = segment.base_uri + segment.uri + + asyncio.run(aria2c( + segment.uri, + segment_save_path, + session.headers, + proxy + )) + # TODO: More like `segment.path`, but this will do for now + # Needed for the drm.decrypt() call couple lines down + track.path = segment_save_path + + if isinstance(track, Audio) or init_data: + with open(track.path, "rb+") as f: + segment_data = f.read() + if isinstance(track, Audio): + # fix audio decryption on ATVP by fixing the sample description index + # TODO: Is this in mpeg data, or init data? + segment_data = re.sub( + b"(tfhd\x00\x02\x00\x1a\x00\x00\x00\x01\x00\x00\x00)\x02", + b"\\g<1>\x01", + segment_data + ) + # prepend the init data to be able to decrypt + if init_data: + f.seek(0) + f.write(init_data) + f.write(segment_data) + + if last_segment_key[0]: + last_segment_key[0].decrypt(track) + if callable(track.OnDecrypted): + track.OnDecrypted(track) + @staticmethod def get_drm( keys: list[Union[m3u8.model.SessionKey, m3u8.model.Key]], diff --git a/devine/core/tracks/subtitle.py b/devine/core/tracks/subtitle.py index c49c1b1..040b112 100644 --- a/devine/core/tracks/subtitle.py +++ b/devine/core/tracks/subtitle.py @@ -4,7 +4,6 @@ import subprocess from collections import defaultdict from enum import Enum from io import BytesIO -from pathlib import Path from typing import Any, Iterable, Optional import pycaption @@ -380,19 +379,6 @@ class Subtitle(Track): ) sub.save() - def download(self, *args, **kwargs) -> Path: - save_path = super().download(*args, **kwargs) - if self.codec not in (Subtitle.Codec.SubRip, Subtitle.Codec.SubStationAlphav4): - caption_set = self.parse(save_path.read_bytes(), self.codec) - self.merge_same_cues(caption_set) - srt = pycaption.SRTWriter().write(caption_set) - # NowTV sometimes has this, when it isn't, causing mux problems - srt = srt.replace("MULTI-LANGUAGE SRT\n", "") - save_path.write_text(srt, encoding="utf8") - self.codec = Subtitle.Codec.SubRip - self.move(self.path.with_suffix(".srt")) - return save_path - def __str__(self) -> str: return " | ".join(filter(bool, [ "SUB", diff --git a/devine/core/tracks/track.py b/devine/core/tracks/track.py index 7cb1486..8cde667 100644 --- a/devine/core/tracks/track.py +++ b/devine/core/tracks/track.py @@ -1,21 +1,17 @@ from __future__ import annotations -import asyncio -import logging import re import shutil import subprocess from enum import Enum from pathlib import Path from typing import Any, Callable, Iterable, Optional, Union -from urllib.parse import urljoin import m3u8 import requests from langcodes import Language from devine.core.constants import TERRITORY_MAP -from devine.core.downloaders import aria2c from devine.core.drm import DRM_T from devine.core.utilities import get_binary_path @@ -133,136 +129,6 @@ class Track: # we only want the first chunk return chunk - def download(self, out: Path, name_template: str = "{type}_{id}", headers: Optional[dict] = None, - proxy: Optional[str] = None) -> Path: - """ - Download the Track and apply any necessary post-edits like Subtitle conversion. - - Parameters: - out: Output Directory Path for the downloaded track. - name_template: Override the default filename template. - Must contain both `{type}` and `{id}` variables. - headers: Headers to use when downloading. - proxy: Proxy to use when downloading. - - Returns: - Where the file was saved, as a Path object. - """ - if out.is_file(): - raise ValueError("Path must be to a directory and not a file") - - log = logging.getLogger("download") - - out.mkdir(parents=True, exist_ok=True) - - file_name = name_template.format( - type=self.__class__.__name__, - id=self.id - ) - - # we must use .mp4 on tracks: - # - as shaka-packager expects mp4 input and mp4 output - # - and mkvtoolnix would try to parse the file in raw-bitstream - save_path = (out / file_name).with_suffix(".mp4") - if self.__class__.__name__ == "Subtitle": - save_path = save_path.with_suffix(f".{self.codec.extension}") - - # these would be files like .decrypted, .repack and such. - # we cannot trust that these files were not interrupted while writing to disc - # lets just delete them before re-attempting a download - for existing_file in save_path.parent.glob(f"{save_path.stem}.*{save_path.suffix}"): - existing_file.unlink() - save_path.with_suffix(".srt").unlink(missing_ok=True) - - if self.descriptor == self.Descriptor.M3U: - master = m3u8.loads( - requests.get( - self.url, - headers=headers, - proxies={"all": proxy} if self.needs_proxy and proxy else None - ).text, - uri=self.url - ) - - if not master.segments: - raise ValueError("Track URI (an M3U8) has no segments...") - - if all(segment.uri == master.segments[0].uri for segment in master.segments): - # all segments use the same file, presumably an EXT-X-BYTERANGE M3U (FUNI) - # TODO: This might be a risky way to deal with these kinds of Playlists - # What if there's an init section, or one segment is reusing a byte-range - segment = master.segments[0] - if not re.match("^https?://", segment.uri): - segment.uri = urljoin(segment.base_uri, segment.uri) - self.url = segment.uri - self.descriptor = self.Descriptor.URL - else: - has_init = False - segments = [] - for segment in master.segments: - # merge base uri with uri where needed in both normal and init segments - if not re.match("^https?://", segment.uri): - segment.uri = segment.base_uri + segment.uri - if segment.init_section and not re.match("^https?://", segment.init_section.uri): - segment.init_section.uri = segment.init_section.base_uri + segment.init_section.uri - - if segment.discontinuity: - has_init = False - - # skip segments we don't want to download (e.g., bumpers, dub cards) - if callable(self.OnSegmentFilter) and self.OnSegmentFilter(segment): - continue - - if segment.init_section and not has_init: - segments.append(segment.init_section.uri) - has_init = True - segments.append(segment.uri) - self.url = list(dict.fromkeys(segments)) - - is_segmented = isinstance(self.url, list) and len(self.url) > 1 - segments_dir = save_path.with_name(save_path.name + "_segments") - - attempts = 1 - while True: - try: - asyncio.run(aria2c( - self.url, - [save_path, segments_dir][is_segmented], - headers, - proxy if self.needs_proxy else None - )) - break - except subprocess.CalledProcessError: - log.info(f" - Download attempt {attempts} failed, {['retrying', 'stopping'][attempts == 3]}...") - if attempts == 3: - raise - attempts += 1 - - if is_segmented: - # merge the segments together - with open(save_path, "wb") as f: - for file in sorted(segments_dir.iterdir()): - data = file.read_bytes() - # fix audio decryption - data = re.sub(b"(tfhd\x00\x02\x00\x1a\x00\x00\x00\x01\x00\x00\x00)\x02", b"\\g<1>\x01", data) - f.write(data) - file.unlink() # delete, we don't need it anymore - segments_dir.rmdir() - - self.path = save_path - - if self.path.stat().st_size <= 3: # Empty UTF-8 BOM == 3 bytes - raise IOError( - "Download failed, the downloaded file is empty. " - f"This {'was' if self.needs_proxy else 'was not'} downloaded with a proxy." + - ( - " Perhaps you need to set `needs_proxy` as True to use the proxy for this track." - if not self.needs_proxy else "" - ) - ) - - return self.path - def delete(self) -> None: if self.path: self.path.unlink()