mirror of https://github.com/devine-dl/devine.git
Implement --no-proxy to disable all uses proxies and proxy providers
This prevents a service from setting a proxy if geofenced, and also discards any manually provided proxy from `--proxy`.
This commit is contained in:
parent
f28a6dc28a
commit
58cb00b18b
|
@ -145,6 +145,7 @@ class dl:
|
|||
def __init__(
|
||||
self,
|
||||
ctx: click.Context,
|
||||
no_proxy: bool,
|
||||
profile: Optional[str] = None,
|
||||
proxy: Optional[str] = None,
|
||||
group: Optional[str] = None,
|
||||
|
@ -192,49 +193,52 @@ class dl:
|
|||
self.vaults.load(vault_type, **vault)
|
||||
self.log.info(f"Loaded {len(self.vaults)} Vaults")
|
||||
|
||||
with console.status("Loading Proxy Providers...", spinner="dots"):
|
||||
self.proxy_providers = []
|
||||
if config.proxy_providers.get("basic"):
|
||||
self.proxy_providers.append(Basic(**config.proxy_providers["basic"]))
|
||||
if config.proxy_providers.get("nordvpn"):
|
||||
self.proxy_providers.append(NordVPN(**config.proxy_providers["nordvpn"]))
|
||||
if get_binary_path("hola-proxy"):
|
||||
self.proxy_providers.append(Hola())
|
||||
for proxy_provider in self.proxy_providers:
|
||||
self.log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}")
|
||||
self.proxy_providers = []
|
||||
if no_proxy:
|
||||
ctx.params["proxy"] = None
|
||||
else:
|
||||
with console.status("Loading Proxy Providers...", spinner="dots"):
|
||||
if config.proxy_providers.get("basic"):
|
||||
self.proxy_providers.append(Basic(**config.proxy_providers["basic"]))
|
||||
if config.proxy_providers.get("nordvpn"):
|
||||
self.proxy_providers.append(NordVPN(**config.proxy_providers["nordvpn"]))
|
||||
if get_binary_path("hola-proxy"):
|
||||
self.proxy_providers.append(Hola())
|
||||
for proxy_provider in self.proxy_providers:
|
||||
self.log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}")
|
||||
|
||||
if proxy:
|
||||
requested_provider = None
|
||||
if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE):
|
||||
# requesting proxy from a specific proxy provider
|
||||
requested_provider, proxy = proxy.split(":", maxsplit=1)
|
||||
if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE):
|
||||
proxy = proxy.lower()
|
||||
with console.status(f"Getting a Proxy to {proxy}...", spinner="dots"):
|
||||
if requested_provider:
|
||||
proxy_provider = next((
|
||||
x
|
||||
for x in self.proxy_providers
|
||||
if x.__class__.__name__.lower() == requested_provider
|
||||
), None)
|
||||
if not proxy_provider:
|
||||
self.log.error(f"The proxy provider '{requested_provider}' was not recognised.")
|
||||
sys.exit(1)
|
||||
proxy_uri = proxy_provider.get_proxy(proxy)
|
||||
if not proxy_uri:
|
||||
self.log.error(f"The proxy provider {requested_provider} had no proxy for {proxy}")
|
||||
sys.exit(1)
|
||||
proxy = ctx.params["proxy"] = proxy_uri
|
||||
self.log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}")
|
||||
else:
|
||||
for proxy_provider in self.proxy_providers:
|
||||
if proxy:
|
||||
requested_provider = None
|
||||
if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE):
|
||||
# requesting proxy from a specific proxy provider
|
||||
requested_provider, proxy = proxy.split(":", maxsplit=1)
|
||||
if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE):
|
||||
proxy = proxy.lower()
|
||||
with console.status(f"Getting a Proxy to {proxy}...", spinner="dots"):
|
||||
if requested_provider:
|
||||
proxy_provider = next((
|
||||
x
|
||||
for x in self.proxy_providers
|
||||
if x.__class__.__name__.lower() == requested_provider
|
||||
), None)
|
||||
if not proxy_provider:
|
||||
self.log.error(f"The proxy provider '{requested_provider}' was not recognised.")
|
||||
sys.exit(1)
|
||||
proxy_uri = proxy_provider.get_proxy(proxy)
|
||||
if proxy_uri:
|
||||
proxy = ctx.params["proxy"] = proxy_uri
|
||||
self.log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}")
|
||||
break
|
||||
else:
|
||||
self.log.info(f"Using explicit Proxy: {proxy}")
|
||||
if not proxy_uri:
|
||||
self.log.error(f"The proxy provider {requested_provider} had no proxy for {proxy}")
|
||||
sys.exit(1)
|
||||
proxy = ctx.params["proxy"] = proxy_uri
|
||||
self.log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}")
|
||||
else:
|
||||
for proxy_provider in self.proxy_providers:
|
||||
proxy_uri = proxy_provider.get_proxy(proxy)
|
||||
if proxy_uri:
|
||||
proxy = ctx.params["proxy"] = proxy_uri
|
||||
self.log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}")
|
||||
break
|
||||
else:
|
||||
self.log.info(f"Using explicit Proxy: {proxy}")
|
||||
|
||||
ctx.obj = ContextData(
|
||||
config=self.service_config,
|
||||
|
@ -274,6 +278,7 @@ class dl:
|
|||
skip_dl: bool,
|
||||
export: Optional[Path],
|
||||
cdm_only: Optional[bool],
|
||||
no_proxy: bool,
|
||||
no_folder: bool,
|
||||
no_source: bool,
|
||||
workers: int,
|
||||
|
|
|
@ -41,39 +41,40 @@ class Service(metaclass=ABCMeta):
|
|||
self.session = self.get_session()
|
||||
self.cache = Cacher(self.__class__.__name__)
|
||||
|
||||
if ctx.parent:
|
||||
self.proxy = ctx.parent.params["proxy"]
|
||||
else:
|
||||
self.proxy = None
|
||||
if not ctx.parent or not ctx.parent.params.get("no_proxy"):
|
||||
if ctx.parent:
|
||||
proxy = ctx.parent.params["proxy"]
|
||||
else:
|
||||
proxy = None
|
||||
|
||||
if not self.proxy:
|
||||
# don't override the explicit proxy set by the user, even if they may be geoblocked
|
||||
with console.status("Checking if current region is Geoblocked...", spinner="dots"):
|
||||
if self.GEOFENCE:
|
||||
# no explicit proxy, let's get one to GEOFENCE if needed
|
||||
current_region = get_ip_info(self.session)["country"].lower()
|
||||
if any(x.lower() == current_region for x in self.GEOFENCE):
|
||||
self.log.info("Service is not Geoblocked in your region")
|
||||
if not proxy:
|
||||
# don't override the explicit proxy set by the user, even if they may be geoblocked
|
||||
with console.status("Checking if current region is Geoblocked...", spinner="dots"):
|
||||
if self.GEOFENCE:
|
||||
# no explicit proxy, let's get one to GEOFENCE if needed
|
||||
current_region = get_ip_info(self.session)["country"].lower()
|
||||
if any(x.lower() == current_region for x in self.GEOFENCE):
|
||||
self.log.info("Service is not Geoblocked in your region")
|
||||
else:
|
||||
requested_proxy = self.GEOFENCE[0] # first is likely main region
|
||||
self.log.info(f"Service is Geoblocked in your region, getting a Proxy to {requested_proxy}")
|
||||
for proxy_provider in ctx.obj.proxy_providers:
|
||||
proxy = proxy_provider.get_proxy(requested_proxy)
|
||||
if proxy:
|
||||
self.log.info(f"Got Proxy from {proxy_provider.__class__.__name__}")
|
||||
break
|
||||
else:
|
||||
requested_proxy = self.GEOFENCE[0] # first is likely main region
|
||||
self.log.info(f"Service is Geoblocked in your region, getting a Proxy to {requested_proxy}")
|
||||
for proxy_provider in ctx.obj.proxy_providers:
|
||||
self.proxy = proxy_provider.get_proxy(requested_proxy)
|
||||
if self.proxy:
|
||||
self.log.info(f"Got Proxy from {proxy_provider.__class__.__name__}")
|
||||
break
|
||||
else:
|
||||
self.log.info("Service has no Geofence")
|
||||
self.log.info("Service has no Geofence")
|
||||
|
||||
if self.proxy:
|
||||
self.session.proxies.update({"all": self.proxy})
|
||||
proxy_parse = urlparse(self.proxy)
|
||||
if proxy_parse.username and proxy_parse.password:
|
||||
self.session.headers.update({
|
||||
"Proxy-Authorization": base64.b64encode(
|
||||
f"{proxy_parse.username}:{proxy_parse.password}".encode("utf8")
|
||||
).decode()
|
||||
})
|
||||
if proxy:
|
||||
self.session.proxies.update({"all": proxy})
|
||||
proxy_parse = urlparse(proxy)
|
||||
if proxy_parse.username and proxy_parse.password:
|
||||
self.session.headers.update({
|
||||
"Proxy-Authorization": base64.b64encode(
|
||||
f"{proxy_parse.username}:{proxy_parse.password}".encode("utf8")
|
||||
).decode()
|
||||
})
|
||||
|
||||
# Optional Abstract functions
|
||||
# The following functions may be implemented by the Service.
|
||||
|
|
Loading…
Reference in New Issue