From b2e9725b3a49ce5118afb5891e0166364b83b680 Mon Sep 17 00:00:00 2001 From: BuildTools Date: Wed, 27 Nov 2024 20:13:07 +0100 Subject: [PATCH] + Tidy up code, add comments --- pyplayready/bcert.py | 8 ++++++-- pyplayready/device.py | 1 + pyplayready/ecc_key.py | 10 +++++----- pyplayready/elgamal.py | 24 +++++++++++++++--------- pyplayready/key.py | 4 +++- pyplayready/pssh.py | 12 ++++++++++-- pyplayready/wrmheader.py | 25 ++++++++++++++++++------- pyplayready/xml_key.py | 2 ++ pyplayready/xmrlicense.py | 8 +++++--- 9 files changed, 65 insertions(+), 29 deletions(-) diff --git a/pyplayready/bcert.py b/pyplayready/bcert.py index 5769958..fac1fd5 100644 --- a/pyplayready/bcert.py +++ b/pyplayready/bcert.py @@ -185,6 +185,8 @@ class _BCertStructs: class Certificate(_BCertStructs): + """Represents a BCert""" + def __init__( self, parsed_bcert: Container, @@ -263,7 +265,7 @@ class Certificate(_BCertStructs): key=signing_key.public_bytes(), usages_count=1, usages=ListContainer([ - 1 + 1 # KEYUSAGE_SIGN ]) ) cert_key_encrypt = Container( @@ -273,7 +275,7 @@ class Certificate(_BCertStructs): key=encryption_key.public_bytes(), usages_count=1, usages=ListContainer([ - 2 + 2 # KEYUSAGE_ENCRYPT_KEY ]) ) key_info = Container( @@ -380,6 +382,8 @@ class Certificate(_BCertStructs): class CertificateChain(_BCertStructs): + """Represents a BCertChain""" + def __init__( self, parsed_bcert_chain: Container, diff --git a/pyplayready/device.py b/pyplayready/device.py index 46ffd65..a77259f 100644 --- a/pyplayready/device.py +++ b/pyplayready/device.py @@ -40,6 +40,7 @@ class _DeviceStructs: class Device: + """Represents a PlayReady Device (.prd)""" CURRENT_STRUCT = _DeviceStructs.v2 def __init__( diff --git a/pyplayready/ecc_key.py b/pyplayready/ecc_key.py index bdd72f2..49a0d16 100644 --- a/pyplayready/ecc_key.py +++ b/pyplayready/ecc_key.py @@ -11,15 +11,14 @@ from ecpy.curves import Curve, Point class ECCKey: - def __init__( - self, - key: EccKey - ): - """Represents a PlayReady ECC key""" + """Represents a PlayReady ECC key""" + + def __init__(self, key: EccKey): self.key = key @classmethod def generate(cls): + """Generate a new ECC key pair""" return cls(key=ECC.generate(curve='P-256')) @classmethod @@ -29,6 +28,7 @@ class ECCKey: public_key_x: Union[bytes, int], public_key_y: Union[bytes, int] ): + """Construct an ECC key pair from private/public bytes/ints""" if isinstance(private_key, bytes): private_key = int.from_bytes(private_key, 'big') if not isinstance(private_key, int): diff --git a/pyplayready/elgamal.py b/pyplayready/elgamal.py index 0f0b9b5..ed6c3db 100644 --- a/pyplayready/elgamal.py +++ b/pyplayready/elgamal.py @@ -5,7 +5,10 @@ import secrets class ElGamal: + """ElGamal ECC utility using ecpy""" + def __init__(self, curve: Curve): + """Initialize the utility with a given curve type ('secp256r1' for PlayReady)""" self.curve = curve @staticmethod @@ -15,21 +18,24 @@ class ElGamal: byte_len += 1 return n.to_bytes(byte_len, 'big') - def encrypt( - self, - message_point: Point, - public_key: Point - ) -> Tuple[Point, Point]: + def encrypt(self, message_point: Point, public_key: Point) -> Tuple[Point, Point]: + """ + Encrypt a single point with a given public key + + Returns an encrypted point pair + """ ephemeral_key = secrets.randbelow(self.curve.order) point1 = ephemeral_key * self.curve.generator point2 = message_point + (ephemeral_key * public_key) return point1, point2 @staticmethod - def decrypt( - encrypted: Tuple[Point, Point], - private_key: int - ) -> Point: + def decrypt(encrypted: Tuple[Point, Point], private_key: int) -> Point: + """ + Decrypt and encrypted point pair with a given private key + + Returns a single decrypted point + """ point1, point2 = encrypted shared_secret = private_key * point1 decrypted_message = point2 - shared_secret diff --git a/pyplayready/key.py b/pyplayready/key.py index 4acc555..601afde 100644 --- a/pyplayready/key.py +++ b/pyplayready/key.py @@ -11,6 +11,7 @@ class Key: RC4 = 0x0002 AES128ECB = 0x0003 Cocktail = 0x0004 + AES128CBC = 0x0005 UNKNOWN = 0xffff @classmethod @@ -23,6 +24,7 @@ class Key: ChainedLicense = 0x0002 ECC256 = 0x0003 ECCforScalableLicenses = 0x0004 + Scalable = 0x0005 UNKNOWN = 0xffff @classmethod @@ -47,7 +49,7 @@ class Key: def kid_to_uuid(kid: Union[str, bytes]) -> UUID: """ Convert a Key ID from a string or bytes to a UUID object. - At first this may seem very simple but some types of Key IDs + At first, this may seem very simple, but some types of Key IDs may not be 16 bytes and some may be decimal vs. hex. """ if isinstance(kid, str): diff --git a/pyplayready/pssh.py b/pyplayready/pssh.py index 29cc0cc..122105b 100644 --- a/pyplayready/pssh.py +++ b/pyplayready/pssh.py @@ -38,10 +38,13 @@ class _PlayreadyPSSHStructs: class PSSH(_PlayreadyPSSHStructs): + """Represents a PlayReady PSSH""" + SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95") def __init__(self, data: Union[str, bytes]): - """Represents a PlayReady PSSH""" + """Load a PSSH Box, PlayReady Header or PlayReady Object""" + if not data: raise InvalidPssh("Data must not be empty") @@ -83,7 +86,12 @@ class PSSH(_PlayreadyPSSHStructs): ) )) - def get_wrm_headers(self, downgrade_to_v4: bool = False): + def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]: + """ + Return a list of all WRM Headers in the PSSH as plaintext strings + + downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR + """ return list(map( lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(), self.wrm_headers diff --git a/pyplayready/wrmheader.py b/pyplayready/wrmheader.py index 61b0798..da9ba87 100644 --- a/pyplayready/wrmheader.py +++ b/pyplayready/wrmheader.py @@ -1,4 +1,5 @@ import base64 +import binascii from enum import Enum from typing import Optional, List, Union, Tuple @@ -6,6 +7,8 @@ import xmltodict class WRMHeader: + """Represents a PlayReady WRM Header""" + class SignedKeyID: def __init__( self, @@ -33,18 +36,16 @@ class WRMHeader: _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]] - def __init__( - self, - data: Union[str, bytes] - ): - """Represents a PlayReady WRM Header""" + def __init__(self, data: Union[str, bytes]): + """Load a WRM Header from either a string, base64 encoded data or bytes""" + if not data: raise ValueError("Data must not be empty") if isinstance(data, str): try: data = base64.b64decode(data).decode() - except Exception: + except (binascii.Error, binascii.Incomplete): data = data.encode() self._raw_data: bytes = data @@ -63,7 +64,11 @@ class WRMHeader: return element def to_v4_0_0_0(self) -> str: - """Will ignore any remaining Key IDs if there's more than just one""" + """ + Build a v4.0.0.0 WRM header from any possible WRM Header version + + Note: Will ignore any remaining Key IDs if there's more than just one + """ return self._build_v4_0_0_0_wrm_header(*self.read_attributes()) @staticmethod @@ -146,6 +151,12 @@ class WRMHeader: ) def read_attributes(self) -> _RETURN_STRUCTURE: + """ + Read any non-custom XML attributes + + Returns a tuple structured like this: Tuple[List[SignedKeyID], , , ] + """ + data = self._header.get("DATA") if not data: raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") diff --git a/pyplayready/xml_key.py b/pyplayready/xml_key.py index 25872ef..468a3bd 100644 --- a/pyplayready/xml_key.py +++ b/pyplayready/xml_key.py @@ -5,6 +5,8 @@ from pyplayready.elgamal import ElGamal class XmlKey: + """Represents a PlayReady XMLKey""" + def __init__(self): self._shared_point = ECCKey.generate() self.shared_key_x = self._shared_point.key.pointQ.x diff --git a/pyplayready/xmrlicense.py b/pyplayready/xmrlicense.py index eb348c9..bdc92a9 100644 --- a/pyplayready/xmrlicense.py +++ b/pyplayready/xmrlicense.py @@ -200,13 +200,15 @@ class _XMRLicenseStructs: class XMRLicense(_XMRLicenseStructs): + """Represents an XMRLicense""" + def __init__( self, parsed_license: Container, license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense ): self.parsed = parsed_license - self._LICENSE = license_obj + self._license_obj = license_obj @classmethod def loads(cls, data: Union[str, bytes]) -> XMRLicense: @@ -229,10 +231,10 @@ class XMRLicense(_XMRLicenseStructs): return cls.loads(f.read()) def dumps(self) -> bytes: - return self._LICENSE.build(self.parsed) + return self._license_obj.build(self.parsed) def struct(self) -> _XMRLicenseStructs.XmrLicense: - return self._LICENSE + return self._license_obj def _locate(self, container: Container): if container.flags == 2 or container.flags == 3: