update logic to adb instance

This commit is contained in:
hyugogirubato 2024-10-26 15:21:06 +02:00
parent 9480d34151
commit 372bb43f57
1 changed files with 64 additions and 35 deletions

View File

@ -1,6 +1,5 @@
import argparse import argparse
import logging import logging
import subprocess
import time import time
from datetime import datetime from datetime import datetime
@ -10,23 +9,24 @@ import coloredlogs
import keydive import keydive
from keydive.adb import ADB
from keydive.cdm import Cdm from keydive.cdm import Cdm
from keydive.constants import CDM_VENDOR_API from keydive.constants import CDM_VENDOR_API, DRM_PLAYER
from keydive.core import Core from keydive.core import Core
def configure_logging(path: Path, verbose: bool) -> Path: def configure_logging(path: Path = None, verbose: bool = False) -> Path:
""" """
Configures logging for the application. Configures logging for the application.
Args: Args:
path (Path, optional): The path for log files. path (Path, optional): The directory to store log files.
verbose (bool): Whether to enable verbose logging. verbose (bool, optional): Flag to enable detailed debug logging.
Returns: Returns:
Path: The path of log file. Path: The path of log file.
""" """
# Get the root logger # Set up the root logger with the desired logging level
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG if verbose else logging.INFO) root_logger.setLevel(logging.DEBUG if verbose else logging.INFO)
@ -36,9 +36,9 @@ def configure_logging(path: Path, verbose: bool) -> Path:
file_path = None file_path = None
if path: if path:
# Ensure the log directory exists
if path.is_file(): if path.is_file():
path = path.parent path = path.parent
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
# Create a file handler # Create a file handler
@ -47,6 +47,7 @@ def configure_logging(path: Path, verbose: bool) -> Path:
file_handler = logging.FileHandler(file_path) file_handler = logging.FileHandler(file_path)
file_handler.setLevel(logging.DEBUG) file_handler.setLevel(logging.DEBUG)
# Set log formatting
formatter = logging.Formatter( formatter = logging.Formatter(
fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s', fmt='%(asctime)s [%(levelname).1s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
@ -67,24 +68,37 @@ def configure_logging(path: Path, verbose: bool) -> Path:
def main() -> None: def main() -> None:
"""
Main entry point for the KeyDive application.
This application extracts Widevine L3 keys from an Android device.
It supports device management via ADB and allows hooking into Widevine processes.
"""
parser = argparse.ArgumentParser(description='Extract Widevine L3 keys from an Android device.') parser = argparse.ArgumentParser(description='Extract Widevine L3 keys from an Android device.')
# Global options # Global arguments for the application
opt_global = parser.add_argument_group('Global options') group_global = parser.add_argument_group('Global')
opt_global.add_argument('-d', '--device', required=False, type=str, metavar='<id>', help='Specify the target Android device ID to connect with via ADB.') group_global.add_argument('-d', '--device', required=False, type=str, metavar='<id>', help='Specify the target Android device ID for ADB connection.')
opt_global.add_argument('-v', '--verbose', required=False, action='store_true', help='Enable verbose logging for detailed debug output.') group_global.add_argument('-v', '--verbose', required=False, action='store_true', help='Enable verbose logging for detailed debug output.')
opt_global.add_argument('-l', '--log', required=False, type=Path, metavar='<dir>', help='Directory to store log files.') group_global.add_argument('-l', '--log', required=False, type=Path, metavar='<dir>', help='Directory to store log files.')
opt_global.add_argument('--delay', required=False, type=float, metavar='<delay>', default=1, help='Delay (in seconds) between process checks in the watcher.') group_global.add_argument('--delay', required=False, type=float, metavar='<delay>', default=1, help='Delay (in seconds) between process checks.')
opt_global.add_argument('--version', required=False, action='store_true', help='Display KeyDive version information.') group_global.add_argument('--version', required=False, action='store_true', help='Display KeyDive version information.')
# Arguments specific to the CDM (Content Decryption Module)
group_cdm = parser.add_argument_group('Cdm')
group_cdm.add_argument('-o', '--output', required=False, type=Path, default=Path('device'), metavar='<dir>', help='Output directory for extracted data.')
group_cdm.add_argument('-w', '--wvd', required=False, action='store_true', help='Generate a pywidevine WVD device file.')
group_cdm.add_argument('-s', '--skip', required=False, action='store_true', help='Skip auto-detection of the private function.')
group_cdm.add_argument('-a', '--auto', required=False, action='store_true', help='Automatically start the Bitmovin web player.')
group_cdm.add_argument('-p', '--player', required=False, action='store_true', help='Install and start the Kaltura app automatically.')
# Advanced options
group_advanced = parser.add_argument_group('Advanced')
group_advanced.add_argument('-f', '--functions', required=False, type=Path, metavar='<file>', help='Path to Ghidra XML functions file.')
# group_advanced.add_argument('-k', '--keybox', required=False, action='store_true', help='Export keybox if available.')
group_advanced.add_argument('--challenge', required=False, type=Path, metavar='<file>', help='Path to unencrypted challenge for extracting client ID.')
group_advanced.add_argument('--private-key', required=False, type=Path, metavar='<file>', help='Path to private key for extracting client ID.')
# Cdm options
opt_cdm = parser.add_argument_group('Cdm options')
opt_cdm.add_argument('-a', '--auto', required=False, action='store_true', help='Automatically open Bitmovin\'s demo.')
opt_cdm.add_argument('-c', '--challenge', required=False, type=Path, metavar='<file>', help='Path to unencrypted challenge for extracting client ID.')
opt_cdm.add_argument('-w', '--wvd', required=False, action='store_true', help='Generate a pywidevine WVD device file.')
opt_cdm.add_argument('-o', '--output', required=False, type=Path, default=Path('device'), metavar='<dir>', help='Output directory path for extracted data.')
opt_cdm.add_argument('-f', '--functions', required=False, type=Path, metavar='<file>', help='Path to Ghidra XML functions file.')
opt_cdm.add_argument('-s', '--skip', required=False, action='store_true', help='Skip auto-detect of private function.')
args = parser.parse_args() args = parser.parse_args()
if args.version: if args.version:
@ -97,31 +111,32 @@ def main() -> None:
logger.info('Version: %s', keydive.__version__) logger.info('Version: %s', keydive.__version__)
try: try:
# Start the ADB server if not already running # Connect to the specified Android device
sp = subprocess.run(['adb', 'start-server'], capture_output=True) adb = ADB(device=args.device)
if sp.returncode != 0:
raise EnvironmentError('ADB is not recognized as an environment variable, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/PACKAGE.md#adb-android-debug-bridge')
# Initialize Cdm instance # Initialize Cdm instance
cdm = Cdm() cdm = Cdm()
if args.challenge: if args.challenge:
cdm.set_challenge(data=args.challenge) cdm.set_challenge(data=args.challenge)
if args.private_key:
cdm.set_private_key(data=args.private_key, name=None)
# Initialize Core instance for interacting with the device # Initialize Core instance for interacting with the device
core = Core(cdm=cdm, device=args.device, functions=args.functions, skip=args.skip) core = Core(adb=adb, cdm=cdm, functions=args.functions, skip=args.skip)
# Process watcher loop # Process watcher loop
logger.info('Watcher delay: %ss' % args.delay) logger.info('Watcher delay: %ss' % args.delay)
current = None current = None # Variable to track the current Widevine process
while core.running: while core.running:
# Check if for current process data has been exported # Check if for current process data has been exported
if current and cdm.export(args.output, args.wvd): if current and cdm.export(args.output, args.wvd):
raise KeyboardInterrupt raise KeyboardInterrupt # Stop if export is complete
# https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792 # https://github.com/hyugogirubato/KeyDive/issues/14#issuecomment-2146788792
# Get the currently running Widevine processes
processes = { processes = {
key: (name, pid) key: (name, pid)
for name, pid in core.enumerate_processes().items() for name, pid in adb.enumerate_processes().items()
for key in CDM_VENDOR_API.keys() for key in CDM_VENDOR_API.keys()
if key in name or key.replace('-service', '-service-lazy') in name if key in name or key.replace('-service', '-service-lazy') in name
} }
@ -149,13 +164,27 @@ def main() -> None:
elif not core.running: elif not core.running:
raise KeyboardInterrupt raise KeyboardInterrupt
# Setup actions based on user arguments
if current: if current:
logger.info('Successfully hooked') logger.info('Successfully hooked')
if args.auto: if args.player:
logger.info('Starting DRM player launch process...') package = DRM_PLAYER['package']
sp = subprocess.run(['adb', '-s', str(core.device.id), 'shell', 'am', 'start', '-a', 'android.intent.action.VIEW', '-d', 'https://bitmovin.com/demos/drm'], capture_output=True)
if sp.returncode != 0: # Check if the application is already installed
logger.error('Error launching DRM player: %s' % sp.stdout.decode('utf-8').strip()) if not package in adb.list_applications(user=True, system=False):
logger.debug('Application %s not found. Installing...', package)
if not adb.install_application(path=DRM_PLAYER['path'], url=DRM_PLAYER['url']):
logger.error('Failed to install application')
continue # Skip starting the application if installation failed
# Start the application
logger.info('Starting application: %s', package)
adb.start_application(package)
elif args.auto:
logger.info('Opening the Bitmovin web player...')
adb.open_url('https://bitmovin.com/demos/drm')
logger.info('Setup completed')
else: else:
logger.warning('Widevine library not found, searching...') logger.warning('Widevine library not found, searching...')