优化直播录制:单次请求刷新所有轨道

This commit is contained in:
nilaoda 2022-11-13 01:32:05 +08:00
parent 7bba10aa0d
commit 06e1302f06
1 changed files with 52 additions and 22 deletions

View File

@ -13,6 +13,7 @@ using N_m3u8DL_RE.Util;
using Spectre.Console;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
@ -30,6 +31,7 @@ namespace N_m3u8DL_RE.DownloadManager
bool STOP_FLAG = false;
int WAIT_SEC = 0; //刷新间隔
ConcurrentDictionary<int, int> RecordingDurDic = new(); //已录制时长
ConcurrentDictionary<int, BufferBlock<List<MediaSegment>>> BlockDic = new(); //各流的Block
ConcurrentDictionary<string, string> LastFileNameDic = new(); //上次下载的文件名
ConcurrentDictionary<string, long> DateTimeDic = new(); //上次下载的dateTime
CancellationTokenSource CancellationTokenSource = new(); //取消Wait
@ -171,7 +173,6 @@ namespace N_m3u8DL_RE.DownloadManager
FileStream? fileOutputStream = null;
WebVttSub currentVtt = new(); //字幕流始终维护一个实例
bool firstSub = true;
task.MaxValue = 0;
task.StartTask();
var name = streamSpec.ToShortString();
@ -426,6 +427,14 @@ namespace N_m3u8DL_RE.DownloadManager
}
}
/*//写出m3u8
if (DownloaderConfig.MyOptions.LiveWriteHLS)
{
var _saveDir = DownloaderConfig.MyOptions.SaveDir ?? Environment.CurrentDirectory;
var _saveName = DownloaderConfig.MyOptions.SaveName ?? DateTime.Now.ToString("yyyyMMddHHmmss");
await StreamingUtil.WriteStreamListAsync(FileDic, task.Id, 0, _saveName, _saveDir);
}*/
//合并逻辑
if (DownloaderConfig.MyOptions.LiveRealTimeMerge)
{
@ -550,33 +559,42 @@ namespace N_m3u8DL_RE.DownloadManager
return true;
}
private async Task PlayListProduceAsync(StreamSpec streamSpec, ProgressTask task, BufferBlock<List<MediaSegment>> target)
private async Task PlayListProduceAsync(Dictionary<StreamSpec, ProgressTask> dic)
{
while (!STOP_FLAG)
{
if (WAIT_SEC != 0)
{
var allHasDatetime = streamSpec.Playlist!.MediaParts[0].MediaSegments.All(s => s.DateTime != null);
//过滤不需要下载的片段
FilterMediaSegments(streamSpec, allHasDatetime);
var newList = streamSpec.Playlist!.MediaParts[0].MediaSegments;
if (newList.Count > 0)
//1. MPD 所有URL相同 单次请求即可获得所有轨道的信息
//2. M3U8 所有URL不同 才需要多次请求
await Parallel.ForEachAsync(dic, async (dic, _) =>
{
//推送给消费者
await target.SendAsync(newList);
//更新最新链接
LastFileNameDic[streamSpec.ToShortString()] = GetSegmentName(newList.Last(), allHasDatetime);
//尝试更新时间戳
var dt = newList.Last().DateTime;
DateTimeDic[streamSpec.ToShortString()] = dt != null ? GetUnixTimestamp(dt.Value) : 0L;
task.MaxValue += newList.Count;
}
var streamSpec = dic.Key;
var task = dic.Value;
var allHasDatetime = streamSpec.Playlist!.MediaParts[0].MediaSegments.All(s => s.DateTime != null);
//过滤不需要下载的片段
FilterMediaSegments(streamSpec, allHasDatetime);
var newList = streamSpec.Playlist!.MediaParts[0].MediaSegments;
if (newList.Count > 0)
{
task.MaxValue += newList.Count;
//推送给消费者
await BlockDic[task.Id].SendAsync(newList);
//更新最新链接
LastFileNameDic[streamSpec.ToShortString()] = GetSegmentName(newList.Last(), allHasDatetime);
//尝试更新时间戳
var dt = newList.Last().DateTime;
DateTimeDic[streamSpec.ToShortString()] = dt != null ? GetUnixTimestamp(dt.Value) : 0L;
}
});
try
{
//Logger.WarnMarkUp($"wait {waitSec}s");
if (!STOP_FLAG) await Task.Delay(WAIT_SEC * 1000, CancellationTokenSource.Token);
//刷新列表
if (!STOP_FLAG) await StreamExtractor.RefreshPlayListAsync(new List<StreamSpec>() { streamSpec });
if (!STOP_FLAG) await StreamExtractor.RefreshPlayListAsync(dic.Keys.ToList());
}
catch (OperationCanceledException oce) when (oce.CancellationToken == CancellationTokenSource.Token)
{
@ -585,7 +603,10 @@ namespace N_m3u8DL_RE.DownloadManager
}
}
target.Complete();
foreach (var target in BlockDic.Values)
{
target.Complete();
}
}
private void FilterMediaSegments(StreamSpec streamSpec, bool allHasDatetime)
@ -643,6 +664,14 @@ namespace N_m3u8DL_RE.DownloadManager
Logger.WarnMarkUp($"set refresh interval to {WAIT_SEC} seconds");
}
/*//写出master
if (DownloaderConfig.MyOptions.LiveWriteHLS)
{
var saveDir = DownloaderConfig.MyOptions.SaveDir ?? Environment.CurrentDirectory;
var saveName = DownloaderConfig.MyOptions.SaveName ?? DateTime.Now.ToString("yyyyMMddHHmmss");
await StreamingUtil.WriteMasterListAsync(SelectedSteams, saveName, saveDir);
}*/
var progress = AnsiConsole.Progress().AutoClear(true);
//进度条的列定义
@ -661,9 +690,10 @@ namespace N_m3u8DL_RE.DownloadManager
//创建任务
var dic = SelectedSteams.Select(item =>
{
var task = ctx.AddTask(item.ToShortString(), autoStart: false);
var task = ctx.AddTask(item.ToShortString(), autoStart: false, maxValue: 0);
SpeedContainerDic[task.Id] = new SpeedContainer(); //速度计算
RecordingDurDic[task.Id] = 0;
BlockDic[task.Id] = new BufferBlock<List<MediaSegment>>();
return (item, task);
}).ToDictionary(item => item.item, item => item.task);
@ -681,13 +711,13 @@ namespace N_m3u8DL_RE.DownloadManager
{
MaxDegreeOfParallelism = SelectedSteams.Count
};
//开始刷新
var producerTask = PlayListProduceAsync(dic);
//并发下载
await Parallel.ForEachAsync(dic, options, async (kp, _) =>
{
var task = kp.Value;
var list = new BufferBlock<List<MediaSegment>>();
var consumerTask = RecordStreamAsync(kp.Key, task, SpeedContainerDic[task.Id], list);
await PlayListProduceAsync(kp.Key, task, list);
var consumerTask = RecordStreamAsync(kp.Key, task, SpeedContainerDic[task.Id], BlockDic[task.Id]);
Results[kp.Key] = await consumerTask;
});
});