remove device adb process

This commit is contained in:
hyugogirubato 2024-10-26 15:21:54 +02:00
parent a3f0dac84c
commit 57e71699f1
1 changed files with 7 additions and 64 deletions

View File

@ -1,15 +1,15 @@
import json import json
import logging import logging
import re import re
import subprocess
from pathlib import Path from pathlib import Path
import frida import frida
import xmltodict import xmltodict
from frida.core import Device, Session, Script from frida.core import Session, Script
from keydive.adb import ADB
from keydive.cdm import Cdm from keydive.cdm import Cdm
from keydive.constants import OEM_CRYPTO_API, NATIVE_C_API, CDM_FUNCTION_API from keydive.constants import OEM_CRYPTO_API, NATIVE_C_API, CDM_FUNCTION_API
from keydive.vendor import Vendor from keydive.vendor import Vendor
@ -17,35 +17,27 @@ from keydive.vendor import Vendor
class Core: class Core:
""" """
Core class for handling DRM operations and device interactions. Core class for managing DRM operations and interactions with Android devices.
""" """
def __init__(self, cdm: Cdm, device: str = None, functions: Path = None, skip: bool = False): def __init__(self, adb: ADB, cdm: Cdm, functions: Path = None, skip: bool = False):
""" """
Initializes a Core instance. Initializes a Core instance.
Args: Args:
adb (ADB): ADB instance for device communication.
cdm (Cdm): Instance of Cdm for managing DRM related operations. cdm (Cdm): Instance of Cdm for managing DRM related operations.
device (str, optional): ID of the Android device to connect to via ADB. Defaults to None (uses USB device).
functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None. functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None.
skip (bool, optional): Flag to determine whether to skip predefined functions (e.g., OEM_CRYPTO_API). skip (bool, optional): Flag to determine whether to skip predefined functions (e.g., OEM_CRYPTO_API).
""" """
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.running = True self.running = True
self.cdm = cdm self.cdm = cdm
self.adb = adb
# https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679 # https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679
self.skip = skip self.skip = skip
# Select device based on provided ID or default to the first USB device.
self.device: Device = frida.get_device(id=device, timeout=5) if device else frida.get_usb_device(timeout=5)
self.logger.info('Device: %s (%s)', self.device.name, self.device.id)
# Obtain device properties
properties = self.device_properties()
self.logger.info('SDK API: %s', properties['ro.build.version.sdk'])
self.logger.info('ABI CPU: %s', properties['ro.product.cpu.abi'])
# Load the hook script # Load the hook script
self.functions = functions self.functions = functions
self.script = self.__prepare_hook_script() self.script = self.__prepare_hook_script()
@ -122,55 +114,6 @@ class Core:
except Exception as e: except Exception as e:
raise ValueError('Failed to extract functions from Ghidra') from e raise ValueError('Failed to extract functions from Ghidra') from e
def device_properties(self) -> dict:
"""
Retrieves system properties from the connected device using ADB shell commands.
Returns:
dict: A dictionary of device properties.
"""
# https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands
properties = {}
sp = subprocess.run(['adb', '-s', str(self.device.id), 'shell', 'getprop'], capture_output=True)
for line in sp.stdout.decode('utf-8').splitlines():
match = re.match(r'\[(.*?)\]: \[(.*?)\]', line)
if match:
key, value = match.groups()
# Attempt to cast numeric and boolean values to appropriate types
try:
value = int(value)
except ValueError:
if value.lower() in ('true', 'false'):
value = value.lower() == 'true'
properties[key] = value
return properties
def enumerate_processes(self) -> dict:
"""
Lists processes running on the device, returning a mapping of process names to PIDs.
Returns:
dict: A dictionary mapping process names to PIDs.
"""
processes = {}
# https://github.com/frida/frida/issues/1225#issuecomment-604181822
prompt = ['adb', '-s', str(self.device.id), 'shell', 'ps']
lines = subprocess.run([*prompt, '-A'], capture_output=True).stdout.decode('utf-8').splitlines()
if len(lines) < 10:
lines = subprocess.run(prompt, capture_output=True).stdout.decode('utf-8').splitlines()
# Iterate through lines starting from the second line (skipping header)
for line in lines[1:]:
try:
line = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME
name = ' '.join(line[8:]).strip()
name = name if name.startswith('[') else Path(name).name
processes[name] = int(line[1])
except Exception:
pass
return processes
def __process_message(self, message: dict, data: bytes) -> None: def __process_message(self, message: dict, data: bytes) -> None:
""" """
Handles messages received from the Frida script. Handles messages received from the Frida script.
@ -207,7 +150,7 @@ class Core:
bool: True if the process was successfully hooked, otherwise False. bool: True if the process was successfully hooked, otherwise False.
""" """
try: try:
session: Session = self.device.attach(pid, persist_timeout=timeout) session: Session = self.adb.device.attach(pid, persist_timeout=timeout)
except frida.ServerNotRunningError as e: except frida.ServerNotRunningError as e:
raise EnvironmentError('Frida server is not running') from e raise EnvironmentError('Frida server is not running') from e
except Exception as e: except Exception as e: