Compare commits

...

2 Commits

Author SHA1 Message Date
rlaphoenix 0530d24110 Translate SSL/TLS and connection refused logs to ConnectionRefusedErrors 2023-03-17 22:02:29 +00:00
rlaphoenix 1ef7419966 Dynamically reduce DASH worker pool on connection errors 2023-03-17 22:02:29 +00:00
2 changed files with 28 additions and 4 deletions

View File

@ -140,9 +140,18 @@ async def aria2c(
log_buffer += f"{line.strip()}\n" log_buffer += f"{line.strip()}\n"
if log_buffer: if log_buffer:
log_buffer = log_buffer.rstrip()
refused_errors = (
"the target machine actively refused it",
"SSL/TLS handshake failure"
)
if segmented and any(x in log_buffer for x in refused_errors):
# likely too many connections
raise ConnectionRefusedError("Aria2 could not connect as the target machine actively refused it.")
# wrap to console width - padding - '[Aria2c]: ' # wrap to console width - padding - '[Aria2c]: '
log_buffer = "\n ".join(textwrap.wrap( log_buffer = "\n ".join(textwrap.wrap(
log_buffer.rstrip(), log_buffer,
width=console.width - 20, width=console.width - 20,
initial_indent="" initial_indent=""
)) ))

View File

@ -13,6 +13,7 @@ from copy import copy
from functools import partial from functools import partial
from hashlib import md5 from hashlib import md5
from pathlib import Path from pathlib import Path
from queue import Queue
from threading import Event from threading import Event
from typing import Any, Callable, Optional, Union from typing import Any, Callable, Optional, Union
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
@ -454,7 +455,7 @@ class DASH:
else: else:
drm = None drm = None
def download_segment(filename: str, segment: tuple[str, Optional[str]]) -> int: def download_segment(filename: str, segment: tuple[str, Optional[str]], workers: Queue) -> int:
if stop_event.is_set(): if stop_event.is_set():
# the track already started downloading, but another failed or was stopped # the track already started downloading, but another failed or was stopped
raise KeyboardInterrupt() raise KeyboardInterrupt()
@ -465,6 +466,7 @@ class DASH:
attempts = 1 attempts = 1
while True: while True:
workers.put(None)
try: try:
if segment_range: if segment_range:
# aria2(c) doesn't support byte ranges, let's use python-requests (likely slower) # aria2(c) doesn't support byte ranges, let's use python-requests (likely slower)
@ -486,12 +488,23 @@ class DASH:
silent=attempts != 5, silent=attempts != 5,
segmented=True segmented=True
)) ))
break except ConnectionRefusedError:
# server likely blocking more than a few connections
# reduce max workers but let this thread continue
with workers.mutex:
_ = workers._get() # take back this threads queue item
if workers.maxsize > 1:
workers.maxsize -= 1
print(f"REDUCED TRAIN SIZE TO {workers.maxsize}")
except Exception as ee: except Exception as ee:
_ = workers.get()
if stop_event.is_set() or attempts == 5: if stop_event.is_set() or attempts == 5:
raise ee raise ee
time.sleep(2) time.sleep(2)
attempts += 1 attempts += 1
else:
_ = workers.get()
break
data_size = segment_save_path.stat().st_size data_size = segment_save_path.stat().st_size
@ -523,6 +536,7 @@ class DASH:
progress(total=len(segments)) progress(total=len(segments))
pool_workers = Queue(maxsize=16)
finished_threads = 0 finished_threads = 0
download_sizes = [] download_sizes = []
last_speed_refresh = time.time() last_speed_refresh = time.time()
@ -532,7 +546,8 @@ class DASH:
pool.submit( pool.submit(
download_segment, download_segment,
filename=str(i).zfill(len(str(len(segments)))), filename=str(i).zfill(len(str(len(segments)))),
segment=segment segment=segment,
workers=pool_workers
) )
for i, segment in enumerate(segments) for i, segment in enumerate(segments)
)): )):