Decrypt HLS segments in groups based on the init data

This commit is contained in:
rlaphoenix 2023-05-17 08:00:58 +01:00
parent 55a86ac6c9
commit 30ab67343b
1 changed files with 206 additions and 170 deletions

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import logging
import re
import shutil
import sys
import time
from concurrent import futures
@ -10,7 +11,7 @@ from functools import partial
from hashlib import md5
from pathlib import Path
from queue import Queue
from threading import Event, Lock
from threading import Event
from typing import Any, Callable, Optional, Union
import m3u8
@ -214,48 +215,39 @@ class HLS:
log.error("Track's HLS playlist has no segments, expecting an invariant M3U8 playlist.")
sys.exit(1)
if track.drm:
session_drm = track.drm[0] # just use the first supported DRM system for now
if isinstance(session_drm, Widevine):
# license and grab content keys
if not license_widevine:
raise ValueError("license_widevine func must be supplied to use Widevine DRM")
license_widevine(session_drm)
else:
session_drm = None
init_data_groups = HLS.group_segments(master.segments, track, license_widevine, proxy, session)
if skip_event.is_set():
return
progress(total=len(master.segments))
range_offset = Queue(maxsize=1)
range_offset.put(0)
download_sizes = []
download_speed_window = 5
last_speed_refresh = time.time()
segment_key = Queue(maxsize=1)
segment_key.put((session_drm, None))
init_data = Queue(maxsize=1)
init_data.put(None)
range_offset = Queue(maxsize=1)
range_offset.put(0)
drm_lock = Lock()
with ThreadPoolExecutor(max_workers=16) as pool:
for g1, (init_data, segment_groups) in enumerate(init_data_groups):
for g2, (drm, segments) in enumerate(segment_groups):
if not segments:
continue
out_path_root = save_dir / f"group_{g1:{len(str(g1))}}_{g2:{len(str(g2))}}"
out_path_root.mkdir(parents=True, exist_ok=True)
group_save_path = (save_dir / out_path_root.name).with_suffix(".mp4")
for i, download in enumerate(futures.as_completed((
pool.submit(
HLS.download_segment,
segment=segment,
out_path=(save_dir / str(n).zfill(len(str(len(master.segments))))).with_suffix(".mp4"),
out_path=(out_path_root / str(n).zfill(len(str(len(master.segments))))).with_suffix(".mp4"),
track=track,
init_data=init_data,
segment_key=segment_key,
range_offset=range_offset,
drm_lock=drm_lock,
license_widevine=license_widevine,
session=session,
proxy=proxy,
stop_event=stop_event,
skip_event=skip_event
stop_event=stop_event
)
for n, segment in enumerate(master.segments)
for n, segment in enumerate(segments)
))):
try:
download_size = download.result()
@ -279,10 +271,6 @@ class HLS:
# it successfully downloaded, and it was not cancelled
progress(advance=1)
if download_size == -1: # skipped for --skip-dl
progress(downloaded="[yellow]SKIPPING")
continue
now = time.time()
time_since = now - last_speed_refresh
@ -296,10 +284,23 @@ class HLS:
last_speed_refresh = now
download_sizes.clear()
with open(save_path, "wb") as f:
for segment_file in sorted(save_dir.iterdir()):
with open(group_save_path, "wb") as f:
if init_data:
f.write(init_data)
for segment_file in sorted(out_path_root.iterdir()):
f.write(segment_file.read_bytes())
segment_file.unlink()
shutil.rmtree(out_path_root)
if drm:
drm.decrypt(group_save_path)
track.drm = None
if callable(track.OnDecrypted):
track.OnDecrypted(track)
with open(save_path, "wb") as f:
for group_file in sorted(save_dir.iterdir()):
f.write(group_file.read_bytes())
group_file.unlink()
progress(downloaded="Downloaded")
@ -311,43 +312,24 @@ class HLS:
segment: m3u8.Segment,
out_path: Path,
track: AnyTrack,
init_data: Queue,
segment_key: Queue,
range_offset: Queue,
drm_lock: Lock,
license_widevine: Optional[Callable] = None,
session: Optional[Session] = None,
proxy: Optional[str] = None,
stop_event: Optional[Event] = None,
skip_event: Optional[Event] = None
stop_event: Optional[Event] = None
) -> int:
"""
Download (and Decrypt) an HLS Media Segment.
Note: Make sure all Queue objects passed are appropriately initialized with
a starting value or this function may get permanently stuck.
Download an HLS Media Segment.
Parameters:
segment: The m3u8.Segment Object to Download.
out_path: Path to save the downloaded Segment file to.
track: The Track object of which this Segment is for. Currently used to fix an
invalid value in the TFHD box of Audio Tracks, for the OnSegmentFilter, and
for DRM-related operations like getting the Track ID and Decryption.
init_data: Queue for saving and loading the most recent init section data.
segment_key: Queue for saving and loading the most recent DRM object, and it's
adjacent Segment.Key object.
range_offset: Queue for saving and loading the most recent Segment Bytes Range.
drm_lock: Prevent more than one Download from doing anything DRM-related at the
same time. Make sure all calls to download_segment() use the same Lock object.
license_widevine: Function used to license Widevine DRM objects. It must be passed
if the Segment's DRM uses Widevine.
invalid value in the TFHD box of Audio Tracks, and for the Segment Filter.
range_offset: Queue for saving and loading the last-used Byte Range offset.
proxy: Proxy URI to use when downloading the Segment file.
session: Python-Requests Session used when requesting init data.
stop_event: Prematurely stop the Download from beginning. Useful if ran from
a Thread Pool. It will raise a KeyboardInterrupt if set.
skip_event: Prematurely stop the Download from beginning. It returns with a
file size of -1 directly after DRM licensing occurs, even if it's DRM-free.
This is mainly for `--skip-dl` to allow licensing without downloading.
Returns the file size of the downloaded Segment in bytes.
"""
@ -357,59 +339,6 @@ class HLS:
if callable(track.OnSegmentFilter) and track.OnSegmentFilter(segment):
return 0
# handle init section changes
newest_init_data = init_data.get()
try:
if segment.init_section and (not newest_init_data or segment.discontinuity):
# Only use the init data if there's no init data yet (e.g., start of file)
# or if EXT-X-DISCONTINUITY is reached at the same time as EXT-X-MAP.
# Even if a new EXT-X-MAP is supplied, it may just be duplicate and would
# be unnecessary and slow to re-download the init data each time.
if not segment.init_section.uri.startswith(segment.init_section.base_uri):
segment.init_section.uri = segment.init_section.base_uri + segment.init_section.uri
if segment.init_section.byterange:
byte_range = HLS.calculate_byte_range(segment.init_section.byterange)
_ = range_offset.get()
range_offset.put(byte_range.split("-")[0])
range_header = {
"Range": f"bytes={byte_range}"
}
else:
range_header = {}
res = session.get(segment.init_section.uri, headers=range_header)
res.raise_for_status()
newest_init_data = res.content
finally:
init_data.put(newest_init_data)
# handle segment key changes
with drm_lock:
newest_segment_key = segment_key.get()
try:
if segment.keys and newest_segment_key[1] != segment.keys:
drm = HLS.get_drm(
keys=segment.keys,
proxy=proxy
)
if drm:
track.drm = drm
# license and grab content keys
# TODO: What if we don't want to use the first DRM system?
drm = drm[0]
if isinstance(drm, Widevine):
track_kid = track.get_key_id(newest_init_data)
if not license_widevine:
raise ValueError("license_widevine func must be supplied to use Widevine DRM")
license_widevine(drm, track_kid=track_kid)
newest_segment_key = (drm, segment.keys)
finally:
segment_key.put(newest_segment_key)
if skip_event.is_set():
return -1
if not segment.uri.startswith(segment.base_uri):
segment.uri = segment.base_uri + segment.uri
@ -457,21 +386,6 @@ class HLS:
f.seek(0)
f.write(fixed_segment_data)
# prepend the init data to be able to decrypt
if newest_init_data:
with open(out_path, "rb+") as f:
segment_data = f.read()
f.seek(0)
f.write(newest_init_data)
f.write(segment_data)
# decrypt segment if encrypted
if newest_segment_key[0]:
newest_segment_key[0].decrypt(out_path)
track.drm = None
if callable(track.OnDecrypted):
track.OnDecrypted(track)
return download_size
@staticmethod
@ -482,14 +396,17 @@ class HLS:
"""
Convert HLS EXT-X-KEY data to initialized DRM objects.
You can supply key data for a single segment or for the entire manifest.
This lets you narrow the results down to each specific segment's DRM status.
Only EXT-X-KEY methods that are currently supported will be returned.
The rest will simply be ignored, unless none of them were supported at
which it will raise a NotImplementedError.
Returns an empty list if there were no supplied EXT-X-KEY data, or if all the
EXT-X-KEY's were of blank data. An empty list signals a DRM-free stream or segment.
If an EXT-X-KEY with the method `NONE` is passed, then an empty list will
be returned. EXT-X-KEY METHOD=NONE means from hence forth the playlist is
no longer encrypted, unless another EXT-X-KEY METHOD is set later.
Will raise a NotImplementedError if EXT-X-KEY data was supplied and none of them
were supported. A DRM-free track will never raise NotImplementedError.
Parameters:
keys: List of Segment Keys or Playlist Session Keys.
proxy: Proxy URI to use when downloading Clear-Key DRM URIs (e.g. AES-128).
"""
drm = []
unsupported_systems = []
@ -534,5 +451,124 @@ class HLS:
length, offset = parts
return f"{offset}-{offset + length - 1}"
@staticmethod
def group_segments(
segments: m3u8.SegmentList,
track: AnyTrack,
license_widevine: Optional[Callable] = None,
proxy: Optional[str] = None,
session: Optional[Session] = None
) -> list[tuple[Optional[bytes], list[tuple[Optional[DRM_T], list[m3u8.Segment]]]]]:
"""
Group Segments that can be decoded and decrypted with the same information.
It also initializes both the init data and DRM information.
Since HLS allows you to set, remove, or change Segment Keys and Init Data at any
point at a per-segment level, we need a way to know when we have a set of segments
ready to be initialized with the same data, and decrypted with the same key.
One way of doing this is by simply doing a linear comparison and blocking them into
lists, which is exactly what this is doing. It groups segments with matching init
data, then within that list, it groups segments with matching keys:
groups = [(
initData, [(
segmentKey, [
segment, ...
]
), ...]
), ...]
Both initData and segmentKey may be set to `None`. The segmentKey may be set to None
if no EXT-X-KEY has been encountered yet, or if an EXT-X-KEY of METHOD=NONE is
encountered. The initData may be None if the HLS manifest doesn't require or use
init data, e.g., transport stream manifests. It is not an error for either or even
both of them to be None.
Parameters:
segments: A list of Segments from an M3U8.
track: Track to retrieve the initial Session Keys from, and to store the
latest Keys in for Services to use.
license_widevine: Function used to license Widevine DRM objects. It must be passed
if the Segment's DRM uses Widevine.
proxy: Proxy URI to use when downloading Clear-Key DRM URIs (e.g. AES-128) and
when retrieving the Segment's init data.
session: Python-Requests Session used when retrieving the Segment's init data.
"""
current_init_data = None
current_segment_keys = None
current_segment_drm = None
init_data_group_i = 0
drm_key_group_i = 0
# the variant master playlist had session DRM data
if track.drm:
# TODO: What if we don't want to use the first DRM system?
session_drm = track.drm[0]
if isinstance(session_drm, Widevine):
# license and grab content keys
if not license_widevine:
raise ValueError("license_widevine func must be supplied to use Widevine DRM")
license_widevine(session_drm)
current_segment_drm = session_drm
init_data_groups: list[tuple[Optional[bytes], list[tuple[Optional[DRM_T], list[m3u8.Segment]]]]] = \
[(None, [(current_segment_drm, [])])]
for i, segment in enumerate(segments):
if segment.init_section and (current_init_data is None or segment.discontinuity):
# Only use the init data if there's no init data yet (i.e., start of file)
# or if EXT-X-DISCONTINUITY is reached at the same time as EXT-X-MAP.
# Even if a new EXT-X-MAP is supplied, it may just be duplicate and would
# be unnecessary and slow to re-download the init data each time.
if not segment.init_section.uri.startswith(segment.init_section.base_uri):
segment.init_section.uri = segment.init_section.base_uri + segment.init_section.uri
if segment.init_section.byterange:
byte_range = HLS.calculate_byte_range(segment.init_section.byterange)
range_header = {
"Range": f"bytes={byte_range}"
}
else:
range_header = {}
res = session.get(
url=segment.init_section.uri,
headers=range_header,
proxies={"all": proxy} if proxy else None
)
res.raise_for_status()
current_init_data = res.content
init_data_groups.append((current_init_data, [(current_segment_drm, [])]))
init_data_group_i += 1
drm_key_group_i = 0
if segment.keys and current_segment_keys != segment.keys:
current_segment_keys = segment.keys
drm = HLS.get_drm(
keys=current_segment_keys,
proxy=proxy
)
if drm:
track.drm = drm
# license and grab content keys
# TODO: What if we don't want to use the first DRM system?
drm = drm[0]
if isinstance(drm, Widevine):
track_kid = track.get_key_id(current_init_data)
if not license_widevine:
raise ValueError("license_widevine func must be supplied to use Widevine DRM")
license_widevine(drm, track_kid=track_kid)
if current_segment_drm != drm:
current_segment_drm = drm
init_data_groups[init_data_group_i][1].append((current_segment_drm, []))
drm_key_group_i += 1
init_data_groups[init_data_group_i][1][drm_key_group_i][1].append(segment)
return init_data_groups
__ALL__ = (HLS,)