2024-07-08 17:01:25 +00:00
|
|
|
import base64
|
2024-07-06 18:01:47 +00:00
|
|
|
import json
|
|
|
|
import logging
|
2024-07-17 22:09:04 +00:00
|
|
|
|
2024-07-06 18:01:47 +00:00
|
|
|
from pathlib import Path
|
|
|
|
from typing import Union
|
|
|
|
from zlib import crc32
|
2024-07-17 22:09:04 +00:00
|
|
|
from unidecode import unidecode
|
2024-07-23 20:16:35 +00:00
|
|
|
|
2024-09-25 13:33:42 +00:00
|
|
|
from pathvalidate import sanitize_filepath, sanitize_filename
|
2024-07-06 18:01:47 +00:00
|
|
|
from Cryptodome.PublicKey import RSA
|
2024-07-06 19:11:00 +00:00
|
|
|
from Cryptodome.PublicKey.RSA import RsaKey
|
2024-07-08 17:01:25 +00:00
|
|
|
from pywidevine.device import Device, DeviceTypes
|
|
|
|
from pywidevine.license_protocol_pb2 import (SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate,
|
|
|
|
DrmCertificate, EncryptedClientIdentification)
|
2024-07-06 18:01:47 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Cdm:
|
|
|
|
"""
|
|
|
|
The Cdm class manages CDM-related operations, such as setting challenge data,
|
|
|
|
extracting and storing private keys, and exporting device information.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
# https://github.com/devine-dl/pywidevine
|
|
|
|
self.client_id: dict[int, ClientIdentification] = {}
|
2024-07-06 19:11:00 +00:00
|
|
|
self.private_key: dict[int, RsaKey] = {}
|
2024-07-06 18:01:47 +00:00
|
|
|
|
2024-07-23 20:16:35 +00:00
|
|
|
@staticmethod
|
|
|
|
def __client_info(client_id: ClientIdentification) -> dict:
|
2024-07-06 18:01:47 +00:00
|
|
|
"""
|
|
|
|
Converts client identification information to a dictionary.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
client_id (ClientIdentification): The client identification.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: A dictionary of client information.
|
|
|
|
"""
|
|
|
|
return {e.name: e.value for e in client_id.client_info}
|
|
|
|
|
2024-07-23 20:16:35 +00:00
|
|
|
@staticmethod
|
|
|
|
def __encrypted_client_info(encrypted_client_id: EncryptedClientIdentification) -> dict:
|
2024-07-08 17:01:25 +00:00
|
|
|
"""
|
|
|
|
Converts encrypted client identification information to a dictionary.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
encrypted_client_id (EncryptedClientIdentification): The encrypted client identification.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: A dictionary of encrypted client information.
|
|
|
|
"""
|
|
|
|
content = {
|
|
|
|
'providerId': encrypted_client_id.provider_id,
|
|
|
|
'serviceCertificateSerialNumber': encrypted_client_id.service_certificate_serial_number,
|
|
|
|
'encryptedClientId': encrypted_client_id.encrypted_client_id,
|
|
|
|
'encryptedClientIdIv': encrypted_client_id.encrypted_client_id_iv,
|
|
|
|
'encryptedPrivacyKey': encrypted_client_id.encrypted_privacy_key
|
|
|
|
}
|
|
|
|
return {
|
|
|
|
k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else v
|
|
|
|
for k, v in content.items()
|
|
|
|
}
|
|
|
|
|
2024-07-06 18:01:47 +00:00
|
|
|
def set_challenge(self, data: Union[Path, bytes]) -> None:
|
|
|
|
"""
|
|
|
|
Sets the challenge data by extracting device information.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
data (Union[Path, bytes]): The challenge data as a file path or bytes.
|
2024-07-08 17:19:45 +00:00
|
|
|
|
|
|
|
Raises:
|
|
|
|
FileNotFoundError: If the provided file path does not exist.
|
2024-07-06 18:01:47 +00:00
|
|
|
"""
|
2024-07-08 17:02:52 +00:00
|
|
|
if isinstance(data, Path):
|
|
|
|
if not data.is_file():
|
|
|
|
raise FileNotFoundError(data)
|
|
|
|
data = data.read_bytes()
|
2024-07-06 18:01:47 +00:00
|
|
|
|
2024-07-08 17:02:52 +00:00
|
|
|
try:
|
2024-07-06 18:01:47 +00:00
|
|
|
signed_message = SignedMessage()
|
|
|
|
signed_message.ParseFromString(data)
|
|
|
|
|
|
|
|
license_request = LicenseRequest()
|
|
|
|
license_request.ParseFromString(signed_message.msg)
|
|
|
|
|
2024-07-08 17:01:25 +00:00
|
|
|
# https://integration.widevine.com/diagnostics
|
|
|
|
encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id
|
|
|
|
if encrypted_client_id.SerializeToString():
|
|
|
|
self.logger.debug('Receive encrypted client id: \n\n%s\n', json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2))
|
|
|
|
self.logger.warning('The client ID of the challenge is encrypted')
|
|
|
|
else:
|
|
|
|
client_id: ClientIdentification = license_request.client_id
|
|
|
|
self.set_client_id(data=client_id)
|
2024-07-06 18:01:47 +00:00
|
|
|
except Exception as e:
|
2024-07-08 16:34:51 +00:00
|
|
|
self.logger.debug('Failed to set challenge data: %s', e)
|
2024-07-06 18:01:47 +00:00
|
|
|
|
|
|
|
def set_private_key(self, data: bytes) -> None:
|
|
|
|
"""
|
|
|
|
Sets the private key from the provided data.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
data (bytes): The private key data.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
key = RSA.import_key(data)
|
|
|
|
if key.n not in self.private_key:
|
|
|
|
self.logger.debug('Receive private key: \n\n%s\n', key.exportKey('PEM').decode('utf-8'))
|
|
|
|
self.private_key[key.n] = key
|
|
|
|
except Exception as e:
|
2024-07-08 16:34:51 +00:00
|
|
|
self.logger.debug('Failed to set private key: %s', e)
|
2024-07-06 18:01:47 +00:00
|
|
|
|
|
|
|
def set_client_id(self, data: Union[ClientIdentification, bytes]) -> None:
|
|
|
|
"""
|
|
|
|
Sets the client ID from the provided data.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
data (Union[ClientIdentification, bytes]): The client ID data.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
if isinstance(data, ClientIdentification):
|
|
|
|
client_id = data
|
|
|
|
else:
|
|
|
|
client_id = ClientIdentification()
|
|
|
|
client_id.ParseFromString(data)
|
|
|
|
|
|
|
|
signed_drm_certificate = SignedDrmCertificate()
|
|
|
|
drm_certificate = DrmCertificate()
|
|
|
|
|
|
|
|
signed_drm_certificate.ParseFromString(client_id.token)
|
|
|
|
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
|
|
|
|
|
|
|
public_key = drm_certificate.public_key
|
|
|
|
key = RSA.importKey(public_key)
|
|
|
|
|
|
|
|
if key.n not in self.client_id:
|
|
|
|
self.logger.debug('Receive client id: \n\n%s\n', json.dumps(self.__client_info(client_id), indent=2))
|
|
|
|
self.client_id[key.n] = client_id
|
|
|
|
except Exception as e:
|
2024-07-08 16:34:51 +00:00
|
|
|
self.logger.debug('Failed to set client ID: %s', e)
|
2024-07-06 18:01:47 +00:00
|
|
|
|
|
|
|
def export(self, parent: Path, wvd: bool = False) -> bool:
|
|
|
|
"""
|
|
|
|
Exports the client ID and private key to disk.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
parent (Path): The parent directory to export the files to.
|
|
|
|
wvd (bool): Whether to export WVD files.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
bool: True if any keys were exported, otherwise False.
|
|
|
|
"""
|
2024-07-22 18:30:32 +00:00
|
|
|
keys = self.client_id.keys() & self.private_key.keys()
|
2024-07-06 18:01:47 +00:00
|
|
|
for k in keys:
|
|
|
|
client_info = self.__client_info(self.client_id[k])
|
|
|
|
# https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211
|
|
|
|
device = Device(
|
|
|
|
client_id=self.client_id[k].SerializeToString(),
|
|
|
|
private_key=self.private_key[k].exportKey('PEM'),
|
|
|
|
type_=DeviceTypes.ANDROID,
|
|
|
|
security_level=3,
|
|
|
|
flags=None
|
|
|
|
)
|
|
|
|
|
|
|
|
# https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146958022
|
2024-09-25 13:33:42 +00:00
|
|
|
parent = sanitize_filepath(parent / client_info['company_name'] / client_info['model_name'] / str(device.system_id) / str(k)[:10])
|
2024-07-06 18:01:47 +00:00
|
|
|
parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
path_id_bin = parent / 'client_id.bin'
|
|
|
|
path_id_bin.write_bytes(data=device.client_id.SerializeToString())
|
|
|
|
self.logger.info('Exported client ID: %s', path_id_bin)
|
|
|
|
|
|
|
|
path_key_bin = parent / 'private_key.pem'
|
|
|
|
path_key_bin.write_bytes(data=device.private_key.exportKey('PEM'))
|
|
|
|
self.logger.info('Exported private key: %s', path_key_bin)
|
|
|
|
|
|
|
|
if wvd:
|
|
|
|
wvd_bin = device.dumps()
|
|
|
|
|
|
|
|
name = f"{client_info['company_name']} {client_info['model_name']}"
|
|
|
|
if client_info.get('widevine_cdm_version'):
|
|
|
|
name += f" {client_info['widevine_cdm_version']}"
|
|
|
|
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
|
|
|
|
name = unidecode(name.strip().lower().replace(' ', '_'))
|
2024-09-25 13:33:42 +00:00
|
|
|
path_wvd = parent / sanitize_filename(f'{name}_{device.system_id}_l{device.security_level}.wvd')
|
2024-07-06 18:01:47 +00:00
|
|
|
|
|
|
|
path_wvd.write_bytes(data=wvd_bin)
|
|
|
|
self.logger.info('Exported WVD: %s', path_wvd)
|
|
|
|
|
|
|
|
return len(keys) > 0
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ('Cdm',)
|