diff --git a/README.md b/README.md index 96903bc..963f8ef 100644 --- a/README.md +++ b/README.md @@ -55,8 +55,7 @@ pssh = PSSH( "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" ) -# set to `True` if your device doesn't support scalable licenses (this projects also doesn't yet) to downgrade the WRMHEADERs to v4.0.0.0 -wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False) +wrm_headers = pssh.get_wrm_headers() request = cdm.get_license_challenge(session_id, wrm_headers[0]) response = requests.post( diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py index 7a4435e..0ff5604 100644 --- a/pyplayready/__init__.py +++ b/pyplayready/__init__.py @@ -11,4 +11,4 @@ from pyplayready.system.pssh import * from pyplayready.system.session import * -__version__ = "0.4.5" +__version__ = "0.5.0" diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py index 5153527..d56f890 100644 --- a/pyplayready/cdm.py +++ b/pyplayready/cdm.py @@ -18,7 +18,6 @@ from pyplayready.crypto import Crypto from pyplayready.system.bcert import CertificateChain from pyplayready.crypto.ecc_key import ECCKey from pyplayready.license.key import Key -from pyplayready.license.xml_key import XmlKey from pyplayready.license.xmrlicense import XMRLicense from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense) from pyplayready.system.session import Session @@ -49,6 +48,7 @@ class Cdm: y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562, curve=Curve.get_curve("secp256r1") ) + self._rgbMagicConstantZero = bytes([0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f, 0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb]) self.__sessions: dict[bytes, Session] = {} @@ -74,7 +74,6 @@ class Cdm: session = Session(len(self.__sessions) + 1) self.__sessions[session.id] = session - session.xml_key = XmlKey() return session.id @@ -101,7 +100,17 @@ class Cdm: def _get_cipher_data(self, session: Session) -> bytes: b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode() - body = f"{b64_chain}" + body = ( + "" + f"{b64_chain}" + "" + '""' + "" + "" + "" + "" + "" + ) cipher = AES.new( key=session.xml_key.aes_key, @@ -242,21 +251,52 @@ class Cdm: if not self._verify_encryption_key(session, parsed_licence): raise InvalidLicense("Public encryption key does not match") + is_scalable = bool(next(parsed_licence.get_object(81), None)) + for content_key in parsed_licence.get_content_keys(): - if Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256: - key = self.__crypto.ecc256_decrypt( - private_key=session.encryption_key, - ciphertext=content_key.encrypted_key - )[16:32] - else: - continue + cipher_type = Key.CipherType(content_key.cipher_type) + + if not cipher_type in (Key.CipherType.ECC_256, Key.CipherType.ECC_256_WITH_KZ, Key.CipherType.ECC_256_VIA_SYMMETRIC): + raise InvalidLicense(f"Invalid cipher type {cipher_type}") + + via_symmetric = Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256_VIA_SYMMETRIC + + decrypted = self.__crypto.ecc256_decrypt( + private_key=session.encryption_key, + ciphertext=content_key.encrypted_key + ) + ci, ck = decrypted[:16], decrypted[16:32] + + if is_scalable: + ci, ck = decrypted[::2][:16], decrypted[1::2][:16] + + if via_symmetric: + embedded_root_license = content_key.encrypted_key[:144] + embedded_leaf_license = content_key.encrypted_key[144:] + + rgb_key = bytes(ck[i] ^ self._rgbMagicConstantZero[i] for i in range(16)) + content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key) + + aux_key = next(parsed_licence.get_object(81))["auxiliary_keys"][0]["key"] + derived_aux_key = AES.new(content_key_prime, AES.MODE_ECB).encrypt(aux_key) + + uplink_x_key = bytes(bytearray(16)[i] ^ derived_aux_key[i] for i in range(16)) + secondary_key = AES.new(ck, AES.MODE_ECB).encrypt(embedded_root_license[128:]) + + embedded_leaf_license = AES.new(uplink_x_key, AES.MODE_ECB).encrypt(embedded_leaf_license) + embedded_leaf_license = AES.new(secondary_key, AES.MODE_ECB).encrypt(embedded_leaf_license) + + ci, ck = embedded_leaf_license[:16], embedded_leaf_license[16:] + + if not parsed_licence.check_signature(ci): + raise InvalidLicense("License integrity signature does not match") session.keys.append(Key( key_id=UUID(bytes_le=content_key.key_id), key_type=content_key.key_type, cipher_type=content_key.cipher_type, key_length=content_key.key_length, - key=key + key=ck )) except InvalidLicense as e: raise InvalidLicense(e) diff --git a/pyplayready/device/__init__.py b/pyplayready/device/__init__.py index a23da4d..c4aaaf2 100644 --- a/pyplayready/device/__init__.py +++ b/pyplayready/device/__init__.py @@ -3,16 +3,16 @@ from __future__ import annotations import base64 from enum import IntEnum from pathlib import Path -from typing import Union, Any +from typing import Union, Any, Optional from pyplayready.device.structs import DeviceStructs +from pyplayready.exceptions import OutdatedDevice from pyplayready.system.bcert import CertificateChain from pyplayready.crypto.ecc_key import ECCKey class Device: """Represents a PlayReady Device (.prd)""" - CURRENT_STRUCT = DeviceStructs.v3 CURRENT_VERSION = 3 class SecurityLevel(IntEnum): @@ -23,7 +23,7 @@ class Device: def __init__( self, *_: Any, - group_key: Union[str, bytes, None], + group_key: Optional[str, bytes, None], encryption_key: Union[str, bytes], signing_key: Union[str, bytes], group_certificate: Union[str, bytes], @@ -60,14 +60,11 @@ class Device: if not isinstance(data, bytes): raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") - prd_header = DeviceStructs.header.parse(data) - if prd_header.version == 2: - return cls( - group_key=None, - **DeviceStructs.v2.parse(data) - ) - - return cls(**cls.CURRENT_STRUCT.parse(data)) + parsed = DeviceStructs.prd.parse(data) + return cls(**{ + **parsed, + 'group_key': parsed.get('group_key', None) + }) @classmethod def load(cls, path: Union[Path, str]) -> Device: @@ -77,7 +74,10 @@ class Device: return cls.loads(f.read()) def dumps(self) -> bytes: - return self.CURRENT_STRUCT.build(dict( + if not self.group_key: + raise OutdatedDevice("Cannot dump a v2 device, re-create it or use a Device with a version of 3 or higher") + + return DeviceStructs.prd.build(dict( version=self.CURRENT_VERSION, group_key=self.group_key.dumps(), encryption_key=self.encryption_key.dumps(), diff --git a/pyplayready/device/structs.py b/pyplayready/device/structs.py index cf6ca02..7a7bda2 100644 --- a/pyplayready/device/structs.py +++ b/pyplayready/device/structs.py @@ -1,18 +1,11 @@ -from construct import Struct, Const, Int8ub, Bytes, this, Int32ub +from construct import Struct, Const, Int8ub, Bytes, this, Int32ub, Switch, Embedded class DeviceStructs: magic = Const(b"PRD") - header = Struct( - "signature" / magic, - "version" / Int8ub, - ) - # was never in production v1 = Struct( - "signature" / magic, - "version" / Int8ub, "group_key_length" / Int32ub, "group_key" / Bytes(this.group_key_length), "group_certificate_length" / Int32ub, @@ -20,8 +13,6 @@ class DeviceStructs: ) v2 = Struct( - "signature" / magic, - "version" / Int8ub, "group_certificate_length" / Int32ub, "group_certificate" / Bytes(this.group_certificate_length), "encryption_key" / Bytes(96), @@ -29,11 +20,22 @@ class DeviceStructs: ) v3 = Struct( - "signature" / magic, - "version" / Int8ub, "group_key" / Bytes(96), "encryption_key" / Bytes(96), "signing_key" / Bytes(96), "group_certificate_length" / Int32ub, "group_certificate" / Bytes(this.group_certificate_length), ) + + prd = Struct( + "signature" / magic, + "version" / Int8ub, + Embedded(Switch( + lambda ctx: ctx.version, + { + 1: v1, + 2: v2, + 3: v3 + } + )) + ) diff --git a/pyplayready/exceptions.py b/pyplayready/exceptions.py index 8f2281f..117a2da 100644 --- a/pyplayready/exceptions.py +++ b/pyplayready/exceptions.py @@ -26,9 +26,13 @@ class InvalidLicense(PyPlayreadyException): """Unable to parse XMR License.""" -class InvalidCertificateChain(PyPlayreadyException): +class InvalidCertificate(PyPlayreadyException): """The BCert is not correctly formatted.""" +class InvalidCertificateChain(PyPlayreadyException): + """The BCertChain is not correctly formatted.""" + + class OutdatedDevice(PyPlayreadyException): """The PlayReady Device is outdated and does not support a specific operation.""" diff --git a/pyplayready/license/xmrlicense.py b/pyplayready/license/xmrlicense.py index 76255b6..1e0421d 100644 --- a/pyplayready/license/xmrlicense.py +++ b/pyplayready/license/xmrlicense.py @@ -1,9 +1,10 @@ from __future__ import annotations import base64 -from pathlib import Path from typing import Union +from Crypto.Cipher import AES +from Crypto.Hash import CMAC from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container @@ -98,7 +99,7 @@ class _XMRLicenseStructs: PolicyMetadataObject = Struct( "metadata_type" / Bytes(16), - "policy_data" / Bytes(this._.length) + "policy_data" / Bytes(this._.length - 24) ) SecureStopRestrictionObject = Struct( @@ -223,13 +224,6 @@ class XMRLicense(_XMRLicenseStructs): license_obj=licence ) - @classmethod - def load(cls, path: Union[Path, str]) -> XMRLicense: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - def dumps(self) -> bytes: return self._license_obj.build(self.parsed) @@ -250,3 +244,11 @@ class XMRLicense(_XMRLicenseStructs): def get_content_keys(self): yield from self.get_object(10) + + def check_signature(self, integrity_key: bytes) -> bool: + cmac = CMAC.new(integrity_key, ciphermod=AES) + + signature_data = next(self.get_object(11)) + cmac.update(self.dumps()[:-(signature_data.signature_data_length + 12)]) + + return signature_data.signature_data == cmac.digest() diff --git a/pyplayready/main.py b/pyplayready/main.py index 73c8d49..e18a927 100644 --- a/pyplayready/main.py +++ b/pyplayready/main.py @@ -7,7 +7,7 @@ import click import requests from Crypto.Random import get_random_bytes -from pyplayready import __version__ +from pyplayready import __version__, InvalidCertificateChain from pyplayready.system.bcert import CertificateChain, Certificate from pyplayready.cdm import Cdm from pyplayready.device import Device @@ -56,7 +56,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None: session_id = cdm.open() log.info("Opened Session") - challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers(downgrade_to_v4=True)[0]) + challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers()[0]) log.info("Created License Request (Challenge)") log.debug(challenge) @@ -69,7 +69,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None: ) if license_res.status_code != 200: - log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}") + log.error(f"Failed to send challenge [{license_res.status_code}]: {license_res.text}") return licence = license_res.text @@ -88,8 +88,10 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None: @main.command() @click.argument("device", type=Path) +@click.option("-c", "--ckt", type=click.Choice(["aesctr", "aescbc"], case_sensitive=False), default="aesctr", help="Content Key Encryption Type") +@click.option("-sl", "--security_level", type=click.Choice(["150", "2000", "3000"], case_sensitive=False), default="2000", help="Minimum Security Level") @click.pass_context -def test(ctx: click.Context, device: Path) -> None: +def test(ctx: click.Context, device: Path, ckt: str, security_level: str) -> None: """ Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server. https://testweb.playready.microsoft.com/Content/Content2X @@ -113,7 +115,7 @@ def test(ctx: click.Context, device: Path) -> None: "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" ) - license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)" + license_server = f"https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:{security_level},ckt:{ckt})" ctx.invoke( license_, @@ -148,6 +150,9 @@ def create_device( group_key = ECCKey.load(group_key) certificate_chain = CertificateChain.load(group_certificate) + if certificate_chain.get(0).get_issuer_key() != group_key.public_bytes(): + raise InvalidCertificateChain("Group key does not match this certificate") + new_certificate = Certificate.new_leaf_cert( cert_id=get_random_bytes(16), security_level=certificate_chain.get_security_level(), @@ -159,6 +164,8 @@ def create_device( ) certificate_chain.prepend(new_certificate) + certificate_chain.verify() + device = Device( group_key=group_key.dumps(), encryption_key=encryption_key.dumps(), diff --git a/pyplayready/remote/serve.py b/pyplayready/remote/serve.py index b6d5a0a..64a0b69 100644 --- a/pyplayready/remote/serve.py +++ b/pyplayready/remote/serve.py @@ -136,7 +136,7 @@ async def get_license_challenge(request: web.Request) -> web.Response: if not init_data.startswith(" Certificate: basic_info = Container( cert_id=cert_id, @@ -230,9 +227,9 @@ class Certificate(_BCertStructs): ) device_info = Container( - max_license=max_license, - max_header=max_header, - max_chain_depth=max_chain_depth + max_license=10240, + max_header=15360, + max_chain_depth=2 ) device_info_attribute = Container( flags=1, @@ -301,7 +298,7 @@ class Certificate(_BCertStructs): attribute=key_info ) - manufacturer_info = parent.get_certificate(0).get_attribute(7) + manufacturer_info = parent.get(0).get_attribute(7) new_bcert_container = Container( signature=b"CERT", @@ -354,13 +351,6 @@ class Certificate(_BCertStructs): bcert_obj=cert ) - @classmethod - def load(cls, path: Union[Path, str]) -> Certificate: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - def get_attribute(self, type_: int): for attribute in self.parsed.attributes: if attribute.tag == type_: @@ -380,35 +370,54 @@ class Certificate(_BCertStructs): if manufacturer_info: return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}" + def get_issuer_key(self) -> Union[bytes, None]: + key_info_object = self.get_attribute(6) + if not key_info_object: + return + + key_info_attribute = key_info_object.attribute + return next(map(lambda key: key.key, filter(lambda key: 6 in key.usages, key_info_attribute.cert_keys)), None) + def dumps(self) -> bytes: return self._BCERT.build(self.parsed) def struct(self) -> _BCertStructs.BCert: return self._BCERT - def verify_signature(self): + def verify(self, public_key: bytes, index: int): signature_object = self.get_attribute(8) + if not signature_object: + raise InvalidCertificate(f"No signature object found in certificate {index}") + signature_attribute = signature_object.attribute - sign_payload = self.dumps()[:-signature_object.length] - raw_signature_key = signature_attribute.signature_key + if public_key != raw_signature_key: + raise InvalidCertificate(f"Signature keys of certificate {index} do not match") + signature_key = ECC.construct( curve='P-256', point_x=int.from_bytes(raw_signature_key[:32], 'big'), point_y=int.from_bytes(raw_signature_key[32:], 'big') ) - return Crypto.ecc256_verify( + sign_payload = self.dumps()[:-signature_object.length] + + if not Crypto.ecc256_verify( public_key=signature_key, data=sign_payload, signature=signature_attribute.signature - ) + ): + raise InvalidCertificate(f"Signature of certificate {index} is not authentic") + + return self.get_issuer_key() class CertificateChain(_BCertStructs): """Represents a BCertChain""" + ECC256MSBCertRootIssuerPubKey = bytes.fromhex("864d61cff2256e422c568b3c28001cfb3e1527658584ba0521b79b1828d936de1d826a8fc3e6e7fa7a90d5ca2946f1f64a2efb9f5dcffe7e434eb44293fac5ab") + def __init__( self, parsed_bcert_chain: Container, @@ -443,15 +452,27 @@ class CertificateChain(_BCertStructs): def struct(self) -> _BCertStructs.BCertChain: return self._BCERT_CHAIN - def get_certificate(self, index: int) -> Certificate: - return Certificate(self.parsed.certificates[index]) - def get_security_level(self) -> int: # not sure if there's a better way than this - return self.get_certificate(0).get_security_level() + return self.get(0).get_security_level() def get_name(self) -> str: - return self.get_certificate(0).get_name() + return self.get(0).get_name() + + def verify(self) -> bool: + issuer_key = self.ECC256MSBCertRootIssuerPubKey + + try: + for i in reversed(range(self.count())): + certificate = self.get(i) + issuer_key = certificate.verify(issuer_key, i) + + if not issuer_key and i != 0: + raise InvalidCertificate(f"Certificate {i} is not valid") + except InvalidCertificate as e: + raise InvalidCertificateChain(e) + + return True def append(self, bcert: Certificate) -> None: self.parsed.certificate_count += 1 @@ -464,21 +485,20 @@ class CertificateChain(_BCertStructs): self.parsed.total_length += len(bcert.dumps()) def remove(self, index: int) -> None: - if self.parsed.certificate_count <= 0: + if self.count() <= 0: raise InvalidCertificateChain("CertificateChain does not contain any Certificates") - if index >= self.parsed.certificate_count: - raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") + if index >= self.count(): + raise IndexError(f"No Certificate at index {index}, {self.count()} total") self.parsed.certificate_count -= 1 - bcert = Certificate(self.parsed.certificates[index]) - self.parsed.total_length -= len(bcert.dumps()) + self.parsed.total_length -= len(self.get(index).dumps()) self.parsed.certificates.pop(index) def get(self, index: int) -> Certificate: - if self.parsed.certificate_count <= 0: + if self.count() <= 0: raise InvalidCertificateChain("CertificateChain does not contain any Certificates") - if index >= self.parsed.certificate_count: - raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") + if index >= self.count(): + raise IndexError(f"No Certificate at index {index}, {self.count()} total") return Certificate(self.parsed.certificates[index]) diff --git a/pyplayready/system/pssh.py b/pyplayready/system/pssh.py index 5ce5b87..16f6f12 100644 --- a/pyplayready/system/pssh.py +++ b/pyplayready/system/pssh.py @@ -86,13 +86,11 @@ class PSSH(_PlayreadyPSSHStructs): ) )) - def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]: + def get_wrm_headers(self) -> List[str]: """ Return a list of all WRM Headers in the PSSH as plaintext strings - - downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR """ return list(map( - lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(), + lambda wrm_header: wrm_header.dumps(), self.wrm_headers )) diff --git a/pyplayready/system/wrmheader.py b/pyplayready/system/wrmheader.py index da9ba87..6d70d66 100644 --- a/pyplayready/system/wrmheader.py +++ b/pyplayready/system/wrmheader.py @@ -63,14 +63,6 @@ class WRMHeader: return [element] return element - def to_v4_0_0_0(self) -> str: - """ - Build a v4.0.0.0 WRM header from any possible WRM Header version - - Note: Will ignore any remaining Key IDs if there's more than just one - """ - return self._build_v4_0_0_0_wrm_header(*self.read_attributes()) - @staticmethod def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE: protect_info = data.get("PROTECTINFO") @@ -156,7 +148,6 @@ class WRMHeader: Returns a tuple structured like this: Tuple[List[SignedKeyID], , , ] """ - data = self._header.get("DATA") if not data: raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") @@ -170,32 +161,5 @@ class WRMHeader: elif self.version == self.Version.VERSION_4_3_0_0: return self._read_v4_3_0_0(data) - @staticmethod - def _build_v4_0_0_0_wrm_header( - key_ids: List[SignedKeyID], - la_url: Optional[str], - lui_url: Optional[str], - ds_id: Optional[str] - ) -> str: - if len(key_ids) == 0: - raise Exception("No Key IDs available") - - key_id = key_ids[0] - return ( - '' - '' - '' - '16' - 'AESCTR' - '' - f'{key_id.value}' + - (f'{la_url}' if la_url else '') + - (f'{lui_url}' if lui_url else '') + - (f'{ds_id}' if ds_id else '') + - (f'{key_id.checksum}' if key_id.checksum else '') + - '' - '' - ) - def dumps(self) -> str: return self._raw_data.decode("utf-16-le") diff --git a/pyproject.toml b/pyproject.toml index 3f725d3..519aa62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyplayready" -version = "0.4.5" +version = "0.5.0" description = "pyplayready CDM (Content Decryption Module) implementation in Python." license = "CC BY-NC-ND 4.0" authors = ["DevLARLEY, Erevoc", "DevataDev"]