+ Tidy up code, add comments

This commit is contained in:
BuildTools 2024-11-27 20:13:07 +01:00
parent d6cbc7f867
commit b2e9725b3a
9 changed files with 65 additions and 29 deletions

View File

@ -185,6 +185,8 @@ class _BCertStructs:
class Certificate(_BCertStructs): class Certificate(_BCertStructs):
"""Represents a BCert"""
def __init__( def __init__(
self, self,
parsed_bcert: Container, parsed_bcert: Container,
@ -263,7 +265,7 @@ class Certificate(_BCertStructs):
key=signing_key.public_bytes(), key=signing_key.public_bytes(),
usages_count=1, usages_count=1,
usages=ListContainer([ usages=ListContainer([
1 1 # KEYUSAGE_SIGN
]) ])
) )
cert_key_encrypt = Container( cert_key_encrypt = Container(
@ -273,7 +275,7 @@ class Certificate(_BCertStructs):
key=encryption_key.public_bytes(), key=encryption_key.public_bytes(),
usages_count=1, usages_count=1,
usages=ListContainer([ usages=ListContainer([
2 2 # KEYUSAGE_ENCRYPT_KEY
]) ])
) )
key_info = Container( key_info = Container(
@ -380,6 +382,8 @@ class Certificate(_BCertStructs):
class CertificateChain(_BCertStructs): class CertificateChain(_BCertStructs):
"""Represents a BCertChain"""
def __init__( def __init__(
self, self,
parsed_bcert_chain: Container, parsed_bcert_chain: Container,

View File

@ -40,6 +40,7 @@ class _DeviceStructs:
class Device: class Device:
"""Represents a PlayReady Device (.prd)"""
CURRENT_STRUCT = _DeviceStructs.v2 CURRENT_STRUCT = _DeviceStructs.v2
def __init__( def __init__(

View File

@ -11,15 +11,14 @@ from ecpy.curves import Curve, Point
class ECCKey: class ECCKey:
def __init__(
self,
key: EccKey
):
"""Represents a PlayReady ECC key""" """Represents a PlayReady ECC key"""
def __init__(self, key: EccKey):
self.key = key self.key = key
@classmethod @classmethod
def generate(cls): def generate(cls):
"""Generate a new ECC key pair"""
return cls(key=ECC.generate(curve='P-256')) return cls(key=ECC.generate(curve='P-256'))
@classmethod @classmethod
@ -29,6 +28,7 @@ class ECCKey:
public_key_x: Union[bytes, int], public_key_x: Union[bytes, int],
public_key_y: Union[bytes, int] public_key_y: Union[bytes, int]
): ):
"""Construct an ECC key pair from private/public bytes/ints"""
if isinstance(private_key, bytes): if isinstance(private_key, bytes):
private_key = int.from_bytes(private_key, 'big') private_key = int.from_bytes(private_key, 'big')
if not isinstance(private_key, int): if not isinstance(private_key, int):

View File

@ -5,7 +5,10 @@ import secrets
class ElGamal: class ElGamal:
"""ElGamal ECC utility using ecpy"""
def __init__(self, curve: Curve): def __init__(self, curve: Curve):
"""Initialize the utility with a given curve type ('secp256r1' for PlayReady)"""
self.curve = curve self.curve = curve
@staticmethod @staticmethod
@ -15,21 +18,24 @@ class ElGamal:
byte_len += 1 byte_len += 1
return n.to_bytes(byte_len, 'big') return n.to_bytes(byte_len, 'big')
def encrypt( def encrypt(self, message_point: Point, public_key: Point) -> Tuple[Point, Point]:
self, """
message_point: Point, Encrypt a single point with a given public key
public_key: Point
) -> Tuple[Point, Point]: Returns an encrypted point pair
"""
ephemeral_key = secrets.randbelow(self.curve.order) ephemeral_key = secrets.randbelow(self.curve.order)
point1 = ephemeral_key * self.curve.generator point1 = ephemeral_key * self.curve.generator
point2 = message_point + (ephemeral_key * public_key) point2 = message_point + (ephemeral_key * public_key)
return point1, point2 return point1, point2
@staticmethod @staticmethod
def decrypt( def decrypt(encrypted: Tuple[Point, Point], private_key: int) -> Point:
encrypted: Tuple[Point, Point], """
private_key: int Decrypt and encrypted point pair with a given private key
) -> Point:
Returns a single decrypted point
"""
point1, point2 = encrypted point1, point2 = encrypted
shared_secret = private_key * point1 shared_secret = private_key * point1
decrypted_message = point2 - shared_secret decrypted_message = point2 - shared_secret

View File

@ -11,6 +11,7 @@ class Key:
RC4 = 0x0002 RC4 = 0x0002
AES128ECB = 0x0003 AES128ECB = 0x0003
Cocktail = 0x0004 Cocktail = 0x0004
AES128CBC = 0x0005
UNKNOWN = 0xffff UNKNOWN = 0xffff
@classmethod @classmethod
@ -23,6 +24,7 @@ class Key:
ChainedLicense = 0x0002 ChainedLicense = 0x0002
ECC256 = 0x0003 ECC256 = 0x0003
ECCforScalableLicenses = 0x0004 ECCforScalableLicenses = 0x0004
Scalable = 0x0005
UNKNOWN = 0xffff UNKNOWN = 0xffff
@classmethod @classmethod
@ -47,7 +49,7 @@ class Key:
def kid_to_uuid(kid: Union[str, bytes]) -> UUID: def kid_to_uuid(kid: Union[str, bytes]) -> UUID:
""" """
Convert a Key ID from a string or bytes to a UUID object. Convert a Key ID from a string or bytes to a UUID object.
At first this may seem very simple but some types of Key IDs At first, this may seem very simple, but some types of Key IDs
may not be 16 bytes and some may be decimal vs. hex. may not be 16 bytes and some may be decimal vs. hex.
""" """
if isinstance(kid, str): if isinstance(kid, str):

View File

@ -38,10 +38,13 @@ class _PlayreadyPSSHStructs:
class PSSH(_PlayreadyPSSHStructs): class PSSH(_PlayreadyPSSHStructs):
"""Represents a PlayReady PSSH"""
SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95") SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95")
def __init__(self, data: Union[str, bytes]): def __init__(self, data: Union[str, bytes]):
"""Represents a PlayReady PSSH""" """Load a PSSH Box, PlayReady Header or PlayReady Object"""
if not data: if not data:
raise InvalidPssh("Data must not be empty") raise InvalidPssh("Data must not be empty")
@ -83,7 +86,12 @@ class PSSH(_PlayreadyPSSHStructs):
) )
)) ))
def get_wrm_headers(self, downgrade_to_v4: bool = False): def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]:
"""
Return a list of all WRM Headers in the PSSH as plaintext strings
downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR
"""
return list(map( return list(map(
lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(), lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(),
self.wrm_headers self.wrm_headers

View File

@ -1,4 +1,5 @@
import base64 import base64
import binascii
from enum import Enum from enum import Enum
from typing import Optional, List, Union, Tuple from typing import Optional, List, Union, Tuple
@ -6,6 +7,8 @@ import xmltodict
class WRMHeader: class WRMHeader:
"""Represents a PlayReady WRM Header"""
class SignedKeyID: class SignedKeyID:
def __init__( def __init__(
self, self,
@ -33,18 +36,16 @@ class WRMHeader:
_RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]] _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]]
def __init__( def __init__(self, data: Union[str, bytes]):
self, """Load a WRM Header from either a string, base64 encoded data or bytes"""
data: Union[str, bytes]
):
"""Represents a PlayReady WRM Header"""
if not data: if not data:
raise ValueError("Data must not be empty") raise ValueError("Data must not be empty")
if isinstance(data, str): if isinstance(data, str):
try: try:
data = base64.b64decode(data).decode() data = base64.b64decode(data).decode()
except Exception: except (binascii.Error, binascii.Incomplete):
data = data.encode() data = data.encode()
self._raw_data: bytes = data self._raw_data: bytes = data
@ -63,7 +64,11 @@ class WRMHeader:
return element return element
def to_v4_0_0_0(self) -> str: def to_v4_0_0_0(self) -> str:
"""Will ignore any remaining Key IDs if there's more than just one""" """
Build a v4.0.0.0 WRM header from any possible WRM Header version
Note: Will ignore any remaining Key IDs if there's more than just one
"""
return self._build_v4_0_0_0_wrm_header(*self.read_attributes()) return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
@staticmethod @staticmethod
@ -146,6 +151,12 @@ class WRMHeader:
) )
def read_attributes(self) -> _RETURN_STRUCTURE: def read_attributes(self) -> _RETURN_STRUCTURE:
"""
Read any non-custom XML attributes
Returns a tuple structured like this: Tuple[List[SignedKeyID], <LA_URL>, <LUI_URL>, <DS_ID>]
"""
data = self._header.get("DATA") data = self._header.get("DATA")
if not data: if not data:
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")

View File

@ -5,6 +5,8 @@ from pyplayready.elgamal import ElGamal
class XmlKey: class XmlKey:
"""Represents a PlayReady XMLKey"""
def __init__(self): def __init__(self):
self._shared_point = ECCKey.generate() self._shared_point = ECCKey.generate()
self.shared_key_x = self._shared_point.key.pointQ.x self.shared_key_x = self._shared_point.key.pointQ.x

View File

@ -200,13 +200,15 @@ class _XMRLicenseStructs:
class XMRLicense(_XMRLicenseStructs): class XMRLicense(_XMRLicenseStructs):
"""Represents an XMRLicense"""
def __init__( def __init__(
self, self,
parsed_license: Container, parsed_license: Container,
license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense
): ):
self.parsed = parsed_license self.parsed = parsed_license
self._LICENSE = license_obj self._license_obj = license_obj
@classmethod @classmethod
def loads(cls, data: Union[str, bytes]) -> XMRLicense: def loads(cls, data: Union[str, bytes]) -> XMRLicense:
@ -229,10 +231,10 @@ class XMRLicense(_XMRLicenseStructs):
return cls.loads(f.read()) return cls.loads(f.read())
def dumps(self) -> bytes: def dumps(self) -> bytes:
return self._LICENSE.build(self.parsed) return self._license_obj.build(self.parsed)
def struct(self) -> _XMRLicenseStructs.XmrLicense: def struct(self) -> _XMRLicenseStructs.XmrLicense:
return self._LICENSE return self._license_obj
def _locate(self, container: Container): def _locate(self, container: Container):
if container.flags == 2 or container.flags == 3: if container.flags == 2 or container.flags == 3: