diff --git a/keydive/core.py b/keydive/core.py index 792e615..dfbec0d 100644 --- a/keydive/core.py +++ b/keydive/core.py @@ -1,15 +1,15 @@ import json import logging import re -import subprocess from pathlib import Path import frida import xmltodict -from frida.core import Device, Session, Script +from frida.core import Session, Script +from keydive.adb import ADB from keydive.cdm import Cdm from keydive.constants import OEM_CRYPTO_API, NATIVE_C_API, CDM_FUNCTION_API from keydive.vendor import Vendor @@ -17,35 +17,27 @@ from keydive.vendor import Vendor class Core: """ - Core class for handling DRM operations and device interactions. + Core class for managing DRM operations and interactions with Android devices. """ - def __init__(self, cdm: Cdm, device: str = None, functions: Path = None, skip: bool = False): + def __init__(self, adb: ADB, cdm: Cdm, functions: Path = None, skip: bool = False): """ Initializes a Core instance. Args: + adb (ADB): ADB instance for device communication. cdm (Cdm): Instance of Cdm for managing DRM related operations. - device (str, optional): ID of the Android device to connect to via ADB. Defaults to None (uses USB device). functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None. skip (bool, optional): Flag to determine whether to skip predefined functions (e.g., OEM_CRYPTO_API). """ self.logger = logging.getLogger(self.__class__.__name__) self.running = True self.cdm = cdm + self.adb = adb # https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679 self.skip = skip - # Select device based on provided ID or default to the first USB device. - self.device: Device = frida.get_device(id=device, timeout=5) if device else frida.get_usb_device(timeout=5) - self.logger.info('Device: %s (%s)', self.device.name, self.device.id) - - # Obtain device properties - properties = self.device_properties() - self.logger.info('SDK API: %s', properties['ro.build.version.sdk']) - self.logger.info('ABI CPU: %s', properties['ro.product.cpu.abi']) - # Load the hook script self.functions = functions self.script = self.__prepare_hook_script() @@ -122,55 +114,6 @@ class Core: except Exception as e: raise ValueError('Failed to extract functions from Ghidra') from e - def device_properties(self) -> dict: - """ - Retrieves system properties from the connected device using ADB shell commands. - - Returns: - dict: A dictionary of device properties. - """ - # https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands - properties = {} - sp = subprocess.run(['adb', '-s', str(self.device.id), 'shell', 'getprop'], capture_output=True) - for line in sp.stdout.decode('utf-8').splitlines(): - match = re.match(r'\[(.*?)\]: \[(.*?)\]', line) - if match: - key, value = match.groups() - # Attempt to cast numeric and boolean values to appropriate types - try: - value = int(value) - except ValueError: - if value.lower() in ('true', 'false'): - value = value.lower() == 'true' - properties[key] = value - return properties - - def enumerate_processes(self) -> dict: - """ - Lists processes running on the device, returning a mapping of process names to PIDs. - - Returns: - dict: A dictionary mapping process names to PIDs. - """ - processes = {} - - # https://github.com/frida/frida/issues/1225#issuecomment-604181822 - prompt = ['adb', '-s', str(self.device.id), 'shell', 'ps'] - lines = subprocess.run([*prompt, '-A'], capture_output=True).stdout.decode('utf-8').splitlines() - if len(lines) < 10: - lines = subprocess.run(prompt, capture_output=True).stdout.decode('utf-8').splitlines() - # Iterate through lines starting from the second line (skipping header) - for line in lines[1:]: - try: - line = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME - name = ' '.join(line[8:]).strip() - name = name if name.startswith('[') else Path(name).name - processes[name] = int(line[1]) - except Exception: - pass - - return processes - def __process_message(self, message: dict, data: bytes) -> None: """ Handles messages received from the Frida script. @@ -207,7 +150,7 @@ class Core: bool: True if the process was successfully hooked, otherwise False. """ try: - session: Session = self.device.attach(pid, persist_timeout=timeout) + session: Session = self.adb.device.attach(pid, persist_timeout=timeout) except frida.ServerNotRunningError as e: raise EnvironmentError('Frida server is not running') from e except Exception as e: