2024-07-06 18:01:47 +00:00
import json
import logging
import re
2024-07-17 22:09:04 +00:00
2024-07-06 18:01:47 +00:00
from pathlib import Path
import frida
import xmltodict
2024-07-17 22:09:04 +00:00
2024-10-26 13:21:54 +00:00
from frida . core import Session , Script
2024-07-06 18:01:47 +00:00
2024-10-26 13:21:54 +00:00
from keydive . adb import ADB
2024-07-06 18:01:47 +00:00
from keydive . cdm import Cdm
from keydive . constants import OEM_CRYPTO_API , NATIVE_C_API , CDM_FUNCTION_API
from keydive . vendor import Vendor
class Core :
"""
2024-10-26 13:21:54 +00:00
Core class for managing DRM operations and interactions with Android devices .
2024-07-06 18:01:47 +00:00
"""
2024-10-26 13:21:54 +00:00
def __init__ ( self , adb : ADB , cdm : Cdm , functions : Path = None , skip : bool = False ) :
2024-07-06 18:01:47 +00:00
"""
Initializes a Core instance .
Args :
2024-10-26 13:21:54 +00:00
adb ( ADB ) : ADB instance for device communication .
2024-07-06 18:01:47 +00:00
cdm ( Cdm ) : Instance of Cdm for managing DRM related operations .
2024-07-07 08:22:21 +00:00
functions ( Path , optional ) : Path to Ghidra XML functions file for symbol extraction . Defaults to None .
2024-10-18 18:26:41 +00:00
skip ( bool , optional ) : Flag to determine whether to skip predefined functions ( e . g . , OEM_CRYPTO_API ) .
2024-07-06 18:01:47 +00:00
"""
self . logger = logging . getLogger ( self . __class__ . __name__ )
self . running = True
self . cdm = cdm
2024-10-26 13:21:54 +00:00
self . adb = adb
2024-10-18 18:31:10 +00:00
# https://github.com/hyugogirubato/KeyDive/issues/38#issuecomment-2411932679
2024-10-27 18:39:09 +00:00
# Flag to skip predefined functions based on the vendor's API level
2024-10-18 18:26:41 +00:00
self . skip = skip
2024-07-06 18:01:47 +00:00
2024-10-27 18:39:09 +00:00
# Load the hook script and prepare for injection
2024-07-07 08:22:21 +00:00
self . functions = functions
2024-07-06 18:01:47 +00:00
self . script = self . __prepare_hook_script ( )
self . logger . info ( ' Script loaded successfully ' )
def __prepare_hook_script ( self ) - > str :
"""
Prepares the hook script content by injecting the library - specific scripts .
Returns :
str : The prepared script content .
"""
2024-07-07 08:22:55 +00:00
content = Path ( __file__ ) . with_name ( ' keydive.js ' ) . read_text ( encoding = ' utf-8 ' )
2024-07-07 08:22:21 +00:00
symbols = self . __prepare_symbols ( self . functions )
2024-07-06 18:01:47 +00:00
# Replace placeholders in script template
replacements = {
' $ {OEM_CRYPTO_API} ' : json . dumps ( list ( OEM_CRYPTO_API ) ) ,
' $ {NATIVE_C_API} ' : json . dumps ( list ( NATIVE_C_API ) ) ,
2024-10-18 18:26:41 +00:00
' $ {SYMBOLS} ' : json . dumps ( symbols ) ,
' $ {SKIP} ' : str ( self . skip )
2024-07-06 18:01:47 +00:00
}
for placeholder , value in replacements . items ( ) :
content = content . replace ( placeholder , value )
return content
2024-10-18 18:26:41 +00:00
def __prepare_symbols ( self , path : Path ) - > list :
2024-07-06 18:01:47 +00:00
"""
Parses the provided XML functions file to select relevant functions .
Args :
path ( Path ) : Path to Ghidra XML functions file .
Returns :
list : List of selected functions as dictionaries .
Raises :
FileNotFoundError : If the functions file is not found .
ValueError : If functions extraction fails .
"""
if not path :
return [ ]
elif not path . is_file ( ) :
raise FileNotFoundError ( ' Functions file not found ' )
try :
program = xmltodict . parse ( path . read_bytes ( ) ) [ ' PROGRAM ' ]
addr_base = int ( program [ ' @IMAGE_BASE ' ] , 16 )
functions = program [ ' FUNCTIONS ' ] [ ' FUNCTION ' ]
# Find a target function from a predefined list
2024-10-18 18:26:41 +00:00
target = None if self . skip else next ( ( f [ ' @NAME ' ] for f in functions if f [ ' @NAME ' ] in OEM_CRYPTO_API ) , None )
2024-07-06 18:01:47 +00:00
# Extract relevant functions
selected = { }
for func in functions :
name = func [ ' @NAME ' ]
args = len ( func . get ( ' REGISTER_VAR ' , [ ] ) )
# Add function if it matches specific criteria
if name not in selected and (
name == target
2024-10-28 20:29:28 +00:00
or any ( True if self . skip else keyword in name for keyword in CDM_FUNCTION_API )
2024-07-06 18:01:47 +00:00
or ( not target and re . match ( r ' ^[a-z]+$ ' , name ) and args > = 6 )
) :
selected [ name ] = {
' type ' : ' function ' ,
' name ' : name ,
' address ' : hex ( int ( func [ ' @ENTRY_POINT ' ] , 16 ) - addr_base )
}
return list ( selected . values ( ) )
except Exception as e :
raise ValueError ( ' Failed to extract functions from Ghidra ' ) from e
def __process_message ( self , message : dict , data : bytes ) - > None :
"""
Handles messages received from the Frida script .
Args :
message ( dict ) : The message payload .
data ( bytes ) : The raw data associated with the message .
"""
logger = logging . getLogger ( ' Script ' )
level = message . get ( ' payload ' )
if isinstance ( level , int ) :
# Process logging messages from Frida script
logger . log ( level = level , msg = data . decode ( ' utf-8 ' ) )
if level in ( logging . FATAL , logging . CRITICAL ) :
self . running = False
2024-10-20 14:07:23 +00:00
elif isinstance ( level , dict ) and ' private_key ' in level :
self . cdm . set_private_key ( data = data , name = level [ ' private_key ' ] )
2024-07-06 18:01:47 +00:00
elif level == ' challenge ' :
self . cdm . set_challenge ( data = data )
2024-10-27 18:39:09 +00:00
elif level == ' device_id ' :
self . cdm . set_device_id ( data )
elif level == ' keybox ' :
self . cdm . set_keybox ( data )
2024-07-06 18:01:47 +00:00
def hook_process ( self , pid : int , vendor : Vendor , timeout : int = 0 ) - > bool :
"""
Hooks into the specified process .
Args :
pid ( int ) : The process ID to hook .
vendor ( Vendor ) : Instance of Vendor class representing the vendor information .
timeout ( int , optional ) : Timeout for attaching to the process . Defaults to 0.
Returns :
bool : True if the process was successfully hooked , otherwise False .
"""
try :
2024-10-26 13:21:54 +00:00
session : Session = self . adb . device . attach ( pid , persist_timeout = timeout )
2024-07-06 19:41:33 +00:00
except frida . ServerNotRunningError as e :
raise EnvironmentError ( ' Frida server is not running ' ) from e
2024-07-06 18:01:47 +00:00
except Exception as e :
self . logger . error ( e )
return False
def __process_destroyed ( ) - > None :
session . detach ( )
script : Script = session . create_script ( self . script )
script . on ( ' message ' , self . __process_message )
script . on ( ' destroyed ' , __process_destroyed )
script . load ( )
library = script . exports_sync . getlibrary ( vendor . name )
if library :
self . logger . info ( ' Library: %s ( %s ) ' , library [ ' name ' ] , library [ ' path ' ] )
# Check if Ghidra XML functions loaded
2024-07-07 08:22:21 +00:00
if vendor . oem > 17 and not self . functions :
2024-07-06 18:01:47 +00:00
self . logger . warning ( ' For OEM API > 17, specifying " functions " is required, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md ' )
2024-07-07 08:22:21 +00:00
elif vendor . oem < 18 and self . functions :
2024-07-06 18:01:47 +00:00
self . logger . warning ( ' The " functions " attribute is deprecated for OEM API < 18 ' )
return script . exports_sync . hooklibrary ( vendor . name )
script . unload ( )
self . logger . warning ( ' Library not found: %s ' % vendor . name )
return False
__all__ = ( ' Core ' , )