Don't run aria2c under asyncio, further improve progress updates

I've removed asyncio usage as it's generally unnecessary. If you want to run aria2c under a thread, run it under a thread. In the case for devine, this would take another thread, and would be another thread layer deep. Pointless. Would affect speed.

With this change I've been able to improve the aria2c progress capture code quite a bit.
This commit is contained in:
rlaphoenix 2023-02-28 08:18:30 +00:00
parent d427ec8472
commit ce53a1b636
4 changed files with 54 additions and 73 deletions

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import asyncio
import html
import logging
import math
@ -756,13 +755,13 @@ class dl:
# no else-if as DASH may convert the track to URL descriptor
if track.descriptor == track.Descriptor.URL:
asyncio.run(aria2c(
track.url,
save_path,
service.session.headers,
proxy if track.needs_proxy else None,
aria2c(
uri=track.url,
out=save_path,
headers=service.session.headers,
proxy=proxy if track.needs_proxy else None,
progress=progress
))
)
track.path = save_path
if not track.drm and isinstance(track, (Video, Audio)):

View File

@ -1,6 +1,4 @@
import asyncio
import subprocess
from asyncio import IncompleteReadError
from functools import partial
from pathlib import Path
from typing import Optional, Union
@ -12,7 +10,7 @@ from devine.core.console import console
from devine.core.utilities import get_binary_path, start_pproxy
async def aria2c(
def aria2c(
uri: Union[str, list[str]],
out: Path,
headers: Optional[dict] = None,
@ -81,68 +79,54 @@ async def aria2c(
if proxy.lower().split(":")[0] != "http":
# HTTPS proxies are not supported by aria2(c).
# Proxy the proxy via pproxy to access it as an HTTP proxy.
async with start_pproxy(proxy) as pproxy_:
return await aria2c(uri, out, headers, pproxy_)
with start_pproxy(proxy) as pproxy_:
return aria2c(uri, out, headers, pproxy_)
arguments += ["--all-proxy", proxy]
p = await asyncio.create_subprocess_exec(
executable,
*arguments,
p = subprocess.Popen(
[executable, *arguments],
stdin=subprocess.PIPE,
stderr=[None, subprocess.DEVNULL][silent],
stdout=(
subprocess.PIPE if progress else
subprocess.DEVNULL if silent else
None
)
),
universal_newlines=True
)
p.stdin.write(uri.encode())
await p.stdin.drain()
p.stdin.close()
p._stdin_write(uri) # noqa
if progress:
def update_progress_bar(data: str):
if "%" in data:
# id, dledMiB/totalMiB(x%), CN:xx, DL:xxMiB, ETA:Xs
# eta may not always be available
data_parts = data[1:-1].split()
perc_parts = data_parts[1].split("(")
if len(perc_parts) == 2:
# might otherwise be e.g., 0B/0B, with no % symbol provided
progress(
total=100,
completed=int(perc_parts[1][:-2]),
downloaded=f"{data_parts[3].split(':')[1]}/s"
)
is_dl_summary = False
for line in iter(p.stdout.readline, ""):
line = line.strip()
if line:
if line.startswith("[") and line.endswith("]"):
if "%" in line:
# id, dledMiB/totalMiB(x%), CN:xx, DL:xxMiB, ETA:Xs
# eta may not always be available
data_parts = line[1:-1].split()
perc_parts = data_parts[1].split("(")
if len(perc_parts) == 2:
# might otherwise be e.g., 0B/0B, with no % symbol provided
progress(
total=100,
completed=int(perc_parts[1][:-2]),
downloaded=f"{data_parts[3].split(':')[1]}/s"
)
elif line.startswith("Download Results"):
# we know it's 100% downloaded, but let's use the avg dl speed value
is_dl_summary = True
elif is_dl_summary and "OK" in line and "|" in line:
gid, status, avg_speed, path_or_uri = line.split("|")
progress(total=100, completed=100, downloaded=avg_speed.strip())
elif not is_dl_summary:
buffer_msg = line.split(" ", maxsplit=2)
buffer_msg = f"[Aria2c]: {buffer_msg[-1].strip()}"
console.log(Text.from_ansi(buffer_msg))
# I'm sorry for this shameful code, aria2(c) is annoying as f!!!
while not p.stdout.at_eof():
try:
buffer = await p.stdout.readuntil(b"\r")
except IncompleteReadError as e:
buffer = e.partial
buffer = buffer.decode().strip()
if buffer:
buffer_lines = buffer.splitlines()
is_dl_summary = False
for line in buffer_lines:
if line:
if line.startswith("[") and line.endswith("]"):
update_progress_bar(line)
elif line.startswith("Download Results"):
# we know it's 100% downloaded, but let's use the avg dl speed value
is_dl_summary = True
elif is_dl_summary and "OK" in line and "|" in line:
gid, status, avg_speed, path_or_uri = line.split("|")
progress(total=100, completed=100, downloaded=avg_speed.strip())
elif not is_dl_summary:
buffer_msg = line.split(" ", maxsplit=2)
buffer_msg = f"[Aria2c]: {buffer_msg[-1].strip()}"
console.log(Text.from_ansi(buffer_msg))
await p.wait()
p.wait()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, arguments)

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import asyncio
import base64
import logging
import math
@ -469,13 +468,13 @@ class DASH:
segment_save_path.parent.mkdir(parents=True, exist_ok=True)
segment_save_path.write_bytes(res.content)
else:
asyncio.run(aria2c(
segment_uri,
segment_save_path,
session.headers,
proxy,
aria2c(
uri=segment_uri,
out=segment_save_path,
headers=session.headers,
proxy=proxy,
silent=True
))
)
data_size = segment_save_path.stat().st_size

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import asyncio
import logging
import re
import sys
@ -289,13 +288,13 @@ class HLS:
segment_save_path.parent.mkdir(parents=True, exist_ok=True)
segment_save_path.write_bytes(res.content)
else:
asyncio.run(aria2c(
segment.uri,
segment_save_path,
session.headers,
proxy,
aria2c(
uri=segment.uri,
out=segment_save_path,
headers=session.headers,
proxy=proxy,
silent=True
))
)
data_size = segment_save_path.stat().st_size