diff --git a/CHANGELOG.md b/CHANGELOG.md index d4cadfc..3f042d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.2.0] - Not Released + +### Added + +- Added support for dynamic interception without the need for Ghidra (available only for Frida server versions greater than 16.6.0). + +### Changed + +- Added additional comments to help understand the script. +- Optimized file path management in parameters. +- Refactored the code globally. + +### Fixed + +- Inconsistency in logging messages with certain functions. +- Fixing the server-generated curl command. + ## [2.1.5] - 2025-01-12 ### Added diff --git a/docs/server/server.py b/docs/server/server.py index 7001af4..b681f6d 100644 --- a/docs/server/server.py +++ b/docs/server/server.py @@ -3,22 +3,23 @@ import json import time import logging +from pathlib import Path + import requests -from pathlib import Path from flask import Flask, Response, request, redirect from keydive.__main__ import configure_logging # Suppress urllib3 warnings -logging.getLogger('urllib3.connectionpool').setLevel(logging.ERROR) +logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) # Initialize Flask application app = Flask(__name__) # Define paths, constants, and global flags PARENT = Path(__file__).parent -VERSION = '1.0.0' +VERSION = "1.0.1" KEYBOX = False DELAY = 10 @@ -31,7 +32,7 @@ def health_check() -> Response: Returns: Response: A simple "pong" message with a 200 OK status. """ - return Response(response='pong', status=200, content_type='text/html; charset=utf-8') + return Response(response="pong", status=200, content_type="text/html; charset=utf-8") @app.route('/shaka-demo-assets/angel-one-widevine/', methods=['GET']) @@ -40,17 +41,17 @@ def shaka_demo_assets(file) -> Response: Serves cached assets for Widevine demo content. If the requested file is not available locally, it fetches it from a remote server and caches it. - Args: + Parameters: file (str): File path requested by the client. Returns: Response: File content as a byte stream, or a 404 error if not found. """ - logger = logging.getLogger('Shaka') - logger.info('%s %s', request.method, request.path) + logger = logging.getLogger("Shaka") + logger.info("%s %s", request.method, request.path) try: - path = PARENT / '.assets' / file + path = PARENT / ".assets" / file path.parent.mkdir(parents=True, exist_ok=True) if path.is_file(): @@ -59,20 +60,20 @@ def shaka_demo_assets(file) -> Response: else: # Fetch the file from remote storage if not cached locally r = requests.get( - url=f'https://storage.googleapis.com/shaka-demo-assets/angel-one-widevine/{file}', + url=f"https://storage.googleapis.com/shaka-demo-assets/angel-one-widevine/{file}", headers={ - 'Accept': '*/*', - 'User-Agent': 'KalturaDeviceInfo/1.4.1 (Linux;Android 10) ExoPlayerLib/2.9.3' + "Accept": "*/*", + "User-Agent": "KalturaDeviceInfo/1.4.1 (Linux;Android 10) ExoPlayerLib/2.9.3" } ) r.raise_for_status() path.write_bytes(r.content) # Cache the downloaded content content = r.content - logger.debug('Downloaded assets: %s', path) + logger.debug("Downloaded assets: %s", path) - return Response(response=content, status=200, content_type='application/octet-stream') + return Response(response=content, status=200, content_type="application/octet-stream") except Exception as e: - return Response(response=str(e), status=404, content_type='text/html; charset=utf-8') + return Response(response=str(e), status=404, content_type="text/html; charset=utf-8") @app.route('/certificateprovisioning/v1/devicecertificates/create', methods=['POST']) @@ -86,37 +87,37 @@ def certificate_provisioning() -> Response: Response: JSON response if provisioning is complete, else a redirection. """ global KEYBOX, DELAY - logger = logging.getLogger('Google') - logger.info('%s %s', request.method, request.path) + logger = logging.getLogger("Google") + logger.info("%s %s", request.method, request.path) if KEYBOX: - logger.warning('Provisioning request aborted to prevent keybox spam') - return Response(response='Internal Server Error', status=500, content_type='text/html; charset=utf-8') + logger.warning("Provisioning request aborted to prevent keybox spam") + return Response(response="Internal Server Error", status=500, content_type="text/html; charset=utf-8") # Generate a curl command from the incoming request for debugging or testing - user_agent = request.headers.get('User-Agent', 'Unknown') - url = request.url.replace('http://', 'https://') + user_agent = request.headers.get("User-Agent", "Unknown") + url = request.url.replace("http://", "https://") prompt = [ - "curl --request POST", - f"--url '{url}'", - "--compressed", - "--header 'accept-encoding: gzip'", - "--header 'connection: Keep-Alive'", - "--header 'content-type: application/x-www-form-urlencoded'", - "--header 'host: www.googleapis.com'", - f"--header 'user-agent: {user_agent}'" + 'curl', + '--request', 'POST', + '--compressed', + '--header', '"Accept-Encoding: gzip"', + '--header', '"Connection: Keep-Alive"', + '--header', '"Content-Type: application/x-www-form-urlencoded"', + '--header', '"Host: www.googleapis.com"', + '--header', f'"User-Agent: {user_agent}"' ] # Save the curl command for potential replay or inspection - curl = PARENT / 'curl.txt' - curl.write_text(' \\\n '.join(prompt)) - logger.debug('Saved curl command to: %s', curl) + curl = PARENT / "curl.txt" + curl.write_text(" \\\n ".join(prompt)) + logger.debug("Saved curl command to: %s", curl) # Wait for provisioning response data with retries - logger.warning('Waiting for provisioning response...') - provision = PARENT / 'provisioning.json' + logger.warning("Waiting for provisioning response...") + provision = PARENT / "provisioning.json" provision.unlink(missing_ok=True) - provision.write_bytes(b'') # Create empty file for manual input if needed + provision.write_bytes(b"") # Create empty file for manual input if needed # Poll for the presence of a response up to DELAY times with 1-second intervals for _ in range(DELAY): @@ -126,13 +127,13 @@ def certificate_provisioning() -> Response: # Cleanup after successful response curl.unlink(missing_ok=True) provision.unlink(missing_ok=True) - return Response(response=content, status=200, content_type='application/json') + return Response(response=content, status=200, content_type="application/json") except Exception as e: pass # Continue waiting if file is empty or not yet ready time.sleep(1) # Redirect to the secure URL if response is not available - logger.warning('Redirecting to avoid timeout') + logger.warning("Redirecting to avoid timeout") return redirect(url, code=302) @@ -142,31 +143,32 @@ def main() -> None: to set global parameters and configures logging, then starts the Flask server. """ global VERSION, DELAY, KEYBOX - parser = argparse.ArgumentParser(description='Local DRM provisioning video player.') + parser = argparse.ArgumentParser(description="Local DRM provisioning video player.") # Global arguments for the application - group_global = parser.add_argument_group('Global') - group_global.add_argument('--host', required=False, type=str, default='127.0.0.1', metavar='', help='Host address for the server to bind to.') - group_global.add_argument('--port', required=False, type=int, default=9090, metavar='', help='Port number for the server to listen on.') - group_global.add_argument('-v', '--verbose', required=False, action='store_true', help='Enable verbose logging for detailed debug output.') - group_global.add_argument('-l', '--log', required=False, type=Path, metavar='', help='Directory to store log files.') - group_global.add_argument('--version', required=False, action='store_true', help='Display Server version information.') + group_global = parser.add_argument_group("Global") + group_global.add_argument('--host', required=False, type=str, default="127.0.0.1", metavar="", help="Host address for the server to bind to.") + group_global.add_argument('--port', required=False, type=int, default=9090, metavar="", help="Port number for the server to listen on.") + group_global.add_argument('-v', '--verbose', required=False, action="store_true", help="Enable verbose logging for detailed debug output.") + group_global.add_argument('-l', '--log', required=False, type=Path, metavar="", help="Directory to store log files.") + group_global.add_argument('--version', required=False, action="store_true", help="Display Server version information.") # Advanced options - group_advanced = parser.add_argument_group('Advanced') - group_advanced.add_argument('-d', '--delay', required=False, type=int, metavar='', default=10, help='Delay (in seconds) between successive checks for provisioning responses.') - group_advanced.add_argument('-k', '--keybox', required=False, action='store_true', help='Enable keybox mode, which aborts provisioning requests to prevent spam.') + group_advanced = parser.add_argument_group("Advanced") + group_advanced.add_argument('-d', '--delay', required=False, type=int, metavar="", default=10, help="Delay (in seconds) between successive checks for provisioning responses.") + group_advanced.add_argument('-k', '--keybox', required=False, action="store_true", help="Enable keybox mode, which aborts provisioning requests to prevent spam.") args = parser.parse_args() + # Handle version display if args.version: - print(f'Server {VERSION}') + print(f"Server {VERSION}") exit(0) - # Configure logging + # Configure logging (file and console) log_path = configure_logging(path=args.log, verbose=args.verbose) - logger = logging.getLogger('Server') - logger.info('Version: %s', VERSION) + logger = logging.getLogger("Server") + logger.info("Version: %s", VERSION) try: # Set global variables based on parsed arguments @@ -174,7 +176,7 @@ def main() -> None: KEYBOX = args.keybox # Start Flask app with specified host, port, and debug mode - logging.getLogger('werkzeug').setLevel(logging.INFO if args.verbose else logging.ERROR) + logging.getLogger("werkzeug").setLevel(logging.INFO if args.verbose else logging.ERROR) app.run(host=args.host, port=args.port, debug=False) except KeyboardInterrupt: pass @@ -183,9 +185,9 @@ def main() -> None: # Final logging and exit if log_path: - logger.info('Log file: %s' % log_path) - logger.info('Exiting') + logger.info("Log file: %s" % log_path) + logger.info("Exiting") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/keydive/__init__.py b/keydive/__init__.py index 856a36b..1589c41 100644 --- a/keydive/__init__.py +++ b/keydive/__init__.py @@ -4,4 +4,4 @@ from .cdm import Cdm from .vendor import Vendor from .keybox import Keybox -__version__ = '2.1.5' +__version__ = "2.2.0" diff --git a/keydive/__main__.py b/keydive/__main__.py index 5f00a63..93bfd24 100644 --- a/keydive/__main__.py +++ b/keydive/__main__.py @@ -4,6 +4,7 @@ import time from datetime import datetime from pathlib import Path +from typing import Optional import coloredlogs @@ -15,16 +16,16 @@ from keydive.constants import CDM_VENDOR_API, DRM_PLAYER from keydive.core import Core -def configure_logging(path: Path = None, verbose: bool = False) -> Path: +def configure_logging(path: Path = None, verbose: bool = False) -> Optional[Path]: """ Configures logging for the application. - Args: + Parameters: path (Path, optional): The directory to store log files. verbose (bool, optional): Flag to enable detailed debug logging. Returns: - Path: The path of log file. + Path: The path of the log file, or None if no log file is created. """ # Set up the root logger with the desired logging level root_logger = logging.getLogger() @@ -42,15 +43,15 @@ def configure_logging(path: Path = None, verbose: bool = False) -> Path: path.mkdir(parents=True, exist_ok=True) # Create a file handler - file_path = path / ('keydive_%s.log' % datetime.now().strftime('%Y-%m-%d_%H-%M-%S')) + file_path = path / ("keydive_%s.log" % datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) file_path = file_path.resolve(strict=False) file_handler = logging.FileHandler(file_path) file_handler.setLevel(logging.DEBUG) # Set log formatting formatter = logging.Formatter( - fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + fmt="%(asctime)s [%(levelname).1s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" ) file_handler.setFormatter(formatter) @@ -59,8 +60,8 @@ def configure_logging(path: Path = None, verbose: bool = False) -> Path: # Configure coloredlogs for console output coloredlogs.install( - fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', + fmt="%(asctime)s [%(levelname).1s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", level=logging.DEBUG if verbose else logging.INFO, logger=root_logger ) @@ -74,47 +75,48 @@ def main() -> None: This application extracts Widevine L3 keys from an Android device. It supports device management via ADB and allows hooking into Widevine processes. """ - parser = argparse.ArgumentParser(description='Extract Widevine L3 keys from an Android device.') + parser = argparse.ArgumentParser(description="Extract Widevine L3 keys from an Android device.") # Global arguments for the application - group_global = parser.add_argument_group('Global') - group_global.add_argument('-d', '--device', required=False, type=str, metavar='', help='Specify the target Android device ID for ADB connection.') - group_global.add_argument('-v', '--verbose', required=False, action='store_true', help='Enable verbose logging for detailed debug output.') - group_global.add_argument('-l', '--log', required=False, type=Path, metavar='', help='Directory to store log files.') - group_global.add_argument('--delay', required=False, type=float, metavar='', default=1, help='Delay (in seconds) between process checks.') - group_global.add_argument('--version', required=False, action='store_true', help='Display KeyDive version information.') + group_global = parser.add_argument_group("Global") + group_global.add_argument('-d', '--device', required=False, type=str, metavar="", help="Specify the target Android device ID for ADB connection.") + group_global.add_argument('-v', '--verbose', required=False, action="store_true", help="Enable verbose logging for detailed debug output.") + group_global.add_argument('-l', '--log', required=False, type=Path, metavar="", help="Directory to store log files.") + group_global.add_argument('--delay', required=False, type=float, metavar="", default=1, help="Delay (in seconds) between process checks.") + group_global.add_argument('--version', required=False, action="store_true", help="Display KeyDive version information.") # Arguments specific to the CDM (Content Decryption Module) - group_cdm = parser.add_argument_group('Cdm') - group_cdm.add_argument('-o', '--output', required=False, type=Path, default=Path('device'), metavar='', help='Output directory for extracted data.') - group_cdm.add_argument('-w', '--wvd', required=False, action='store_true', help='Generate a pywidevine WVD device file.') - group_cdm.add_argument('-s', '--skip', required=False, action='store_true', help='Skip auto-detection of the private function.') - group_cdm.add_argument('-a', '--auto', required=False, action='store_true', help='Automatically start the Bitmovin web player.') - group_cdm.add_argument('-p', '--player', required=False, action='store_true', help='Install and start the Kaltura app automatically.') + group_cdm = parser.add_argument_group("Cdm") + group_cdm.add_argument('-o', '--output', required=False, type=Path, default=Path("device"), metavar="", help="Output directory for extracted data.") + group_cdm.add_argument('-w', '--wvd', required=False, action="store_true", help="Generate a pywidevine WVD device file.") + group_cdm.add_argument('-s', '--skip', required=False, action="store_true", help="Skip auto-detection of the private function.") + group_cdm.add_argument('-a', '--auto', required=False, action="store_true", help="Automatically start the Bitmovin web player.") + group_cdm.add_argument('-p', '--player', required=False, action="store_true", help="Install and start the Kaltura app automatically.") # Advanced options - group_advanced = parser.add_argument_group('Advanced') - group_advanced.add_argument('-f', '--functions', required=False, type=Path, metavar='', help='Path to Ghidra XML functions file.') - group_advanced.add_argument('-k', '--keybox', required=False, action='store_true', help='Enable export of the Keybox data if it is available.') - group_advanced.add_argument('--challenge', required=False, type=Path, metavar='', help='Path to unencrypted challenge for extracting client ID.') - group_advanced.add_argument('--private-key', required=False, type=Path, metavar='', help='Path to private key for extracting client ID.') + group_advanced = parser.add_argument_group("Advanced") + group_advanced.add_argument('-f', '--functions', required=False, type=Path, metavar="", help="Path to Ghidra XML functions file.") + group_advanced.add_argument('-k', '--keybox', required=False, action="store_true", help="Enable export of the Keybox data if it is available.") + group_advanced.add_argument('--challenge', required=False, type=Path, metavar="", help="Path to unencrypted challenge for extracting client ID.") + group_advanced.add_argument('--private-key', required=False, type=Path, metavar="", help="Path to private key for extracting client ID.") args = parser.parse_args() + # Handle version display if args.version: - print(f'KeyDive {keydive.__version__}') + print(f"KeyDive {keydive.__version__}") exit(0) - # Configure logging + # Configure logging (file and console) log_path = configure_logging(path=args.log, verbose=args.verbose) - logger = logging.getLogger('KeyDive') - logger.info('Version: %s', keydive.__version__) + logger = logging.getLogger("KeyDive") + logger.info("Version: %s", keydive.__version__) try: # Connect to the specified Android device adb = ADB(device=args.device) - # Initialize Cdm instance + # Initialize Cdm instance for content decryption module (with optional arguments) cdm = Cdm(keybox=args.keybox) if args.challenge: cdm.set_challenge(data=args.challenge) @@ -124,70 +126,70 @@ def main() -> None: # Initialize Core instance for interacting with the device core = Core(adb=adb, cdm=cdm, functions=args.functions, skip=args.skip) - # Setup actions based on user arguments + # Setup actions based on user arguments (for DRM player, Bitmovin player, etc.) if args.player: - package = DRM_PLAYER['package'] + package = DRM_PLAYER["package"] # Check if the application is already installed - installed = package in adb.list_applications(user=True, system=False) + installed = package in adb.list_applications(user=True, system=False) if not installed: - logger.debug('Application %s not found. Installing...', package) - installed = adb.install_application(path=DRM_PLAYER['path'], url=DRM_PLAYER['url']) + logger.debug("Application %s not found. Installing...", package) + installed = adb.install_application(path=DRM_PLAYER["path"], url=DRM_PLAYER["url"]) # Skip starting the application if installation failed if installed: # Start the application - logger.info('Starting application: %s', package) + logger.info("Starting application: %s", package) adb.start_application(package) elif args.auto: - logger.info('Opening the Bitmovin web player...') - adb.open_url('https://bitmovin.com/demos/drm') - logger.info('Setup completed') + logger.info("Opening the Bitmovin web player...") + adb.open_url("https://bitmovin.com/demos/drm") + logger.info("Setup completed") - # Process watcher loop - logger.info('Watcher delay: %ss' % args.delay) + # Process watcher loop: continuously checks for Widevine processes + logger.info("Watcher delay: %ss" % args.delay) current = None # Variable to track the current Widevine process while core.running: # Check if for current process data has been exported if current and cdm.export(args.output, args.wvd): raise KeyboardInterrupt # Stop if export is complete - # https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792 # Get the currently running Widevine processes + # https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792 processes = { key: (name, pid) for name, pid in adb.enumerate_processes().items() for key in CDM_VENDOR_API.keys() - if key in name or key.replace('-service', '-service-lazy') in name + if key in name or key.replace("-service", "-service-lazy") in name } if not processes: - raise EnvironmentError('Unable to detect Widevine, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#drm-info') + raise EnvironmentError("Unable to detect Widevine, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#drm-info") # Check if the current process has changed if current and current not in [v[1] for v in processes.values()]: - logger.warning('Widevine process has changed') + logger.warning("Widevine process has changed") current = None # If current process not found, attempt to hook into the detected processes if not current: - logger.debug('Analysing...') + logger.debug("Analysing...") for key, (name, pid) in processes.items(): if current: break for vendor in CDM_VENDOR_API[key]: if core.hook_process(pid=pid, vendor=vendor): - logger.info('Process: %s (%s)', pid, name) + logger.info("Process: %s (%s)", pid, name) current = pid break elif not core.running: raise KeyboardInterrupt if current: - logger.info('Successfully hooked') + logger.info("Successfully hooked") else: - logger.warning('Widevine library not found, searching...') + logger.warning("Widevine library not found, searching...") # Delay before next iteration time.sleep(args.delay) @@ -198,9 +200,9 @@ def main() -> None: # Final logging and exit if log_path: - logger.info('Log file: %s' % log_path) - logger.info('Exiting') + logger.info("Log file: %s" % log_path) + logger.info("Exiting") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/keydive/adb.py b/keydive/adb.py index 01af92c..725d0f3 100644 --- a/keydive/adb.py +++ b/keydive/adb.py @@ -3,29 +3,30 @@ import re import shutil import subprocess +from pathlib import Path + import frida import requests -from pathlib import Path from frida.core import Device # Suppress urllib3 warnings -logging.getLogger('urllib3.connectionpool').setLevel(logging.ERROR) +logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) def shell(prompt: list) -> subprocess.CompletedProcess: """ - Executes a shell command and returns the completed process. + Executes a shell command and returns the result. - Args: - prompt (list): The command to be executed as a list of strings. + Parameters: + prompt (list): The command to execute as a list of strings. Returns: - subprocess.CompletedProcess: The completed process object containing return code, stdout, and stderr. + subprocess.CompletedProcess: The result containing return code, stdout, and stderr. """ - prompt = list(map(str, prompt)) - # logging.getLogger('Shell').debug(' '.join(prompt)) - return subprocess.run(prompt, capture_output=True) + prompt = list(map(str, prompt)) # Ensure all command parts are strings + # logging.getLogger("Shell").debug(" ".join(prompt)) + return subprocess.run(prompt, capture_output=True) # Run the command and capture output class ADB: @@ -35,76 +36,76 @@ class ADB: def __init__(self, device: str = None, timeout: int = 5): """ - Initializes the ADB connection to a device. + Initializes ADB connection to the device. - Args: - device (str, optional): The ID of the device to connect to. If None, defaults to the first USB device. - timeout (int, optional): The timeout for device connection in seconds. Defaults to 5. + Parameters: + device (str, optional): Device ID to connect to, defaults to the first USB device. + timeout (int, optional): Timeout for connection in seconds. Defaults to 5. Raises: EnvironmentError: If ADB is not found in the system path. - Exception: If the connection to the device fails. + Exception: If connection to the device fails. """ self.logger = logging.getLogger(self.__class__.__name__) # Ensure ADB is available - if not shutil.which('adb'): + if not shutil.which("adb"): raise EnvironmentError( - 'ADB is not recognized as an environment variable. ' - 'Ensure ADB is installed and refer to the documentation: ' - 'https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#adb-android-debug-bridge' + "ADB is not recognized as an environment variable. " + "Ensure ADB is installed and refer to the documentation: " + "https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#adb-android-debug-bridge" ) # Start the ADB server if not already running sp = shell(['adb', 'start-server']) if sp.returncode != 0: - self.logger.warning('ADB server startup failed (Error: %s)', sp.stdout.decode('utf-8').strip()) + self.logger.warning("ADB server startup failed (Error: %s)", sp.stdout.decode("utf-8").strip()) - # Select device based on provided ID or default to the first USB device + # Connect to device (or default to the first USB device) try: self.device: Device = frida.get_device(id=device, timeout=timeout) if device else frida.get_usb_device(timeout=timeout) - self.logger.info('Connected to device: %s (%s)', self.device.name, self.device.id) + self.logger.info("Connected to device: %s (%s)", self.device.name, self.device.id) except Exception as e: - self.logger.error('Failed to connect to device: %s', e) + self.logger.error("Failed to connect to device: %s", e) raise e self.prompt = ['adb', '-s', self.device.id, 'shell'] - # Obtain device properties + # Retrieve and log device properties properties = self.device_properties() if properties: - self.logger.info('SDK API: %s', properties.get('ro.build.version.sdk', 'Unknown')) - self.logger.info('ABI CPU: %s', properties.get('ro.product.cpu.abi', 'Unknown')) + self.logger.info("SDK API: %s", properties.get("ro.build.version.sdk", "Unknown")) + self.logger.info("ABI CPU: %s", properties.get("ro.product.cpu.abi", "Unknown")) else: - self.logger.warning('No device properties were retrieved.') + self.logger.warning("No device properties retrieved") def device_properties(self) -> dict: """ - Retrieves system properties from the connected device using ADB shell commands. + Retrieves system properties from the device. Returns: - dict: A dictionary mapping device property keys to their corresponding values. + dict: A dictionary mapping property keys to their corresponding values. """ # https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands properties = {} - # Execute shell command to retrieve device properties + # Execute the shell command to retrieve device properties sp = shell([*self.prompt, 'getprop']) if sp.returncode != 0: - self.logger.error('Failed to retrieve device properties (Error: %s)', sp.stdout.decode('utf-8').strip()) + self.logger.error("Failed to retrieve device properties (Error: %s)", sp.stdout.decode("utf-8").strip()) return properties - # Parse the output to fill the properties dictionary - for line in sp.stdout.decode('utf-8').splitlines(): - match = re.match(r'\[(.*?)\]: \[(.*?)\]', line) + # Parse the output and cast values accordingly + for line in sp.stdout.decode("utf-8").splitlines(): + match = re.match(r"\[(.*?)\]: \[(.*?)\]", line) if match: key, value = match.groups() - # Attempt to cast numeric and boolean values + # Cast numeric and boolean values where appropriate if value.isdigit(): value = int(value) - elif value.lower() in ('true', 'false'): - value = value.lower() == 'true' + elif value.lower() in ("true", "false"): + value = value.lower() == "true" properties[key] = value @@ -112,37 +113,38 @@ class ADB: def list_applications(self, user: bool = True, system: bool = False) -> dict: """ - Lists installed applications on the device, filtering by user and/or system apps. + Lists installed applications on the device, with optional filters for user/system apps. - Args: - user (bool, optional): Whether to include user-installed applications. Defaults to True. - system (bool, optional): Whether to include system applications. Defaults to False. + Parameters: + user (bool, optional): Include user-installed apps. Defaults to True. + system (bool, optional): Include system apps. Defaults to False. Returns: - dict: A dictionary mapping application packages to their file paths. + dict: A dictionary of application packages and their file paths. """ applications = {} - # Validate input and set the appropriate prompt + # Validate input; return empty dict if no filter is set if not user and not system: return applications + # Set the appropriate shell command based on user/system filters prompt = [*self.prompt, 'pm', 'list', 'packages', '-f'] if user and not system: - prompt.append('-3') + prompt.append("-3") elif not user and system: - prompt.append('-s') + prompt.append("-s") - # Execute shell command to list applications + # Execute the shell command to list applications sp = shell(prompt) if sp.returncode != 0: - self.logger.error('Failed to retrieve application list (Error: %s)', sp.stdout.decode('utf-8').strip()) + self.logger.error("Failed to retrieve app list (Error: %s)", sp.stdout.decode("utf-8").strip()) return applications - # Parse and store applications in the dictionary - for line in sp.stdout.decode('utf-8').splitlines(): + # Parse and add applications to the dictionary + for line in sp.stdout.decode("utf-8").splitlines(): try: - path, package = line.strip().split(':', 1)[1].rsplit('=', 1) + path, package = line.strip().split(":", 1)[1].rsplit("=", 1) applications[package] = path except Exception as e: pass @@ -153,68 +155,68 @@ class ADB: """ Starts an application by its package name. - Args: - package (str): The package name of the application to start. + Parameters: + package (str): The package name of the application. Returns: - bool: True if the application was started successfully, False otherwise. + bool: True if the app was started successfully, False otherwise. """ - # Get package information + # Get package information using dumpsys sp = shell([*self.prompt, 'dumpsys', 'package', package]) - lines = sp.stdout.decode('utf-8').splitlines() + lines = sp.stdout.decode("utf-8").splitlines() # Remove empty lines to ensure backwards compatibility lines = [l.strip() for l in lines if l.strip()] - # Look for main activity in package information + # Look for MAIN activity to identify entry point for i, line in enumerate(lines): - if 'android.intent.action.MAIN' in line: - match = re.search(fr'({package}/[^ ]+)', lines[i + 1]) + if "android.intent.action.MAIN" in line: + match = re.search(fr"({package}/[^ ]+)", lines[i + 1]) if match: - # Attempt to start the application + # Start the application by its main activity main_activity = match.group() sp = shell([*self.prompt, 'am', 'start', '-n', main_activity]) if sp.returncode == 0: return True - self.logger.error('Failed to start application %s (Error: %s)', package, sp.stdout.decode('utf-8').strip()) + self.logger.error("Failed to start app %s (Error: %s)", package, sp.stdout.decode("utf-8").strip()) break - self.logger.error('Package %s not found or has no MAIN intent action.', package) + self.logger.error("Package %s not found or no MAIN intent", package) return False def enumerate_processes(self) -> dict: """ - Lists running processes on the device, mapping process names to PIDs. + Lists running processes and maps process names to their PIDs. Returns: - dict: A dictionary mapping process names to their corresponding PIDs. + dict: Dictionary of process names and corresponding PIDs. """ # https://github.com/frida/frida/issues/1225#issuecomment-604181822 processes = {} - # Try to get the list of processes using `ps -A` + # Attempt to get the list of processes using the 'ps -A' command prompt = [*self.prompt, 'ps'] sp = shell([*prompt, '-A']) - lines = sp.stdout.decode('utf-8').splitlines() + lines = sp.stdout.decode("utf-8").splitlines() - # If the output has less than 10 lines, try the alternative `ps` command + # If the output has less than 10 lines, retry with a simpler 'ps' command if len(lines) < 10: sp = shell(prompt) if sp.returncode != 0: - self.logger.error('Failed to execute ps command (Error: %s)', sp.stdout.decode('utf-8').strip()) + self.logger.error("Failed to execute ps command (Error: %s)", sp.stdout.decode("utf-8").strip()) return processes - lines = sp.stdout.decode('utf-8').splitlines() + lines = sp.stdout.decode("utf-8").splitlines() # Iterate through lines starting from the second line (skipping header) for line in lines[1:]: try: parts = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME pid = int(parts[1]) # Extract PID - name = ' '.join(parts[8:]).strip() # Extract process name + name = " ".join(parts[8:]).strip() # Extract process name - # Handle cases where the name might be in brackets - name = name if name.startswith('[') else Path(name).name + # Handle cases where process name might be in brackets (e.g., kernel threads) + name = name if name.startswith("[") else Path(name).name processes[name] = pid except Exception as e: pass @@ -223,61 +225,63 @@ class ADB: def install_application(self, path: Path = None, url: str = None) -> bool: """ - Installs an application from a local file path or a URL. + Installs an application on the device either from a local file or by downloading from a URL. - Args: + Parameters: path (Path, optional): The local file path of the APK to install. Defaults to None. url (str, optional): The URL to download the APK from. Defaults to None. Returns: bool: True if the installation was successful, False otherwise. """ + # Prepare the shell command for installation prompt = [*self.prompt[:-1], 'install'] - # Install from a local file path if provided + # Install from a local file path if a valid path is provided if path and path.is_file(): - sp = shell([*prompt, path]) + sp = shell([*prompt, path]) # Run the installation command with the local file path if sp.returncode == 0: return True - self.logger.error('Installation failed for local path: %s (Error: %s)', path, sp.stdout.decode('utf-8').strip()) + self.logger.error("Installation failed for local path: %s (Error: %s)", path, sp.stdout.decode("utf-8").strip()) - # Install from a URL if provided + # If URL is provided, attempt to download the APK and install it status = False if url: - file = Path('tmp.apk') + file = Path("tmp.apk") # Temporary file to store the downloaded APK try: - r = requests.get(url, headers={'Accept': '*/*', 'User-Agent': 'KeyDive/ADB'}) + # Download the APK from the provided URL + r = requests.get(url, headers={"Accept": "*/*", "User-Agent": "KeyDive/ADB"}) r.raise_for_status() - # Write downloaded content to temporary APK file + # Save the downloaded APK to a temporary file file.write_bytes(r.content) - # Attempt installation from the downloaded file + # Attempt installation from the downloaded APK status = self.install_application(path=file) except Exception as e: - self.logger.error('Failed to download application from URL: %s (Error: %s)', url, e) - file.unlink(missing_ok=True) + self.logger.error("Failed to download application from URL: %s (Error: %s)", url, e) + file.unlink(missing_ok=True) # Clean up the temporary file, even if there was an error return status def open_url(self, url: str) -> bool: """ - Opens a specified URL on the connected device. + Opens a specified URL on the device. - Args: - url (str): The URL to open on the device. + Parameters: + url (str): The URL to be opened on the device. Returns: - bool: True if the URL was opened successfully, False otherwise. + bool: True if the URL was successfully opened, False otherwise. """ - # Execute the shell command to open the URL + # Execute the shell command to open the URL using the Android 'am' (Activity Manager) command. sp = shell([*self.prompt, 'am', 'start', '-a', 'android.intent.action.VIEW', '-d', url]) - # Check the result and log accordingly + # Check the result of the command execution and log if there is an error if sp.returncode != 0: - self.logger.error('URL open failed for: %s (Return: %s)', url, sp.stdout.decode('utf-8').strip()) + self.logger.error("URL open failed for: %s (Return: %s)", url, sp.stdout.decode("utf-8").strip()) return False return True -__all__ = ('ADB',) +__all__ = ("ADB",) diff --git a/keydive/cdm.py b/keydive/cdm.py index 280d893..72413d0 100644 --- a/keydive/cdm.py +++ b/keydive/cdm.py @@ -2,17 +2,18 @@ import base64 import json import logging -from pathlib import Path from typing import Union from zlib import crc32 from unidecode import unidecode +from pathlib import Path from pathvalidate import sanitize_filepath, sanitize_filename from Cryptodome.PublicKey import RSA from Cryptodome.PublicKey.RSA import RsaKey from pywidevine.device import Device, DeviceTypes -from pywidevine.license_protocol_pb2 import (SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate, - DrmCertificate, EncryptedClientIdentification) +from pywidevine.license_protocol_pb2 import ( + SignedMessage, LicenseRequest, ClientIdentification, SignedDrmCertificate, DrmCertificate, + EncryptedClientIdentification) from keydive.constants import OEM_CRYPTO_API from keydive.keybox import Keybox @@ -28,15 +29,15 @@ class Cdm: """ Initializes the Cdm object, setting up a logger and containers for client IDs and private keys. - Attributes: - client_id (dict[int, ClientIdentification]): Stores client identification info mapped by key modulus. - private_key (dict[int, RsaKey]): Stores private keys mapped by key modulus. + Parameters: keybox (bool, optional): Initializes a Keybox instance for secure key management. """ self.logger = logging.getLogger(self.__class__.__name__) # https://github.com/devine-dl/pywidevine self.client_id: dict[int, ClientIdentification] = {} self.private_key: dict[int, RsaKey] = {} + + # Optionally initialize a Keybox instance for secure key management if 'keybox' is True self.keybox = Keybox() if keybox else None @staticmethod @@ -44,7 +45,7 @@ class Cdm: """ Converts client identification information to a dictionary. - Args: + Parameters: client_id (ClientIdentification): The client identification. Returns: @@ -57,120 +58,143 @@ class Cdm: """ Converts encrypted client identification information to a dictionary. - Args: + Parameters: encrypted_client_id (EncryptedClientIdentification): The encrypted client identification. Returns: dict: A dictionary of encrypted client information. """ content = { - 'providerId': encrypted_client_id.provider_id, - 'serviceCertificateSerialNumber': encrypted_client_id.service_certificate_serial_number, - 'encryptedClientId': encrypted_client_id.encrypted_client_id, - 'encryptedClientIdIv': encrypted_client_id.encrypted_client_id_iv, - 'encryptedPrivacyKey': encrypted_client_id.encrypted_privacy_key + "providerId": encrypted_client_id.provider_id, + "serviceCertificateSerialNumber": encrypted_client_id.service_certificate_serial_number, + "encryptedClientId": encrypted_client_id.encrypted_client_id, + "encryptedClientIdIv": encrypted_client_id.encrypted_client_id_iv, + "encryptedPrivacyKey": encrypted_client_id.encrypted_privacy_key } return { - k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else v + k: base64.b64encode(v).decode("utf-8") if isinstance(v, bytes) else v for k, v in content.items() } def set_challenge(self, data: Union[Path, bytes]) -> None: """ - Sets the challenge data by extracting device information. + Sets the challenge data by extracting device information and client ID. - Args: - data (Union[Path, bytes]): The challenge data as a file path or bytes. + Parameters: + data (Union[Path, bytes]): Challenge data as a file path or raw bytes. Raises: - FileNotFoundError: If the provided file path does not exist. + FileNotFoundError: If the file path doesn't exist. + Exception: Logs any other exceptions that occur. """ - if isinstance(data, Path): - if not data.is_file(): - raise FileNotFoundError(data) - data = data.read_bytes() - try: + # Check if the data is a Path object, indicating it's a file path + if isinstance(data, Path): + data = data.read_bytes() + + # Parse the signed message from the data signed_message = SignedMessage() signed_message.ParseFromString(data) + # Parse the license request from the signed message license_request = LicenseRequest() license_request.ParseFromString(signed_message.msg) + # Extract the encrypted client ID, if available # https://integration.widevine.com/diagnostics encrypted_client_id: EncryptedClientIdentification = license_request.encrypted_client_id if encrypted_client_id.SerializeToString(): - self.logger.info('Receive encrypted client id: \n\n%s\n', json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2)) - self.logger.warning('The client ID of the challenge is encrypted') + # If encrypted, log the encrypted client ID and indicate encryption + self.logger.info("Receive encrypted client id: \n\n%s\n", json.dumps(self.__encrypted_client_info(encrypted_client_id), indent=2)) + self.logger.warning("The client ID of the challenge is encrypted") else: + # If unencrypted, extract and set the client ID client_id: ClientIdentification = license_request.client_id self.set_client_id(data=client_id) + + except FileNotFoundError as e: + raise FileNotFoundError(f"Challenge file not found: {data}") from e except Exception as e: - self.logger.debug('Failed to set challenge data: %s', e) + self.logger.debug("Failed to set challenge data: %s", e) def set_private_key(self, data: Union[Path, bytes], name: str = None) -> None: """ Sets the private key from the provided data. - Args: - data (Union[Path, bytes]): The private key data, either as a file path or bytes. + Parameters: + data (Union[Path, bytes]): The private key data, either as a file path or byte data. name (str, optional): Function name for verification against known functions. Raises: - FileNotFoundError: If the provided file path does not exist or is not a file. + FileNotFoundError: If the file path doesn't exist. + Exception: Logs any other exceptions that occur. """ - if isinstance(data, Path): - if not data.is_file(): - raise FileNotFoundError(data) - data = data.read_bytes() - try: + # Check if the data is a Path object, indicating it's a file path + if isinstance(data, Path): + data = data.read_bytes() + + # Import the private key using the RSA module key = RSA.import_key(data) + + # Log the private key if it's not already in the dictionary if key.n not in self.private_key: - self.logger.info('Receive private key: \n\n%s\n', key.exportKey('PEM').decode('utf-8')) + self.logger.info("Receive private key: \n\n%s\n", key.exportKey("PEM").decode("utf-8")) + # If a function name is provided, verify it against known functions if name and name not in OEM_CRYPTO_API: - self.logger.warning('The function "%s" does not belong to the referenced functions. Communicate it to the developer to improve the tool.', name) + self.logger.warning("The function '%s' does not belong to the referenced functions. Communicate it to the developer to improve the tool.",name) + # Store the private key in the dictionary, using the modulus (key.n) as the key self.private_key[key.n] = key + except FileNotFoundError as e: + raise FileNotFoundError(f"Private key file not found: {data}") from e except Exception as e: - self.logger.debug('Failed to set private key: %s', e) + self.logger.debug("Failed to set private key: %s", e) def set_client_id(self, data: Union[ClientIdentification, bytes]) -> None: """ Sets the client ID from the provided data. - Args: + Parameters: data (Union[ClientIdentification, bytes]): The client ID data. """ try: + # Check if the provided data is already a `ClientIdentification` object if isinstance(data, ClientIdentification): client_id = data else: + # Deserialize the byte data into a `ClientIdentification` object client_id = ClientIdentification() client_id.ParseFromString(data) + # Initialize objects for parsing the DRM certificate and signed certificate signed_drm_certificate = SignedDrmCertificate() drm_certificate = DrmCertificate() + # Parse the signed DRM certificate from the client ID token signed_drm_certificate.ParseFromString(client_id.token) drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) + # Extract the public key from the DRM certificate public_key = drm_certificate.public_key key = RSA.importKey(public_key) + # Check if this public key has already been recorded and log the client ID info if key.n not in self.client_id: - self.logger.info('Receive client id: \n\n%s\n', json.dumps(self.__client_info(client_id), indent=2)) + self.logger.info("Receive client id: \n\n%s\n", json.dumps(self.__client_info(client_id), indent=2)) + + # Store the client ID in the client_id dictionary, using the public key modulus (`key.n`) as the key self.client_id[key.n] = client_id except Exception as e: - self.logger.debug('Failed to set client ID: %s', e) + self.logger.debug("Failed to set client ID: %s", e) def set_device_id(self, data: bytes) -> None: """ - Sets the device ID in the keybox if it is enabled. + Sets the device ID in the keybox. - Args: - data (bytes): The device ID data to be stored in the keybox. + Parameters: + data (bytes): The device ID to be stored in the keybox. """ if self.keybox: self.keybox.set_device_id(data=data) @@ -179,7 +203,7 @@ class Cdm: """ Sets the keybox data. - Args: + Parameters: data (bytes): The keybox data to be set. """ if self.keybox: @@ -187,56 +211,69 @@ class Cdm: def export(self, parent: Path, wvd: bool = False) -> bool: """ - Exports the client ID and private key to disk. + Exports client ID, private key, and optionally WVD files to disk. - Args: - parent (Path): The parent directory to export the files to. - wvd (bool): Whether to export WVD files. + Parameters: + parent (Path): Directory to export the files to. + wvd (bool, optional): Whether to export WVD files. Defaults to False. Returns: bool: True if any keys were exported, otherwise False. """ + # Find the intersection of client IDs and private keys keys = self.client_id.keys() & self.private_key.keys() + for k in keys: + # Retrieve client information based on the client ID client_info = self.__client_info(self.client_id[k]) + # https://github.com/devine-dl/pywidevine/blob/master/pywidevine/main.py#L211 device = Device( client_id=self.client_id[k].SerializeToString(), - private_key=self.private_key[k].exportKey('PEM'), + private_key=self.private_key[k].exportKey("PEM"), type_=DeviceTypes.ANDROID, security_level=3, flags=None ) + # Generate a sanitized file path for exporting the data # https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146958022 - parent = sanitize_filepath(parent / client_info['company_name'] / client_info['model_name'] / str(device.system_id) / str(k)[:10]) + parent = sanitize_filepath(parent / client_info["company_name"] / client_info["model_name"] / str(device.system_id) / str(k)[:10]) parent.mkdir(parents=True, exist_ok=True) - path_id_bin = parent / 'client_id.bin' + # Export the client ID to a binary file + path_id_bin = parent / "client_id.bin" path_id_bin.write_bytes(data=device.client_id.SerializeToString()) - self.logger.info('Exported client ID: %s', path_id_bin) + self.logger.info("Exported client ID: %s", path_id_bin) - path_key_bin = parent / 'private_key.pem' - path_key_bin.write_bytes(data=device.private_key.exportKey('PEM')) - self.logger.info('Exported private key: %s', path_key_bin) + # Export the private key to a PEM file + path_key_bin = parent / "private_key.pem" + path_key_bin.write_bytes(data=device.private_key.exportKey("PEM")) + self.logger.info("Exported private key: %s", path_key_bin) + # If the WVD option is enabled, export the WVD file if wvd: + # Serialize the device to WVD format wvd_bin = device.dumps() + # Generate a unique name for the WVD file using client and device details name = f"{client_info['company_name']} {client_info['model_name']}" - if client_info.get('widevine_cdm_version'): + if client_info.get("widevine_cdm_version"): name += f" {client_info['widevine_cdm_version']}" name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}" - name = unidecode(name.strip().lower().replace(' ', '_')) - path_wvd = parent / sanitize_filename(f'{name}_{device.system_id}_l{device.security_level}.wvd') + name = unidecode(name.strip().lower().replace(" ", "_")) + path_wvd = parent / sanitize_filename(f"{name}_{device.system_id}_l{device.security_level}.wvd") + # Export the WVD file to disk path_wvd.write_bytes(data=wvd_bin) - self.logger.info('Exported WVD: %s', path_wvd) + self.logger.info("Exported WVD: %s", path_wvd) + # If keybox is available and hasn't been exported, issue a warning if self.keybox and not self.keybox.export(parent=parent.parent): - self.logger.warning('The keybox has not been intercepted or decrypted') + self.logger.warning("The keybox has not been intercepted or decrypted") + # Return True if any keys were exported, otherwise return False return len(keys) > 0 -__all__ = ('Cdm',) +__all__ = ("Cdm",) diff --git a/keydive/constants.py b/keydive/constants.py index 79bfe32..52ad020 100644 --- a/keydive/constants.py +++ b/keydive/constants.py @@ -1,143 +1,145 @@ -import re from pathlib import Path from keydive.vendor import Vendor -# https://developer.android.com/ndk/guides/cpp-support +# https://cs.android.com/android/platform/superproject/+/android14-qpr3-release:bionic/libc/libc.map.txt NATIVE_C_API = { # BUILT-IN - 'main', + "main", # STDIO - 'fclose', 'fflush', 'fgetc', 'fgetpos', 'fgets', 'fopen', 'fprintf', 'fputc', 'fputs', 'fread', 'freopen', - 'fscanf', 'fseek', 'fsetpos', 'ftell', 'fwrite', 'getc', 'getchar', 'gets', 'perror', 'printf', 'putc', - 'putchar', 'puts', 'remove', 'rename', 'rewind', 'scanf', 'setbuf', 'setvbuf', 'sprintf', 'sscanf', 'tmpfile', - 'tmpnam', 'ungetc', 'vfprintf', 'vprintf', 'vsprintf', 'fileno', 'feof', 'ferror', 'snprintf', + "fclose", "fflush", "fgetc", "fgetpos", "fgets", "fopen", "fprintf", "fputc", "fputs", "fread", "freopen", + "fscanf", "fseek", "fsetpos", "ftell", "fwrite", "getc", "getchar", "gets", "perror", "printf", "putc", + "putchar", "puts", "remove", "rename", "rewind", "scanf", "setbuf", "setvbuf", "sprintf", "sscanf", "tmpfile", + "tmpnam", "ungetc", "vfprintf", "vprintf", "vsprintf", "fileno", "feof", "ferror", "snprintf", # STDLIB - 'abort', 'abs', 'atexit', 'atof', 'atoi', 'atol', 'bsearch', 'calloc', 'div', 'exit', 'free', 'getenv', 'labs', - 'ldiv', 'malloc', 'mblen', 'mbstowcs', 'mbtowc', 'qsort', 'rand', 'realloc', 'srand', 'strtod', 'strtol', - 'strtoul', 'system', 'wcstombs', 'wctomb', + "abort", "abs", "atexit", "atof", "atoi", "atol", "bsearch", "calloc", "div", "exit", "free", "getenv", "labs", + "ldiv", "malloc", "mblen", "mbstowcs", "mbtowc", "qsort", "rand", "realloc", "srand", "strtod", "strtol", + "strtoul", "system", "wcstombs", "wctomb", # STRING - 'memchr', 'memcmp', 'memcpy', 'memmove', 'memset', 'strcat', 'strchr', 'strcmp', 'strcoll', 'strcpy', 'strcspn', - 'strerror', 'strlen', 'strncat', 'strncmp', 'strncpy', 'strpbrk', 'strrchr', 'strspn', 'strstr', 'strtok', - 'strxfrm', 'strncasecmp', + "memchr", "memcmp", "memcpy", "memmove", "memset", "strcat", "strchr", "strcmp", "strcoll", "strcpy", "strcspn", + "strerror", "strlen", "strncat", "strncmp", "strncpy", "strpbrk", "strrchr", "strspn", "strstr", "strtok", + "strxfrm", "strncasecmp", # MATH - 'acos', 'asin', 'atan', 'atan2', 'cos', 'cosh', 'exp', 'fabs', 'floor', 'fmod', 'frexp', 'ldexp', 'log', - 'log10', 'modf', 'pow', 'sin', 'sinh', 'sqrt', 'tan', 'tanh', + "acos", "asin", "atan", "atan2", "cos", "cosh", "exp", "fabs", "floor", "fmod", "frexp", "ldexp", "log", + "log10", "modf", "pow", "sin", "sinh", "sqrt", "tan", "tanh", # CTYPE - 'isalnum', 'isalpha', 'iscntrl', 'isdigit', 'isgraph', 'islower', 'isprint', 'ispunct', 'isspace', 'isupper', - 'isxdigit', 'tolower', 'toupper', + "isalnum", "isalpha", "iscntrl", "isdigit", "isgraph", "islower", "isprint", "ispunct", "isspace", "isupper", + "isxdigit", "tolower", "toupper", # TIME - 'asctime', 'clock', 'ctime', 'difftime', 'gmtime', 'localtime', 'mktime', 'strftime', 'time', + "asctime", "clock", "ctime", "difftime", "gmtime", "localtime", "mktime", "strftime", "time", # UNISTD - 'access', 'alarm', 'chdir', 'chown', 'close', 'dup', 'dup2', 'execle', 'execv', 'execve', 'execvp', 'fork', - 'fpathconf', 'getcwd', 'getegid', 'geteuid', 'getgid', 'getgroups', 'getlogin', 'getopt', 'getpgid', 'getpgrp', - 'getpid', 'getppid', 'getuid', 'isatty', 'lseek', 'pathconf', 'pause', 'pipe', 'read', 'rmdir', 'setgid', - 'setpgid', 'setsid', 'setuid', 'sleep', 'sysconf', 'tcgetpgrp', 'tcsetpgrp', 'ttyname', 'ttyname_r', 'write', - 'fsync', 'unlink', 'syscall', 'getpagesize', + "access", "alarm", "chdir", "chown", "close", "dup", "dup2", "execle", "execv", "execve", "execvp", "fork", + "fpathconf", "getcwd", "getegid", "geteuid", "getgid", "getgroups", "getlogin", "getopt", "getpgid", "getpgrp", + "getpid", "getppid", "getuid", "isatty", "lseek", "pathconf", "pause", "pipe", "read", "rmdir", "setgid", + "setpgid", "setsid", "setuid", "sleep", "sysconf", "tcgetpgrp", "tcsetpgrp", "ttyname", "ttyname_r", "write", + "fsync", "unlink", "syscall", "getpagesize", # FCNTL - 'creat', 'fcntl', 'open', + "creat", "fcntl", "open", # SYS_TYPE - 'fd_set', 'FD_CLR', 'FD_ISSET', 'FD_SET', 'FD_ZERO', + "fd_set", "FD_CLR", "FD_ISSET", "FD_SET", "FD_ZERO", # SYS_STAT - 'chmod', 'fchmod', 'fstat', 'mkdir', 'mkfifo', 'stat', 'umask', + "chmod", "fchmod", "fstat", "mkdir", "mkfifo", "stat", "umask", # SYS_TIME - 'gettimeofday', 'select', 'settimeofday', + "gettimeofday", "select", "settimeofday", # SIGNAL - 'signal', 'raise', 'kill', 'sigaction', 'sigaddset', 'sigdelset', 'sigemptyset', 'sigfillset', 'sigismember', - 'sigpending', 'sigprocmask', 'sigsuspend', 'alarm', 'pause', + "signal", "raise", "kill", "sigaction", "sigaddset", "sigdelset", "sigemptyset", "sigfillset", "sigismember", + "sigpending", "sigprocmask", "sigsuspend", "alarm", "pause", # SETJMP - 'longjmp', 'setjmp', + "longjmp", "setjmp", # ERRNO - 'errno', 'strerror', 'perror', + "errno", "strerror", "perror", # ASSERT - 'assert', + "assert", # LOCAL - 'localeconv', 'setlocale', + "localeconv", "setlocale", # WCHAR - 'btowc', 'fgetwc', 'fgetws', 'fputwc', 'fputws', 'fwide', 'fwprintf', 'fwscanf', 'getwc', 'getwchar', 'mbrlen', - 'mbrtowc', 'mbsinit', 'mbsrtowcs', 'putwc', 'putwchar', 'swprintf', 'swscanf', 'ungetwc', 'vfwprintf', - 'vfwscanf', 'vwprintf', 'vwscanf', 'wcrtomb', 'wcscat', 'wcschr', 'wcscmp', 'wcscoll', 'wcscpy', 'wcscspn', - 'wcsftime', 'wcslen', 'wcsncat', 'wcsncmp', 'wcsncpy', 'wcspbrk', 'wcsrchr', 'wcsrtombs', 'wcsspn', 'wcsstr', - 'wcstod', 'wcstok', 'wcstol', 'wcstombs', 'wcstoul', 'wcsxfrm', 'wctob', 'wmemchr', 'wmemcmp', 'wmemcpy', - 'wmemmove', 'wmemset', 'wprintf', 'wscanf', + "btowc", "fgetwc", "fgetws", "fputwc", "fputws", "fwide", "fwprintf", "fwscanf", "getwc", "getwchar", "mbrlen", + "mbrtowc", "mbsinit", "mbsrtowcs", "putwc", "putwchar", "swprintf", "swscanf", "ungetwc", "vfwprintf", + "vfwscanf", "vwprintf", "vwscanf", "wcrtomb", "wcscat", "wcschr", "wcscmp", "wcscoll", "wcscpy", "wcscspn", + "wcsftime", "wcslen", "wcsncat", "wcsncmp", "wcsncpy", "wcspbrk", "wcsrchr", "wcsrtombs", "wcsspn", "wcsstr", + "wcstod", "wcstok", "wcstol", "wcstombs", "wcstoul", "wcsxfrm", "wctob", "wmemchr", "wmemcmp", "wmemcpy", + "wmemmove", "wmemset", "wprintf", "wscanf", # WCTYPE - 'iswalnum', 'iswalpha', 'iswcntrl', 'iswdigit', 'iswgraph', 'iswlower', 'iswprint', 'iswpunct', 'iswspace', - 'iswupper', 'iswxdigit', 'towlower', 'towupper', 'iswctype', 'wctype', + "iswalnum", "iswalpha", "iswcntrl", "iswdigit", "iswgraph", "iswlower", "iswprint", "iswpunct", "iswspace", + "iswupper", "iswxdigit", "towlower", "towupper", "iswctype", "wctype", # STDDEF - 'NULL', 'offsetof', 'ptrdiff_t', 'size_t', 'wchar_t', + "NULL", "offsetof", "ptrdiff_t", "size_t", "wchar_t", # STDARG - 'va_arg', 'va_end', 'va_start', + "va_arg", "va_end", "va_start", # DLFCN - 'dlclose', 'dlerror', 'dlopen', 'dlsym', + "dlclose", "dlerror", "dlopen", "dlsym", # DIRENT - 'closedir', 'opendir', 'readdir', + "closedir", "opendir", "readdir", # SYS_SENDFILE - 'sendfile', + "sendfile", # SYS_MMAN - 'mmap', 'mprotect', 'munmap', + "mmap", "mprotect", "munmap", # SYS_UTSNAME - 'uname', + "uname", # LINK - 'dladdr' + "dladdr" } +# https://cs.android.com/search?q=oemcrypto&sq=&ss=android%2Fplatform%2Fsuperproject OEM_CRYPTO_API = { # Mapping of function names across different API levels (obfuscated names may vary). - 'rnmsglvj', 'polorucp', 'kqzqahjq', 'pldrclfq', 'kgaitijd', 'cwkfcplc', 'crhqcdet', 'ulns', 'dnvffnze', 'ygjiljer', - 'qbjxtubz', 'qkfrcjtw', 'rbhjspoh', 'zgtjmxko', 'igrqajte', 'ofskesua', 'qllcoacg', 'pukctkiv', 'ehdqmfmd', - 'xftzvkwx', 'gndskkuk', 'wcggmnnx', 'kaatohcz', 'ktmgdchz', 'jkcwonus', 'ehmduqyt', 'vewtuecx', 'mxrbzntq', - 'isyowgmp', 'flzfkhbc' + "rnmsglvj", "polorucp", "kqzqahjq", "pldrclfq", "kgaitijd", "cwkfcplc", "crhqcdet", "ulns", "dnvffnze", "ygjiljer", + "qbjxtubz", "qkfrcjtw", "rbhjspoh", "zgtjmxko", "igrqajte", "ofskesua", "qllcoacg", "pukctkiv", "ehdqmfmd", + "xftzvkwx", "gndskkuk", "wcggmnnx", "kaatohcz", "ktmgdchz", "jkcwonus", "ehmduqyt", "vewtuecx", "mxrbzntq", + "isyowgmp", "flzfkhbc" # Add more as needed for different versions. } # https://developer.android.com/tools/releases/platforms CDM_VENDOR_API = { - 'mediaserver': [ - Vendor(22, 11, '1.0', r'libwvdrmengine(?:@\S+)?\.so') + "mediaserver": [ + Vendor(22, 11, "1.0", r"libwvdrmengine(?:@\S+)?\.so") ], - 'mediadrmserver': [ - Vendor(24, 11, '1.0', r'libwvdrmengine(?:@\S+)?\.so') + "mediadrmserver": [ + Vendor(24, 11, "1.0", r"libwvdrmengine(?:@\S+)?\.so") ], - 'android.hardware.drm@1.0-service.widevine': [ - Vendor(26, 13, '5.1.0', r'libwvhidl(?:@\S+)?\.so') + "android.hardware.drm@1.0-service.widevine": [ + Vendor(26, 13, "5.1.0", r"libwvhidl(?:@\S+)?\.so") ], - 'android.hardware.drm@1.1-service.widevine': [ - Vendor(28, 14, '14.0.0', r'libwvhidl(?:@\S+)?\.so') + "android.hardware.drm@1.1-service.widevine": [ + Vendor(28, 14, "14.0.0", r"libwvhidl(?:@\S+)?\.so") ], - 'android.hardware.drm@1.2-service.widevine': [ - Vendor(29, 15, '15.0.0', r'libwvhidl(?:@\S+)?\.so') + "android.hardware.drm@1.2-service.widevine": [ + Vendor(29, 15, "15.0.0", r"libwvhidl(?:@\S+)?\.so") ], - 'android.hardware.drm@1.3-service.widevine': [ - Vendor(30, 16, '16.0.0', r'libwvhidl(?:@\S+)?\.so') + "android.hardware.drm@1.3-service.widevine": [ + Vendor(30, 16, "16.0.0", r"libwvhidl(?:@\S+)?\.so") ], - 'android.hardware.drm@1.4-service.widevine': [ - Vendor(31, 16, '16.1.0', r'libwvhidl(?:@\S+)?\.so') + "android.hardware.drm@1.4-service.widevine": [ + Vendor(31, 16, "16.1.0", r"libwvhidl(?:@\S+)?\.so") ], - 'android.hardware.drm-service.widevine': [ - Vendor(33, 17, '17.0.0', r'libwvaidl(?:@\S+)?\.so'), - Vendor(34, 18, '18.0.0', r'android\.hardware\.drm-service(?:-lazy)?\.widevine(?:@\S+)?'), - Vendor(35, 18, '19.0.1', r'android\.hardware\.drm-service(?:-lazy)?\.widevine(?:@\S+)?') + "android.hardware.drm-service.widevine": [ + Vendor(33, 17, "17.0.0", r"libwvaidl(?:@\S+)?\.so"), + Vendor(34, 18, "18.0.0", r"android\.hardware\.drm-service(?:-lazy)?\.widevine(?:@\S+)?"), + Vendor(35, 18, "19.0.1", r"android\.hardware\.drm-service(?:-lazy)?\.widevine(?:@\S+)?") ] } # https://developers.google.com/widevine CDM_FUNCTION_API = { - 'UsePrivacyMode', - 'GetCdmClientPropertySet', - 'PrepareKeyRequest', - 'getOemcryptoDeviceId', - 'lcc07', - 'oecc07', - 'Read', - 'x1c36', - 'runningcrc' + "UsePrivacyMode", + "GetCdmClientPropertySet", + "PrepareKeyRequest", + "getOemcryptoDeviceId", + "lcc07", + "oecc07", + "Read", + "x1c36", + "runningcrc" } +# Maximum clear API level for Keybox +# https://cs.android.com/android/platform/superproject/+/android14-qpr3-release:trusty/user/app/sample/hwcrypto/keybox/keybox.c KEYBOX_MAX_CLEAR_API = 28 # https://github.com/kaltura/kaltura-device-info-android DRM_PLAYER = { - 'package': 'com.kaltura.kalturadeviceinfo', - 'path': Path(__file__).parent.parent / 'docs' / 'server' / 'kaltura.apk', - 'url': 'https://github.com/kaltura/kaltura-device-info-android/releases/download/t3/kaltura-device-info-release.apk' + "package": "com.kaltura.kalturadeviceinfo", + "path": Path(__file__).parent.parent / "docs" / "server" / "kaltura.apk", + "url": "https://github.com/kaltura/kaltura-device-info-android/releases/download/t3/kaltura-device-info-release.apk" } diff --git a/keydive/core.py b/keydive/core.py index 4752fd5..f9a8126 100644 --- a/keydive/core.py +++ b/keydive/core.py @@ -24,55 +24,59 @@ class Core: """ Initializes a Core instance. - Args: + Parameters: adb (ADB): ADB instance for device communication. - cdm (Cdm): Instance of Cdm for managing DRM related operations. - functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None. - skip (bool, optional): Flag to determine whether to skip predefined functions (e.g., OEM_CRYPTO_API). + cdm (Cdm): Instance for handling DRM-related operations. + functions (Path, optional): Path to Ghidra XML file for symbol extraction. Defaults to None. + skip (bool, optional): Whether to skip predefined functions (e.g., OEM_CRYPTO_API). Defaults to False. """ self.logger = logging.getLogger(self.__class__.__name__) self.running = True self.cdm = cdm self.adb = adb - # https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679 # Flag to skip predefined functions based on the vendor's API level + # https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679 self.skip = skip - # Load the hook script and prepare for injection + # Load the hook script with relevant data and prepare for injection self.functions = functions self.script = self.__prepare_hook_script() - self.logger.info('Script loaded successfully') + self.logger.info("Hook script prepared successfully") def __prepare_hook_script(self) -> str: """ - Prepares the hook script content by injecting the library-specific scripts. + Prepares the hook script by injecting library-specific data. Returns: - str: The prepared script content. + str: The finalized hook script content with placeholders replaced. """ - content = Path(__file__).with_name('keydive.js').read_text(encoding='utf-8') + # Read the base JavaScript template file + content = Path(__file__).with_name("keydive.js").read_text(encoding="utf-8") + + # Generate the list of symbols from the functions file symbols = self.__prepare_symbols(self.functions) - # Replace placeholders in script template + # Define the placeholder replacements replacements = { - '${OEM_CRYPTO_API}': json.dumps(list(OEM_CRYPTO_API)), - '${NATIVE_C_API}': json.dumps(list(NATIVE_C_API)), - '${SYMBOLS}': json.dumps(symbols), - '${SKIP}': str(self.skip) + "${OEM_CRYPTO_API}": json.dumps(list(OEM_CRYPTO_API)), + "${NATIVE_C_API}": json.dumps(list(NATIVE_C_API)), + "${SYMBOLS}": json.dumps(symbols), + "${SKIP}": str(self.skip) } + # Replace placeholders in the script content for placeholder, value in replacements.items(): - content = content.replace(placeholder, value) + content = content.replace(placeholder, value, 1) return content def __prepare_symbols(self, path: Path) -> list: """ - Parses the provided XML functions file to select relevant functions. + Extracts relevant functions from a Ghidra XML file. - Args: - path (Path): Path to Ghidra XML functions file. + Parameters: + path (Path): Path to the Ghidra XML functions file. Returns: list: List of selected functions as dictionaries. @@ -81,70 +85,83 @@ class Core: FileNotFoundError: If the functions file is not found. ValueError: If functions extraction fails. """ + # Return an empty list if no path is provided if not path: return [] - elif not path.is_file(): - raise FileNotFoundError('Functions file not found') try: - program = xmltodict.parse(path.read_bytes())['PROGRAM'] - addr_base = int(program['@IMAGE_BASE'], 16) - functions = program['FUNCTIONS']['FUNCTION'] + # Parse the XML file and extract program data + program = xmltodict.parse(path.read_bytes())["PROGRAM"] + addr_base = int(program["@IMAGE_BASE"], 16) # Base address for function addresses + functions = program["FUNCTIONS"]["FUNCTION"] # List of functions in the XML - # Find a target function from a predefined list - target = None if self.skip else next((f['@NAME'] for f in functions if f['@NAME'] in OEM_CRYPTO_API), None) + # Identify a target function from the predefined OEM_CRYPTO_API list (if not skipped) + target = next((f["@NAME"] for f in functions if f["@NAME"] in OEM_CRYPTO_API and not self.skip), None) - # Extract relevant functions + # Prepare a dictionary to store selected functions selected = {} for func in functions: - name = func['@NAME'] - args = len(func.get('REGISTER_VAR', [])) + name = func["@NAME"] # Function name + args = len(func.get("REGISTER_VAR", [])) # Number of arguments - # Add function if it matches specific criteria + """ + Add the function if it matches specific criteria + - Match the target function if identified + - Match API keywords + - Match unnamed functions with 6+ args + """ if name not in selected and ( name == target or any(True if self.skip else keyword in name for keyword in CDM_FUNCTION_API) - or (not target and re.match(r'^[a-z]+$', name) and args >= 6) + or (not target and re.match(r"^[a-z]+$", name) and args >= 6) ): selected[name] = { - 'type': 'function', - 'name': name, - 'address': hex(int(func['@ENTRY_POINT'], 16) - addr_base) + "type": "function", + "name": name, + "address": hex(int(func["@ENTRY_POINT"], 16) - addr_base) # Calculate relative address } + + # Return the list of selected functions return list(selected.values()) + except FileNotFoundError as e: + raise FileNotFoundError(f"Functions file not found: {path}") from e except Exception as e: - raise ValueError('Failed to extract functions from Ghidra') from e + raise ValueError("Failed to extract functions from Ghidra XML file") from e def __process_message(self, message: dict, data: bytes) -> None: """ Handles messages received from the Frida script. - Args: + Parameters: message (dict): The message payload. data (bytes): The raw data associated with the message. """ - logger = logging.getLogger('Script') - level = message.get('payload') + logger = logging.getLogger("Script") + level = message.get("payload") if isinstance(level, int): - # Process logging messages from Frida script - logger.log(level=level, msg=data.decode('utf-8')) + # Log the message based on its severity level + logger.log(level=level, msg=data.decode("utf-8")) if level in (logging.FATAL, logging.CRITICAL): - self.running = False - elif isinstance(level, dict) and 'private_key' in level: - self.cdm.set_private_key(data=data, name=level['private_key']) - elif level == 'challenge': + self.running = False # Stop the process on critical errors + elif isinstance(level, dict) and "private_key" in level: + # Set the private key in the DRM handler + self.cdm.set_private_key(data=data, name=level["private_key"]) + elif level == "challenge": + # Set the challenge data in the DRM handler self.cdm.set_challenge(data=data) - elif level == 'device_id': + elif level == "device_id": + # Set the device ID in the DRM handler self.cdm.set_device_id(data) - elif level == 'keybox': + elif level == "keybox": + # Set the keybox data in the DRM handler self.cdm.set_keybox(data) def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool: """ Hooks into the specified process. - Args: + Parameters: pid (int): The process ID to hook. vendor (Vendor): Instance of Vendor class representing the vendor information. timeout (int, optional): Timeout for attaching to the process. Defaults to 0. @@ -153,38 +170,57 @@ class Core: bool: True if the process was successfully hooked, otherwise False. """ try: + # Attach to the target process using the specified PID. + # The 'persist_timeout' parameter ensures the session persists for the given duration. session: Session = self.adb.device.attach(pid, persist_timeout=timeout) except frida.ServerNotRunningError as e: - raise EnvironmentError('Frida server is not running') from e + # Handle the case where the Frida server is not running on the device. + raise EnvironmentError("Frida server is not running") from e except Exception as e: + # Log other exceptions and return False to indicate failure. self.logger.error(e) return False + # Define a callback to handle when the process is destroyed. def __process_destroyed() -> None: session.detach() + # Create a Frida script object using the prepared script content. script: Script = session.create_script(self.script) - script.on('message', self.__process_message) - script.on('destroyed', __process_destroyed) + script.on("message", self.__process_message) + script.on("destroyed", __process_destroyed) script.load() + # Fetch a list of libraries loaded by the target process. libraries = script.exports_sync.getlibraries() - library = next((l for l in libraries if re.match(vendor.pattern, l['name'])), None) + library = next((l for l in libraries if re.match(vendor.pattern, l["name"])), None) if library: - self.logger.info('Library: %s (%s)', library['name'], library['path']) + # Log information about the library if it is found. + self.logger.info("Library: %s (%s)", library["name"], library["path"]) - # Check if Ghidra XML functions loaded - if vendor.oem > 17 and not self.functions: - self.logger.warning('For OEM API > 17, specifying "functions" is required, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md') - elif vendor.oem < 18 and self.functions: - self.logger.warning('The "functions" attribute is deprecated for OEM API < 18') + # Retrieve and log the version of the Frida server. + version = script.exports_sync.getversion() + self.logger.debug(f"Server: %s", version) - return script.exports_sync.hooklibrary(library['name']) + # Determine if the Frida server version is older than 16.6.0. + code = tuple(map(int, version.split("."))) + minimum = code[0] < 16 or (code == 16 and code[1] < 6) + # Warn the user if certain conditions related to the functions option are met. + if minimum and self.functions: + self.logger.warning("The '--functions' option is deprecated starting from Frida 16.6.0") + elif not minimum and vendor.oem < 18 and self.functions: + self.logger.warning("The '--functions' option is deprecated for OEM API < 18") + elif not minimum and vendor.oem > 17 and not self.functions: + self.logger.warning("For OEM API > 17, specifying '--functions' is required. Refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md") + + return script.exports_sync.hooklibrary(library["name"]) + + # Unload the script if the target library is not found. script.unload() - self.logger.warning('Library not found: %s' % vendor.pattern) + self.logger.warning("Library not found: %s" % vendor.pattern) return False -__all__ = ('Core',) +__all__ = ("Core",) diff --git a/keydive/keybox.py b/keydive/keybox.py index aeaeb14..e707b24 100644 --- a/keydive/keybox.py +++ b/keydive/keybox.py @@ -5,21 +5,20 @@ import logging from json.encoder import encode_basestring_ascii from typing import Literal from uuid import UUID - from pathlib import Path -def bytes2int(value: bytes, byteorder: Literal['big', 'little'] = 'big', signed: bool = False) -> int: +def bytes2int(value: bytes, byteorder: Literal["big", "little"] = "big", signed: bool = False) -> int: """ - Convert bytes to an integer. + Convert a byte sequence to an integer. - Args: + Parameters: value (bytes): The byte sequence to convert. - byteorder (Literal['big', 'little'], optional): The byte order for conversion. Defaults to 'big'. - signed (bool, optional): Indicates if the integer is signed. Defaults to False. + byteorder (str, optional): Byte order for conversion. 'big' or 'little'. Defaults to 'big'. + signed (bool, optional): Whether the integer is signed. Defaults to False. Returns: - int: The converted integer. + int: The integer representation of the byte sequence. """ return int.from_bytes(value, byteorder=byteorder, signed=signed) @@ -31,12 +30,7 @@ class Keybox: def __init__(self): """ - Initializes the Keybox object, setting up a logger and containers for device IDs and keyboxes. - - Attributes: - logger (Logger): Logger instance for logging messages. - device_id (list[bytes]): List of unique device IDs (32 bytes each). - keybox (dict[bytes, bytes]): Dictionary mapping device IDs to their respective keybox data. + Initializes the Keybox object, setting up logger and containers for device IDs and keyboxes. """ self.logger = logging.getLogger(self.__class__.__name__) # https://github.com/kaltura/kaltura-device-info-android/blob/master/app/src/main/java/com/kaltura/kalturadeviceinfo/MainActivity.java#L203 @@ -47,7 +41,7 @@ class Keybox: """ Set the device ID from the provided data. - Args: + Parameters: data (bytes): The device ID, expected to be 32 bytes long. Raises: @@ -55,19 +49,22 @@ class Keybox: """ try: size = len(data) - assert size == 32, f'Invalid keybox length: {size}. Should be 32 bytes' + # Ensure the device ID is exactly 32 bytes long + assert size == 32, f"Invalid device ID length: {size}. Should be 32 bytes" + # Add device ID to the list if it's not already present if data not in self.device_id: - self.logger.info('Receive device id: \n\n%s\n', encode_basestring_ascii(data.decode('utf-8'))) + self.logger.info("Receive device id: \n\n%s\n", encode_basestring_ascii(data.decode("utf-8"))) self.device_id.append(data) + except Exception as e: - self.logger.debug('Failed to set device id: %s', e) + self.logger.debug("Failed to set device id: %s", e) def set_keybox(self, data: bytes) -> None: """ Set the keybox from the provided data. - Args: + Parameters: data (bytes): The keybox data, expected to be either 128 or 132 bytes long. Raises: @@ -76,68 +73,74 @@ class Keybox: # https://github.com/zybpp/Python/tree/master/Python/keybox try: size = len(data) - assert size in (128, 132), f'Invalid keybox length: {size}. Should be 128 or 132 bytes' + # Validate the keybox size (128 or 132 bytes) + assert size in (128, 132), f"Invalid keybox length: {size}. Should be 128 or 132 bytes" - if size == 132: - assert data[128:132] == b"LVL1", 'QSEE-style keybox must end with bytes "LVL1"' + # Validate the QSEE-style keybox end + assert size == 128 or data[128:132] == b"LVL1", "QSEE-style keybox must end with bytes 'LVL1'" - assert data[120:124] == b"kbox", 'Invalid keybox magic' + # Validate the keybox magic (should be 'kbox') + assert data[120:124] == b"kbox", "Invalid keybox magic" - device_id = data[0:32] + device_id = data[0:32] # Extract the device ID from the first 32 bytes - # Retrieve structured keybox info and log it + # Retrieve and log the structured keybox information infos = self.__keybox_info(data) - encrypted = infos['flags'] > 10 - self.set_device_id(data=device_id) + encrypted = infos["flags"] > 10 # Check if the keybox is encrypted + self.set_device_id(data=device_id) # Set the device ID + # Log and store the keybox data if it's a new keybox or the device ID is updated if (device_id in self.keybox and self.keybox[device_id] != (data, encrypted)) or device_id not in self.keybox: - self.logger.info('Receive keybox: \n\n%s\n', json.dumps(infos, indent=2)) + self.logger.info("Receive keybox: \n\n%s\n", json.dumps(infos, indent=2)) - # Warn if flags indicate encryption, which requires an unencrypted device token + # Warn if keybox is encrypted and interception of plaintext device token is needed if encrypted: - self.logger.warning('Keybox contains encrypted data. Interception of plaintext device token is needed') + self.logger.warning("Keybox contains encrypted data. Interception of plaintext device token is needed") - # Store or update the keybox for the device if it's not already saved + # Store the keybox (encrypted or not) for the device ID if (device_id in self.keybox and not encrypted) or device_id not in self.keybox: self.keybox[device_id] = (data, encrypted) except Exception as e: - self.logger.debug('Failed to set keybox: %s', e) + self.logger.debug("Failed to set keybox: %s", e) @staticmethod def __keybox_info(data: bytes) -> dict: """ Extract keybox information from the provided data. - Args: + Parameters: data (bytes): The keybox data. Returns: dict: A dictionary containing extracted keybox information. """ # https://github.com/wvdumper/dumper/blob/main/Helpers/Keybox.py#L51 + + # Extract device-specific information from the keybox data device_token = data[48:120] + + # Prepare the keybox content dictionary content = { - 'device_id': data[0:32].decode('utf-8'), # Device's unique identifier (32 bytes) - 'device_key': data[32:48], # Device-specific cryptographic key (16 bytes) - 'device_token': device_token, # Token used for device authentication (72 bytes) - 'keybox_tag': data[120:124].decode('utf-8'), # Magic tag indicating keybox format (4 bytes) - 'crc32': bytes2int(data[124:128]), # CRC32 checksum for data integrity verification (4 bytes) - 'level_tag': data[128:132].decode('utf-8') or None, # Optional tag indicating keybox level (4 bytes). + "device_id": data[0:32].decode("utf-8"), # Device's unique identifier (32 bytes) + "device_key": data[32:48], # Device cryptographic key (16 bytes) + "device_token": device_token, # Token for device authentication (72 bytes) + "keybox_tag": data[120:124].decode("utf-8"), # Magic tag (4 bytes) + "crc32": bytes2int(data[124:128]), # CRC32 checksum (4 bytes) + "level_tag": data[128:132].decode("utf-8") or None, # Optional level tag (4 bytes) - # Additional metadata parsed from the device token (Bytes 48–120). - 'flags': bytes2int(device_token[0:4]), # Flags indicating specific device capabilities (4 bytes). - 'system_id': bytes2int(device_token[4:8]), # System identifier (4 bytes). - - # Provisioning ID, encrypted and derived from the unique ID in the system. - 'provisioning_id': UUID(bytes_le=device_token[8:24]), # Provisioning UUID (16 bytes). - - # Encrypted bits containing device key, key hash, and additional flags. - 'encrypted_bits': device_token[24:72] ## Encrypted device-specific information (48 bytes). + # Extract metadata from the device token (Bytes 48–120) + "flags": bytes2int(device_token[0:4]), # Device flags (4 bytes) + "system_id": bytes2int(device_token[4:8]), # System identifier (4 bytes) + "provisioning_id": UUID(bytes_le=device_token[8:24]), # Provisioning UUID (16 bytes) + "encrypted_bits": device_token[24:72] # Encrypted device-specific information (48 bytes) } - # Encode certain fields in base64 and convert UUIDs to string + # https://github.com/ThatNotEasy/Parser-DRM/blob/main/modules/widevine.py#L84 + # TODO: decrypt device token value + + # Encode bytes as base64 and convert UUIDs to string return { - k: base64.b64encode(v).decode('utf-8') if isinstance(v, bytes) else str(v) if isinstance(v, UUID) else v + k: base64.b64encode(v).decode("utf-8") if isinstance(v, bytes) else str(v) if isinstance(v, UUID) else v for k, v in content.items() } @@ -145,24 +148,33 @@ class Keybox: """ Export the keybox data to a file in the specified parent directory. - Args: + Parameters: parent (Path): The parent directory where the keybox data will be saved. Returns: bool: True if any keybox were exported, otherwise False. """ + # Find matching keyboxes based on the device_id keys = self.device_id & self.keybox.keys() + for k in keys: - # Prepare target directory and export file path + # Create the parent directory if it doesn't exist parent.mkdir(parents=True, exist_ok=True) - path_keybox_bin = parent / f"keybox.{'enc' if self.keybox[k][1] else 'bin'}" + + # Define the export file path and extension (encrypted or binary) + path_keybox_bin = parent / ("keybox." + ("enc" if self.keybox[k][1] else "bin")) + + # Write the keybox data to the file path_keybox_bin.write_bytes(self.keybox[k][0]) + # Log export status based on whether the keybox is encrypted if self.keybox[k][1]: - self.logger.warning('Exported encrypted keybox: %s', path_keybox_bin) + self.logger.warning("Exported encrypted keybox: %s", path_keybox_bin) else: - self.logger.info('Exported keybox: %s', path_keybox_bin) + self.logger.info("Exported keybox: %s", path_keybox_bin) + + # Return True if any keyboxes were exported, otherwise False return len(keys) > 0 -__all__ = ('Keybox',) +__all__ = ("Keybox",) diff --git a/keydive/keydive.js b/keydive/keydive.js index ce9c9a4..166bd06 100644 --- a/keydive/keydive.js +++ b/keydive/keydive.js @@ -1,5 +1,5 @@ /** - * Date: 2025-01-11 + * Date: 2025-01-18 * Description: DRM key extraction for research and educational purposes. * Source: https://github.com/hyugogirubato/KeyDive */ @@ -65,6 +65,8 @@ const print = (level, message) => { send(level, message); } +const getVersion = () => Frida.version; + // @Utils const getLibraries = (name) => { @@ -85,7 +87,20 @@ const getLibrary = (name) => { const getFunctions = (library) => { try { - return library.enumerateExports(); + // https://frida.re/news/2025/01/09/frida-16-6-0-released/ + const functions = library.enumerateSymbols().map(item => ({ + type: item.type, + name: item.name, + address: item.address + })); + + library.enumerateExports().forEach(item => { + if (!functions.includes(item)) { + functions.push(item); + } + }); + + return functions; } catch (e) { print(Level.CRITICAL, e.message); return []; @@ -453,6 +468,7 @@ const hookLibrary = (name) => { // RPC interfaces exposed to external calls. rpc.exports = { + getversion: getVersion, getlibraries: getLibraries, hooklibrary: hookLibrary }; diff --git a/keydive/vendor.py b/keydive/vendor.py index 3e85ae5..06af8d2 100644 --- a/keydive/vendor.py +++ b/keydive/vendor.py @@ -7,11 +7,11 @@ class Vendor: """ Initializes a Vendor instance. - Args: - sdk (int): Minimum SDK version required. - oem (int): OEM identifier. + Parameters: + sdk (int): Minimum SDK version required by the vendor. + oem (int): OEM identifier for the vendor. version (str): Version of the vendor. - pattern (str): Name of the vendor. + pattern (str): Name pattern of the vendor. """ self.sdk = sdk self.oem = oem @@ -23,12 +23,12 @@ class Vendor: Returns a string representation of the Vendor instance. Returns: - str: String representation of the Vendor instance. + str: String representation of the Vendor instance with its attributes. """ - return '{name}({items})'.format( + return "{name}({items})".format( name=self.__class__.__name__, - items=', '.join([f'{k}={repr(v)}' for k, v in self.__dict__.items()]) + items=", ".join([f"{k}={repr(v)}" for k, v in self.__dict__.items()]) ) -__all__ = ('Vendor',) +__all__ = ("Vendor",) diff --git a/pyproject.toml b/pyproject.toml index 0ef2bcf..22a45e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "keydive" -version = "2.1.5" +version = "2.2.0" description = "Extract Widevine L3 keys from Android devices effortlessly, spanning multiple Android versions for DRM research and education." license = "MIT" authors = ["hyugogirubato <65763543+hyugogirubato@users.noreply.github.com>"] @@ -36,7 +36,7 @@ include = [ [tool.poetry.dependencies] python = "^3.8" coloredlogs = "^15.0.1" -frida = "^16.5.6" +frida = "^16.6.0" pathlib = "^1.0.1" pycryptodomex = "^3.21.0" pywidevine = "^1.8.0"