diff --git a/src/N_m3u8DL-RE/DownloadManager/SimpleLiveRecordManager2.cs b/src/N_m3u8DL-RE/DownloadManager/SimpleLiveRecordManager2.cs index 28f1595..b047397 100644 --- a/src/N_m3u8DL-RE/DownloadManager/SimpleLiveRecordManager2.cs +++ b/src/N_m3u8DL-RE/DownloadManager/SimpleLiveRecordManager2.cs @@ -13,6 +13,7 @@ using N_m3u8DL_RE.Util; using Spectre.Console; using System.Collections.Concurrent; using System.Text; +using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Xml.Linq; @@ -30,6 +31,7 @@ namespace N_m3u8DL_RE.DownloadManager bool STOP_FLAG = false; int WAIT_SEC = 0; //刷新间隔 ConcurrentDictionary RecordingDurDic = new(); //已录制时长 + ConcurrentDictionary>> BlockDic = new(); //各流的Block ConcurrentDictionary LastFileNameDic = new(); //上次下载的文件名 ConcurrentDictionary DateTimeDic = new(); //上次下载的dateTime CancellationTokenSource CancellationTokenSource = new(); //取消Wait @@ -171,7 +173,6 @@ namespace N_m3u8DL_RE.DownloadManager FileStream? fileOutputStream = null; WebVttSub currentVtt = new(); //字幕流始终维护一个实例 bool firstSub = true; - task.MaxValue = 0; task.StartTask(); var name = streamSpec.ToShortString(); @@ -426,6 +427,14 @@ namespace N_m3u8DL_RE.DownloadManager } } + /*//写出m3u8 + if (DownloaderConfig.MyOptions.LiveWriteHLS) + { + var _saveDir = DownloaderConfig.MyOptions.SaveDir ?? Environment.CurrentDirectory; + var _saveName = DownloaderConfig.MyOptions.SaveName ?? DateTime.Now.ToString("yyyyMMddHHmmss"); + await StreamingUtil.WriteStreamListAsync(FileDic, task.Id, 0, _saveName, _saveDir); + }*/ + //合并逻辑 if (DownloaderConfig.MyOptions.LiveRealTimeMerge) { @@ -550,33 +559,42 @@ namespace N_m3u8DL_RE.DownloadManager return true; } - private async Task PlayListProduceAsync(StreamSpec streamSpec, ProgressTask task, BufferBlock> target) + private async Task PlayListProduceAsync(Dictionary dic) { while (!STOP_FLAG) { if (WAIT_SEC != 0) { - var allHasDatetime = streamSpec.Playlist!.MediaParts[0].MediaSegments.All(s => s.DateTime != null); - //过滤不需要下载的片段 - FilterMediaSegments(streamSpec, allHasDatetime); - var newList = streamSpec.Playlist!.MediaParts[0].MediaSegments; - if (newList.Count > 0) + //1. MPD 所有URL相同 单次请求即可获得所有轨道的信息 + //2. M3U8 所有URL不同 才需要多次请求 + + await Parallel.ForEachAsync(dic, async (dic, _) => { - //推送给消费者 - await target.SendAsync(newList); - //更新最新链接 - LastFileNameDic[streamSpec.ToShortString()] = GetSegmentName(newList.Last(), allHasDatetime); - //尝试更新时间戳 - var dt = newList.Last().DateTime; - DateTimeDic[streamSpec.ToShortString()] = dt != null ? GetUnixTimestamp(dt.Value) : 0L; - task.MaxValue += newList.Count; - } + var streamSpec = dic.Key; + var task = dic.Value; + var allHasDatetime = streamSpec.Playlist!.MediaParts[0].MediaSegments.All(s => s.DateTime != null); + //过滤不需要下载的片段 + FilterMediaSegments(streamSpec, allHasDatetime); + var newList = streamSpec.Playlist!.MediaParts[0].MediaSegments; + if (newList.Count > 0) + { + task.MaxValue += newList.Count; + //推送给消费者 + await BlockDic[task.Id].SendAsync(newList); + //更新最新链接 + LastFileNameDic[streamSpec.ToShortString()] = GetSegmentName(newList.Last(), allHasDatetime); + //尝试更新时间戳 + var dt = newList.Last().DateTime; + DateTimeDic[streamSpec.ToShortString()] = dt != null ? GetUnixTimestamp(dt.Value) : 0L; + } + }); + try { //Logger.WarnMarkUp($"wait {waitSec}s"); if (!STOP_FLAG) await Task.Delay(WAIT_SEC * 1000, CancellationTokenSource.Token); //刷新列表 - if (!STOP_FLAG) await StreamExtractor.RefreshPlayListAsync(new List() { streamSpec }); + if (!STOP_FLAG) await StreamExtractor.RefreshPlayListAsync(dic.Keys.ToList()); } catch (OperationCanceledException oce) when (oce.CancellationToken == CancellationTokenSource.Token) { @@ -585,7 +603,10 @@ namespace N_m3u8DL_RE.DownloadManager } } - target.Complete(); + foreach (var target in BlockDic.Values) + { + target.Complete(); + } } private void FilterMediaSegments(StreamSpec streamSpec, bool allHasDatetime) @@ -643,6 +664,14 @@ namespace N_m3u8DL_RE.DownloadManager Logger.WarnMarkUp($"set refresh interval to {WAIT_SEC} seconds"); } + /*//写出master + if (DownloaderConfig.MyOptions.LiveWriteHLS) + { + var saveDir = DownloaderConfig.MyOptions.SaveDir ?? Environment.CurrentDirectory; + var saveName = DownloaderConfig.MyOptions.SaveName ?? DateTime.Now.ToString("yyyyMMddHHmmss"); + await StreamingUtil.WriteMasterListAsync(SelectedSteams, saveName, saveDir); + }*/ + var progress = AnsiConsole.Progress().AutoClear(true); //进度条的列定义 @@ -661,9 +690,10 @@ namespace N_m3u8DL_RE.DownloadManager //创建任务 var dic = SelectedSteams.Select(item => { - var task = ctx.AddTask(item.ToShortString(), autoStart: false); + var task = ctx.AddTask(item.ToShortString(), autoStart: false, maxValue: 0); SpeedContainerDic[task.Id] = new SpeedContainer(); //速度计算 RecordingDurDic[task.Id] = 0; + BlockDic[task.Id] = new BufferBlock>(); return (item, task); }).ToDictionary(item => item.item, item => item.task); @@ -681,13 +711,13 @@ namespace N_m3u8DL_RE.DownloadManager { MaxDegreeOfParallelism = SelectedSteams.Count }; + //开始刷新 + var producerTask = PlayListProduceAsync(dic); //并发下载 await Parallel.ForEachAsync(dic, options, async (kp, _) => { var task = kp.Value; - var list = new BufferBlock>(); - var consumerTask = RecordStreamAsync(kp.Key, task, SpeedContainerDic[task.Id], list); - await PlayListProduceAsync(kp.Key, task, list); + var consumerTask = RecordStreamAsync(kp.Key, task, SpeedContainerDic[task.Id], BlockDic[task.Id]); Results[kp.Key] = await consumerTask; }); });