From 1f755f993dc98998d76873449dd2921199c9b6eb Mon Sep 17 00:00:00 2001 From: hyugogirubato <65763543+hyugogirubato@users.noreply.github.com> Date: Sun, 27 Oct 2024 19:39:09 +0100 Subject: [PATCH] add keybox dump (experimental) --- keydive/cdm.py | 36 +++++++++-- keydive/core.py | 9 ++- keydive/keybox.py | 153 +++++++++++++++++++++++++++++++++++++++++++++ keydive/keydive.js | 69 ++++++++++++++++++-- 4 files changed, 254 insertions(+), 13 deletions(-) create mode 100644 keydive/keybox.py diff --git a/keydive/cdm.py b/keydive/cdm.py index 22ecaa8..f0a3101 100644 --- a/keydive/cdm.py +++ b/keydive/cdm.py @@ -15,6 +15,7 @@ from pywidevine.license_protocol_pb2 import (SignedMessage, LicenseRequest, Clie DrmCertificate, EncryptedClientIdentification) from keydive.constants import OEM_CRYPTO_API +from keydive.keybox import Keybox class Cdm: @@ -23,18 +24,20 @@ class Cdm: extracting and storing private keys, and exporting device information. """ - def __init__(self): + def __init__(self, keybox: bool = False): """ Initializes the Cdm object, setting up a logger and containers for client IDs and private keys. Attributes: client_id (dict[int, ClientIdentification]): Stores client identification info mapped by key modulus. private_key (dict[int, RsaKey]): Stores private keys mapped by key modulus. + keybox (bool, optional): Initializes a Keybox instance for secure key management. """ self.logger = logging.getLogger(self.__class__.__name__) # https://github.com/devine-dl/pywidevine self.client_id: dict[int, ClientIdentification] = {} self.private_key: dict[int, RsaKey] = {} + self.keybox = Keybox() if keybox else None @staticmethod def __client_info(client_id: ClientIdentification) -> dict: @@ -97,7 +100,7 @@ class Cdm: # https://integration.widevine.com/diagnostics encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id if encrypted_client_id.SerializeToString(): - self.logger.debug('Receive encrypted client id: \n\n%s\n', json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2)) + self.logger.info('Receive encrypted client id: \n\n%s\n', json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2)) self.logger.warning('The client ID of the challenge is encrypted') else: client_id: ClientIdentification = license_request.client_id @@ -124,10 +127,10 @@ class Cdm: try: key = RSA.import_key(data) if key.n not in self.private_key: - self.logger.debug('Receive private key: \n\n%s\n', key.exportKey('PEM').decode('utf-8')) + self.logger.info('Receive private key: \n\n%s\n', key.exportKey('PEM').decode('utf-8')) if name and name not in OEM_CRYPTO_API: - self.logger.warning(f'The function "{name}" does not belong to the referenced functions. Communicate it to the developer to improve the tool.') + self.logger.warning('The function "%s" does not belong to the referenced functions. Communicate it to the developer to improve the tool.', name) self.private_key[key.n] = key except Exception as e: @@ -157,11 +160,31 @@ class Cdm: key = RSA.importKey(public_key) if key.n not in self.client_id: - self.logger.debug('Receive client id: \n\n%s\n', json.dumps(self.__client_info(client_id), indent=2)) + self.logger.info('Receive client id: \n\n%s\n', json.dumps(self.__client_info(client_id), indent=2)) self.client_id[key.n] = client_id except Exception as e: self.logger.debug('Failed to set client ID: %s', e) + def set_device_id(self, data: bytes) -> None: + """ + Sets the device ID in the keybox if it is enabled. + + Args: + data (bytes): The device ID data to be stored in the keybox. + """ + if self.keybox: + self.keybox.set_device_id(data=data) + + def set_keybox(self, data: bytes) -> None: + """ + Sets the keybox data. + + Args: + data (bytes): The keybox data to be set. + """ + if self.keybox: + self.keybox.set_keybox(data=data) + def export(self, parent: Path, wvd: bool = False) -> bool: """ Exports the client ID and private key to disk. @@ -210,6 +233,9 @@ class Cdm: path_wvd.write_bytes(data=wvd_bin) self.logger.info('Exported WVD: %s', path_wvd) + if self.keybox and not self.keybox.export(parent=parent): + self.logger.warning('The keybox has not been intercepted or decrypted') + return len(keys) > 0 diff --git a/keydive/core.py b/keydive/core.py index dfbec0d..6b400ea 100644 --- a/keydive/core.py +++ b/keydive/core.py @@ -36,9 +36,10 @@ class Core: self.adb = adb # https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679 + # Flag to skip predefined functions based on the vendor's API level self.skip = skip - # Load the hook script + # Load the hook script and prepare for injection self.functions = functions self.script = self.__prepare_hook_script() self.logger.info('Script loaded successfully') @@ -134,8 +135,10 @@ class Core: self.cdm.set_private_key(data=data, name=level['private_key']) elif level == 'challenge': self.cdm.set_challenge(data=data) - elif level == 'client_id': - self.cdm.set_client_id(data=data) + elif level == 'device_id': + self.cdm.set_device_id(data) + elif level == 'keybox': + self.cdm.set_keybox(data) def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool: """ diff --git a/keydive/keybox.py b/keydive/keybox.py new file mode 100644 index 0000000..f80979c --- /dev/null +++ b/keydive/keybox.py @@ -0,0 +1,153 @@ +import base64 +import json +import logging + +from typing import Literal +from uuid import UUID + +from pathlib import Path + + +def bytes2int(value: bytes, byteorder: Literal['big', 'little'] = 'big', signed: bool = False) -> int: + """ + Convert bytes to an integer. + + Args: + value (bytes): The byte sequence to convert. + byteorder (Literal['big', 'little'], optional): The byte order for conversion. Defaults to 'big'. + signed (bool, optional): Indicates if the integer is signed. Defaults to False. + + Returns: + int: The converted integer. + """ + return int.from_bytes(value, byteorder=byteorder, signed=signed) + + +class Keybox: + """ + The Keybox class handles the storage and management of device IDs and keybox data. + """ + + def __init__(self): + """ + Initializes the Keybox object, setting up a logger and containers for device IDs and keyboxes. + + Attributes: + logger (Logger): Logger instance for logging messages. + device_id (list[bytes]): List to store unique device IDs (32 bytes each). + keybox (dict[bytes, bytes]): Dictionary to map device IDs to their respective keybox data. + """ + self.logger = logging.getLogger(self.__class__.__name__) + # https://github.com/kaltura/kaltura-device-info-android/blob/master/app/src/main/java/com/kaltura/kalturadeviceinfo/MainActivity.java#L203 + self.device_id = [] + self.keybox = {} + + def set_device_id(self, data: bytes) -> None: + """ + Set the device ID from the provided data. + + Args: + data (bytes): The device ID, expected to be 32 bytes long. + + Raises: + AssertionError: If the data length is not 32 bytes. + """ + try: + size = len(data) + assert size == 32, f'Invalid keybox length: {size}. Should be 32 bytes' + + if data not in self.device_id: + self.logger.info('Receive device id: \n\n%s\n', base64.b64encode(data).decode('utf-8')) + self.device_id.append(data) + except Exception as e: + self.logger.debug('Failed to set device id: %s', e) + + def set_keybox(self, data: bytes) -> None: + """ + Set the keybox from the provided data. + + Args: + data (bytes): The keybox data, expected to be either 128 or 132 bytes long. + + Raises: + AssertionError: If the data length is not 128 or 132 bytes or does not meet other criteria. + """ + # https://github.com/wvdumper/dumper/blob/main/Helpers/Keybox.py#L51 + try: + size = len(data) + assert size in (128, 132), f'Invalid keybox length: {size}. Should be 128 or 132 bytes' + + if size == 132: + assert data[128:132] == b"LVL1", 'QSEE style keybox does not end in bytes "LVL1"' + + assert data[120:124] == b"kbox", 'Invalid keybox magic' + + device_id = data[0:32] + self.set_device_id(data=device_id) + + if device_id not in self.keybox: + # https://github.com/wvdumper/dumper/blob/main/Helpers/Keybox.py#L51 + self.logger.info('Receive keybox: \n\n%s\n', json.dumps(self.__keybox_info(data), indent=2)) + + self.keybox[device_id] = data + except Exception as e: + self.logger.debug('Failed to set keybox: %s', e) + + @staticmethod + def __keybox_info(data: bytes) -> dict: + """ + Extract keybox information from the provided data. + + Args: + data (bytes): The keybox data. + + Returns: + dict: A dictionary containing extracted keybox information. + """ + # Extract key components from the keybox data based on defined byte offsets. + content = { + 'device_id': data[0:32], # Unique identifier for the device (32 bytes). + 'device_key': data[32:48], # Device-specific cryptographic key (16 bytes). + 'device_token': data[48:120], # AES key used for device token encryption (72 bytes). + 'keybox_tag': data[120:124].decode('utf-8'), # Magic tag indicating keybox format (4 bytes). + 'crc32': bytes2int(data[124:128]), # CRC32 checksum for data integrity verification (4 bytes). + 'level_tag': data[128:132].decode('utf-8') or None, # Optional tag indicating keybox level (4 bytes). + + # Key components extracted from the device token (Bytes 48–119). + 'flags': bytes2int(data[48:52]), # Flags indicating various settings (4 bytes). + 'system_id': bytes2int(data[52:56]), # System identifier for the device (4 bytes). + + # Provisioning ID, encrypted and derived from the unique ID in the system. + 'provisioning_id': UUID(bytes_le=data[56:72]), # Unique ID for provisioning (16 bytes). + + # Encrypted bits containing device key, key hash, and additional flags. + 'encrypted_bits': data[72:120] # Encrypted data relevant to the device (48 bytes). + } + + # Encode certain fields in base64 and convert UUIDs to string + return { + k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else str(v) if isinstance(v, UUID) else v + for k, v in content.items() + } + + def export(self, parent: Path) -> bool: + """ + Export the keybox data to a file in the specified parent directory. + + Args: + parent (Path): The parent directory where the keybox data will be saved. + + Returns: + bool: True if any keybox were exported, otherwise False. + """ + keys = self.device_id & self.keybox.keys() + for k in keys: + parent.mkdir(parents=True, exist_ok=True) + path_keybox_bin = parent / 'keybox.bin' + path_keybox_bin.write_bytes(self.keybox[k]) + + self.logger.info('Exported Keybox: %s', path_keybox_bin) + return len(keys) > 0 + + +__all__ = ('Keybox',) diff --git a/keydive/keydive.js b/keydive/keydive.js index b71a051..cdff549 100644 --- a/keydive/keydive.js +++ b/keydive/keydive.js @@ -1,5 +1,5 @@ /** - * Date: 2024-10-20 + * Date: 2024-10-27 * Description: DRM key extraction for research and educational purposes. * Source: https://github.com/hyugogirubato/KeyDive */ @@ -253,7 +253,7 @@ const GetDeviceID = (address) => { try { const size = Memory.readPointer(this.size).toInt32(); const data = Memory.readByteArray(this.data, size); - data && send('client_id', data); + data && send('device_id', data); } catch (e) { print(Level.ERROR, `Failed to dump device ID.`); } @@ -262,10 +262,49 @@ const GetDeviceID = (address) => { } +const memcpy = (address) => { + // libc::memcpy + Interceptor.attach(address, { + onEnter: function (args) { + // Convert size argument from hexadecimal to integer + const size = parseInt(args[2], 16); + + // Check if the size matches known keybox sizes (128 or 132 bytes) + if ([128, 132].includes(size)) { + const data = Memory.readByteArray(args[1], size); + + // Define the target bytes for "kbox" + const targetBytes = [0x6b, 0x62, 0x6f, 0x78]; // "kbox" in hex + + // Function to check if target bytes are present + function containsTargetBytes(buffer, target) { + const dataArray = Array.from(new Uint8Array(buffer)); + for (let i = 0; i <= dataArray.length - target.length; i++) { + if (target.every((byte, index) => dataArray[i + index] === byte)) { + return true; + } + } + return false; + } + + // Check if "kbox" is present in the data + if (data && containsTargetBytes(data, targetBytes)) { + print(Level.DEBUG, `[*] Keybox: memcpy`); + send('keybox', data); + } + } + }, + onLeave: function (retval) { + // print(Level.DEBUG, `[-] onLeave: memcpy`); + } + }); +} + + // @Hooks const hookLibrary = (name) => { // https://github.com/poxyran/misc/blob/master/frida-enumerate-imports.py - const library = getLibrary(name); + let library = getLibrary(name); if (!library) return false; let functions; @@ -281,7 +320,7 @@ const hookLibrary = (name) => { } functions = functions.filter(f => !NATIVE_C_API.includes(f.name)); - const targets = SKIP ? [] : functions.filter(f => OEM_CRYPTO_API.includes(f.name)).map(f => f.name); + let targets = SKIP ? [] : functions.filter(f => OEM_CRYPTO_API.includes(f.name)).map(f => f.name); const hooked = []; functions.forEach(func => { @@ -295,11 +334,19 @@ const hookLibrary = (name) => { GetCdmClientPropertySet(funcAddr); } else if (funcName.includes('PrepareKeyRequest')) { PrepareKeyRequest(funcAddr); - } else if (funcName.includes('getOemcryptoDeviceId')) { + } else if (funcName.includes('_lcc07') || funcName.includes('_oecc07') || funcName.includes('getOemcryptoDeviceId')) { GetDeviceID(funcAddr); } else if (targets.includes(funcName) || (!targets.length && funcName.match(/^[a-z]+$/))) { LoadDeviceRSAKey(funcAddr, funcName); } else { + /* + 1. wvcdm::CdmEngine::GetProvisioningRequest + 2. wvcdm::ClientIdentification::GetProvisioningTokenType + 3. wvcdm::CryptoSession::GetProvisioningToken + 1. wvcdm::CryptoSession::GetTokenFromOemCert + 2. wvcdm::CryptoSession::GetTokenFromKeybox + */ + // print(Level.WARNING, `Hooked (${funcAddr}): ${funcName}`); return; } @@ -315,6 +362,18 @@ const hookLibrary = (name) => { return false; } + // Keybox (experimental) + library = getLibrary('libc.so'); + targets = getFunctions(library).find(func => func.name === 'memcpy' && func.type === 'function'); + if (targets) { + try { + memcpy(targets.address); + print(Level.DEBUG, `Hooked (${targets.address}): ${targets.name}`); + } catch (e) { + print(Level.ERROR, `${e.message} for ${targets.name}`); + } + } + // https://github.com/hzy132/liboemcryptodisabler/blob/master/customize.sh#L33 disableLibrary('liboemcrypto.so'); return true;