Rework the Requests and Curl-Impersonate Downloaders

- Downloads are now multithreaded directly in the downloader.
- Requests and Curl-Impersonate use one singular Session for all downloads, keeping connections alive and cached so it doesn't have to close and reopen connections for every single download.
- Progress updates are now yielded back to the caller instead of drilling down a progress callable.
This commit is contained in:
rlaphoenix 2024-02-15 11:12:17 +00:00
parent 709901176e
commit 0e96d18af6
2 changed files with 480 additions and 154 deletions

View File

@ -1,7 +1,9 @@
import math
import time
from functools import partial
from concurrent import futures
from concurrent.futures.thread import ThreadPoolExecutor
from pathlib import Path
from typing import Any, MutableMapping, Optional, Union
from typing import Any, Generator, MutableMapping, Optional, Union
from curl_cffi.requests import Session
from requests.cookies import RequestsCookieJar
@ -12,38 +14,203 @@ from devine.core.constants import DOWNLOAD_CANCELLED
MAX_ATTEMPTS = 5
RETRY_WAIT = 2
CHUNK_SIZE = 1024
PROGRESS_WINDOW = 5
BROWSER = config.curl_impersonate.get("browser", "chrome110")
def curl_impersonate(
uri: Union[str, list[str]],
out: Path,
headers: Optional[dict] = None,
cookies: Optional[Union[MutableMapping[str, str], RequestsCookieJar]] = None,
proxy: Optional[str] = None,
progress: Optional[partial] = None,
*_: Any,
**__: Any
) -> int:
def download(
url: str,
save_path: Path,
session: Optional[Session] = None,
**kwargs: Any
) -> Generator[dict[str, Any], None, None]:
"""
Download files using Curl Impersonate.
https://github.com/lwthiker/curl-impersonate
If multiple URLs are provided they will be downloaded in the provided order
to the output directory. They will not be merged together.
"""
if isinstance(uri, list) and len(uri) == 1:
uri = uri[0]
Yields the following download status updates while chunks are downloading:
if isinstance(uri, list):
if out.is_file():
raise ValueError("Expecting out to be a Directory path not a File as multiple URLs were provided")
uri = [
(url, out / f"{i:08}.mp4")
for i, url in enumerate(uri)
]
- {total: 123} (there are 123 chunks to download)
- {total: None} (there are an unknown number of chunks to download)
- {advance: 1} (one chunk was downloaded)
- {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s)
- {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size)
The data is in the same format accepted by rich's progress.update() function. The
`downloaded` key is custom and is not natively accepted by all rich progress bars.
Parameters:
url: Web URL of a file to download.
save_path: The path to save the file to. If the save path's directory does not
exist then it will be made automatically.
session: The Requests or Curl-Impersonate Session to make HTTP requests with.
Useful to set Header, Cookie, and Proxy data. Connections are saved and
re-used with the session so long as the server keeps the connection alive.
kwargs: Any extra keyword arguments to pass to the session.get() call. Use this
for one-time request changes like a header, cookie, or proxy. For example,
to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`.
"""
if not session:
session = Session(impersonate=BROWSER)
save_dir = save_path.parent
control_file = save_path.with_name(f"{save_path.name}.!dev")
save_dir.mkdir(parents=True, exist_ok=True)
if control_file.exists():
# consider the file corrupt if the control file exists
save_path.unlink(missing_ok=True)
control_file.unlink()
elif save_path.exists():
# if it exists, and no control file, then it should be safe
yield dict(
file_downloaded=save_path,
written=save_path.stat().st_size
)
# TODO: Design a control file format so we know how much of the file is missing
control_file.write_bytes(b"")
attempts = 1
try:
while True:
written = 0
download_sizes = []
last_speed_refresh = time.time()
try:
stream = session.get(url, stream=True, **kwargs)
stream.raise_for_status()
try:
content_length = int(stream.headers.get("Content-Length", "0"))
except ValueError:
content_length = 0
if content_length > 0:
yield dict(total=math.ceil(content_length / CHUNK_SIZE))
else:
uri = [(uri, out.parent / out.name)]
# we have no data to calculate total chunks
yield dict(total=None) # indeterminate mode
with open(save_path, "wb") as f:
for chunk in stream.iter_content(chunk_size=CHUNK_SIZE):
download_size = len(chunk)
f.write(chunk)
written += download_size
yield dict(advance=1)
now = time.time()
time_since = now - last_speed_refresh
download_sizes.append(download_size)
if time_since > PROGRESS_WINDOW or download_size < CHUNK_SIZE:
data_size = sum(download_sizes)
download_speed = math.ceil(data_size / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now
download_sizes.clear()
yield dict(
file_downloaded=save_path,
written=written
)
break
except Exception as e:
save_path.unlink(missing_ok=True)
if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS:
raise e
time.sleep(RETRY_WAIT)
attempts += 1
finally:
control_file.unlink()
def curl_impersonate(
urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]],
output_dir: Path,
filename: str,
headers: Optional[MutableMapping[str, Union[str, bytes]]] = None,
cookies: Optional[Union[MutableMapping[str, str], RequestsCookieJar]] = None,
proxy: Optional[str] = None,
max_workers: Optional[int] = None
) -> Generator[dict[str, Any], None, None]:
"""
Download files using Curl Impersonate.
https://github.com/lwthiker/curl-impersonate
Yields the following download status updates while chunks are downloading:
- {total: 123} (there are 123 chunks to download)
- {total: None} (there are an unknown number of chunks to download)
- {advance: 1} (one chunk was downloaded)
- {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s)
- {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size)
The data is in the same format accepted by rich's progress.update() function.
However, The `downloaded`, `file_downloaded` and `written` keys are custom and not
natively accepted by rich progress bars.
Parameters:
urls: Web URL(s) to file(s) to download. You can use a dictionary with the key
"url" for the URI, and other keys for extra arguments to use per-URL.
output_dir: The folder to save the file into. If the save path's directory does
not exist then it will be made automatically.
filename: The filename or filename template to use for each file. The variables
you can use are `i` for the URL index and `ext` for the URL extension.
headers: A mapping of HTTP Header Key/Values to use for all downloads.
cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads.
proxy: An optional proxy URI to route connections through for all downloads.
max_workers: The maximum amount of threads to use for downloads. Defaults to
min(32,(cpu_count+4)).
"""
if not urls:
raise ValueError("urls must be provided and not empty")
elif not isinstance(urls, (str, dict, list)):
raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}")
if not output_dir:
raise ValueError("output_dir must be provided")
elif not isinstance(output_dir, Path):
raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}")
if not filename:
raise ValueError("filename must be provided")
elif not isinstance(filename, str):
raise TypeError(f"Expected filename to be {str}, not {type(filename)}")
if not isinstance(headers, (MutableMapping, type(None))):
raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}")
if not isinstance(cookies, (MutableMapping, RequestsCookieJar, type(None))):
raise TypeError(f"Expected cookies to be {MutableMapping} or {RequestsCookieJar}, not {type(cookies)}")
if not isinstance(proxy, (str, type(None))):
raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}")
if not isinstance(max_workers, (int, type(None))):
raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}")
if not isinstance(urls, list):
urls = [urls]
urls = [
dict(
save_path=save_path,
**url
) if isinstance(url, dict) else dict(
url=url,
save_path=save_path
)
for i, url in enumerate(urls)
for save_path in [output_dir / filename.format(
i=i,
ext=Path((url["url"]) if isinstance(url, dict) else url).suffix
)]
]
session = Session(impersonate=BROWSER)
if headers:
@ -57,65 +224,65 @@ def curl_impersonate(
session.cookies.update(cookies)
if proxy:
session.proxies.update({
"http": proxy,
"https": proxy
"http": proxy.replace("https://", "http://"),
"https": proxy.replace("https://", "http://")
})
if progress:
progress(total=len(uri))
yield dict(total=len(urls))
download_sizes = []
last_speed_refresh = time.time()
for url, out_path in uri:
out_path.parent.mkdir(parents=True, exist_ok=True)
control_file = out_path.with_name(f"{out_path.name}.!dev")
if control_file.exists():
# consider the file corrupt if the control file exists
# TODO: Design a control file format so we know how much of the file is missing
out_path.unlink(missing_ok=True)
control_file.unlink()
elif out_path.exists():
continue
control_file.write_bytes(b"")
attempts = 1
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for i, future in enumerate(futures.as_completed((
pool.submit(
download,
session=session,
**url
)
for url in urls
))):
file_path, download_size = None, None
try:
while True:
try:
stream = session.get(url, stream=True)
stream.raise_for_status()
with open(out_path, "wb") as f:
written = 0
for chunk in stream.iter_content(chunk_size=1024):
download_size = len(chunk)
f.write(chunk)
written += download_size
if progress:
progress(advance=1)
for status_update in future.result():
if status_update.get("file_downloaded") and status_update.get("written"):
file_path = status_update["file_downloaded"]
download_size = status_update["written"]
elif len(urls) == 1:
# these are per-chunk updates, only useful if it's one big file
yield status_update
except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
yield dict(downloaded="[yellow]CANCELLING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[yellow]CANCELLED")
# tell dl that it was cancelled
# the pool is already shut down, so exiting loop is fine
raise
except Exception:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
yield dict(downloaded="[red]FAILING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[red]FAILED")
# tell dl that it failed
# the pool is already shut down, so exiting loop is fine
raise
else:
yield dict(file_downloaded=file_path)
yield dict(advance=1)
now = time.time()
time_since = now - last_speed_refresh
if download_size: # no size == skipped dl
download_sizes.append(download_size)
if time_since > 5 or download_size < 1024:
if download_sizes and (time_since > PROGRESS_WINDOW or i == len(urls)):
data_size = sum(download_sizes)
download_speed = data_size / (time_since or 1)
progress(downloaded=f"{filesize.decimal(download_speed)}/s")
download_speed = math.ceil(data_size / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now
download_sizes.clear()
break
except Exception as e:
out_path.unlink(missing_ok=True)
if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS:
raise e
time.sleep(RETRY_WAIT)
attempts += 1
finally:
control_file.unlink()
return 0
__all__ = ("curl_impersonate",)

View File

@ -1,8 +1,9 @@
import math
import time
from functools import partial
from concurrent import futures
from concurrent.futures.thread import ThreadPoolExecutor
from pathlib import Path
from typing import Any, MutableMapping, Optional, Union
from typing import Any, Generator, MutableMapping, Optional, Union
from requests import Session
from requests.cookies import RequestsCookieJar
@ -12,37 +13,201 @@ from devine.core.constants import DOWNLOAD_CANCELLED
MAX_ATTEMPTS = 5
RETRY_WAIT = 2
CHUNK_SIZE = 1024
PROGRESS_WINDOW = 5
def download(
url: str,
save_path: Path,
session: Optional[Session] = None,
**kwargs: Any
) -> Generator[dict[str, Any], None, None]:
"""
Download a file using Python Requests.
https://requests.readthedocs.io
Yields the following download status updates while chunks are downloading:
- {total: 123} (there are 123 chunks to download)
- {total: None} (there are an unknown number of chunks to download)
- {advance: 1} (one chunk was downloaded)
- {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s)
- {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size)
The data is in the same format accepted by rich's progress.update() function. The
`downloaded` key is custom and is not natively accepted by all rich progress bars.
Parameters:
url: Web URL of a file to download.
save_path: The path to save the file to. If the save path's directory does not
exist then it will be made automatically.
session: The Requests Session to make HTTP requests with. Useful to set Header,
Cookie, and Proxy data. Connections are saved and re-used with the session
so long as the server keeps the connection alive.
kwargs: Any extra keyword arguments to pass to the session.get() call. Use this
for one-time request changes like a header, cookie, or proxy. For example,
to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`.
"""
session = session or Session()
save_dir = save_path.parent
control_file = save_path.with_name(f"{save_path.name}.!dev")
save_dir.mkdir(parents=True, exist_ok=True)
if control_file.exists():
# consider the file corrupt if the control file exists
save_path.unlink(missing_ok=True)
control_file.unlink()
elif save_path.exists():
# if it exists, and no control file, then it should be safe
yield dict(
file_downloaded=save_path,
written=save_path.stat().st_size
)
# TODO: Design a control file format so we know how much of the file is missing
control_file.write_bytes(b"")
attempts = 1
try:
while True:
written = 0
download_sizes = []
last_speed_refresh = time.time()
try:
stream = session.get(url, stream=True, **kwargs)
stream.raise_for_status()
try:
content_length = int(stream.headers.get("Content-Length", "0"))
except ValueError:
content_length = 0
if content_length > 0:
yield dict(total=math.ceil(content_length / CHUNK_SIZE))
else:
# we have no data to calculate total chunks
yield dict(total=None) # indeterminate mode
with open(save_path, "wb") as f:
for chunk in stream.iter_content(chunk_size=CHUNK_SIZE):
download_size = len(chunk)
f.write(chunk)
written += download_size
yield dict(advance=1)
now = time.time()
time_since = now - last_speed_refresh
download_sizes.append(download_size)
if time_since > PROGRESS_WINDOW or download_size < CHUNK_SIZE:
data_size = sum(download_sizes)
download_speed = math.ceil(data_size / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now
download_sizes.clear()
yield dict(
file_downloaded=save_path,
written=written
)
break
except Exception as e:
save_path.unlink(missing_ok=True)
if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS:
raise e
time.sleep(RETRY_WAIT)
attempts += 1
finally:
control_file.unlink()
def requests(
uri: Union[str, list[str]],
out: Path,
headers: Optional[dict] = None,
urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]],
output_dir: Path,
filename: str,
headers: Optional[MutableMapping[str, Union[str, bytes]]] = None,
cookies: Optional[Union[MutableMapping[str, str], RequestsCookieJar]] = None,
proxy: Optional[str] = None,
progress: Optional[partial] = None,
*_: Any,
**__: Any
) -> int:
max_workers: Optional[int] = None
) -> Generator[dict[str, Any], None, None]:
"""
Download files using Python Requests.
Download a file using Python Requests.
https://requests.readthedocs.io
If multiple URLs are provided they will be downloaded in the provided order
to the output directory. They will not be merged together.
"""
if isinstance(uri, list) and len(uri) == 1:
uri = uri[0]
Yields the following download status updates while chunks are downloading:
if isinstance(uri, list):
if out.is_file():
raise ValueError("Expecting out to be a Directory path not a File as multiple URLs were provided")
uri = [
(url, out / f"{i:08}.mp4")
for i, url in enumerate(uri)
- {total: 123} (there are 123 chunks to download)
- {total: None} (there are an unknown number of chunks to download)
- {advance: 1} (one chunk was downloaded)
- {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s)
- {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size)
The data is in the same format accepted by rich's progress.update() function.
However, The `downloaded`, `file_downloaded` and `written` keys are custom and not
natively accepted by rich progress bars.
Parameters:
urls: Web URL(s) to file(s) to download. You can use a dictionary with the key
"url" for the URI, and other keys for extra arguments to use per-URL.
output_dir: The folder to save the file into. If the save path's directory does
not exist then it will be made automatically.
filename: The filename or filename template to use for each file. The variables
you can use are `i` for the URL index and `ext` for the URL extension.
headers: A mapping of HTTP Header Key/Values to use for all downloads.
cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads.
proxy: An optional proxy URI to route connections through for all downloads.
max_workers: The maximum amount of threads to use for downloads. Defaults to
min(32,(cpu_count+4)).
"""
if not urls:
raise ValueError("urls must be provided and not empty")
elif not isinstance(urls, (str, dict, list)):
raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}")
if not output_dir:
raise ValueError("output_dir must be provided")
elif not isinstance(output_dir, Path):
raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}")
if not filename:
raise ValueError("filename must be provided")
elif not isinstance(filename, str):
raise TypeError(f"Expected filename to be {str}, not {type(filename)}")
if not isinstance(headers, (MutableMapping, type(None))):
raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}")
if not isinstance(cookies, (MutableMapping, RequestsCookieJar, type(None))):
raise TypeError(f"Expected cookies to be {MutableMapping} or {RequestsCookieJar}, not {type(cookies)}")
if not isinstance(proxy, (str, type(None))):
raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}")
if not isinstance(max_workers, (int, type(None))):
raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}")
if not isinstance(urls, list):
urls = [urls]
urls = [
dict(
save_path=save_path,
**url
) if isinstance(url, dict) else dict(
url=url,
save_path=save_path
)
for i, url in enumerate(urls)
for save_path in [output_dir / filename.format(
i=i,
ext=Path((url["url"]) if isinstance(url, dict) else url).suffix
)]
]
else:
uri = [(uri, out.parent / out.name)]
session = Session()
if headers:
@ -57,67 +222,61 @@ def requests(
if proxy:
session.proxies.update({"all": proxy})
if progress:
progress(total=len(uri))
yield dict(total=len(urls))
download_sizes = []
last_speed_refresh = time.time()
for url, out_path in uri:
out_path.parent.mkdir(parents=True, exist_ok=True)
control_file = out_path.with_name(f"{out_path.name}.!dev")
if control_file.exists():
# consider the file corrupt if the control file exists
# TODO: Design a control file format so we know how much of the file is missing
out_path.unlink(missing_ok=True)
control_file.unlink()
elif out_path.exists():
continue
control_file.write_bytes(b"")
attempts = 1
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for i, future in enumerate(futures.as_completed((
pool.submit(
download,
session=session,
**url
)
for url in urls
))):
file_path, download_size = None, None
try:
while True:
try:
stream = session.get(url, stream=True)
stream.raise_for_status()
if len(uri) == 1 and progress:
content_length = int(stream.headers.get("Content-Length", "0"))
if content_length > 0:
progress(total=math.ceil(content_length / 1024))
with open(out_path, "wb") as f:
written = 0
for chunk in stream.iter_content(chunk_size=1024):
download_size = len(chunk)
f.write(chunk)
written += download_size
if progress:
progress(advance=1)
for status_update in future.result():
if status_update.get("file_downloaded") and status_update.get("written"):
file_path = status_update["file_downloaded"]
download_size = status_update["written"]
elif len(urls) == 1:
# these are per-chunk updates, only useful if it's one big file
yield status_update
except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
yield dict(downloaded="[yellow]CANCELLING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[yellow]CANCELLED")
# tell dl that it was cancelled
# the pool is already shut down, so exiting loop is fine
raise
except Exception:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
yield dict(downloaded="[red]FAILING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[red]FAILED")
# tell dl that it failed
# the pool is already shut down, so exiting loop is fine
raise
else:
yield dict(file_downloaded=file_path, written=download_size)
yield dict(advance=1)
now = time.time()
time_since = now - last_speed_refresh
if download_size: # no size == skipped dl
download_sizes.append(download_size)
if time_since > 5 or download_size < 1024:
if download_sizes and (time_since > PROGRESS_WINDOW or i == len(urls)):
data_size = sum(download_sizes)
download_speed = data_size / (time_since or 1)
progress(downloaded=f"{filesize.decimal(download_speed)}/s")
download_speed = math.ceil(data_size / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now
download_sizes.clear()
break
except Exception as e:
out_path.unlink(missing_ok=True)
if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS:
raise e
time.sleep(RETRY_WAIT)
attempts += 1
finally:
control_file.unlink()
return 0
__all__ = ("requests",)