From 30ab67343b25d2b588a7b7ed6d1959eb88998fe3 Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Wed, 17 May 2023 08:00:58 +0100 Subject: [PATCH] Decrypt HLS segments in groups based on the init data --- devine/core/manifests/hls.py | 376 +++++++++++++++++++---------------- 1 file changed, 206 insertions(+), 170 deletions(-) diff --git a/devine/core/manifests/hls.py b/devine/core/manifests/hls.py index 6e85f79..6acd27b 100644 --- a/devine/core/manifests/hls.py +++ b/devine/core/manifests/hls.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging import re +import shutil import sys import time from concurrent import futures @@ -10,7 +11,7 @@ from functools import partial from hashlib import md5 from pathlib import Path from queue import Queue -from threading import Event, Lock +from threading import Event from typing import Any, Callable, Optional, Union import m3u8 @@ -214,92 +215,92 @@ class HLS: log.error("Track's HLS playlist has no segments, expecting an invariant M3U8 playlist.") sys.exit(1) - if track.drm: - session_drm = track.drm[0] # just use the first supported DRM system for now - if isinstance(session_drm, Widevine): - # license and grab content keys - if not license_widevine: - raise ValueError("license_widevine func must be supplied to use Widevine DRM") - license_widevine(session_drm) - else: - session_drm = None + init_data_groups = HLS.group_segments(master.segments, track, license_widevine, proxy, session) + + if skip_event.is_set(): + return progress(total=len(master.segments)) + range_offset = Queue(maxsize=1) + range_offset.put(0) download_sizes = [] download_speed_window = 5 last_speed_refresh = time.time() - segment_key = Queue(maxsize=1) - segment_key.put((session_drm, None)) - init_data = Queue(maxsize=1) - init_data.put(None) - range_offset = Queue(maxsize=1) - range_offset.put(0) - drm_lock = Lock() - with ThreadPoolExecutor(max_workers=16) as pool: - for i, download in enumerate(futures.as_completed(( - pool.submit( - HLS.download_segment, - segment=segment, - out_path=(save_dir / str(n).zfill(len(str(len(master.segments))))).with_suffix(".mp4"), - track=track, - init_data=init_data, - segment_key=segment_key, - range_offset=range_offset, - drm_lock=drm_lock, - license_widevine=license_widevine, - session=session, - proxy=proxy, - stop_event=stop_event, - skip_event=skip_event - ) - for n, segment in enumerate(master.segments) - ))): - try: - download_size = download.result() - except KeyboardInterrupt: - stop_event.set() # skip pending track downloads - progress(downloaded="[yellow]STOPPING") - pool.shutdown(wait=True, cancel_futures=True) - progress(downloaded="[yellow]STOPPED") - # tell dl that it was cancelled - # the pool is already shut down, so exiting loop is fine - raise - except Exception as e: - stop_event.set() # skip pending track downloads - progress(downloaded="[red]FAILING") - pool.shutdown(wait=True, cancel_futures=True) - progress(downloaded="[red]FAILED") - # tell dl that it failed - # the pool is already shut down, so exiting loop is fine - raise e - else: - # it successfully downloaded, and it was not cancelled - progress(advance=1) - - if download_size == -1: # skipped for --skip-dl - progress(downloaded="[yellow]SKIPPING") + for g1, (init_data, segment_groups) in enumerate(init_data_groups): + for g2, (drm, segments) in enumerate(segment_groups): + if not segments: continue + out_path_root = save_dir / f"group_{g1:{len(str(g1))}}_{g2:{len(str(g2))}}" + out_path_root.mkdir(parents=True, exist_ok=True) + group_save_path = (save_dir / out_path_root.name).with_suffix(".mp4") + for i, download in enumerate(futures.as_completed(( + pool.submit( + HLS.download_segment, + segment=segment, + out_path=(out_path_root / str(n).zfill(len(str(len(master.segments))))).with_suffix(".mp4"), + track=track, + range_offset=range_offset, + session=session, + proxy=proxy, + stop_event=stop_event + ) + for n, segment in enumerate(segments) + ))): + try: + download_size = download.result() + except KeyboardInterrupt: + stop_event.set() # skip pending track downloads + progress(downloaded="[yellow]STOPPING") + pool.shutdown(wait=True, cancel_futures=True) + progress(downloaded="[yellow]STOPPED") + # tell dl that it was cancelled + # the pool is already shut down, so exiting loop is fine + raise + except Exception as e: + stop_event.set() # skip pending track downloads + progress(downloaded="[red]FAILING") + pool.shutdown(wait=True, cancel_futures=True) + progress(downloaded="[red]FAILED") + # tell dl that it failed + # the pool is already shut down, so exiting loop is fine + raise e + else: + # it successfully downloaded, and it was not cancelled + progress(advance=1) - now = time.time() - time_since = now - last_speed_refresh + now = time.time() + time_since = now - last_speed_refresh - if download_size: # no size == skipped dl - download_sizes.append(download_size) + if download_size: # no size == skipped dl + download_sizes.append(download_size) - if download_sizes and (time_since > download_speed_window or i == len(master.segments)): - data_size = sum(download_sizes) - download_speed = data_size / (time_since or 1) - progress(downloaded=f"HLS {filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() + if download_sizes and (time_since > download_speed_window or i == len(master.segments)): + data_size = sum(download_sizes) + download_speed = data_size / (time_since or 1) + progress(downloaded=f"HLS {filesize.decimal(download_speed)}/s") + last_speed_refresh = now + download_sizes.clear() + + with open(group_save_path, "wb") as f: + if init_data: + f.write(init_data) + for segment_file in sorted(out_path_root.iterdir()): + f.write(segment_file.read_bytes()) + shutil.rmtree(out_path_root) + + if drm: + drm.decrypt(group_save_path) + track.drm = None + if callable(track.OnDecrypted): + track.OnDecrypted(track) with open(save_path, "wb") as f: - for segment_file in sorted(save_dir.iterdir()): - f.write(segment_file.read_bytes()) - segment_file.unlink() + for group_file in sorted(save_dir.iterdir()): + f.write(group_file.read_bytes()) + group_file.unlink() progress(downloaded="Downloaded") @@ -311,43 +312,24 @@ class HLS: segment: m3u8.Segment, out_path: Path, track: AnyTrack, - init_data: Queue, - segment_key: Queue, range_offset: Queue, - drm_lock: Lock, - license_widevine: Optional[Callable] = None, session: Optional[Session] = None, proxy: Optional[str] = None, - stop_event: Optional[Event] = None, - skip_event: Optional[Event] = None + stop_event: Optional[Event] = None ) -> int: """ - Download (and Decrypt) an HLS Media Segment. - - Note: Make sure all Queue objects passed are appropriately initialized with - a starting value or this function may get permanently stuck. + Download an HLS Media Segment. Parameters: segment: The m3u8.Segment Object to Download. out_path: Path to save the downloaded Segment file to. track: The Track object of which this Segment is for. Currently used to fix an - invalid value in the TFHD box of Audio Tracks, for the OnSegmentFilter, and - for DRM-related operations like getting the Track ID and Decryption. - init_data: Queue for saving and loading the most recent init section data. - segment_key: Queue for saving and loading the most recent DRM object, and it's - adjacent Segment.Key object. - range_offset: Queue for saving and loading the most recent Segment Bytes Range. - drm_lock: Prevent more than one Download from doing anything DRM-related at the - same time. Make sure all calls to download_segment() use the same Lock object. - license_widevine: Function used to license Widevine DRM objects. It must be passed - if the Segment's DRM uses Widevine. + invalid value in the TFHD box of Audio Tracks, and for the Segment Filter. + range_offset: Queue for saving and loading the last-used Byte Range offset. proxy: Proxy URI to use when downloading the Segment file. session: Python-Requests Session used when requesting init data. stop_event: Prematurely stop the Download from beginning. Useful if ran from a Thread Pool. It will raise a KeyboardInterrupt if set. - skip_event: Prematurely stop the Download from beginning. It returns with a - file size of -1 directly after DRM licensing occurs, even if it's DRM-free. - This is mainly for `--skip-dl` to allow licensing without downloading. Returns the file size of the downloaded Segment in bytes. """ @@ -357,59 +339,6 @@ class HLS: if callable(track.OnSegmentFilter) and track.OnSegmentFilter(segment): return 0 - # handle init section changes - newest_init_data = init_data.get() - try: - if segment.init_section and (not newest_init_data or segment.discontinuity): - # Only use the init data if there's no init data yet (e.g., start of file) - # or if EXT-X-DISCONTINUITY is reached at the same time as EXT-X-MAP. - # Even if a new EXT-X-MAP is supplied, it may just be duplicate and would - # be unnecessary and slow to re-download the init data each time. - if not segment.init_section.uri.startswith(segment.init_section.base_uri): - segment.init_section.uri = segment.init_section.base_uri + segment.init_section.uri - - if segment.init_section.byterange: - byte_range = HLS.calculate_byte_range(segment.init_section.byterange) - _ = range_offset.get() - range_offset.put(byte_range.split("-")[0]) - range_header = { - "Range": f"bytes={byte_range}" - } - else: - range_header = {} - - res = session.get(segment.init_section.uri, headers=range_header) - res.raise_for_status() - newest_init_data = res.content - finally: - init_data.put(newest_init_data) - - # handle segment key changes - with drm_lock: - newest_segment_key = segment_key.get() - try: - if segment.keys and newest_segment_key[1] != segment.keys: - drm = HLS.get_drm( - keys=segment.keys, - proxy=proxy - ) - if drm: - track.drm = drm - # license and grab content keys - # TODO: What if we don't want to use the first DRM system? - drm = drm[0] - if isinstance(drm, Widevine): - track_kid = track.get_key_id(newest_init_data) - if not license_widevine: - raise ValueError("license_widevine func must be supplied to use Widevine DRM") - license_widevine(drm, track_kid=track_kid) - newest_segment_key = (drm, segment.keys) - finally: - segment_key.put(newest_segment_key) - - if skip_event.is_set(): - return -1 - if not segment.uri.startswith(segment.base_uri): segment.uri = segment.base_uri + segment.uri @@ -457,21 +386,6 @@ class HLS: f.seek(0) f.write(fixed_segment_data) - # prepend the init data to be able to decrypt - if newest_init_data: - with open(out_path, "rb+") as f: - segment_data = f.read() - f.seek(0) - f.write(newest_init_data) - f.write(segment_data) - - # decrypt segment if encrypted - if newest_segment_key[0]: - newest_segment_key[0].decrypt(out_path) - track.drm = None - if callable(track.OnDecrypted): - track.OnDecrypted(track) - return download_size @staticmethod @@ -482,14 +396,17 @@ class HLS: """ Convert HLS EXT-X-KEY data to initialized DRM objects. - You can supply key data for a single segment or for the entire manifest. - This lets you narrow the results down to each specific segment's DRM status. + Only EXT-X-KEY methods that are currently supported will be returned. + The rest will simply be ignored, unless none of them were supported at + which it will raise a NotImplementedError. - Returns an empty list if there were no supplied EXT-X-KEY data, or if all the - EXT-X-KEY's were of blank data. An empty list signals a DRM-free stream or segment. + If an EXT-X-KEY with the method `NONE` is passed, then an empty list will + be returned. EXT-X-KEY METHOD=NONE means from hence forth the playlist is + no longer encrypted, unless another EXT-X-KEY METHOD is set later. - Will raise a NotImplementedError if EXT-X-KEY data was supplied and none of them - were supported. A DRM-free track will never raise NotImplementedError. + Parameters: + keys: List of Segment Keys or Playlist Session Keys. + proxy: Proxy URI to use when downloading Clear-Key DRM URIs (e.g. AES-128). """ drm = [] unsupported_systems = [] @@ -534,5 +451,124 @@ class HLS: length, offset = parts return f"{offset}-{offset + length - 1}" + @staticmethod + def group_segments( + segments: m3u8.SegmentList, + track: AnyTrack, + license_widevine: Optional[Callable] = None, + proxy: Optional[str] = None, + session: Optional[Session] = None + ) -> list[tuple[Optional[bytes], list[tuple[Optional[DRM_T], list[m3u8.Segment]]]]]: + """ + Group Segments that can be decoded and decrypted with the same information. + It also initializes both the init data and DRM information. + + Since HLS allows you to set, remove, or change Segment Keys and Init Data at any + point at a per-segment level, we need a way to know when we have a set of segments + ready to be initialized with the same data, and decrypted with the same key. + + One way of doing this is by simply doing a linear comparison and blocking them into + lists, which is exactly what this is doing. It groups segments with matching init + data, then within that list, it groups segments with matching keys: + + groups = [( + initData, [( + segmentKey, [ + segment, ... + ] + ), ...] + ), ...] + + Both initData and segmentKey may be set to `None`. The segmentKey may be set to None + if no EXT-X-KEY has been encountered yet, or if an EXT-X-KEY of METHOD=NONE is + encountered. The initData may be None if the HLS manifest doesn't require or use + init data, e.g., transport stream manifests. It is not an error for either or even + both of them to be None. + + Parameters: + segments: A list of Segments from an M3U8. + track: Track to retrieve the initial Session Keys from, and to store the + latest Keys in for Services to use. + license_widevine: Function used to license Widevine DRM objects. It must be passed + if the Segment's DRM uses Widevine. + proxy: Proxy URI to use when downloading Clear-Key DRM URIs (e.g. AES-128) and + when retrieving the Segment's init data. + session: Python-Requests Session used when retrieving the Segment's init data. + """ + current_init_data = None + current_segment_keys = None + current_segment_drm = None + + init_data_group_i = 0 + drm_key_group_i = 0 + + # the variant master playlist had session DRM data + if track.drm: + # TODO: What if we don't want to use the first DRM system? + session_drm = track.drm[0] + if isinstance(session_drm, Widevine): + # license and grab content keys + if not license_widevine: + raise ValueError("license_widevine func must be supplied to use Widevine DRM") + license_widevine(session_drm) + current_segment_drm = session_drm + + init_data_groups: list[tuple[Optional[bytes], list[tuple[Optional[DRM_T], list[m3u8.Segment]]]]] = \ + [(None, [(current_segment_drm, [])])] + + for i, segment in enumerate(segments): + if segment.init_section and (current_init_data is None or segment.discontinuity): + # Only use the init data if there's no init data yet (i.e., start of file) + # or if EXT-X-DISCONTINUITY is reached at the same time as EXT-X-MAP. + # Even if a new EXT-X-MAP is supplied, it may just be duplicate and would + # be unnecessary and slow to re-download the init data each time. + if not segment.init_section.uri.startswith(segment.init_section.base_uri): + segment.init_section.uri = segment.init_section.base_uri + segment.init_section.uri + + if segment.init_section.byterange: + byte_range = HLS.calculate_byte_range(segment.init_section.byterange) + range_header = { + "Range": f"bytes={byte_range}" + } + else: + range_header = {} + + res = session.get( + url=segment.init_section.uri, + headers=range_header, + proxies={"all": proxy} if proxy else None + ) + res.raise_for_status() + current_init_data = res.content + + init_data_groups.append((current_init_data, [(current_segment_drm, [])])) + init_data_group_i += 1 + drm_key_group_i = 0 + + if segment.keys and current_segment_keys != segment.keys: + current_segment_keys = segment.keys + drm = HLS.get_drm( + keys=current_segment_keys, + proxy=proxy + ) + if drm: + track.drm = drm + # license and grab content keys + # TODO: What if we don't want to use the first DRM system? + drm = drm[0] + if isinstance(drm, Widevine): + track_kid = track.get_key_id(current_init_data) + if not license_widevine: + raise ValueError("license_widevine func must be supplied to use Widevine DRM") + license_widevine(drm, track_kid=track_kid) + if current_segment_drm != drm: + current_segment_drm = drm + init_data_groups[init_data_group_i][1].append((current_segment_drm, [])) + drm_key_group_i += 1 + + init_data_groups[init_data_group_i][1][drm_key_group_i][1].append(segment) + + return init_data_groups + __ALL__ = (HLS,)