Improve readability and documentation of DASH's to_tracks function

This commit is contained in:
rlaphoenix 2023-05-15 16:19:53 +01:00
parent e079febe79
commit e7dc138c0f
1 changed files with 226 additions and 140 deletions

View File

@ -19,6 +19,7 @@ from uuid import UUID
import requests
from langcodes import Language, tag_is_valid
from lxml.etree import Element
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.pssh import PSSH
from requests import Session
@ -87,12 +88,17 @@ class DASH:
return cls(manifest, url)
def to_tracks(self, language: Union[str, Language], period_filter: Optional[Callable] = None) -> Tracks:
def to_tracks(
self,
language: Optional[Union[str, Language]] = None,
period_filter: Optional[Callable] = None
) -> Tracks:
"""
Convert an MPEG-DASH MPD (Media Presentation Description) document to Video, Audio and Subtitle Track objects.
Convert an MPEG-DASH document to Video, Audio and Subtitle Track objects.
Parameters:
language: Language you expect the Primary Track to be in.
language: The Title's Original Recorded Language. It will also be used as a fallback
track language value if the manifest does not list language information.
period_filter: Filter out period's within the manifest.
All Track URLs will be a list of segment URLs.
@ -104,171 +110,107 @@ class DASH:
continue
for adaptation_set in period.findall("AdaptationSet"):
trick_mode = any(
x.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode"
for x in (
adaptation_set.findall("EssentialProperty") +
adaptation_set.findall("SupplementalProperty")
)
)
if trick_mode:
if self.is_trick_mode(adaptation_set):
# we don't want trick mode streams (they are only used for fast-forward/rewind)
continue
descriptive = any(
(x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "descriptive")
for x in adaptation_set.findall("Accessibility")
) or any(
(x.get("schemeIdUri"), x.get("value")) == ("urn:tva:metadata:cs:AudioPurposeCS:2007", "1")
for x in adaptation_set.findall("Accessibility")
)
forced = any(
x.get("schemeIdUri") == "urn:mpeg:dash:role:2011"
and x.get("value") in ("forced-subtitle", "forced_subtitle")
for x in adaptation_set.findall("Role")
)
cc = any(
(x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption")
for x in adaptation_set.findall("Role")
)
for rep in adaptation_set.findall("Representation"):
codecs = rep.get("codecs") or adaptation_set.get("codecs")
get = partial(self._get, adaptation_set=adaptation_set, representation=rep)
findall = partial(self._findall, adaptation_set=adaptation_set, representation=rep, both=True)
content_type = adaptation_set.get("contentType") or \
adaptation_set.get("mimeType") or \
rep.get("contentType") or \
rep.get("mimeType")
if not content_type:
raise ValueError("No content type value could be found")
content_type = content_type.split("/")[0]
codecs = get("codecs")
content_type = get("contentType")
mime_type = get("mimeType")
if content_type.startswith("image"):
# we don't want what's likely thumbnails for the seekbar
continue
if content_type == "application":
# possibly application/mp4 which could be mp4-boxed subtitles
if not content_type and mime_type:
content_type = mime_type.split("/")[0]
if not content_type and not mime_type:
raise ValueError("Unable to determine the format of a Representation, cannot continue...")
if mime_type == "application/mp4" or content_type == "application":
# likely mp4-boxed subtitles
# TODO: It may not actually be subtitles
try:
Subtitle.Codec.from_mime(codecs)
real_codec = Subtitle.Codec.from_mime(codecs)
content_type = "text"
mime_type = f"application/mp4; codecs='{real_codec.value.lower()}'"
except ValueError:
raise ValueError(f"Unsupported content type '{content_type}' with codecs of '{codecs}'")
if content_type == "text":
mime = adaptation_set.get("mimeType")
if mime and not mime.endswith("/mp4"):
codecs = mime.split("/")[1]
if content_type == "text" and mime_type and "/mp4" not in mime_type:
# mimeType likely specifies the subtitle codec better than `codecs`
codecs = mime_type.split("/")[1]
supplements = rep.findall("SupplementalProperty") + adaptation_set.findall("SupplementalProperty")
joc = next((
x.get("value")
for x in supplements
if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018"
), None)
if rep.get("id") is not None:
rep_id_lang = re.match(r"\w+_(\w+)=\d+", rep.get("id"))
if rep_id_lang:
rep_id_lang = rep_id_lang.group(1)
else:
rep_id_lang = None
track_lang = DASH.get_language(rep.get("lang"), adaptation_set.get("lang"), rep_id_lang, language)
if not track_lang:
raise ValueError(
"One or more Tracks had no Language information. "
"The provided fallback language is not valid or is `None` or `und`."
if content_type == "video":
track_type = Video
track_codec = Video.Codec.from_codecs(codecs)
track_args = dict(
range_=self.get_video_range(
codecs,
findall("SupplementalProperty"),
findall("EssentialProperty")
),
bitrate=get("bandwidth") or None,
width=get("width") or 0,
height=get("height") or 0,
fps=get("frameRate") or rep.find("SegmentBase", {}).get("timescale") or None
)
elif content_type == "audio":
track_type = Audio
track_codec = Audio.Codec.from_codecs(codecs)
track_args = dict(
bitrate=get("bandwidth") or None,
channels=next(iter(
rep.xpath("AudioChannelConfiguration/@value")
or adaptation_set.xpath("AudioChannelConfiguration/@value")
), None),
joc=self.get_ddp_complexity_index(adaptation_set, rep),
descriptive=self.is_descriptive(adaptation_set)
)
elif content_type == "text":
track_type = Subtitle
track_codec = Subtitle.Codec.from_codecs(codecs or "vtt")
track_args = dict(
forced=self.is_forced(adaptation_set),
cc=self.is_closed_caption(adaptation_set)
)
elif content_type == "image":
# we don't want what's likely thumbnails for the seekbar
continue
else:
raise ValueError(f"Unknown Track Type '{content_type}'")
track_lang = self.get_language(adaptation_set, rep, fallback=language)
if not track_lang:
msg = "Language information could not be derived from a Representation."
if language is None:
msg += " No fallback language was provided when calling DASH.to_tracks()."
elif not tag_is_valid((str(language) or "").strip()) or str(language).startswith("und"):
msg += f" The fallback language provided is also invalid: {language}"
raise ValueError(msg)
# for some reason it's incredibly common for services to not provide
# a good and actually unique track ID, sometimes because of the lang
# dialect not being represented in the id, or the bitrate, or such.
# this combines all of them as one and hashes it to keep it small(ish).
track_id = md5("{codec}-{lang}-{bitrate}-{base_url}-{extra}".format(
track_id = md5("{codec}-{lang}-{bitrate}-{base_url}-{ids}-{track_args}".format(
codec=codecs,
lang=track_lang,
bitrate=rep.get("bandwidth") or 0, # subs may not state bandwidth
bitrate=get("bitrate"),
base_url=(rep.findtext("BaseURL") or "").split("?")[0],
extra=(adaptation_set.get("audioTrackId") or "") + (rep.get("id") or "") +
(period.get("id") or "")
ids=[get("audioTrackId"), get("id"), period.get("id")],
track_args=track_args
).encode()).hexdigest()
if content_type == "video":
track_type = Video
track_codec = Video.Codec.from_codecs(codecs)
elif content_type == "audio":
track_type = Audio
track_codec = Audio.Codec.from_codecs(codecs)
elif content_type == "text":
track_type = Subtitle
track_codec = Subtitle.Codec.from_codecs(codecs or "vtt")
else:
raise ValueError(f"Unknown Track Type '{content_type}'")
tracks.add(track_type(
id_=track_id,
url=(self.url, self.manifest, rep, adaptation_set, period),
codec=track_codec,
language=track_lang,
is_original_lang=not track_lang or not language or is_close_match(track_lang, [language]),
is_original_lang=language and is_close_match(track_lang, [language]),
descriptor=Video.Descriptor.MPD,
extra=(rep, adaptation_set),
# video track args
**(dict(
range_=(
Video.Range.DV
if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")) else
Video.Range.from_cicp(
primaries=next((
int(x.get("value"))
for x in (
adaptation_set.findall("SupplementalProperty")
+ adaptation_set.findall("EssentialProperty")
)
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries"
), 0),
transfer=next((
int(x.get("value"))
for x in (
adaptation_set.findall("SupplementalProperty")
+ adaptation_set.findall("EssentialProperty")
)
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics"
), 0),
matrix=next((
int(x.get("value"))
for x in (
adaptation_set.findall("SupplementalProperty")
+ adaptation_set.findall("EssentialProperty")
)
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients"
), 0)
)
),
bitrate=rep.get("bandwidth"),
width=int(rep.get("width") or 0) or adaptation_set.get("width"),
height=int(rep.get("height") or 0) or adaptation_set.get("height"),
fps=(
rep.get("frameRate") or
adaptation_set.get("frameRate") or
(
rep.find("SegmentBase").get("timescale") if
rep.find("SegmentBase") is not None else None
)
)
) if track_type is Video else dict(
bitrate=rep.get("bandwidth"),
channels=next(iter(
rep.xpath("AudioChannelConfiguration/@value")
or adaptation_set.xpath("AudioChannelConfiguration/@value")
), None),
joc=joc,
descriptive=descriptive
) if track_type is Audio else dict(
forced=forced,
cc=cc
) if track_type is Subtitle else {})
**track_args
))
# only get tracks from the first main-content period
@ -590,7 +532,76 @@ class DASH:
save_dir.rmdir()
@staticmethod
def get_language(*options: Any) -> Optional[Language]:
def _get(
item: str,
adaptation_set: Element,
representation: Optional[Element] = None
) -> Optional[Any]:
"""Helper to get a requested item from the Representation, otherwise from the AdaptationSet."""
adaptation_set_item = adaptation_set.get(item)
if representation is None:
return adaptation_set_item
representation_item = representation.get(item)
if representation_item is not None:
return representation_item
return adaptation_set_item
@staticmethod
def _findall(
item: str,
adaptation_set: Element,
representation: Optional[Element] = None,
both: bool = False
) -> list[Any]:
"""
Helper to get all requested items from the Representation, otherwise from the AdaptationSet.
Optionally, you may pass both=True to keep both values (where available).
"""
adaptation_set_items = adaptation_set.findall(item)
if representation is None:
return adaptation_set_items
representation_items = representation.findall(item)
if both:
return representation_items + adaptation_set_items
if representation_items:
return representation_items
return adaptation_set_items
@staticmethod
def get_language(
adaptation_set: Element,
representation: Optional[Element] = None,
fallback: Optional[Union[str, Language]] = None
) -> Optional[Language]:
"""
Get Language (if any) from the AdaptationSet or Representation.
A fallback language may be provided if no language information could be
retrieved.
"""
options = []
if representation is not None:
options.append(representation.get("lang"))
# derive language from somewhat common id string format
# the format is typically "{rep_id}_{lang}={bitrate}" or similar
rep_id = representation.get("id")
if rep_id:
m = re.match(r"\w+_(\w+)=\d+", rep_id)
if m:
options.append(m.group(1))
options.append(adaptation_set.get("lang"))
if fallback:
options.append(fallback)
for option in options:
option = (str(option) or "").strip()
if not tag_is_valid(option) or option.startswith("und"):
@ -598,7 +609,82 @@ class DASH:
return Language.get(option)
@staticmethod
def get_drm(protections) -> list[Widevine]:
def get_video_range(
codecs: str,
all_supplemental_props: list[Element],
all_essential_props: list[Element]
) -> Video.Range:
if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")):
return Video.Range.DV
return Video.Range.from_cicp(
primaries=next((
int(x.get("value"))
for x in all_supplemental_props + all_essential_props
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:ColourPrimaries"
), 0),
transfer=next((
int(x.get("value"))
for x in all_supplemental_props + all_essential_props
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:TransferCharacteristics"
), 0),
matrix=next((
int(x.get("value"))
for x in all_supplemental_props + all_essential_props
if x.get("schemeIdUri") == "urn:mpeg:mpegB:cicp:MatrixCoefficients"
), 0)
)
@staticmethod
def is_trick_mode(adaptation_set: Element) -> bool:
"""Check if contents of Adaptation Set is a Trick-Mode stream."""
essential_props = adaptation_set.findall("EssentialProperty")
supplemental_props = adaptation_set.findall("SupplementalProperty")
return any(
prop.get("schemeIdUri") == "http://dashif.org/guidelines/trickmode"
for prop in essential_props + supplemental_props
)
@staticmethod
def is_descriptive(adaptation_set: Element) -> bool:
"""Check if contents of Adaptation Set is Descriptive."""
return any(
(x.get("schemeIdUri"), x.get("value")) in (
("urn:mpeg:dash:role:2011", "descriptive"),
("urn:tva:metadata:cs:AudioPurposeCS:2007", "1")
)
for x in adaptation_set.findall("Accessibility")
)
@staticmethod
def is_forced(adaptation_set: Element) -> bool:
"""Check if contents of Adaptation Set is a Forced Subtitle."""
return any(
x.get("schemeIdUri") == "urn:mpeg:dash:role:2011"
and x.get("value") in ("forced-subtitle", "forced_subtitle")
for x in adaptation_set.findall("Role")
)
@staticmethod
def is_closed_caption(adaptation_set: Element) -> bool:
"""Check if contents of Adaptation Set is a Closed Caption Subtitle."""
return any(
(x.get("schemeIdUri"), x.get("value")) == ("urn:mpeg:dash:role:2011", "caption")
for x in adaptation_set.findall("Role")
)
@staticmethod
def get_ddp_complexity_index(adaptation_set: Element, representation: Optional[Element]) -> Optional[int]:
"""Get the DD+ Complexity Index (if any) from the AdaptationSet or Representation."""
return next((
int(x.get("value"))
for x in DASH._findall("SupplementalProperty", adaptation_set, representation, both=True)
if x.get("schemeIdUri") == "tag:dolby.com,2018:dash:EC3_ExtensionComplexityIndex:2018"
), None)
@staticmethod
def get_drm(protections: list[Element]) -> list[Widevine]:
drm = []
for protection in protections: