KeyDive/keydive/cdm.py

280 lines
12 KiB
Python
Raw Normal View History

2024-07-08 17:01:25 +00:00
import base64
2024-07-06 18:01:47 +00:00
import json
import logging
2024-07-17 22:09:04 +00:00
2024-07-06 18:01:47 +00:00
from typing import Union
from zlib import crc32
2024-07-17 22:09:04 +00:00
from unidecode import unidecode
2025-01-19 13:13:07 +00:00
from pathlib import Path
2024-07-23 20:16:35 +00:00
2024-09-25 13:33:42 +00:00
from pathvalidate import sanitize_filepath, sanitize_filename
2024-07-06 18:01:47 +00:00
from Cryptodome.PublicKey import RSA
2024-07-06 19:11:00 +00:00
from Cryptodome.PublicKey.RSA import RsaKey
2024-07-08 17:01:25 +00:00
from pywidevine.device import Device, DeviceTypes
2025-01-19 13:13:07 +00:00
from pywidevine.license_protocol_pb2 import (
SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate, DrmCertificate,
EncryptedClientIdentification)
2024-07-06 18:01:47 +00:00
2024-10-20 14:06:43 +00:00
from keydive.constants import OEM_CRYPTO_API
2024-10-27 18:39:09 +00:00
from keydive.keybox import Keybox
2024-10-20 14:06:43 +00:00
2024-07-06 18:01:47 +00:00
class Cdm:
"""
The Cdm class manages CDM-related operations, such as setting challenge data,
extracting and storing private keys, and exporting device information.
"""
2024-10-27 18:39:09 +00:00
def __init__(self, keybox: bool = False):
2024-10-20 14:06:43 +00:00
"""
Initializes the Cdm object, setting up a logger and containers for client IDs and private keys.
2025-01-19 13:13:07 +00:00
Parameters:
2024-10-27 18:39:09 +00:00
keybox (bool, optional): Initializes a Keybox instance for secure key management.
2024-10-20 14:06:43 +00:00
"""
2024-07-06 18:01:47 +00:00
self.logger = logging.getLogger(self.__class__.__name__)
# https://github.com/devine-dl/pywidevine
self.client_id: dict[int, ClientIdentification] = {}
2024-07-06 19:11:00 +00:00
self.private_key: dict[int, RsaKey] = {}
2025-01-19 13:13:07 +00:00
# Optionally initialize a Keybox instance for secure key management if 'keybox' is True
2024-10-27 18:39:09 +00:00
self.keybox = Keybox() if keybox else None
2024-07-06 18:01:47 +00:00
2024-07-23 20:16:35 +00:00
@staticmethod
def __client_info(client_id: ClientIdentification) -> dict:
2024-07-06 18:01:47 +00:00
"""
Converts client identification information to a dictionary.
2025-01-19 13:13:07 +00:00
Parameters:
2024-07-06 18:01:47 +00:00
client_id (ClientIdentification): The client identification.
Returns:
dict: A dictionary of client information.
"""
return {e.name: e.value for e in client_id.client_info}
2024-07-23 20:16:35 +00:00
@staticmethod
def __encrypted_client_info(encrypted_client_id: EncryptedClientIdentification) -> dict:
2024-07-08 17:01:25 +00:00
"""
Converts encrypted client identification information to a dictionary.
2025-01-19 13:13:07 +00:00
Parameters:
2024-07-08 17:01:25 +00:00
encrypted_client_id (EncryptedClientIdentification): The encrypted client identification.
Returns:
dict: A dictionary of encrypted client information.
"""
content = {
2025-01-19 13:13:07 +00:00
"providerId": encrypted_client_id.provider_id,
"serviceCertificateSerialNumber": encrypted_client_id.service_certificate_serial_number,
"encryptedClientId": encrypted_client_id.encrypted_client_id,
"encryptedClientIdIv": encrypted_client_id.encrypted_client_id_iv,
"encryptedPrivacyKey": encrypted_client_id.encrypted_privacy_key
2024-07-08 17:01:25 +00:00
}
return {
2025-01-19 13:13:07 +00:00
k: base64.b64encode(v).decode("utf-8") if isinstance(v, bytes) else v
2024-07-08 17:01:25 +00:00
for k, v in content.items()
}
2024-07-06 18:01:47 +00:00
def set_challenge(self, data: Union[Path, bytes]) -> None:
"""
2025-01-19 13:13:07 +00:00
Sets the challenge data by extracting device information and client ID.
2024-07-06 18:01:47 +00:00
2025-01-19 13:13:07 +00:00
Parameters:
data (Union[Path, bytes]): Challenge data as a file path or raw bytes.
2024-07-08 17:19:45 +00:00
Raises:
2025-01-19 13:13:07 +00:00
FileNotFoundError: If the file path doesn't exist.
Exception: Logs any other exceptions that occur.
2024-07-06 18:01:47 +00:00
"""
2024-07-08 17:02:52 +00:00
try:
2025-01-19 13:13:07 +00:00
# Check if the data is a Path object, indicating it's a file path
if isinstance(data, Path):
data = data.read_bytes()
# Parse the signed message from the data
2024-07-06 18:01:47 +00:00
signed_message = SignedMessage()
signed_message.ParseFromString(data)
2025-01-19 13:13:07 +00:00
# Parse the license request from the signed message
2024-07-06 18:01:47 +00:00
license_request = LicenseRequest()
license_request.ParseFromString(signed_message.msg)
2025-01-19 13:13:07 +00:00
# Extract the encrypted client ID, if available
2024-07-08 17:01:25 +00:00
# https://integration.widevine.com/diagnostics
encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id
if encrypted_client_id.SerializeToString():
2025-01-19 13:13:07 +00:00
# If encrypted, log the encrypted client ID and indicate encryption
self.logger.info("Receive encrypted client id: \n\n%s\n", json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2))
self.logger.warning("The client ID of the challenge is encrypted")
2024-07-08 17:01:25 +00:00
else:
2025-01-19 13:13:07 +00:00
# If unencrypted, extract and set the client ID
2024-07-08 17:01:25 +00:00
client_id: ClientIdentification = license_request.client_id
self.set_client_id(data=client_id)
2025-01-19 13:13:07 +00:00
except FileNotFoundError as e:
raise FileNotFoundError(f"Challenge file not found: {data}") from e
2024-07-06 18:01:47 +00:00
except Exception as e:
2025-01-19 13:13:07 +00:00
self.logger.debug("Failed to set challenge data: %s", e)
2024-07-06 18:01:47 +00:00
2024-10-26 13:21:34 +00:00
def set_private_key(self, data: Union[Path, bytes], name: str = None) -> None:
2024-07-06 18:01:47 +00:00
"""
Sets the private key from the provided data.
2025-01-19 13:13:07 +00:00
Parameters:
data (Union[Path, bytes]): The private key data, either as a file path or byte data.
2024-10-26 13:21:34 +00:00
name (str, optional): Function name for verification against known functions.
Raises:
2025-01-19 13:13:07 +00:00
FileNotFoundError: If the file path doesn't exist.
Exception: Logs any other exceptions that occur.
2024-07-06 18:01:47 +00:00
"""
try:
2025-01-19 13:13:07 +00:00
# Check if the data is a Path object, indicating it's a file path
if isinstance(data, Path):
data = data.read_bytes()
# Import the private key using the RSA module
2024-07-06 18:01:47 +00:00
key = RSA.import_key(data)
2025-01-19 13:13:07 +00:00
# Log the private key if it's not already in the dictionary
2024-07-06 18:01:47 +00:00
if key.n not in self.private_key:
2025-01-19 13:13:07 +00:00
self.logger.info("Receive private key: \n\n%s\n", key.exportKey("PEM").decode("utf-8"))
2024-10-20 14:06:43 +00:00
2025-01-19 13:13:07 +00:00
# If a function name is provided, verify it against known functions
2024-10-26 13:21:34 +00:00
if name and name not in OEM_CRYPTO_API:
2025-01-19 13:13:07 +00:00
self.logger.warning("The function '%s' does not belong to the referenced functions. Communicate it to the developer to improve the tool.",name)
2024-10-20 14:06:43 +00:00
2025-01-19 13:13:07 +00:00
# Store the private key in the dictionary, using the modulus (key.n) as the key
2024-07-06 18:01:47 +00:00
self.private_key[key.n] = key
2025-01-19 13:13:07 +00:00
except FileNotFoundError as e:
raise FileNotFoundError(f"Private key file not found: {data}") from e
2024-07-06 18:01:47 +00:00
except Exception as e:
2025-01-19 13:13:07 +00:00
self.logger.debug("Failed to set private key: %s", e)
2024-07-06 18:01:47 +00:00
def set_client_id(self, data: Union[ClientIdentification, bytes]) -> None:
"""
Sets the client ID from the provided data.
2025-01-19 13:13:07 +00:00
Parameters:
2024-07-06 18:01:47 +00:00
data (Union[ClientIdentification, bytes]): The client ID data.
"""
try:
2025-01-19 13:13:07 +00:00
# Check if the provided data is already a `ClientIdentification` object
2024-07-06 18:01:47 +00:00
if isinstance(data, ClientIdentification):
client_id = data
else:
2025-01-19 13:13:07 +00:00
# Deserialize the byte data into a `ClientIdentification` object
2024-07-06 18:01:47 +00:00
client_id = ClientIdentification()
client_id.ParseFromString(data)
2025-01-19 13:13:07 +00:00
# Initialize objects for parsing the DRM certificate and signed certificate
2024-07-06 18:01:47 +00:00
signed_drm_certificate = SignedDrmCertificate()
drm_certificate = DrmCertificate()
2025-01-19 13:13:07 +00:00
# Parse the signed DRM certificate from the client ID token
2024-07-06 18:01:47 +00:00
signed_drm_certificate.ParseFromString(client_id.token)
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
2025-01-19 13:13:07 +00:00
# Extract the public key from the DRM certificate
2024-07-06 18:01:47 +00:00
public_key = drm_certificate.public_key
key = RSA.importKey(public_key)
2025-01-19 13:13:07 +00:00
# Check if this public key has already been recorded and log the client ID info
2024-07-06 18:01:47 +00:00
if key.n not in self.client_id:
2025-01-19 13:13:07 +00:00
self.logger.info("Receive client id: \n\n%s\n", json.dumps(self.__client_info(client_id), indent=2))
# Store the client ID in the client_id dictionary, using the public key modulus (`key.n`) as the key
2024-07-06 18:01:47 +00:00
self.client_id[key.n] = client_id
except Exception as e:
2025-01-19 13:13:07 +00:00
self.logger.debug("Failed to set client ID: %s", e)
2024-07-06 18:01:47 +00:00
2024-10-27 18:39:09 +00:00
def set_device_id(self, data: bytes) -> None:
"""
2025-01-19 13:13:07 +00:00
Sets the device ID in the keybox.
2024-10-27 18:39:09 +00:00
2025-01-19 13:13:07 +00:00
Parameters:
data (bytes): The device ID to be stored in the keybox.
2024-10-27 18:39:09 +00:00
"""
if self.keybox:
self.keybox.set_device_id(data=data)
def set_keybox(self, data: bytes) -> None:
"""
Sets the keybox data.
2025-01-19 13:13:07 +00:00
Parameters:
2024-10-27 18:39:09 +00:00
data (bytes): The keybox data to be set.
"""
if self.keybox:
self.keybox.set_keybox(data=data)
2024-07-06 18:01:47 +00:00
def export(self, parent: Path, wvd: bool = False) -> bool:
"""
2025-01-19 13:13:07 +00:00
Exports client ID, private key, and optionally WVD files to disk.
2024-07-06 18:01:47 +00:00
2025-01-19 13:13:07 +00:00
Parameters:
parent (Path): Directory to export the files to.
wvd (bool, optional): Whether to export WVD files. Defaults to False.
2024-07-06 18:01:47 +00:00
Returns:
bool: True if any keys were exported, otherwise False.
"""
2025-01-19 13:13:07 +00:00
# Find the intersection of client IDs and private keys
2024-07-22 18:30:32 +00:00
keys = self.client_id.keys() & self.private_key.keys()
2025-01-19 13:13:07 +00:00
2024-07-06 18:01:47 +00:00
for k in keys:
2025-01-19 13:13:07 +00:00
# Retrieve client information based on the client ID
2024-07-06 18:01:47 +00:00
client_info = self.__client_info(self.client_id[k])
2025-01-19 13:13:07 +00:00
2024-07-06 18:01:47 +00:00
# https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211
device = Device(
client_id=self.client_id[k].SerializeToString(),
2025-01-19 13:13:07 +00:00
private_key=self.private_key[k].exportKey("PEM"),
2024-07-06 18:01:47 +00:00
type_=DeviceTypes.ANDROID,
security_level=3,
flags=None
)
2025-01-19 13:13:07 +00:00
# Generate a sanitized file path for exporting the data
2024-07-06 18:01:47 +00:00
# https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146958022
2025-01-19 13:13:07 +00:00
parent = sanitize_filepath(parent / client_info["company_name"] / client_info["model_name"] / str(device.system_id) / str(k)[:10])
2024-07-06 18:01:47 +00:00
parent.mkdir(parents=True, exist_ok=True)
2025-01-19 13:13:07 +00:00
# Export the client ID to a binary file
path_id_bin = parent / "client_id.bin"
2024-07-06 18:01:47 +00:00
path_id_bin.write_bytes(data=device.client_id.SerializeToString())
2025-01-19 13:13:07 +00:00
self.logger.info("Exported client ID: %s", path_id_bin)
2024-07-06 18:01:47 +00:00
2025-01-19 13:13:07 +00:00
# Export the private key to a PEM file
path_key_bin = parent / "private_key.pem"
path_key_bin.write_bytes(data=device.private_key.exportKey("PEM"))
self.logger.info("Exported private key: %s", path_key_bin)
2024-07-06 18:01:47 +00:00
2025-01-19 13:13:07 +00:00
# If the WVD option is enabled, export the WVD file
2024-07-06 18:01:47 +00:00
if wvd:
2025-01-19 13:13:07 +00:00
# Serialize the device to WVD format
2024-07-06 18:01:47 +00:00
wvd_bin = device.dumps()
2025-01-19 13:13:07 +00:00
# Generate a unique name for the WVD file using client and device details
2024-07-06 18:01:47 +00:00
name = f"{client_info['company_name']} {client_info['model_name']}"
2025-01-19 13:13:07 +00:00
if client_info.get("widevine_cdm_version"):
2024-07-06 18:01:47 +00:00
name += f" {client_info['widevine_cdm_version']}"
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
2025-01-19 13:13:07 +00:00
name = unidecode(name.strip().lower().replace(" ", "_"))
path_wvd = parent / sanitize_filename(f"{name}_{device.system_id}_l{device.security_level}.wvd")
2024-07-06 18:01:47 +00:00
2025-01-19 13:13:07 +00:00
# Export the WVD file to disk
2024-07-06 18:01:47 +00:00
path_wvd.write_bytes(data=wvd_bin)
2025-01-19 13:13:07 +00:00
self.logger.info("Exported WVD: %s", path_wvd)
2024-07-06 18:01:47 +00:00
2025-01-19 13:13:07 +00:00
# If keybox is available and hasn't been exported, issue a warning
2024-10-28 20:38:58 +00:00
if self.keybox and not self.keybox.export(parent=parent.parent):
2025-01-19 13:13:07 +00:00
self.logger.warning("The keybox has not been intercepted or decrypted")
2024-10-27 18:39:09 +00:00
2025-01-19 13:13:07 +00:00
# Return True if any keys were exported, otherwise return False
2024-07-06 18:01:47 +00:00
return len(keys) > 0
2025-01-19 13:13:07 +00:00
__all__ = ("Cdm",)