Cdm: Change construction interface to allow manual creation

This is so you can construct a Cdm object without using `.wvd` files (nor the Device class). It also improves enforcement of some required data from the Device. The underlying Device object is discarded for it's data as it won't be required.

Note that the Client ID and Private Key related variables are now stored as private `__var` variables to further amplify their private nature and to really discourage manual read write. This is not impossible to workaround in Python but further discourages manual read/writes to the variable that could cause serious issues.

The RSA Key is also no longer stored as-is. It is now stored as PSS and PKCS1_OAEP objects, as they will be used like so. This makes it even more annoying to directly read/write the RSA key (but not impossible).
This commit is contained in:
rlaphoenix 2022-08-04 04:15:37 +01:00
parent f1a38d1966
commit c969d80931
3 changed files with 64 additions and 19 deletions

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import base64 import base64
import binascii import binascii
import random import random
@ -62,14 +64,61 @@ class Cdm:
MAX_NUM_OF_SESSIONS = 50 # most common limit MAX_NUM_OF_SESSIONS = 50 # most common limit
def __init__(self, device: Device): def __init__(
self,
device_type: Device.Types,
system_id: int,
security_level: int,
client_id: ClientIdentification,
rsa_key: RSA.RsaKey
):
"""Initialize a Widevine Content Decryption Module (CDM).""" """Initialize a Widevine Content Decryption Module (CDM)."""
if not device: if not device_type:
raise ValueError("A Widevine Device must be provided.") raise ValueError("Device Type must be provided")
self.device = device if not isinstance(device_type, Device.Types):
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
if not system_id:
raise ValueError("System ID must be provided")
if not isinstance(system_id, int):
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
if not security_level:
raise ValueError("Security Level must be provided")
if not isinstance(security_level, int):
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
if not client_id:
raise ValueError("Client ID must be provided")
if not isinstance(client_id, ClientIdentification):
raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}")
if not rsa_key:
raise ValueError("RSA Key must be provided")
if not isinstance(rsa_key, RSA.RsaKey):
raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}")
self.device_type = device_type
self.system_id = system_id
self.security_level = security_level
self.__client_id = client_id
self.__signer = pss.new(rsa_key)
self.__decrypter = PKCS1_OAEP.new(rsa_key)
self._sessions: dict[bytes, Session] = {} self._sessions: dict[bytes, Session] = {}
@classmethod
def from_device(cls, device: Device) -> Cdm:
"""Initialize a Widevine CDM from a Widevine Device (.wvd) file."""
return cls(
device_type=device.type,
system_id=device.system_id,
security_level=device.security_level,
client_id=device.client_id,
rsa_key=device.private_key
)
def open(self) -> bytes: def open(self) -> bytes:
""" """
Open a Widevine Content Decryption Module (CDM) session. Open a Widevine Content Decryption Module (CDM) session.
@ -238,19 +287,16 @@ class Cdm:
if session.service_certificate and privacy_mode: if session.service_certificate and privacy_mode:
# encrypt the client id for privacy mode # encrypt the client id for privacy mode
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id( license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
client_id=self.device.client_id, client_id=self.__client_id,
service_certificate=session.service_certificate service_certificate=session.service_certificate
)) ))
else: else:
license_request.client_id.CopyFrom(self.device.client_id) license_request.client_id.CopyFrom(self.__client_id)
license_message = SignedMessage() license_message = SignedMessage()
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
license_message.msg = license_request.SerializeToString() license_message.msg = license_request.SerializeToString()
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
license_message.signature = pss. \
new(self.device.private_key). \
sign(SHA1.new(license_message.msg))
session.context[request_id] = self.derive_context(license_message.msg) session.context[request_id] = self.derive_context(license_message.msg)
@ -315,11 +361,10 @@ class Cdm:
if not context: if not context:
raise InvalidContext("Cannot parse a license message without first making a license request") raise InvalidContext("Cannot parse a license message without first making a license request")
session_key = PKCS1_OAEP. \ enc_key, mac_key_server, _ = self.derive_keys(
new(self.device.private_key). \ *context,
decrypt(license_message.session_key) key=self.__decrypter.decrypt(license_message.session_key)
)
enc_key, mac_key_server, _ = self.derive_keys(*context, session_key)
computed_signature = HMAC. \ computed_signature = HMAC. \
new(mac_key_server, digestmod=SHA256). \ new(mac_key_server, digestmod=SHA256). \

View File

@ -68,7 +68,7 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
log.debug(device) log.debug(device)
# load cdm # load cdm
cdm = Cdm(device) cdm = Cdm.from_device(device)
log.info(f"[+] Loaded CDM") log.info(f"[+] Loaded CDM")
log.debug(cdm) log.debug(cdm)

View File

@ -66,7 +66,7 @@ async def open(request: web.Request) -> web.Response:
cdm = request.app["cdms"].get((secret_key, device_name)) cdm = request.app["cdms"].get((secret_key, device_name))
if not cdm: if not cdm:
device = Device.load(request.app["config"]["devices"][device_name]) device = Device.load(request.app["config"]["devices"][device_name])
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm(device) cdm = request.app["cdms"][(secret_key, device_name)] = Cdm.from_device(device)
try: try:
session_id = cdm.open() session_id = cdm.open()
@ -82,8 +82,8 @@ async def open(request: web.Request) -> web.Response:
"data": { "data": {
"session_id": session_id.hex(), "session_id": session_id.hex(),
"device": { "device": {
"system_id": cdm.device.system_id, "system_id": cdm.system_id,
"security_level": cdm.device.security_level "security_level": cdm.security_level
} }
} }
}) })