KeyDive/extractor/cdm.py

315 lines
13 KiB
Python
Raw Normal View History

2024-03-31 13:27:10 +00:00
import json
2024-03-30 19:03:15 +00:00
import logging
import re
import subprocess
from pathlib import Path
from zlib import crc32
2024-03-30 19:03:15 +00:00
2024-03-31 13:27:10 +00:00
import xmltodict
import frida
2024-04-01 11:17:28 +00:00
from frida.core import Device, Session, Script
2024-03-30 19:03:15 +00:00
from Cryptodome.PublicKey import RSA
2024-05-21 12:26:31 +00:00
from pywidevine.device import Device, DeviceTypes
from pywidevine.license_protocol_pb2 import SignedMessage, LicenseRequest, ClientIdentification, DrmCertificate, SignedDrmCertificate
from unidecode import unidecode
from extractor.vendor import Vendor
2024-03-30 19:03:15 +00:00
PARENT = Path(__file__).parent
2024-03-30 19:03:15 +00:00
class Cdm:
"""
Manages the capture and processing of DRM keys from a specified device using Frida to inject custom hooks.
"""
OEM_CRYPTO_API = {
2024-03-31 13:27:10 +00:00
# Mapping of function names across different API levels (obfuscated names may vary).
'rnmsglvj', 'polorucp', 'kqzqahjq', 'pldrclfq', 'kgaitijd',
'cwkfcplc', 'crhqcdet', 'ulns', 'dnvffnze', 'ygjiljer',
2024-05-12 13:00:32 +00:00
'qbjxtubz', 'qkfrcjtw', 'rbhjspoh', 'zgtjmxko', 'igrqajte',
'ofskesua', 'qllcoacg', 'pukctkiv', 'ehmduqyt'
2024-03-31 13:27:10 +00:00
# Add more as needed for different versions.
}
2024-03-31 13:27:10 +00:00
2024-05-21 12:26:31 +00:00
def __init__(self, device: str = None, functions: Path = None, force: bool = False, wvd: bool = False):
2024-03-30 19:03:15 +00:00
self.logger = logging.getLogger('Cdm')
2024-04-06 13:24:58 +00:00
self.functions = functions
2024-03-30 19:03:15 +00:00
self.running = True
self.keys = {}
2024-04-06 13:24:58 +00:00
# Select device based on provided ID or default to the first USB device.
2024-03-30 19:03:15 +00:00
self.device: Device = frida.get_device(id=device, timeout=5) if device else frida.get_usb_device(timeout=5)
self.logger.info('Device: %s (%s)', self.device.name, self.device.id)
2024-05-21 12:26:31 +00:00
# Select if create WVD or not
self.wvd = wvd
2024-04-06 13:24:58 +00:00
# Obtain device properties
2024-03-30 19:03:15 +00:00
self.properties = self._fetch_device_properties()
2024-04-06 13:24:58 +00:00
2024-03-30 19:03:15 +00:00
self.sdk_api = self.properties['ro.build.version.sdk']
self.logger.info('SDK API: %s', self.sdk_api)
self.logger.info('ABI CPU: %s', self.properties['ro.product.cpu.abi'])
2024-04-06 13:24:58 +00:00
# Load the hook scrip
self.script = self._prepare_hook_script()
self.logger.info('Script loaded successfully')
# Determine vendor based on device SDK API
vendor_api = self._prepare_vendor_api(force=force)
self.vendor = Vendor.from_sdk_api(vendor_api)
# Update script for specific vendor API, if necessary
if vendor_api != self.sdk_api:
self.sdk_api = vendor_api
self.script = self._prepare_hook_script()
self.logger.info('Script updated for vendor API')
2024-03-30 19:03:15 +00:00
def _fetch_device_properties(self) -> dict:
"""
Retrieves system properties from the connected device using ADB shell commands.
"""
# https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands
properties = {}
2024-05-19 13:41:42 +00:00
sp = subprocess.run(['adb', '-s', self.device.id, 'shell', 'getprop'], capture_output=True)
2024-04-19 19:52:59 +00:00
for line in sp.stdout.decode('utf-8').splitlines():
2024-03-30 19:03:15 +00:00
match = re.match(r'\[(.*?)\]: \[(.*?)\]', line)
if match:
key, value = match.groups()
# Attempt to cast numeric and boolean values to appropriate types
try:
value = int(value)
except ValueError:
if value.lower() in ('true', 'false'):
value = value.lower() == 'true'
properties[key] = value
return properties
2024-04-06 13:24:58 +00:00
def _prepare_hook_script(self) -> str:
2024-03-31 13:27:10 +00:00
"""
2024-04-06 13:24:58 +00:00
Prepares the Frida hook script, injecting dynamic content like SDK API and selected functions.
2024-03-31 13:27:10 +00:00
"""
content = (PARENT / 'keydive.js').read_text(encoding='utf-8')
2024-04-06 13:24:58 +00:00
selected = self._select_functions() if self.functions else {}
2024-04-01 10:24:00 +00:00
2024-04-06 13:24:58 +00:00
# Replace placeholders in script template
replacements = {
'${SDK_API}': str(self.sdk_api),
'${OEM_CRYPTO_API}': json.dumps(list(self.OEM_CRYPTO_API)),
'${SYMBOLS}': json.dumps(list(selected.values())),
}
2024-03-31 13:27:10 +00:00
2024-04-06 13:24:58 +00:00
for placeholder, real_value in replacements.items():
content = content.replace(placeholder, real_value)
2024-03-31 13:27:10 +00:00
return content
2024-04-06 13:24:58 +00:00
def _select_functions(self) -> dict:
"""
Parses the provided XML functions file to select relevant functions.
"""
if not self.functions.is_file():
raise FileNotFoundError('Functions file not found')
try:
program = xmltodict.parse(self.functions.read_bytes())['PROGRAM']
addr_base = int(program['@IMAGE_BASE'], 16)
functions = program['FUNCTIONS']['FUNCTION']
# Find a target function from a predefined list
target = next((f['@NAME'] for f in functions if f['@NAME'] in self.OEM_CRYPTO_API), None)
# Extract relevant functions
selected = {}
for func in functions:
name = func['@NAME']
args = len(func.get('REGISTER_VAR', []))
# Add function if it matches specific criteria
if name not in selected and (
name == target
or any(keyword in name for keyword in ['UsePrivacyMode', 'PrepareKeyRequest'])
or (not target and re.match(r'^[a-z]+$', name) and args >= 6)
):
selected[name] = {'name': name, 'address': hex(int(func['@ENTRY_POINT'], 16) - addr_base)}
return selected
except Exception:
pass
raise ValueError('Failed to extract functions from Ghidra')
def enumerate_processes(self) -> dict:
"""
Lists processes running on the device, returning a mapping of process names to PIDs.
"""
2024-04-06 13:29:15 +00:00
# https://github.com/frida/frida/issues/1225#issuecomment-604181822
2024-04-06 13:24:58 +00:00
# Iterate through lines starting from the second line (skipping header)
processes = {}
2024-05-19 13:41:42 +00:00
sp = subprocess.run(['adb', '-s', self.device.id, 'shell', 'ps'], capture_output=True)
2024-04-19 19:52:59 +00:00
for line in sp.stdout.decode('utf-8').splitlines()[1:]:
2024-04-06 13:24:58 +00:00
try:
line = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME
name = ' '.join(line[8:]).strip()
name = name if name.startswith('[') else Path(name).name
processes[name] = int(line[1])
except Exception:
pass
return processes
def _prepare_vendor_api(self, force: bool = False) -> int:
2024-03-30 19:03:15 +00:00
"""
2024-04-06 13:24:58 +00:00
Determines the most compatible vendor API version based on device processes.
2024-03-30 19:03:15 +00:00
"""
2024-04-06 13:24:58 +00:00
if force:
self.logger.warning('Using default vendor due to force flag')
return self.sdk_api
# Check if forcing is not enabled and enumerate processes
2024-03-31 13:27:10 +00:00
details: [int] = []
2024-04-06 13:24:58 +00:00
processes = self.enumerate_processes()
for k, v in Vendor.SDK_VERSIONS.items():
pid = processes.get(v[2])
if pid:
self.logger.debug('Analysing... (%s)', v[2])
session: Session = self.device.attach(pid)
script: Script = session.create_script(self.script)
script.load()
if script.exports_sync.getlibrary(v[3]):
details.append(k)
session.detach()
# If no compatible versions found
if details:
# Find the closest SDK version to the current one, preferring lower matches in case of a tie.
sdk_api = min(details, key=lambda x: abs(x - self.sdk_api))
# Adjust SDK version if it exceeds the maximum supported version
if sdk_api == Vendor.SDK_MAX and self.sdk_api > Vendor.SDK_MAX:
sdk_api = self.sdk_api
elif sdk_api != self.sdk_api:
self.logger.warning('Using non-default Widevine version for SDK %s', sdk_api)
return sdk_api
raise EnvironmentError('Unable to detect Widevine, see: https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#drm-info')
2024-03-30 19:03:15 +00:00
def _process_message(self, message: dict, data: bytes) -> None:
"""
Handles messages received from the Frida script.
"""
logger = logging.getLogger('Script')
level = message.get('payload')
if isinstance(level, int):
# Process logging messages from Frida script
logger.log(level=level, msg=data.decode('utf-8'))
if level in (logging.FATAL, logging.CRITICAL):
self.running = False
elif level == 'device_info':
if data:
self._extract_device_info(data)
else:
logger.critical('No data for device info, invalid argument position')
self.running = False
elif level == 'private_key':
self._extract_private_key(data)
def _extract_private_key(self, data: bytes) -> None:
"""
Extracts and stores the private key from the provided data.
"""
key = RSA.import_key(data)
key_id = key.n
if key_id not in self.keys:
self.keys[key_id] = key
self.logger.debug('Retrieved key: \n\n%s\n', key.exportKey('PEM').decode('utf-8'))
def _extract_device_info(self, data: bytes) -> None:
"""
Extracts device information and associated private keys, storing them to disk.
"""
# https://github.com/devine-dl/pywidevine
signed_message = SignedMessage()
signed_message.ParseFromString(data)
license_request = LicenseRequest()
license_request.ParseFromString(signed_message.msg)
client_id: ClientIdentification = license_request.client_id
signed_drm_certificate = SignedDrmCertificate()
drm_certificate = DrmCertificate()
signed_drm_certificate.ParseFromString(client_id.token)
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
public_key = drm_certificate.public_key
key = RSA.importKey(public_key)
key_id = key.n
private_key = self.keys.get(key_id)
if private_key:
path = Path() / 'device' / str(self.device.name) / 'private_keys' / str(drm_certificate.system_id) / str(key_id)[:10]
2024-03-30 19:03:15 +00:00
path.mkdir(parents=True, exist_ok=True)
path_client_id = path / 'client_id.bin'
path_private_key = path / 'private_key.pem'
path_client_id.write_bytes(data=client_id.SerializeToString())
path_private_key.write_bytes(data=private_key.exportKey('PEM'))
self.logger.info('Dumped client ID: %s', path_client_id)
self.logger.info('Dumped private key: %s', path_private_key)
2024-05-21 12:26:31 +00:00
if self.wvd:
# https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211
client_info = {}
for entry in client_id.client_info:
client_info[entry.name] = entry.value
device = Device(
2024-05-21 12:26:31 +00:00
client_id=client_id.SerializeToString(),
private_key=private_key.exportKey('PEM'),
type_=DeviceTypes.ANDROID,
2024-05-21 12:26:31 +00:00
security_level=3,
flags=None
)
wvd_bin = device.dumps()
name = f"{client_info['company_name']} {client_info['model_name']}"
if client_info.get('widevine_cdm_version'):
name += f" {client_info['widevine_cdm_version']}"
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
name = unidecode(name.strip().lower().replace(' ', '_'))
out_path = path / f'{name}_{device.system_id}_l{device.security_level}.wvd'
out_path.write_bytes(data=wvd_bin)
self.logger.info('Created WVD: %s', out_path)
2024-05-21 12:26:31 +00:00
2024-03-30 19:03:15 +00:00
self.running = False
else:
self.logger.warning('Failed to intercept the private key')
2024-04-06 13:24:58 +00:00
def hook_process(self, pid: int) -> bool:
2024-03-30 19:03:15 +00:00
"""
Hooks into the specified process to intercept DRM keys.
"""
2024-04-06 13:24:58 +00:00
session: Session = self.device.attach(pid)
2024-03-30 19:03:15 +00:00
script: Script = session.create_script(self.script)
script.on('message', self._process_message)
script.load()
2024-04-01 11:00:44 +00:00
library_info = script.exports_sync.getlibrary(self.vendor.library)
if library_info:
2024-03-30 19:03:15 +00:00
self.logger.info('Library: %s (%s)', library_info['name'], library_info['path'])
2024-04-06 13:24:58 +00:00
# Check if Ghidra XML functions loaded
if self.sdk_api > 33:
if not self.functions:
raise AttributeError('For SDK API > 33, specifying "functions" is required, see: https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md')
elif self.functions:
self.logger.warning('The "functions" attribute is deprecated for SDK API < 34')
2024-03-30 19:03:15 +00:00
return script.exports_sync.hooklibrary(library_info['name'])
2024-04-01 11:00:44 +00:00
return False