KeyDive/keydive/core.py

227 lines
9.2 KiB
Python

import json
import logging
import re
from pathlib import Path
import frida
import xmltodict
from frida.core import Session, Script
from keydive.adb import ADB
from keydive.cdm import Cdm
from keydive.constants import OEM_CRYPTO_API, NATIVE_C_API, CDM_FUNCTION_API
from keydive.vendor import Vendor
class Core:
"""
Core class for managing DRM operations and interactions with Android devices.
"""
def __init__(self, adb: ADB, cdm: Cdm, functions: Path = None, skip: bool = False):
"""
Initializes a Core instance.
Parameters:
adb (ADB): ADB instance for device communication.
cdm (Cdm): Instance for handling DRM-related operations.
functions (Path, optional): Path to Ghidra XML file for symbol extraction. Defaults to None.
skip (bool, optional): Whether to skip predefined functions (e.g., OEM_CRYPTO_API). Defaults to False.
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.running = True
self.cdm = cdm
self.adb = adb
# Flag to skip predefined functions based on the vendor's API level
# https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679
self.skip = skip
# Load the hook script with relevant data and prepare for injection
self.functions = functions
self.script = self.__prepare_hook_script()
self.logger.info("Hook script prepared successfully")
def __prepare_hook_script(self) -> str:
"""
Prepares the hook script by injecting library-specific data.
Returns:
str: The finalized hook script content with placeholders replaced.
"""
# Read the base JavaScript template file
content = Path(__file__).with_name("keydive.js").read_text(encoding="utf-8")
# Generate the list of symbols from the functions file
symbols = self.__prepare_symbols(self.functions)
# Define the placeholder replacements
replacements = {
"${OEM_CRYPTO_API}": json.dumps(list(OEM_CRYPTO_API)),
"${NATIVE_C_API}": json.dumps(list(NATIVE_C_API)),
"${SYMBOLS}": json.dumps(symbols),
"${SKIP}": str(self.skip)
}
# Replace placeholders in the script content
for placeholder, value in replacements.items():
content = content.replace(placeholder, value, 1)
return content
def __prepare_symbols(self, path: Path) -> list:
"""
Extracts relevant functions from a Ghidra XML file.
Parameters:
path (Path): Path to the Ghidra XML functions file.
Returns:
list: List of selected functions as dictionaries.
Raises:
FileNotFoundError: If the functions file is not found.
ValueError: If functions extraction fails.
"""
# Return an empty list if no path is provided
if not path:
return []
try:
# Parse the XML file and extract program data
program = xmltodict.parse(path.read_bytes())["PROGRAM"]
addr_base = int(program["@IMAGE_BASE"], 16) # Base address for function addresses
functions = program["FUNCTIONS"]["FUNCTION"] # List of functions in the XML
# Identify a target function from the predefined OEM_CRYPTO_API list (if not skipped)
target = next((f["@NAME"] for f in functions if f["@NAME"] in OEM_CRYPTO_API and not self.skip), None)
# Prepare a dictionary to store selected functions
selected = {}
for func in functions:
name = func["@NAME"] # Function name
args = len(func.get("REGISTER_VAR", [])) # Number of arguments
"""
Add the function if it matches specific criteria
- Match the target function if identified
- Match API keywords
- Match unnamed functions with 6+ args
"""
if name not in selected and (
name == target
or any(True if self.skip else keyword in name for keyword in CDM_FUNCTION_API)
or (not target and re.match(r"^[a-z]+$", name) and args >= 6)
):
selected[name] = {
"type": "function",
"name": name,
"address": hex(int(func["@ENTRY_POINT"], 16) - addr_base) # Calculate relative address
}
# Return the list of selected functions
return list(selected.values())
except FileNotFoundError as e:
raise FileNotFoundError(f"Functions file not found: {path}") from e
except Exception as e:
raise ValueError("Failed to extract functions from Ghidra XML file") from e
def __process_message(self, message: dict, data: bytes) -> None:
"""
Handles messages received from the Frida script.
Parameters:
message (dict): The message payload.
data (bytes): The raw data associated with the message.
"""
logger = logging.getLogger("Script")
level = message.get("payload")
if isinstance(level, int):
# Log the message based on its severity level
logger.log(level=level, msg=data.decode("utf-8"))
if level in (logging.FATAL, logging.CRITICAL):
self.running = False # Stop the process on critical errors
elif isinstance(level, dict) and "private_key" in level:
# Set the private key in the DRM handler
self.cdm.set_private_key(data=data, name=level["private_key"])
elif level == "challenge":
# Set the challenge data in the DRM handler
self.cdm.set_challenge(data=data)
elif level == "device_id":
# Set the device ID in the DRM handler
self.cdm.set_device_id(data)
elif level == "keybox":
# Set the keybox data in the DRM handler
self.cdm.set_keybox(data)
def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool:
"""
Hooks into the specified process.
Parameters:
pid (int): The process ID to hook.
vendor (Vendor): Instance of Vendor class representing the vendor information.
timeout (int, optional): Timeout for attaching to the process. Defaults to 0.
Returns:
bool: True if the process was successfully hooked, otherwise False.
"""
try:
# Attach to the target process using the specified PID.
# The 'persist_timeout' parameter ensures the session persists for the given duration.
session: Session = self.adb.device.attach(pid, persist_timeout=timeout)
except frida.ServerNotRunningError as e:
# Handle the case where the Frida server is not running on the device.
raise EnvironmentError("Frida server is not running") from e
except Exception as e:
# Log other exceptions and return False to indicate failure.
self.logger.error(e)
return False
# Define a callback to handle when the process is destroyed.
def __process_destroyed() -> None:
session.detach()
# Create a Frida script object using the prepared script content.
script: Script = session.create_script(self.script)
script.on("message", self.__process_message)
script.on("destroyed", __process_destroyed)
script.load()
# Fetch a list of libraries loaded by the target process.
libraries = script.exports_sync.getlibraries()
library = next((l for l in libraries if re.match(vendor.pattern, l["name"])), None)
if library:
# Log information about the library if it is found.
self.logger.info("Library: %s (%s)", library["name"], library["path"])
# Retrieve and log the version of the Frida server.
version = script.exports_sync.getversion()
self.logger.debug(f"Server: %s", version)
# Determine if the Frida server version is older than 16.6.0.
code = tuple(map(int, version.split(".")))
minimum = code[0] > 16 or (code[0] == 16 and code[1] >= 6)
# Warn the user if certain conditions related to the functions option are met.
if minimum and self.functions:
self.logger.warning("The '--functions' option is deprecated starting from Frida 16.6.0")
elif not minimum and vendor.oem < 18 and self.functions:
self.logger.warning("The '--functions' option is deprecated for OEM API < 18")
elif not minimum and vendor.oem > 17 and not self.functions:
self.logger.warning("For OEM API > 17, specifying '--functions' is required. Refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md")
return script.exports_sync.hooklibrary(library["name"])
# Unload the script if the target library is not found.
script.unload()
self.logger.warning("Library not found: %s" % vendor.pattern)
return False
__all__ = ("Core",)