Merge HLS segments first by discontinuity then via FFmpeg

HLS playlists where each segment is in an mp4 container seems to corrupt when the EXT-X-MAP is changed out, unless you first merge segments by discontinuity and then merge the merges via FFmpeg (which demuxes all the merged segment continuities and then concatanates them together, probably giving it new init data too).
This commit is contained in:
rlaphoenix 2024-02-09 07:44:27 +00:00
parent 167b45475e
commit a544b1e867
1 changed files with 131 additions and 67 deletions

View File

@ -3,6 +3,8 @@ from __future__ import annotations
import html import html
import logging import logging
import re import re
import shutil
import subprocess
import sys import sys
import time import time
from concurrent import futures from concurrent import futures
@ -29,7 +31,7 @@ from devine.core.downloaders import downloader
from devine.core.downloaders import requests as requests_downloader from devine.core.downloaders import requests as requests_downloader
from devine.core.drm import DRM_T, ClearKey, Widevine from devine.core.drm import DRM_T, ClearKey, Widevine
from devine.core.tracks import Audio, Subtitle, Tracks, Video from devine.core.tracks import Audio, Subtitle, Tracks, Video
from devine.core.utilities import is_close_match, try_ensure_utf8 from devine.core.utilities import get_binary_path, is_close_match, try_ensure_utf8
class HLS: class HLS:
@ -250,84 +252,115 @@ class HLS:
range_offset.put(0) range_offset.put(0)
drm_lock = Lock() drm_lock = Lock()
with ThreadPoolExecutor(max_workers=16) as pool: discontinuities: list[list[segment]] = []
for i, download in enumerate(futures.as_completed(( discontinuity_index = -1
pool.submit( for i, segment in enumerate(master.segments):
HLS.download_segment, if i == 0 or segment.discontinuity:
segment=segment, discontinuity_index += 1
out_path=(save_dir / str(n).zfill(len(str(len(master.segments))))).with_suffix(".mp4"), discontinuities.append([])
track=track, discontinuities[discontinuity_index].append(segment)
init_data=init_data,
segment_key=segment_key,
range_offset=range_offset,
drm_lock=drm_lock,
progress=progress,
license_widevine=license_widevine,
session=session,
proxy=proxy
)
for n, segment in enumerate(master.segments)
))):
try:
download_size = download.result()
except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
progress(downloaded="[yellow]CANCELLING")
pool.shutdown(wait=True, cancel_futures=True)
progress(downloaded="[yellow]CANCELLED")
# tell dl that it was cancelled
# the pool is already shut down, so exiting loop is fine
raise
except Exception as e:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
progress(downloaded="[red]FAILING")
pool.shutdown(wait=True, cancel_futures=True)
progress(downloaded="[red]FAILED")
# tell dl that it failed
# the pool is already shut down, so exiting loop is fine
raise e
else:
# it successfully downloaded, and it was not cancelled
progress(advance=1)
if download_size == -1: # skipped for --skip-dl for d_i, discontinuity in enumerate(discontinuities):
progress(downloaded="[yellow]SKIPPING") # each discontinuity is a separate 'file'/encode and must be processed separately
continue discontinuity_save_dir = save_dir / str(d_i).zfill(len(str(len(discontinuities))))
discontinuity_save_path = discontinuity_save_dir.with_suffix(Path(discontinuity[0].uri).suffix)
now = time.time() with ThreadPoolExecutor(max_workers=16) as pool:
time_since = now - last_speed_refresh for i, download in enumerate(futures.as_completed((
pool.submit(
HLS.download_segment,
segment=segment,
out_path=(
discontinuity_save_dir /
str(s_i).zfill(len(str(len(discontinuity))))
).with_suffix(Path(segment.uri).suffix),
track=track,
init_data=init_data,
segment_key=segment_key,
range_offset=range_offset,
drm_lock=drm_lock,
progress=progress,
license_widevine=license_widevine,
session=session,
proxy=proxy
)
for s_i, segment in enumerate(discontinuity)
))):
try:
download_size = download.result()
except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
progress(downloaded="[yellow]CANCELLING")
pool.shutdown(wait=True, cancel_futures=True)
progress(downloaded="[yellow]CANCELLED")
# tell dl that it was cancelled
# the pool is already shut down, so exiting loop is fine
raise
except Exception as e:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
progress(downloaded="[red]FAILING")
pool.shutdown(wait=True, cancel_futures=True)
progress(downloaded="[red]FAILED")
# tell dl that it failed
# the pool is already shut down, so exiting loop is fine
raise e
else:
# it successfully downloaded, and it was not cancelled
progress(advance=1)
if download_size: # no size == skipped dl if download_size == -1: # skipped for --skip-dl
download_sizes.append(download_size) progress(downloaded="[yellow]SKIPPING")
continue
if download_sizes and (time_since > download_speed_window or i == len(master.segments)): now = time.time()
data_size = sum(download_sizes) time_since = now - last_speed_refresh
download_speed = data_size / (time_since or 1)
progress(downloaded=f"HLS {filesize.decimal(download_speed)}/s") if download_size: # no size == skipped dl
last_speed_refresh = now download_sizes.append(download_size)
download_sizes.clear()
if download_sizes and (time_since > download_speed_window or i == len(master.segments)):
data_size = sum(download_sizes)
download_speed = data_size / (time_since or 1)
progress(downloaded=f"HLS {filesize.decimal(download_speed)}/s")
last_speed_refresh = now
download_sizes.clear()
with open(discontinuity_save_path, "wb") as f:
for segment_file in sorted(discontinuity_save_dir.iterdir()):
segment_data = segment_file.read_bytes()
if isinstance(track, Subtitle):
segment_data = try_ensure_utf8(segment_data)
if track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML):
# decode text direction entities or SubtitleEdit's /ReverseRtlStartEnd won't work
segment_data = segment_data.decode("utf8"). \
replace("‎", html.unescape("‎")). \
replace("‏", html.unescape("‏")). \
encode("utf8")
f.write(segment_data)
segment_file.unlink()
shutil.rmtree(discontinuity_save_dir)
if DOWNLOAD_LICENCE_ONLY.is_set(): if DOWNLOAD_LICENCE_ONLY.is_set():
return return
with open(save_path, "wb") as f: if isinstance(track, (Video, Audio)):
for segment_file in sorted(save_dir.iterdir()): progress(downloaded="Merging")
segment_data = segment_file.read_bytes() HLS.merge_segments(
if isinstance(track, Subtitle): segments=sorted(list(save_dir.iterdir())),
segment_data = try_ensure_utf8(segment_data) save_path=save_path
if track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML): )
# decode text direction entities or SubtitleEdit's /ReverseRtlStartEnd won't work shutil.rmtree(save_dir)
segment_data = segment_data.decode("utf8"). \ else:
replace("‎", html.unescape("‎")). \ with open(save_path, "wb") as f:
replace("‏", html.unescape("‏")). \ for discontinuity_file in sorted(save_dir.iterdir()):
encode("utf8") discontinuity_data = discontinuity_file.read_bytes()
f.write(segment_data) f.write(discontinuity_data)
segment_file.unlink() discontinuity_file.unlink()
save_dir.rmdir()
progress(downloaded="Downloaded") progress(downloaded="Downloaded")
track.path = save_path track.path = save_path
save_dir.rmdir()
@staticmethod @staticmethod
def download_segment( def download_segment(
@ -482,6 +515,37 @@ class HLS:
return download_size return download_size
@staticmethod
def merge_segments(segments: list[Path], save_path: Path) -> int:
"""
Concatenate Segments by first demuxing with FFmpeg.
Returns the file size of the merged file.
"""
ffmpeg = get_binary_path("ffmpeg")
if not ffmpeg:
raise EnvironmentError("FFmpeg executable was not found but is required to merge HLS segments.")
demuxer_file = segments[0].parent / "ffmpeg_concat_demuxer.txt"
demuxer_file.write_text("\n".join([
f"file '{segment}'"
for segment in segments
]))
subprocess.check_call([
ffmpeg, "-hide_banner",
"-loglevel", "panic",
"-f", "concat",
"-safe", "0",
"-i", demuxer_file,
"-map", "0",
"-c", "copy",
save_path
])
demuxer_file.unlink()
return save_path.stat().st_size
@staticmethod @staticmethod
def get_drm( def get_drm(
keys: list[Union[m3u8.model.SessionKey, m3u8.model.Key]], keys: list[Union[m3u8.model.SessionKey, m3u8.model.Key]],