diff --git a/devine/core/downloaders/aria2c.py b/devine/core/downloaders/aria2c.py index ba30b77..e97b37d 100644 --- a/devine/core/downloaders/aria2c.py +++ b/devine/core/downloaders/aria2c.py @@ -17,28 +17,33 @@ from rich.text import Text from devine.core.config import config from devine.core.console import console +from devine.core.constants import DOWNLOAD_CANCELLED from devine.core.utilities import get_binary_path, get_free_port -def rpc(caller: Callable, secret: str, method: str, *params: Any) -> dict[str, Any]: +def rpc(caller: Callable, secret: str, method: str, params: Optional[list[Any]] = None) -> Any: """Make a call to Aria2's JSON-RPC API.""" - rpc_res = caller( - json={ - "jsonrpc": "2.0", - "id": get_random_bytes(16).hex(), - "method": method, - "params": [f"token:{secret}", *params] - } - ).json() - if rpc_res.get("code"): - # wrap to console width - padding - '[Aria2c]: ' - error_pretty = "\n ".join(textwrap.wrap( - f"RPC Error: {rpc_res['message']} ({rpc_res['code']})".strip(), - width=console.width - 20, - initial_indent="" - )) - console.log(Text.from_ansi("\n[Aria2c]: " + error_pretty)) - return rpc_res["result"] + try: + rpc_res = caller( + json={ + "jsonrpc": "2.0", + "id": get_random_bytes(16).hex(), + "method": method, + "params": [f"token:{secret}", *(params or [])] + } + ).json() + if rpc_res.get("code"): + # wrap to console width - padding - '[Aria2c]: ' + error_pretty = "\n ".join(textwrap.wrap( + f"RPC Error: {rpc_res['message']} ({rpc_res['code']})".strip(), + width=console.width - 20, + initial_indent="" + )) + console.log(Text.from_ansi("\n[Aria2c]: " + error_pretty)) + return rpc_res["result"] + except requests.exceptions.ConnectionError: + # absorb, process likely ended as it was calling RPC + return def download( @@ -176,6 +181,8 @@ def download( continue arguments.extend(["--header", f"{header}: {value}"]) + yield dict(total=len(urls)) + try: p = subprocess.Popen( [ @@ -190,28 +197,50 @@ def download( p.stdin.close() while p.poll() is None: - global_stats = rpc( + global_stats: dict[str, Any] = rpc( caller=partial(rpc_session.post, url=rpc_uri), secret=rpc_secret, method="aria2.getGlobalStat" ) if global_stats: - active = int(global_stats["numActive"]) - waiting = int(global_stats["numWaiting"]) - stopped = int(global_stats["numStopped"]) - total = active + waiting + stopped yield dict( - total=total, - completed=stopped, downloaded=f"{filesize.decimal(int(global_stats['downloadSpeed']))}/s" ) - if total == stopped: - rpc( - caller=partial(rpc_session.post, url=rpc_uri), - secret=rpc_secret, - method="aria2.shutdown" + + stopped_downloads: list[dict[str, Any]] = rpc( + caller=partial(rpc_session.post, url=rpc_uri), + secret=rpc_secret, + method="aria2.tellStopped", + params=[0, 999999] + ) + for dl in stopped_downloads or []: + if dl["status"] == "complete": + yield dict(advance=1) + elif dl["status"] == "error": + used_uri = next( + uri["uri"] + for file in dl["files"] + if file["selected"] == "true" + for uri in file["uris"] + if uri["status"] == "used" ) - break + error = f"Download Error (#{dl['gid']}): {dl['errorMessage']} ({dl['errorCode']}), {used_uri}" + error_pretty = "\n ".join(textwrap.wrap( + error, + width=console.width - 20, + initial_indent="" + )) + console.log(Text.from_ansi("\n[Aria2c]: " + error_pretty)) + raise ValueError(error) + + if len(stopped_downloads) == len(urls): + rpc( + caller=partial(rpc_session.post, url=rpc_uri), + secret=rpc_secret, + method="aria2.shutdown" + ) + break + time.sleep(1) p.wait() @@ -227,6 +256,20 @@ def download( # 0xC000013A is when it never got the chance to raise KeyboardInterrupt() raise + except KeyboardInterrupt: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + yield dict(downloaded="[yellow]CANCELLED") + raise + except Exception: + DOWNLOAD_CANCELLED.set() # skip pending track downloads + yield dict(downloaded="[red]FAILED") + raise + finally: + rpc( + caller=partial(rpc_session.post, url=rpc_uri), + secret=rpc_secret, + method="aria2.shutdown" + ) def aria2c(