KeyDive/keydive.py

57 lines
2.0 KiB
Python
Raw Permalink Normal View History

2024-03-30 19:03:15 +00:00
import argparse
import logging
2024-04-01 10:24:00 +00:00
import subprocess
2024-03-30 19:03:15 +00:00
import time
import coloredlogs
from _frida import Process
2024-03-31 13:27:10 +00:00
from pathlib import Path
2024-03-30 19:03:15 +00:00
import extractor
2024-03-30 19:03:15 +00:00
from extractor.cdm import Cdm
coloredlogs.install(
fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
if __name__ == '__main__':
logger = logging.getLogger('KeyDive')
# Parse command line arguments for device ID
parser = argparse.ArgumentParser(description='Extract Widevine L3 keys from an Android device.')
parser.add_argument('-d', '--device', required=False, type=str, help='Target Android device ID.')
parser.add_argument('-f', '--functions', required=False, type=Path, help='Ghidra XML functions file.')
2024-03-30 19:03:15 +00:00
args = parser.parse_args()
try:
logger.info('Version: %s', extractor.__version__)
2024-04-01 10:24:00 +00:00
# Start ADB server
exitcode, _ = subprocess.getstatusoutput('adb start-server')
if exitcode != 0:
raise EnvironmentError('ADB is not recognized as an environment variable')
2024-03-31 13:27:10 +00:00
2024-03-30 19:03:15 +00:00
# Initialize CDM handler with given device
cdm = Cdm(device=args.device, functions=args.functions)
2024-03-30 19:03:15 +00:00
# Find Widevine process on the device
process: Process = next((p for p in cdm.device.enumerate_processes() if cdm.vendor.process == p.name), None)
if not process:
raise Exception('Failed to find Widevine process')
logger.info('Process: %s (%s)', process.pid, process.name)
# Hook into the process to extract DRM keys
if not cdm.hook_process(process):
raise Exception('Failed to hook into the process')
logger.info('Successfully hooked. To test, play a DRM-protected video: https://bitmovin.com/demos/drm')
# Keep script running while extracting keys
while cdm.running:
time.sleep(1)
except KeyboardInterrupt:
pass
except Exception as e:
logger.critical(e)
logger.info('Exiting')