Fixed RemoteCDM
This commit is contained in:
parent
8a4be776eb
commit
374a0c16cc
|
@ -1,5 +1,7 @@
|
|||
import base64
|
||||
from enum import Enum
|
||||
from uuid import UUID
|
||||
from typing import Optional, Union
|
||||
|
||||
|
||||
class Key:
|
||||
|
@ -40,3 +42,23 @@ class Key:
|
|||
self.cipher_type = self.CipherType(cipher_type)
|
||||
self.key_length = key_length
|
||||
self.key = key
|
||||
|
||||
@staticmethod
|
||||
def kid_to_uuid(kid: Union[str, bytes]) -> UUID:
|
||||
"""
|
||||
Convert a Key ID from a string or bytes to a UUID object.
|
||||
At first this may seem very simple but some types of Key IDs
|
||||
may not be 16 bytes and some may be decimal vs. hex.
|
||||
"""
|
||||
if isinstance(kid, str):
|
||||
kid = base64.b64decode(kid)
|
||||
if not kid:
|
||||
kid = b"\x00" * 16
|
||||
|
||||
if kid.decode(errors="replace").isdigit():
|
||||
return UUID(int=int(kid.decode()))
|
||||
|
||||
if len(kid) < 16:
|
||||
kid += b"\x00" * (16 - len(kid))
|
||||
|
||||
return UUID(bytes=kid)
|
||||
|
|
|
@ -50,7 +50,7 @@ class RemoteCdm(Cdm):
|
|||
self.device_name = device_name
|
||||
|
||||
# spoof certificate_chain and ecc_key just so we can construct via super call
|
||||
super().__init__(security_level, CertificateChain(), ECCKey(), ECCKey())
|
||||
super().__init__(security_level, CertificateChain, ECCKey, ECCKey)
|
||||
|
||||
self.__session = requests.Session()
|
||||
self.__session.headers.update({
|
||||
|
@ -78,13 +78,11 @@ class RemoteCdm(Cdm):
|
|||
r = self.__session.get(
|
||||
url=f"{self.host}/{self.device_name}/open"
|
||||
).json()
|
||||
|
||||
if r['status'] != 200:
|
||||
raise ValueError(f"Cannot Open CDM Session, {r['message']} [{r['status']}]")
|
||||
r = r["data"]
|
||||
|
||||
if int(r["device"]["system_id"]) != self.system_id:
|
||||
raise DeviceMismatch("The System ID specified does not match the one specified in the API response.")
|
||||
|
||||
if int(r["device"]["security_level"]) != self.security_level:
|
||||
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
|
||||
|
||||
|
@ -100,19 +98,19 @@ class RemoteCdm(Cdm):
|
|||
def get_license_challenge(
|
||||
self,
|
||||
session_id: bytes,
|
||||
pssh: PSSH,
|
||||
pssh: str,
|
||||
downgrade: str
|
||||
) -> str:
|
||||
if not pssh:
|
||||
raise InvalidInitData("A pssh must be provided.")
|
||||
if not isinstance(pssh, PSSH):
|
||||
if not isinstance(pssh, str):
|
||||
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/get_license_challenge",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"init_data": pssh.dumps(),
|
||||
"init_data": pssh,
|
||||
"downgrade": downgrade,
|
||||
}
|
||||
).json()
|
||||
|
@ -152,10 +150,11 @@ class RemoteCdm(Cdm):
|
|||
|
||||
return [
|
||||
Key(
|
||||
type_=key["type"],
|
||||
kid=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
|
||||
key_type=key["type"],
|
||||
key_id=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
|
||||
key=bytes.fromhex(key["key"]),
|
||||
cipher_type=key["cipher_type"]
|
||||
cipher_type=key["cipher_type"],
|
||||
key_length=key["key_length"]
|
||||
)
|
||||
for key in r["keys"]
|
||||
]
|
||||
|
|
|
@ -256,6 +256,7 @@ async def get_keys(request: web.Request) -> web.Response:
|
|||
"key": key.key.hex(),
|
||||
"type": str(key.key_type),
|
||||
"cipher_type": str(key.cipher_type),
|
||||
"key_length": str(key.key_length),
|
||||
}
|
||||
for key in keys
|
||||
]
|
||||
|
|
Loading…
Reference in New Issue