From e7dc138c0f4b207d82f1c4a7d856e7f346c1b96f Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Mon, 15 May 2023 16:19:53 +0100 Subject: [PATCH] Improve readability and documentation of DASH's to_tracks function --- devine/core/manifests/dash.py | 366 +++++++++++++++++++++------------- 1 file changed, 226 insertions(+), 140 deletions(-) diff --git a/devine/core/manifests/dash.py b/devine/core/manifests/dash.py index 09cd7ad..8087616 100644 --- a/devine/core/manifests/dash.py +++ b/devine/core/manifests/dash.py @@ -19,6 +19,7 @@ from uuid import UUID import requests from langcodes import Language, tag_is_valid +from lxml.etree import Element from pywidevine.cdm import Cdm as WidevineCdm from pywidevine.pssh import PSSH from requests import Session @@ -87,12 +88,17 @@ class DASH: return cls(manifest, url) - def to_tracks(self, language: Union[str, Language], period_filter: Optional[Callable] = None) -> Tracks: + def to_tracks( + self, + language: Optional[Union[str, Language]] = None, + period_filter: Optional[Callable] = None + ) -> Tracks: """ - Convert an MPEG-DASH MPD (Media Presentation Description) document to Video, Audio and Subtitle Track objects. + Convert an MPEG-DASH document to Video, Audio and Subtitle Track objects. Parameters: - language: Language you expect the Primary Track to be in. + language: The Title's Original Recorded Language. It will also be used as a fallback + track language value if the manifest does not list language information. period_filter: Filter out period's within the manifest. All Track URLs will be a list of segment URLs. @@ -104,171 +110,107 @@ class DASH: continue for adaptation_set in period.findall("AdaptationSet"): - trick_mode = any( - x.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode" - for x in ( - adaptation_set.findall("EssentialProperty") + - adaptation_set.findall("SupplementalProperty") - ) - ) - if trick_mode: + if self.is_trick_mode(adaptation_set): # we don't want trick mode streams (they are only used for fast-forward/rewind) continue - descriptive = any( - (x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "descriptive") - for x in adaptation_set.findall("Accessibility") - ) or any( - (x.get("schemeIdUri"), x.get("value")) == ("urn:tva:metadata:cs:AudioPurposeCS:2007", "1") - for x in adaptation_set.findall("Accessibility") - ) - forced = any( - x.get("schemeIdUri") == "urn:mpeg:dash:role:2011" - and x.get("value") in ("forced-subtitle", "forced_subtitle") - for x in adaptation_set.findall("Role") - ) - cc = any( - (x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption") - for x in adaptation_set.findall("Role") - ) - for rep in adaptation_set.findall("Representation"): - codecs = rep.get("codecs") or adaptation_set.get("codecs") + get = partial(self._get, adaptation_set=adaptation_set, representation=rep) + findall = partial(self._findall, adaptation_set=adaptation_set, representation=rep, both=True) - content_type = adaptation_set.get("contentType") or \ - adaptation_set.get("mimeType") or \ - rep.get("contentType") or \ - rep.get("mimeType") - if not content_type: - raise ValueError("No content type value could be found") - content_type = content_type.split("/")[0] + codecs = get("codecs") + content_type = get("contentType") + mime_type = get("mimeType") - if content_type.startswith("image"): - # we don't want what's likely thumbnails for the seekbar - continue - if content_type == "application": - # possibly application/mp4 which could be mp4-boxed subtitles + if not content_type and mime_type: + content_type = mime_type.split("/")[0] + if not content_type and not mime_type: + raise ValueError("Unable to determine the format of a Representation, cannot continue...") + + if mime_type == "application/mp4" or content_type == "application": + # likely mp4-boxed subtitles + # TODO: It may not actually be subtitles try: - Subtitle.Codec.from_mime(codecs) + real_codec = Subtitle.Codec.from_mime(codecs) content_type = "text" + mime_type = f"application/mp4; codecs='{real_codec.value.lower()}'" except ValueError: raise ValueError(f"Unsupported content type '{content_type}' with codecs of '{codecs}'") - if content_type == "text": - mime = adaptation_set.get("mimeType") - if mime and not mime.endswith("/mp4"): - codecs = mime.split("/")[1] + if content_type == "text" and mime_type and "/mp4" not in mime_type: + # mimeType likely specifies the subtitle codec better than `codecs` + codecs = mime_type.split("/")[1] - supplements = rep.findall("SupplementalProperty") + adaptation_set.findall("SupplementalProperty") - - joc = next(( - x.get("value") - for x in supplements - if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018" - ), None) - - if rep.get("id") is not None: - rep_id_lang = re.match(r"\w+_(\w+)=\d+", rep.get("id")) - if rep_id_lang: - rep_id_lang = rep_id_lang.group(1) - else: - rep_id_lang = None - - track_lang = DASH.get_language(rep.get("lang"), adaptation_set.get("lang"), rep_id_lang, language) - if not track_lang: - raise ValueError( - "One or more Tracks had no Language information. " - "The provided fallback language is not valid or is `None` or `und`." + if content_type == "video": + track_type = Video + track_codec = Video.Codec.from_codecs(codecs) + track_args = dict( + range_=self.get_video_range( + codecs, + findall("SupplementalProperty"), + findall("EssentialProperty") + ), + bitrate=get("bandwidth") or None, + width=get("width") or 0, + height=get("height") or 0, + fps=get("frameRate") or rep.find("SegmentBase", {}).get("timescale") or None ) + elif content_type == "audio": + track_type = Audio + track_codec = Audio.Codec.from_codecs(codecs) + track_args = dict( + bitrate=get("bandwidth") or None, + channels=next(iter( + rep.xpath("AudioChannelConfiguration/@value") + or adaptation_set.xpath("AudioChannelConfiguration/@value") + ), None), + joc=self.get_ddp_complexity_index(adaptation_set, rep), + descriptive=self.is_descriptive(adaptation_set) + ) + elif content_type == "text": + track_type = Subtitle + track_codec = Subtitle.Codec.from_codecs(codecs or "vtt") + track_args = dict( + forced=self.is_forced(adaptation_set), + cc=self.is_closed_caption(adaptation_set) + ) + elif content_type == "image": + # we don't want what's likely thumbnails for the seekbar + continue + else: + raise ValueError(f"Unknown Track Type '{content_type}'") + + track_lang = self.get_language(adaptation_set, rep, fallback=language) + if not track_lang: + msg = "Language information could not be derived from a Representation." + if language is None: + msg += " No fallback language was provided when calling DASH.to_tracks()." + elif not tag_is_valid((str(language) or "").strip()) or str(language).startswith("und"): + msg += f" The fallback language provided is also invalid: {language}" + raise ValueError(msg) # for some reason it's incredibly common for services to not provide # a good and actually unique track ID, sometimes because of the lang # dialect not being represented in the id, or the bitrate, or such. # this combines all of them as one and hashes it to keep it small(ish). - track_id = md5("{codec}-{lang}-{bitrate}-{base_url}-{extra}".format( + track_id = md5("{codec}-{lang}-{bitrate}-{base_url}-{ids}-{track_args}".format( codec=codecs, lang=track_lang, - bitrate=rep.get("bandwidth") or 0, # subs may not state bandwidth + bitrate=get("bitrate"), base_url=(rep.findtext("BaseURL") or "").split("?")[0], - extra=(adaptation_set.get("audioTrackId") or "") + (rep.get("id") or "") + - (period.get("id") or "") + ids=[get("audioTrackId"), get("id"), period.get("id")], + track_args=track_args ).encode()).hexdigest() - if content_type == "video": - track_type = Video - track_codec = Video.Codec.from_codecs(codecs) - elif content_type == "audio": - track_type = Audio - track_codec = Audio.Codec.from_codecs(codecs) - elif content_type == "text": - track_type = Subtitle - track_codec = Subtitle.Codec.from_codecs(codecs or "vtt") - else: - raise ValueError(f"Unknown Track Type '{content_type}'") - tracks.add(track_type( id_=track_id, url=(self.url, self.manifest, rep, adaptation_set, period), codec=track_codec, language=track_lang, - is_original_lang=not track_lang or not language or is_close_match(track_lang, [language]), + is_original_lang=language and is_close_match(track_lang, [language]), descriptor=Video.Descriptor.MPD, extra=(rep, adaptation_set), - # video track args - **(dict( - range_=( - Video.Range.DV - if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")) else - Video.Range.from_cicp( - primaries=next(( - int(x.get("value")) - for x in ( - adaptation_set.findall("SupplementalProperty") - + adaptation_set.findall("EssentialProperty") - ) - if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries" - ), 0), - transfer=next(( - int(x.get("value")) - for x in ( - adaptation_set.findall("SupplementalProperty") - + adaptation_set.findall("EssentialProperty") - ) - if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics" - ), 0), - matrix=next(( - int(x.get("value")) - for x in ( - adaptation_set.findall("SupplementalProperty") - + adaptation_set.findall("EssentialProperty") - ) - if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients" - ), 0) - ) - ), - bitrate=rep.get("bandwidth"), - width=int(rep.get("width") or 0) or adaptation_set.get("width"), - height=int(rep.get("height") or 0) or adaptation_set.get("height"), - fps=( - rep.get("frameRate") or - adaptation_set.get("frameRate") or - ( - rep.find("SegmentBase").get("timescale") if - rep.find("SegmentBase") is not None else None - ) - ) - ) if track_type is Video else dict( - bitrate=rep.get("bandwidth"), - channels=next(iter( - rep.xpath("AudioChannelConfiguration/@value") - or adaptation_set.xpath("AudioChannelConfiguration/@value") - ), None), - joc=joc, - descriptive=descriptive - ) if track_type is Audio else dict( - forced=forced, - cc=cc - ) if track_type is Subtitle else {}) + **track_args )) # only get tracks from the first main-content period @@ -590,7 +532,76 @@ class DASH: save_dir.rmdir() @staticmethod - def get_language(*options: Any) -> Optional[Language]: + def _get( + item: str, + adaptation_set: Element, + representation: Optional[Element] = None + ) -> Optional[Any]: + """Helper to get a requested item from the Representation, otherwise from the AdaptationSet.""" + adaptation_set_item = adaptation_set.get(item) + if representation is None: + return adaptation_set_item + + representation_item = representation.get(item) + if representation_item is not None: + return representation_item + + return adaptation_set_item + + @staticmethod + def _findall( + item: str, + adaptation_set: Element, + representation: Optional[Element] = None, + both: bool = False + ) -> list[Any]: + """ + Helper to get all requested items from the Representation, otherwise from the AdaptationSet. + Optionally, you may pass both=True to keep both values (where available). + """ + adaptation_set_items = adaptation_set.findall(item) + if representation is None: + return adaptation_set_items + + representation_items = representation.findall(item) + + if both: + return representation_items + adaptation_set_items + + if representation_items: + return representation_items + + return adaptation_set_items + + @staticmethod + def get_language( + adaptation_set: Element, + representation: Optional[Element] = None, + fallback: Optional[Union[str, Language]] = None + ) -> Optional[Language]: + """ + Get Language (if any) from the AdaptationSet or Representation. + + A fallback language may be provided if no language information could be + retrieved. + """ + options = [] + + if representation is not None: + options.append(representation.get("lang")) + # derive language from somewhat common id string format + # the format is typically "{rep_id}_{lang}={bitrate}" or similar + rep_id = representation.get("id") + if rep_id: + m = re.match(r"\w+_(\w+)=\d+", rep_id) + if m: + options.append(m.group(1)) + + options.append(adaptation_set.get("lang")) + + if fallback: + options.append(fallback) + for option in options: option = (str(option) or "").strip() if not tag_is_valid(option) or option.startswith("und"): @@ -598,7 +609,82 @@ class DASH: return Language.get(option) @staticmethod - def get_drm(protections) -> list[Widevine]: + def get_video_range( + codecs: str, + all_supplemental_props: list[Element], + all_essential_props: list[Element] + ) -> Video.Range: + if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")): + return Video.Range.DV + + return Video.Range.from_cicp( + primaries=next(( + int(x.get("value")) + for x in all_supplemental_props + all_essential_props + if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries" + ), 0), + transfer=next(( + int(x.get("value")) + for x in all_supplemental_props + all_essential_props + if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics" + ), 0), + matrix=next(( + int(x.get("value")) + for x in all_supplemental_props + all_essential_props + if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients" + ), 0) + ) + + @staticmethod + def is_trick_mode(adaptation_set: Element) -> bool: + """Check if contents of Adaptation Set is a Trick-Mode stream.""" + essential_props = adaptation_set.findall("EssentialProperty") + supplemental_props = adaptation_set.findall("SupplementalProperty") + + return any( + prop.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode" + for prop in essential_props + supplemental_props + ) + + @staticmethod + def is_descriptive(adaptation_set: Element) -> bool: + """Check if contents of Adaptation Set is Descriptive.""" + return any( + (x.get("schemeIdUri"), x.get("value")) in ( + ("urn:mpeg:dash:role:2011", "descriptive"), + ("urn:tva:metadata:cs:AudioPurposeCS:2007", "1") + ) + for x in adaptation_set.findall("Accessibility") + ) + + @staticmethod + def is_forced(adaptation_set: Element) -> bool: + """Check if contents of Adaptation Set is a Forced Subtitle.""" + return any( + x.get("schemeIdUri") == "urn:mpeg:dash:role:2011" + and x.get("value") in ("forced-subtitle", "forced_subtitle") + for x in adaptation_set.findall("Role") + ) + + @staticmethod + def is_closed_caption(adaptation_set: Element) -> bool: + """Check if contents of Adaptation Set is a Closed Caption Subtitle.""" + return any( + (x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption") + for x in adaptation_set.findall("Role") + ) + + @staticmethod + def get_ddp_complexity_index(adaptation_set: Element, representation: Optional[Element]) -> Optional[int]: + """Get the DD+ Complexity Index (if any) from the AdaptationSet or Representation.""" + return next(( + int(x.get("value")) + for x in DASH._findall("SupplementalProperty", adaptation_set, representation, both=True) + if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018" + ), None) + + @staticmethod + def get_drm(protections: list[Element]) -> list[Widevine]: drm = [] for protection in protections: