KeyDive/keydive/core.py

237 lines
8.8 KiB
Python
Raw Normal View History

2024-07-06 18:01:47 +00:00
import json
import logging
import re
import subprocess
from pathlib import Path
import frida
import xmltodict
from frida.core import Device, Session, Script
from keydive.cdm import Cdm
from keydive.constants import OEM_CRYPTO_API, NATIVE_C_API, CDM_FUNCTION_API
from keydive.vendor import Vendor
class Core:
"""
Core class for handling DRM operations and device interactions.
"""
2024-07-07 08:22:21 +00:00
def __init__(self, cdm: Cdm, device: str = None, functions: Path = None):
2024-07-06 18:01:47 +00:00
"""
Initializes a Core instance.
Args:
cdm (Cdm): Instance of Cdm for managing DRM related operations.
device (str, optional): ID of the Android device to connect to via ADB. Defaults to None (uses USB device).
2024-07-07 08:22:21 +00:00
functions (Path, optional): Path to Ghidra XML functions file for symbol extraction. Defaults to None.
2024-07-06 18:01:47 +00:00
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.running = True
self.cdm = cdm
# Select device based on provided ID or default to the first USB device.
self.device: Device = frida.get_device(id=device, timeout=5) if device else frida.get_usb_device(timeout=5)
self.logger.info('Device: %s (%s)', self.device.name, self.device.id)
# Obtain device properties
properties = self.device_properties()
2024-07-08 17:23:49 +00:00
self.logger.info('SDK API: %s', properties['ro.build.version.sdk'])
2024-07-06 18:01:47 +00:00
self.logger.info('ABI CPU: %s', properties['ro.product.cpu.abi'])
# Load the hook script
2024-07-07 08:22:21 +00:00
self.functions = functions
2024-07-06 18:01:47 +00:00
self.script = self.__prepare_hook_script()
self.logger.info('Script loaded successfully')
def __prepare_hook_script(self) -> str:
"""
Prepares the hook script content by injecting the library-specific scripts.
Returns:
str: The prepared script content.
"""
2024-07-07 08:22:55 +00:00
content = Path(__file__).with_name('keydive.js').read_text(encoding='utf-8')
2024-07-07 08:22:21 +00:00
symbols = self.__prepare_symbols(self.functions)
2024-07-06 18:01:47 +00:00
# Replace placeholders in script template
replacements = {
'${OEM_CRYPTO_API}': json.dumps(list(OEM_CRYPTO_API)),
'${NATIVE_C_API}': json.dumps(list(NATIVE_C_API)),
2024-07-07 08:22:21 +00:00
'${SYMBOLS}': json.dumps(symbols)
2024-07-06 18:01:47 +00:00
}
for placeholder, value in replacements.items():
content = content.replace(placeholder, value)
return content
def __prepare_symbols(self, path: Path) -> list:
"""
Parses the provided XML functions file to select relevant functions.
Args:
path (Path): Path to Ghidra XML functions file.
Returns:
list: List of selected functions as dictionaries.
Raises:
FileNotFoundError: If the functions file is not found.
ValueError: If functions extraction fails.
"""
if not path:
return []
elif not path.is_file():
raise FileNotFoundError('Functions file not found')
try:
program = xmltodict.parse(path.read_bytes())['PROGRAM']
addr_base = int(program['@IMAGE_BASE'], 16)
functions = program['FUNCTIONS']['FUNCTION']
# Find a target function from a predefined list
target = next((f['@NAME'] for f in functions if f['@NAME'] in OEM_CRYPTO_API), None)
# Extract relevant functions
selected = {}
for func in functions:
name = func['@NAME']
args = len(func.get('REGISTER_VAR', []))
# Add function if it matches specific criteria
if name not in selected and (
name == target
or any(keyword in name for keyword in CDM_FUNCTION_API)
or (not target and re.match(r'^[a-z]+$', name) and args >= 6)
):
selected[name] = {
'type': 'function',
'name': name,
'address': hex(int(func['@ENTRY_POINT'], 16) - addr_base)
}
return list(selected.values())
except Exception as e:
raise ValueError('Failed to extract functions from Ghidra') from e
def device_properties(self) -> dict:
"""
Retrieves system properties from the connected device using ADB shell commands.
Returns:
dict: A dictionary of device properties.
"""
# https://source.android.com/docs/core/architecture/configuration/add-system-properties?#shell-commands
properties = {}
sp = subprocess.run(['adb', '-s', str(self.device.id), 'shell', 'getprop'], capture_output=True)
for line in sp.stdout.decode('utf-8').splitlines():
match = re.match(r'\[(.*?)\]: \[(.*?)\]', line)
if match:
key, value = match.groups()
# Attempt to cast numeric and boolean values to appropriate types
try:
value = int(value)
except ValueError:
if value.lower() in ('true', 'false'):
value = value.lower() == 'true'
properties[key] = value
return properties
def enumerate_processes(self) -> dict:
"""
Lists processes running on the device, returning a mapping of process names to PIDs.
Returns:
dict: A dictionary mapping process names to PIDs.
"""
processes = {}
# https://github.com/frida/frida/issues/1225#issuecomment-604181822
prompt = ['adb', '-s', str(self.device.id), 'shell', 'ps']
sp = subprocess.run([*prompt, '-A'], capture_output=True)
if sp.returncode != 0:
sp = subprocess.run(prompt, capture_output=True)
# Iterate through lines starting from the second line (skipping header)
for line in sp.stdout.decode('utf-8').splitlines()[1:]:
try:
line = line.split() # USER,PID,PPID,VSZ,RSS,WCHAN,ADDR,S,NAME
name = ' '.join(line[8:]).strip()
name = name if name.startswith('[') else Path(name).name
processes[name] = int(line[1])
except Exception:
pass
return processes
def __process_message(self, message: dict, data: bytes) -> None:
"""
Handles messages received from the Frida script.
Args:
message (dict): The message payload.
data (bytes): The raw data associated with the message.
"""
logger = logging.getLogger('Script')
level = message.get('payload')
if isinstance(level, int):
# Process logging messages from Frida script
logger.log(level=level, msg=data.decode('utf-8'))
if level in (logging.FATAL, logging.CRITICAL):
self.running = False
elif level == 'challenge':
self.cdm.set_challenge(data=data)
elif level == 'private_key':
self.cdm.set_private_key(data=data)
elif level == 'client_id':
self.cdm.set_client_id(data=data)
def hook_process(self, pid: int, vendor: Vendor, timeout: int = 0) -> bool:
"""
Hooks into the specified process.
Args:
pid (int): The process ID to hook.
vendor (Vendor): Instance of Vendor class representing the vendor information.
timeout (int, optional): Timeout for attaching to the process. Defaults to 0.
Returns:
bool: True if the process was successfully hooked, otherwise False.
"""
try:
session: Session = self.device.attach(pid, persist_timeout=timeout)
2024-07-06 19:41:33 +00:00
except frida.ServerNotRunningError as e:
raise EnvironmentError('Frida server is not running') from e
2024-07-06 18:01:47 +00:00
except Exception as e:
self.logger.error(e)
return False
def __process_destroyed() -> None:
session.detach()
script: Script = session.create_script(self.script)
script.on('message', self.__process_message)
script.on('destroyed', __process_destroyed)
script.load()
library = script.exports_sync.getlibrary(vendor.name)
if library:
self.logger.info('Library: %s (%s)', library['name'], library['path'])
# Check if Ghidra XML functions loaded
2024-07-07 08:22:21 +00:00
if vendor.oem > 17 and not self.functions:
2024-07-06 18:01:47 +00:00
self.logger.warning('For OEM API > 17, specifying "functions" is required, refer to https://github.com/hyugogirubato/KeyDive/blob/main/docs/FUNCTIONS.md')
2024-07-07 08:22:21 +00:00
elif vendor.oem < 18 and self.functions:
2024-07-06 18:01:47 +00:00
self.logger.warning('The "functions" attribute is deprecated for OEM API < 18')
return script.exports_sync.hooklibrary(vendor.name)
script.unload()
self.logger.warning('Library not found: %s' % vendor.name)
return False
__all__ = ('Core',)