import json import logging import re import subprocess from pathlib import Path import xmltodict import frida from frida.core import Device, Session, Script from Cryptodome.PublicKey import RSA from extractor.license_protocol_pb2 import SignedMessage, LicenseRequest, ClientIdentification, DrmCertificate, SignedDrmCertificate from extractor.vendor import Vendor from pywidevine.device import Device, DeviceTypes SCRIPT_PATH = Path(__file__).parent / 'script.js' class Cdm: """ Manages the capture and processing of DRM keys from a specified device using Frida to inject custom hooks. """ OEM_CRYPTO_API = { # Mapping of function names across different API levels (obfuscated names may vary). 'rnmsglvj', 'polorucp', 'kqzqahjq', 'pldrclfq', 'kgaitijd', 'cwkfcplc', 'crhqcdet', 'ulns', 'dnvffnze', 'ygjiljer', 'qbjxtubz', 'qkfrcjtw', 'rbhjspoh', 'zgtjmxko', 'igrqajte', 'ofskesua', 'qllcoqcg' # Add more as needed for different versions. } def __init__(self, device: str = None, functions: Path = None, force: bool = False, wvd: bool = False): self.logger = logging.getLogger('Cdm') self.functions = functions self.running = True self.keys = {} # Select device based on provided ID or default to the first USB device. self.device: Device = frida.get_device(id=device, timeout=5) if device else frida.get_usb_device(timeout=5) self.logger.info('Device: %s (%s)', self.device.name, self.device.id) # Select if create WVD or not self.wvd = wvd # Obtain device properties self.properties = self._fetch_device_properties() self.sdk_api = self.properties['ro.build.version.sdk'] self.logger.info('SDK API: %s', self.sdk_api) self.logger.info('ABI CPU: %s', self.properties['ro.product.cpu.abi']) # Load the hook scrip self.script = self._prepare_hook_script() self.logger.info('Script loaded successfully') # Determine vendor based on device SDK API vendor_api = self._prepare_vendor_api(force=force) self.vendor = Vendor.from_sdk_api(vendor_api) # Update script for specific vendor API, if necessary if vendor_api != self.sdk_api: self.sdk_api = vendor_api self.script = self._prepare_hook_script() self.logger.info('Script updated for vendor API') def _fetch_device_properties(self) -> dict: """ Retrieves system properties from the connected device using ADB shell commands. """ # https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands properties = {} sp = subprocess.run(['adb', '-s', self.device.id, 'shell', 'getprop'], capture_output=True) for line in sp.stdout.decode('utf-8').splitlines(): match = re.match(r'\[(.*?)\]: \[(.*?)\]', line) if match: key, value = match.groups() # Attempt to cast numeric and boolean values to appropriate types try: value = int(value) except ValueError: if value.lower() in ('true', 'false'): value = value.lower() == 'true' properties[key] = value return properties def _prepare_hook_script(self) -> str: """ Prepares the Frida hook script, injecting dynamic content like SDK API and selected functions. """ content = SCRIPT_PATH.read_text(encoding='utf-8') selected = self._select_functions() if self.functions else {} # Replace placeholders in script template replacements = { '${SDK_API}': str(self.sdk_api), '${OEM_CRYPTO_API}': json.dumps(list(self.OEM_CRYPTO_API)), '${SYMBOLS}': json.dumps(list(selected.values())), } for placeholder, real_value in replacements.items(): content = content.replace(placeholder, real_value) return content def _select_functions(self) -> dict: """ Parses the provided XML functions file to select relevant functions. """ if not self.functions.is_file(): raise FileNotFoundError('Functions file not found') try: program = xmltodict.parse(self.functions.read_bytes())['PROGRAM'] addr_base = int(program['@IMAGE_BASE'], 16) functions = program['FUNCTIONS']['FUNCTION'] # Find a target function from a predefined list target = next((f['@NAME'] for f in functions if f['@NAME'] in self.OEM_CRYPTO_API), None) # Extract relevant functions selected = {} for func in functions: name = func['@NAME'] args = len(func.get('REGISTER_VAR', [])) # Add function if it matches specific criteria if name not in selected and ( name == target or any(keyword in name for keyword in ['UsePrivacyMode', 'PrepareKeyRequest']) or (not target and re.match(r'^[a-z]+$', name) and args >= 6) ): selected[name] = {'name': name, 'address': hex(int(func['@ENTRY_POINT'], 16) - addr_base)} return selected except Exception: pass raise ValueError('Failed to extract functions from Ghidra') def enumerate_processes(self) -> dict: """ Lists processes running on the device, returning a mapping of process names to PIDs. """ # https://github.com/frida/frida/issues/1225#issuecomment-604181822 # Iterate through lines starting from the second line (skipping header) processes = {} sp = subprocess.run(['adb', '-s', self.device.id, 'shell', 'ps'], capture_output=True) for line in sp.stdout.decode('utf-8').splitlines()[1:]: try: line = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME name = ' '.join(line[8:]).strip() name = name if name.startswith('[') else Path(name).name processes[name] = int(line[1]) except Exception: pass return processes def _prepare_vendor_api(self, force: bool = False) -> int: """ Determines the most compatible vendor API version based on device processes. """ if force: self.logger.warning('Using default vendor due to force flag') return self.sdk_api # Check if forcing is not enabled and enumerate processes details: [int] = [] processes = self.enumerate_processes() for k, v in Vendor.SDK_VERSIONS.items(): pid = processes.get(v[2]) if pid: self.logger.debug('Analysing... (%s)', v[2]) session: Session = self.device.attach(pid) script: Script = session.create_script(self.script) script.load() if script.exports_sync.getlibrary(v[3]): details.append(k) session.detach() # If no compatible versions found if details: # Find the closest SDK version to the current one, preferring lower matches in case of a tie. sdk_api = min(details, key=lambda x: abs(x - self.sdk_api)) # Adjust SDK version if it exceeds the maximum supported version if sdk_api == Vendor.SDK_MAX and self.sdk_api > Vendor.SDK_MAX: sdk_api = self.sdk_api elif sdk_api != self.sdk_api: self.logger.warning('Using non-default Widevine version for SDK %s', sdk_api) return sdk_api raise EnvironmentError('Unable to detect Widevine, see: https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#drm-info') def _process_message(self, message: dict, data: bytes) -> None: """ Handles messages received from the Frida script. """ logger = logging.getLogger('Script') level = message.get('payload') if isinstance(level, int): # Process logging messages from Frida script logger.log(level=level, msg=data.decode('utf-8')) if level in (logging.FATAL, logging.CRITICAL): self.running = False elif level == 'device_info': if data: self._extract_device_info(data) else: logger.critical('No data for device info, invalid argument position') self.running = False elif level == 'private_key': self._extract_private_key(data) def _extract_private_key(self, data: bytes) -> None: """ Extracts and stores the private key from the provided data. """ key = RSA.import_key(data) key_id = key.n if key_id not in self.keys: self.keys[key_id] = key self.logger.debug('Retrieved key: \n\n%s\n', key.exportKey('PEM').decode('utf-8')) def _extract_device_info(self, data: bytes) -> None: """ Extracts device information and associated private keys, storing them to disk. """ # https://github.com/devine-dl/pywidevine signed_message = SignedMessage() signed_message.ParseFromString(data) license_request = LicenseRequest() license_request.ParseFromString(signed_message.msg) client_id: ClientIdentification = license_request.client_id signed_drm_certificate = SignedDrmCertificate() drm_certificate = DrmCertificate() signed_drm_certificate.ParseFromString(client_id.token) drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) public_key = drm_certificate.public_key key = RSA.importKey(public_key) key_id = key.n private_key = self.keys.get(key_id) if private_key: path = Path() / 'device' / self.device.name / 'private_keys' / str(drm_certificate.system_id) / str(key_id)[:10] path.mkdir(parents=True, exist_ok=True) path_client_id = path / 'client_id.bin' path_private_key = path / 'private_key.pem' path_client_id.write_bytes(data=client_id.SerializeToString()) path_private_key.write_bytes(data=private_key.exportKey('PEM')) self.logger.info('Dumped client ID: %s', path_client_id) self.logger.info('Dumped private key: %s', path_private_key) if self.wvd: path_wvd = path / 'device.wvd' Device( client_id=client_id.SerializeToString(), private_key=private_key.exportKey('PEM'), type_=DeviceTypes['ANDROID'], security_level=3, flags=None ).dump(path_wvd) self.logger.info('Created WVD: %s', path_wvd) self.running = False else: self.logger.warning('Failed to intercept the private key') def hook_process(self, pid: int) -> bool: """ Hooks into the specified process to intercept DRM keys. """ session: Session = self.device.attach(pid) script: Script = session.create_script(self.script) script.on('message', self._process_message) script.load() library_info = script.exports_sync.getlibrary(self.vendor.library) if library_info: self.logger.info('Library: %s (%s)', library_info['name'], library_info['path']) # Check if Ghidra XML functions loaded if self.sdk_api > 33: if not self.functions: raise AttributeError('For SDK API > 33, specifying "functions" is required, see: https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md') elif self.functions: self.logger.warning('The "functions" attribute is deprecated for SDK API < 34') return script.exports_sync.hooklibrary(library_info['name']) return False