From c969d809314a3c005610b0e2841c3d77e9c93bd0 Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Thu, 4 Aug 2022 04:15:37 +0100 Subject: [PATCH] Cdm: Change construction interface to allow manual creation This is so you can construct a Cdm object without using `.wvd` files (nor the Device class). It also improves enforcement of some required data from the Device. The underlying Device object is discarded for it's data as it won't be required. Note that the Client ID and Private Key related variables are now stored as private `__var` variables to further amplify their private nature and to really discourage manual read write. This is not impossible to workaround in Python but further discourages manual read/writes to the variable that could cause serious issues. The RSA Key is also no longer stored as-is. It is now stored as PSS and PKCS1_OAEP objects, as they will be used like so. This makes it even more annoying to directly read/write the RSA key (but not impossible). --- pywidevine/cdm.py | 75 ++++++++++++++++++++++++++++++++++++--------- pywidevine/main.py | 2 +- pywidevine/serve.py | 6 ++-- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/pywidevine/cdm.py b/pywidevine/cdm.py index be14a36..1045e23 100644 --- a/pywidevine/cdm.py +++ b/pywidevine/cdm.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import base64 import binascii import random @@ -62,14 +64,61 @@ class Cdm: MAX_NUM_OF_SESSIONS = 50 # most common limit - def __init__(self, device: Device): + def __init__( + self, + device_type: Device.Types, + system_id: int, + security_level: int, + client_id: ClientIdentification, + rsa_key: RSA.RsaKey + ): """Initialize a Widevine Content Decryption Module (CDM).""" - if not device: - raise ValueError("A Widevine Device must be provided.") - self.device = device + if not device_type: + raise ValueError("Device Type must be provided") + if not isinstance(device_type, Device.Types): + raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}") + + if not system_id: + raise ValueError("System ID must be provided") + if not isinstance(system_id, int): + raise TypeError(f"Expected system_id to be a {int} not {system_id!r}") + + if not security_level: + raise ValueError("Security Level must be provided") + if not isinstance(security_level, int): + raise TypeError(f"Expected security_level to be a {int} not {security_level!r}") + + if not client_id: + raise ValueError("Client ID must be provided") + if not isinstance(client_id, ClientIdentification): + raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}") + + if not rsa_key: + raise ValueError("RSA Key must be provided") + if not isinstance(rsa_key, RSA.RsaKey): + raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}") + + self.device_type = device_type + self.system_id = system_id + self.security_level = security_level + self.__client_id = client_id + + self.__signer = pss.new(rsa_key) + self.__decrypter = PKCS1_OAEP.new(rsa_key) self._sessions: dict[bytes, Session] = {} + @classmethod + def from_device(cls, device: Device) -> Cdm: + """Initialize a Widevine CDM from a Widevine Device (.wvd) file.""" + return cls( + device_type=device.type, + system_id=device.system_id, + security_level=device.security_level, + client_id=device.client_id, + rsa_key=device.private_key + ) + def open(self) -> bytes: """ Open a Widevine Content Decryption Module (CDM) session. @@ -238,19 +287,16 @@ class Cdm: if session.service_certificate and privacy_mode: # encrypt the client id for privacy mode license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id( - client_id=self.device.client_id, + client_id=self.__client_id, service_certificate=session.service_certificate )) else: - license_request.client_id.CopyFrom(self.device.client_id) + license_request.client_id.CopyFrom(self.__client_id) license_message = SignedMessage() license_message.type = SignedMessage.MessageType.LICENSE_REQUEST license_message.msg = license_request.SerializeToString() - - license_message.signature = pss. \ - new(self.device.private_key). \ - sign(SHA1.new(license_message.msg)) + license_message.signature = self.__signer.sign(SHA1.new(license_message.msg)) session.context[request_id] = self.derive_context(license_message.msg) @@ -315,11 +361,10 @@ class Cdm: if not context: raise InvalidContext("Cannot parse a license message without first making a license request") - session_key = PKCS1_OAEP. \ - new(self.device.private_key). \ - decrypt(license_message.session_key) - - enc_key, mac_key_server, _ = self.derive_keys(*context, session_key) + enc_key, mac_key_server, _ = self.derive_keys( + *context, + key=self.__decrypter.decrypt(license_message.session_key) + ) computed_signature = HMAC. \ new(mac_key_server, digestmod=SHA256). \ diff --git a/pywidevine/main.py b/pywidevine/main.py index cfbc24d..f52b3be 100644 --- a/pywidevine/main.py +++ b/pywidevine/main.py @@ -68,7 +68,7 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool): log.debug(device) # load cdm - cdm = Cdm(device) + cdm = Cdm.from_device(device) log.info(f"[+] Loaded CDM") log.debug(cdm) diff --git a/pywidevine/serve.py b/pywidevine/serve.py index 2f775d6..42b0612 100644 --- a/pywidevine/serve.py +++ b/pywidevine/serve.py @@ -66,7 +66,7 @@ async def open(request: web.Request) -> web.Response: cdm = request.app["cdms"].get((secret_key, device_name)) if not cdm: device = Device.load(request.app["config"]["devices"][device_name]) - cdm = request.app["cdms"][(secret_key, device_name)] = Cdm(device) + cdm = request.app["cdms"][(secret_key, device_name)] = Cdm.from_device(device) try: session_id = cdm.open() @@ -82,8 +82,8 @@ async def open(request: web.Request) -> web.Response: "data": { "session_id": session_id.hex(), "device": { - "system_id": cdm.device.system_id, - "security_level": cdm.device.security_level + "system_id": cdm.system_id, + "security_level": cdm.security_level } } })