直播录制使用生产者消费者模式

This commit is contained in:
nilaoda 2022-09-18 18:01:43 +08:00
parent 1192cba42e
commit a0f3709559
3 changed files with 70 additions and 43 deletions

View File

@ -334,7 +334,7 @@ namespace N_m3u8DL_RE.CommandLine
public static async Task<int> InvokeArgs(string[] args, Func<MyOption, Task> action)
{
var rootCommand = new RootCommand("N_m3u8DL-RE (Beta version) 20220917")
var rootCommand = new RootCommand("N_m3u8DL-RE (Beta version) 20220918")
{
Input, TmpDir, SaveDir, SaveName, BaseUrl, ThreadCount, DownloadRetryCount, AutoSelect, SkipMerge, SkipDownload, CheckSegmentsCount,
BinaryMerge, DelAfterDone, WriteMetaJson, AppendUrlParams, ConcurrentDownload, Headers, /**SavePattern,**/ SubOnly, SubtitleFormat, AutoSubtitleFix,

View File

@ -13,19 +13,22 @@ using N_m3u8DL_RE.Util;
using NiL.JS;
using NiL.JS.BaseLibrary;
using Spectre.Console;
using Spectre.Console.Rendering;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.CommandLine.Parsing;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
using static System.Net.Mime.MediaTypeNames;
namespace N_m3u8DL_RE.DownloadManager
{
internal class SimpleLiveRecordManager
internal class SimpleLiveRecordManager2
{
IDownloader Downloader;
DownloaderConfig DownloaderConfig;
@ -33,8 +36,12 @@ namespace N_m3u8DL_RE.DownloadManager
List<StreamSpec> SelectedSteams;
DateTime NowDateTime;
bool STOP_FLAG = false;
int WAIT_SEC = 0; //刷新间隔
ConcurrentDictionary<int, int> RecordingDurDic = new(); //已录制时长
ConcurrentDictionary<string, string> LastUrlDic = new(); //上次下载的url
CancellationTokenSource CancellationTokenSource = new(); //取消Wait
public SimpleLiveRecordManager(DownloaderConfig downloaderConfig, List<StreamSpec> selectedSteams, StreamExtractor streamExtractor)
public SimpleLiveRecordManager2(DownloaderConfig downloaderConfig, List<StreamSpec> selectedSteams, StreamExtractor streamExtractor)
{
this.DownloaderConfig = downloaderConfig;
Downloader = new SimpleDownloader(DownloaderConfig);
@ -106,7 +113,7 @@ namespace N_m3u8DL_RE.DownloadManager
}
}
private async Task<bool> RecordStreamAsync(StreamSpec streamSpec, ProgressTask task, SpeedContainer speedContainer, ConcurrentDictionary<string, string> dic, ConcurrentDictionary<int, int> recodingDurDic)
private async Task<bool> RecordStreamAsync(StreamSpec streamSpec, ProgressTask task, SpeedContainer speedContainer, ISourceBlock<List<MediaSegment>> source)
{
//mp4decrypt
var mp4decrypt = DownloaderConfig.MyOptions.DecryptionBinaryPath!;
@ -116,7 +123,6 @@ namespace N_m3u8DL_RE.DownloadManager
bool useAACFilter = false; //ffmpeg合并flag
bool initDownloaded = false; //是否下载过init文件
bool hls = StreamExtractor.ExtractorType == ExtractorType.HLS;
int waitSec = 0; //刷新间隔
ConcurrentDictionary<MediaSegment, DownloadResult?> FileDic = new();
List<Mediainfo> mediaInfos = new();
FileStream? fileOutputStream = null;
@ -137,21 +143,10 @@ namespace N_m3u8DL_RE.DownloadManager
if (!Directory.Exists(tmpDir)) Directory.CreateDirectory(tmpDir);
if (!Directory.Exists(saveDir)) Directory.CreateDirectory(saveDir);
while (!STOP_FLAG)
while (!STOP_FLAG && await source.OutputAvailableAsync())
{
//过滤不需要下载的片段
FilterMediaSegments(streamSpec, dic[name]);
//只下载没下过的片段
var segments = streamSpec.Playlist!.MediaParts[0].MediaSegments.AsEnumerable();
task.MaxValue += segments.Count();
//设置等待时间
if (waitSec == 0)
{
waitSec = (int)segments.Sum(s => s.Duration) / 2;
Logger.WarnMarkUp($"set refresh interval to {waitSec} seconds");
}
//接收新片段
var segments = (await source.ReceiveAsync()).AsEnumerable();
//下载init
if (!initDownloaded && streamSpec.Playlist?.MediaInit != null)
@ -213,8 +208,6 @@ namespace N_m3u8DL_RE.DownloadManager
{
var seg = segments.First();
segments = segments.Skip(1);
//记录最新url
dic[name] = GetPath(seg.Url);
//获取文件名
var filename = hls ? seg.Index.ToString(pad) : OtherUtil.GetFileNameFromInput(seg.Url, false);
var index = seg.Index;
@ -284,11 +277,7 @@ namespace N_m3u8DL_RE.DownloadManager
}
});
//记录最新url
if (segments.Any())
dic[name] = GetPath(segments.Last().Url);
recodingDurDic[task.Id] += (int)segments.Sum(s => s.Duration);
RecordingDurDic[task.Id] += (int)segments.Sum(s => s.Duration);
//自动修复VTT raw字幕
if (DownloaderConfig.MyOptions.AutoSubtitleFix && streamSpec.MediaType == Common.Enum.MediaType.SUBTITLES
@ -480,23 +469,21 @@ namespace N_m3u8DL_RE.DownloadManager
File.Delete(inputFilePath);
}
}
}
//刷新buffer
if (fileOutputStream != null)
{
fileOutputStream.Flush();
}
}
//检测时长限制
if (recodingDurDic.All(d => d.Value >= DownloaderConfig.MyOptions.LiveRecordLimit?.TotalSeconds))
if (!STOP_FLAG && RecordingDurDic.All(d => d.Value >= DownloaderConfig.MyOptions.LiveRecordLimit?.TotalSeconds))
{
Logger.WarnMarkUp($"[darkorange3_1]{ResString.liveLimitReached}[/]");
STOP_FLAG = true;
}
if (!STOP_FLAG)
{
//Logger.WarnMarkUp($"wait {waitSec}s");
await Task.Delay(waitSec * 1000);
//刷新列表
await StreamExtractor.RefreshPlayListAsync(new List<StreamSpec>() { streamSpec });
CancellationTokenSource.Cancel();
}
}
@ -509,6 +496,40 @@ namespace N_m3u8DL_RE.DownloadManager
return true;
}
private async Task PlayListProduceAsync(StreamSpec streamSpec, ProgressTask task, ITargetBlock<List<MediaSegment>> target)
{
while (!STOP_FLAG)
{
if (WAIT_SEC != 0)
{
//过滤不需要下载的片段
FilterMediaSegments(streamSpec, LastUrlDic[streamSpec.ToShortString()]);
var newList = streamSpec.Playlist!.MediaParts[0].MediaSegments;
if (newList.Count > 0)
{
//推送给消费者
await target.SendAsync(newList);
//更新最新链接
LastUrlDic[streamSpec.ToShortString()] = GetPath(newList.Last().Url);
task.MaxValue += newList.Count;
}
try
{
//Logger.WarnMarkUp($"wait {waitSec}s");
if (!STOP_FLAG) await Task.Delay(WAIT_SEC * 1000, CancellationTokenSource.Token);
//刷新列表
if (!STOP_FLAG) await StreamExtractor.RefreshPlayListAsync(new List<StreamSpec>() { streamSpec });
}
catch (OperationCanceledException oce) when (oce.CancellationToken == CancellationTokenSource.Token)
{
//不需要做事
}
}
}
target.Complete();
}
private void FilterMediaSegments(StreamSpec streamSpec, string lastUrl)
{
if (string.IsNullOrEmpty(lastUrl)) return;
@ -529,8 +550,6 @@ namespace N_m3u8DL_RE.DownloadManager
{
var takeLastCount = 15;
ConcurrentDictionary<int, SpeedContainer> SpeedContainerDic = new(); //速度计算
ConcurrentDictionary<int, int> recordingDurDic = new(); //已录制时长
ConcurrentDictionary<string, string> lastUrlDic = new(); //上次下载的url
ConcurrentDictionary<StreamSpec, bool?> Results = new();
//取最后15个分片
var minIndex = SelectedSteams.Max(s => s.Playlist!.MediaParts.Min(p => p.MediaSegments.Min(s => s.Index)));
@ -544,7 +563,13 @@ namespace N_m3u8DL_RE.DownloadManager
//初始化dic
foreach (var item in SelectedSteams)
{
lastUrlDic[item.ToShortString()] = "";
LastUrlDic[item.ToShortString()] = "";
}
//设置等待时间
if (WAIT_SEC == 0)
{
WAIT_SEC = (int)(SelectedSteams.Min(s => s.Playlist!.MediaParts[0].MediaSegments.Sum(s => s.Duration)) / 2);
Logger.WarnMarkUp($"set refresh interval to {WAIT_SEC} seconds");
}
var progress = AnsiConsole.Progress().AutoClear(true);
@ -553,7 +578,7 @@ namespace N_m3u8DL_RE.DownloadManager
progress.Columns(new ProgressColumn[]
{
new TaskDescriptionColumn() { Alignment = Justify.Left },
new RecordingDurationColumn(recordingDurDic), //时长显示
new RecordingDurationColumn(RecordingDurDic), //时长显示
new RecordingStatusColumn(),
new PercentageColumn(),
new DownloadSpeedColumn(SpeedContainerDic), //速度计算
@ -567,7 +592,7 @@ namespace N_m3u8DL_RE.DownloadManager
{
var task = ctx.AddTask(item.ToShortString(), autoStart: false);
SpeedContainerDic[task.Id] = new SpeedContainer(); //速度计算
recordingDurDic[task.Id] = 0;
RecordingDurDic[task.Id] = 0;
return (item, task);
}).ToDictionary(item => item.item, item => item.task);
@ -581,8 +606,10 @@ namespace N_m3u8DL_RE.DownloadManager
await Parallel.ForEachAsync(dic, async (kp, _) =>
{
var task = kp.Value;
var result = await RecordStreamAsync(kp.Key, task, SpeedContainerDic[task.Id], lastUrlDic, recordingDurDic);
Results[kp.Key] = result;
var list = new BufferBlock<List<MediaSegment>>();
var consumerTask = RecordStreamAsync(kp.Key, task, SpeedContainerDic[task.Id], list);
await PlayListProduceAsync(kp.Key, task, list);
Results[kp.Key] = await consumerTask;
});
});

View File

@ -305,7 +305,7 @@ namespace N_m3u8DL_RE
}
else
{
var sldm = new SimpleLiveRecordManager(downloadConfig, selectedStreams, extractor);
var sldm = new SimpleLiveRecordManager2(downloadConfig, selectedStreams, extractor);
var result = await sldm.StartRecordAsync();
if (result)
Logger.InfoMarkUp("[white on green]Done[/]");