From dc9fc1e217935d6b82e8dd45737f5327a1f21890 Mon Sep 17 00:00:00 2001 From: BuildTools Date: Mon, 11 Nov 2024 20:34:22 +0100 Subject: [PATCH] Initial Commit --- README.md | 71 +++++++ pyplayready/__init__.py | 1 + pyplayready/bcert.py | 428 ++++++++++++++++++++++++++++++++++++++ pyplayready/cdm.py | 216 +++++++++++++++++++ pyplayready/device.py | 105 ++++++++++ pyplayready/ecc_key.py | 105 ++++++++++ pyplayready/elgamal.py | 36 ++++ pyplayready/key.py | 42 ++++ pyplayready/main.py | 230 ++++++++++++++++++++ pyplayready/pssh.py | 87 ++++++++ pyplayready/xml_key.py | 18 ++ pyplayready/xmrlicense.py | 251 ++++++++++++++++++++++ pyproject.toml | 41 ++++ requirements.txt | 5 + 14 files changed, 1636 insertions(+) create mode 100644 README.md create mode 100644 pyplayready/__init__.py create mode 100644 pyplayready/bcert.py create mode 100644 pyplayready/cdm.py create mode 100644 pyplayready/device.py create mode 100644 pyplayready/ecc_key.py create mode 100644 pyplayready/elgamal.py create mode 100644 pyplayready/key.py create mode 100644 pyplayready/main.py create mode 100644 pyplayready/pssh.py create mode 100644 pyplayready/xml_key.py create mode 100644 pyplayready/xmrlicense.py create mode 100644 pyproject.toml create mode 100644 requirements.txt diff --git a/README.md b/README.md new file mode 100644 index 0000000..2dfaeff --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# pyplayready +All of this is already public. 100% of this code has been derived from the mspr_toolkit. + +## Installation +```shell +pip install pyplayready +``` + +Run `pyplayready --help` to view available cli functions + +## Devices +Run the command below to create a Playready Device (.prd) from a `bgroupcert.dat` and `zgpriv.dat`: +```shell +pyplayready create-device -c bgroupcert.dat -g zgpriv.dat +``` + +## Usage +```python +from pyplayready.cdm import Cdm +from pyplayready.device import Device +from pyplayready.pssh import PSSH + +import requests + +device = Device.load("C:/Path/To/A/Device.prd") +cdm = Cdm.from_device(device) + +pssh = PSSH( + "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH" + "QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh" + "AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg" + "BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA" + "UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE" + "cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD" + "AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ" + "B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA" + "ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF" + "YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT" + "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" +) + +request = cdm.get_license_challenge(pssh.wrm_headers[0]) + +response = requests.post( + url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)", + headers={ + 'Content-Type': 'text/xml; charset=UTF-8', + }, + data=request, +) + +cdm.parse_license(response.text) + +for key in cdm.get_keys(): + print(f"{key.key_id.hex}:{key.key.hex()}") +``` + +## Disclaimer + +1. This project requires a valid Microsoft Certificate and Group Key, which are not provided by this project. +2. Public test provisions are available and provided by Microsoft to use for testing projects such as this one. +3. This project does not condone piracy or any action against the terms of the DRM systems. +4. All efforts in this project have been the result of Reverse-Engineering, Publicly available research, and Trial & Error. +5. Do not use this program to decrypt or access any content for which you do not have the legal rights or explicit permission. +6. Unauthorized decryption or distribution of copyrighted materials is a violation of applicable laws and intellectual property rights. +7. This tool must not be used for any illegal activities, including but not limited to piracy, circumventing digital rights management (DRM), or unauthorized access to protected content. +8. The developers, contributors, and maintainers of this program are not responsible for any misuse or illegal activities performed using this software. +9. By using this program, you agree to comply with all applicable laws and regulations governing digital rights and copyright protections. + +## Credits ++ [mspr_toolkit](https://security-explorations.com/materials/mspr_toolkit.zip) diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py new file mode 100644 index 0000000..f102a9c --- /dev/null +++ b/pyplayready/__init__.py @@ -0,0 +1 @@ +__version__ = "0.0.1" diff --git a/pyplayready/bcert.py b/pyplayready/bcert.py new file mode 100644 index 0000000..747e72d --- /dev/null +++ b/pyplayready/bcert.py @@ -0,0 +1,428 @@ +from __future__ import annotations + +import base64 +from pathlib import Path +from typing import Union + +from Crypto.Hash import SHA256 +from Crypto.Signature import DSS +from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer +from construct import Int16ub, Array +from construct import Struct, this + +from pyplayready.ecc_key import ECCKey + + +class _BCertStructs: + DrmBCertBasicInfo = Struct( + "cert_id" / Bytes(16), + "security_level" / Int32ub, + "flags" / Int32ub, + "cert_type" / Int32ub, + "public_key_digest" / Bytes(32), + "expiration_date" / Int32ub, + "client_id" / Bytes(16) + ) + + # TODO: untested + DrmBCertDomainInfo = Struct( + "service_id" / Bytes(16), + "account_id" / Bytes(16), + "revision_timestamp" / Int32ub, + "domain_url_length" / Int32ub, + "domain_url" / Bytes((this.domain_url_length + 3) & 0xfffffffc) + ) + + # TODO: untested + DrmBCertPCInfo = Struct( + "security_version" / Int32ub + ) + + # TODO: untested + DrmBCertDeviceInfo = Struct( + "max_license" / Int32ub, + "max_header" / Int32ub, + "max_chain_depth" / Int32ub + ) + + DrmBCertFeatureInfo = Struct( + "feature_count" / Int32ub, + "features" / Array(this.feature_count, Int32ub) + ) + + DrmBCertKeyInfo = Struct( + "key_count" / Int32ub, + "cert_keys" / Array(this.key_count, Struct( + "type" / Int16ub, + "length" / Int16ub, + "flags" / Int32ub, + "key" / Bytes(this.length // 8), + "usages_count" / Int32ub, + "usages" / Array(this.usages_count, Int32ub) + )) + ) + + DrmBCertManufacturerInfo = Struct( + "flags" / Int32ub, + "manufacturer_name_length" / Int32ub, + "manufacturer_name" / Bytes((this.manufacturer_name_length + 3) & 0xfffffffc), + "model_name_length" / Int32ub, + "model_name" / Bytes((this.model_name_length + 3) & 0xfffffffc), + "model_number_length" / Int32ub, + "model_number" / Bytes((this.model_number_length + 3) & 0xfffffffc), + ) + + DrmBCertSignatureInfo = Struct( + "signature_type" / Int16ub, + "signature_size" / Int16ub, + "signature" / Bytes(this.signature_size), + "signature_key_size" / Int32ub, + "signature_key" / Bytes(this.signature_key_size // 8) + ) + + # TODO: untested + DrmBCertSilverlightInfo = Struct( + "security_version" / Int32ub, + "platform_identifier" / Int32ub + ) + + # TODO: untested + DrmBCertMeteringInfo = Struct( + "metering_id" / Bytes(16), + "metering_url_length" / Int32ub, + "metering_url" / Bytes((this.metering_url_length + 3) & 0xfffffffc) + ) + + # TODO: untested + DrmBCertExtDataSignKeyInfo = Struct( + "type" / Int16ub, + "length" / Int16ub, + "flags" / Int32ub, + "key" / Bytes(this.length // 8) + ) + + # TODO: untested + BCertExtDataRecord = Struct( + "data_size" / Int32ub, + "data" / Bytes(this.data_size) + ) + + # TODO: untested + DrmBCertExtDataSignature = Struct( + "signature_type" / Int16ub, + "signature_size" / Int16ub, + "signature" / Bytes(this.signature_size) + ) + + # TODO: untested + BCertExtDataContainer = Struct( + "record_count" / Int32ub, + "records" / Array(this.record_count, BCertExtDataRecord), + "signature" / DrmBCertExtDataSignature + ) + + # TODO: untested + DrmBCertServerInfo = Struct( + "warning_days" / Int32ub + ) + + # TODO: untested + DrmBcertSecurityVersion = Struct( + "security_version" / Int32ub, + "platform_identifier" / Int32ub + ) + + Attribute = Struct( + "flags" / Int16ub, + "tag" / Int16ub, + "length" / Int32ub, + "attribute" / Switch( + lambda this_: this_.tag, + { + 1: DrmBCertBasicInfo, + 2: DrmBCertDomainInfo, + 3: DrmBCertPCInfo, + 4: DrmBCertDeviceInfo, + 5: DrmBCertFeatureInfo, + 6: DrmBCertKeyInfo, + 7: DrmBCertManufacturerInfo, + 8: DrmBCertSignatureInfo, + 9: DrmBCertSilverlightInfo, + 10: DrmBCertMeteringInfo, + 11: DrmBCertExtDataSignKeyInfo, + 12: BCertExtDataContainer, + 13: DrmBCertExtDataSignature, + 14: Bytes(this.length - 8), + 15: DrmBCertServerInfo, + 16: DrmBcertSecurityVersion, + 17: DrmBcertSecurityVersion + }, + default=Bytes(this.length - 8) + ) + ) + + BCert = Struct( + "signature" / Const(b"CERT"), + "version" / Int32ub, + "total_length" / Int32ub, + "certificate_length" / Int32ub, + "attributes" / GreedyRange(Attribute) + ) + + BCertChain = Struct( + "signature" / Const(b"CHAI"), + "version" / Int32ub, + "total_length" / Int32ub, + "flags" / Int32ub, + "certificate_count" / Int32ub, + "certificates" / GreedyRange(BCert) + ) + + +class Certificate(_BCertStructs): + def __init__( + self, + parsed_bcert: Container, + bcert_obj: _BCertStructs.BCert = _BCertStructs.BCert + ): + self.parsed = parsed_bcert + self._BCERT = bcert_obj + + @classmethod + def new_key_cert( + cls, + cert_id: bytes, + security_level: int, + client_id: bytes, + signing_key: ECCKey, + encryption_key: ECCKey, + group_key: ECCKey, + parent: CertificateChain, + expiry: int = 0xFFFFFFFF, + max_license: int = 10240, + max_header: int = 15360, + max_chain_depth: int = 2 + ) -> Certificate: + if not cert_id: + raise ValueError("Certificate ID is required") + if not client_id: + raise ValueError("Client ID is required") + + basic_info = Container( + cert_id=cert_id, + security_level=security_level, + flags=0, + cert_type=2, + public_key_digest=signing_key.public_sha256_digest(), + expiration_date=expiry, + client_id=client_id + ) + basic_info_attribute = Container( + flags=1, + tag=1, + length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8, + attribute=basic_info + ) + + device_info = Container( + max_license=max_license, + max_header=max_header, + max_chain_depth=max_chain_depth + ) + device_info_attribute = Container( + flags=1, + tag=4, + length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8, + attribute=device_info + ) + + feature = Container( + feature_count=1, + features=ListContainer([ + 4 + ]) + ) + feature_attribute = Container( + flags=1, + tag=5, + length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8, + attribute=feature + ) + + cert_key_sign = Container( + type=1, + length=512, # bits + flags=0, + key=signing_key.public_bytes(), + usages_count=1, + usages=ListContainer([ + 1 + ]) + ) + cert_key_encrypt = Container( + type=1, + length=512, # bits + flags=0, + key=encryption_key.public_bytes(), + usages_count=1, + usages=ListContainer([ + 2 + ]) + ) + key_info = Container( + key_count=2, + cert_keys=ListContainer([ + cert_key_sign, + cert_key_encrypt + ]) + ) + key_info_attribute = Container( + flags=1, + tag=6, + length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8, + attribute=key_info + ) + + manufacturer_info = parent.get_certificate(0).get_attribute(7) + + new_bcert_container = Container( + signature=b"CERT", + version=1, + total_length=0, # filled at a later time + certificate_length=0, # filled at a later time + attributes=ListContainer([ + basic_info_attribute, + device_info_attribute, + feature_attribute, + key_info_attribute, + manufacturer_info, + ]) + ) + + payload = _BCertStructs.BCert.build(new_bcert_container) + new_bcert_container.certificate_length = len(payload) + new_bcert_container.total_length = len(payload) + 144 # signature length + + sign_payload = _BCertStructs.BCert.build(new_bcert_container) + + hash_obj = SHA256.new(sign_payload) + signer = DSS.new(group_key.key, 'fips-186-3') + signature = signer.sign(hash_obj) + + signature_info = Container( + signature_type=1, + signature_size=64, + signature=signature, + signature_key_size=512, # bits + signature_key=group_key.public_bytes() + ) + signature_info_attribute = Container( + flags=1, + tag=8, + length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8, + attribute=signature_info + ) + new_bcert_container.attributes.append(signature_info_attribute) + + return cls(new_bcert_container) + + @classmethod + def loads(cls, data: Union[str, bytes]) -> Certificate: + if isinstance(data, str): + data = base64.b64decode(data) + if not isinstance(data, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + + cert = _BCertStructs.BCert + return cls( + parsed_bcert=cert.parse(data), + bcert_obj=cert + ) + + @classmethod + def load(cls, path: Union[Path, str]) -> Certificate: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + with Path(path).open(mode="rb") as f: + return cls.loads(f.read()) + + def get_attribute(self, type_: int): + for attribute in self.parsed.attributes: + if attribute.tag == type_: + return attribute + + def get_security_level(self) -> int: + basic_info_attribute = self.get_attribute(1).attribute + if basic_info_attribute: + return basic_info_attribute.security_level + + @staticmethod + def _unpad(name: bytes): + return name.rstrip(b'\x00').decode("utf-8", errors="ignore") + + def get_name(self): + manufacturer_info = self.get_attribute(7).attribute + if manufacturer_info: + return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}" + + def dumps(self) -> bytes: + return self._BCERT.build(self.parsed) + + def struct(self) -> _BCertStructs.BCert: + return self._BCERT + + +class CertificateChain(_BCertStructs): + def __init__( + self, + parsed_bcert_chain: Container, + bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain + ): + self.parsed = parsed_bcert_chain + self._BCERT_CHAIN = bcert_chain_obj + + @classmethod + def loads(cls, data: Union[str, bytes]) -> CertificateChain: + if isinstance(data, str): + data = base64.b64decode(data) + if not isinstance(data, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + + cert_chain = _BCertStructs.BCertChain + return cls( + parsed_bcert_chain=cert_chain.parse(data), + bcert_chain_obj=cert_chain + ) + + @classmethod + def load(cls, path: Union[Path, str]) -> CertificateChain: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + with Path(path).open(mode="rb") as f: + return cls.loads(f.read()) + + def dumps(self) -> bytes: + return self._BCERT_CHAIN.build(self.parsed) + + def struct(self) -> _BCertStructs.BCertChain: + return self._BCERT_CHAIN + + def get_certificate(self, index: int) -> Certificate: + return Certificate(self.parsed.certificates[index]) + + def get_security_level(self) -> int: + # not sure if there's a better way than this + return self.get_certificate(0).get_security_level() + + def get_name(self) -> str: + return self.get_certificate(0).get_name() + + def append(self, bcert: Certificate) -> None: + self.parsed.certificate_count += 1 + self.parsed.certificates.append(bcert.parsed) + self.parsed.total_length += len(bcert.dumps()) + + def prepend(self, bcert: Certificate) -> None: + self.parsed.certificate_count += 1 + self.parsed.certificates.insert(0, bcert.parsed) + self.parsed.total_length += len(bcert.dumps()) diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py new file mode 100644 index 0000000..61defea --- /dev/null +++ b/pyplayready/cdm.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import base64 +import math +import time +from typing import List +from uuid import UUID +import xml.etree.ElementTree as ET + +from Crypto.Cipher import AES +from Crypto.Hash import SHA256 +from Crypto.Random import get_random_bytes +from Crypto.Signature import DSS +from Crypto.Util.Padding import pad +from ecpy.curves import Point, Curve + +from pyplayready.bcert import CertificateChain +from pyplayready.ecc_key import ECCKey +from pyplayready.key import Key +from pyplayready.xml_key import XmlKey +from pyplayready.elgamal import ElGamal +from pyplayready.xmrlicense import XMRLicense + + +class Cdm: + def __init__( + self, + security_level: int, + certificate_chain: CertificateChain, + encryption_key: ECCKey, + signing_key: ECCKey, + client_version: str = "10.0.16384.10011" + ): + self.security_level = security_level + self.certificate_chain = certificate_chain + self.encryption_key = encryption_key + self.signing_key = signing_key + self.client_version = client_version + + self.curve = Curve.get_curve("secp256r1") + self.elgamal = ElGamal(self.curve) + + self._wmrm_key = Point( + x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b, + y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562, + curve=self.curve + ) + self._xml_key = XmlKey() + + self._keys: List[Key] = [] + + @classmethod + def from_device(cls, device) -> Cdm: + """Initialize a Playready CDM from a Playready Device (.prd) file""" + return cls( + security_level=device.security_level, + certificate_chain=device.group_certificate, + encryption_key=device.encryption_key, + signing_key=device.signing_key + ) + + def get_key_data(self): + point1, point2 = self.elgamal.encrypt( + message_point=self._xml_key.get_point(self.elgamal.curve), + public_key=self._wmrm_key + ) + return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y) + + def get_cipher_data(self): + b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode() + body = f"{b64_chain}" + + cipher = AES.new( + key=self._xml_key.aes_key, + mode=AES.MODE_CBC, + iv=self._xml_key.aes_iv + ) + + ciphertext = cipher.encrypt(pad( + body.encode(), + AES.block_size + )) + + return self._xml_key.aes_iv + ciphertext + + def _build_digest_content( + self, + content_header: str, + nonce: str, + wmrm_cipher: str, + cert_cipher: str + ) -> str: + return ( + '' + '1' + f'{content_header}' + '' + f'{self.client_version}' + '' + f'{nonce}' + f'{math.floor(time.time())}' + '' + '' + '' + '' + '' + '' + 'WMRMServer' + '' + '' + f'{wmrm_cipher}' + '' + '' + '' + '' + f'{cert_cipher}' + '' + '' + '' + ) + + @staticmethod + def _build_signed_info(digest_value: str) -> str: + return ( + '' + '' + '' + '' + '' + f'{digest_value}' + '' + '' + ) + + def get_license_challenge(self, content_header: str) -> str: + la_content = self._build_digest_content( + content_header=content_header, + nonce=base64.b64encode(get_random_bytes(16)).decode(), + wmrm_cipher=base64.b64encode(self.get_key_data()).decode(), + cert_cipher=base64.b64encode(self.get_cipher_data()).decode() + ) + + la_hash_obj = SHA256.new() + la_hash_obj.update(la_content.encode()) + la_hash = la_hash_obj.digest() + + signed_info = self._build_signed_info(base64.b64encode(la_hash).decode()) + signed_info_digest = SHA256.new(signed_info.encode()) + + signer = DSS.new(self.signing_key.key, 'fips-186-3') + signature = signer.sign(signed_info_digest) + + # haven't found a better way to do this. xmltodict.unparse doesn't work + main_body = ( + '' + '' + '' + '' + '' + '' + + la_content + + '' + + signed_info + + f'{base64.b64encode(signature).decode()}' + '' + '' + '' + f'{base64.b64encode(self.signing_key.public_bytes()).decode()}' + '' + '' + '' + '' + '' + '' + '' + '' + '' + ) + + return main_body + + def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes: + point1 = Point( + x=int.from_bytes(encrypted_key[:32], 'big'), + y=int.from_bytes(encrypted_key[32:64], 'big'), + curve=self.curve + ) + point2 = Point( + x=int.from_bytes(encrypted_key[64:96], 'big'), + y=int.from_bytes(encrypted_key[96:128], 'big'), + curve=self.curve + ) + + decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d)) + return self.elgamal.to_bytes(decrypted.x)[16:32] + + def parse_license(self, licence: str) -> None: + try: + root = ET.fromstring(licence) + license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License") + for license_element in license_elements: + parsed_licence = XMRLicense.loads(license_element.text) + for key in parsed_licence.get_content_keys(): + if Key.CipherType(key.cipher_type) == Key.CipherType.ECC256: + self._keys.append(Key( + key_id=UUID(bytes_le=key.key_id), + key_type=key.key_type, + cipher_type=key.cipher_type, + key_length=key.key_length, + key=self._decrypt_ecc256_key(key.encrypted_key) + )) + except Exception as e: + raise Exception(f"Unable to parse license, {e}") + + def get_keys(self) -> List[Key]: + return self._keys diff --git a/pyplayready/device.py b/pyplayready/device.py new file mode 100644 index 0000000..3b5f8ce --- /dev/null +++ b/pyplayready/device.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import base64 +from enum import IntEnum +from pathlib import Path +from typing import Union, Any + +from construct import Struct, Const, Int8ub, Bytes, this, Int32ub + +from pyplayready.bcert import CertificateChain +from pyplayready.ecc_key import ECCKey + + +class SecurityLevel(IntEnum): + SL150 = 150 + SL2000 = 2000 + SL3000 = 3000 + + +class _DeviceStructs: + magic = Const(b"PRD") + + v1 = Struct( + "signature" / magic, + "version" / Int8ub, + "group_key_length" / Int32ub, + "group_key" / Bytes(this.group_key_length), + "group_certificate_length" / Int32ub, + "group_certificate" / Bytes(this.group_certificate_length) + ) + + v2 = Struct( + "signature" / magic, + "version" / Int8ub, + "group_certificate_length" / Int32ub, + "group_certificate" / Bytes(this.group_certificate_length), + "encryption_key" / Bytes(96), + "signing_key" / Bytes(96), + ) + + +class Device: + CURRENT_STRUCT = _DeviceStructs.v2 + + def __init__( + self, + *_: Any, + group_certificate: Union[str, bytes], + encryption_key: Union[str, bytes], + signing_key: Union[str, bytes], + **__: Any + ): + if isinstance(group_certificate, str): + group_certificate = base64.b64decode(group_certificate) + if not isinstance(group_certificate, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}") + + if isinstance(encryption_key, str): + encryption_key = base64.b64decode(encryption_key) + if not isinstance(encryption_key, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}") + if isinstance(signing_key, str): + signing_key = base64.b64decode(signing_key) + if not isinstance(signing_key, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}") + + self.group_certificate = CertificateChain.loads(group_certificate) + self.encryption_key = ECCKey.loads(encryption_key) + self.signing_key = ECCKey.loads(signing_key) + self.security_level = self.group_certificate.get_security_level() + + @classmethod + def loads(cls, data: Union[str, bytes]) -> Device: + if isinstance(data, str): + data = base64.b64decode(data) + if not isinstance(data, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + return cls(**cls.CURRENT_STRUCT.parse(data)) + + @classmethod + def load(cls, path: Union[Path, str]) -> Device: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + with Path(path).open(mode="rb") as f: + return cls.loads(f.read()) + + def dumps(self) -> bytes: + return self.CURRENT_STRUCT.build(dict( + version=2, + group_certificate_length=len(self.group_certificate.dumps()), + group_certificate=self.group_certificate.dumps(), + encryption_key=self.encryption_key.dumps(), + signing_key=self.signing_key.dumps() + )) + + def dump(self, path: Union[Path, str]) -> None: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(self.dumps()) + + def get_name(self): + name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}" + return ''.join(char for char in name if char.isascii()).strip().lower().replace(" ", "_") diff --git a/pyplayready/ecc_key.py b/pyplayready/ecc_key.py new file mode 100644 index 0000000..bdd72f2 --- /dev/null +++ b/pyplayready/ecc_key.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import base64 +from pathlib import Path +from typing import Union + +from Crypto.Hash import SHA256 +from Crypto.PublicKey import ECC +from Crypto.PublicKey.ECC import EccKey +from ecpy.curves import Curve, Point + + +class ECCKey: + def __init__( + self, + key: EccKey + ): + """Represents a PlayReady ECC key""" + self.key = key + + @classmethod + def generate(cls): + return cls(key=ECC.generate(curve='P-256')) + + @classmethod + def construct( + cls, + private_key: Union[bytes, int], + public_key_x: Union[bytes, int], + public_key_y: Union[bytes, int] + ): + if isinstance(private_key, bytes): + private_key = int.from_bytes(private_key, 'big') + if not isinstance(private_key, int): + raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}") + + if isinstance(public_key_x, bytes): + public_key_x = int.from_bytes(public_key_x, 'big') + if not isinstance(public_key_x, int): + raise ValueError(f"Expecting Bytes or Int input, got {public_key_x!r}") + + if isinstance(public_key_y, bytes): + public_key_y = int.from_bytes(public_key_y, 'big') + if not isinstance(public_key_y, int): + raise ValueError(f"Expecting Bytes or Int input, got {public_key_y!r}") + + # The public is always derived from the private key; loading the other stuff won't work + key = ECC.construct( + curve='P-256', + d=private_key, + ) + + return cls(key=key) + + @classmethod + def loads(cls, data: Union[str, bytes]) -> ECCKey: + if isinstance(data, str): + data = base64.b64decode(data) + if not isinstance(data, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + + if len(data) not in [96, 32]: + raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}") + + return cls.construct( + private_key=data[:32], + public_key_x=data[32:64], + public_key_y=data[64:96] + ) + + @classmethod + def load(cls, path: Union[Path, str]) -> ECCKey: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + with Path(path).open(mode="rb") as f: + return cls.loads(f.read()) + + def dumps(self): + return self.private_bytes() + self.public_bytes() + + def dump(self, path: Union[Path, str]) -> None: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(self.dumps()) + + def get_point(self, curve: Curve) -> Point: + return Point(self.key.pointQ.x, self.key.pointQ.y, curve) + + def private_bytes(self) -> bytes: + return self.key.d.to_bytes() + + def private_sha256_digest(self) -> bytes: + hash_object = SHA256.new() + hash_object.update(self.private_bytes()) + return hash_object.digest() + + def public_bytes(self) -> bytes: + return self.key.pointQ.x.to_bytes() + self.key.pointQ.y.to_bytes() + + def public_sha256_digest(self) -> bytes: + hash_object = SHA256.new() + hash_object.update(self.public_bytes()) + return hash_object.digest() diff --git a/pyplayready/elgamal.py b/pyplayready/elgamal.py new file mode 100644 index 0000000..0f0b9b5 --- /dev/null +++ b/pyplayready/elgamal.py @@ -0,0 +1,36 @@ +from typing import Tuple + +from ecpy.curves import Curve, Point +import secrets + + +class ElGamal: + def __init__(self, curve: Curve): + self.curve = curve + + @staticmethod + def to_bytes(n: int) -> bytes: + byte_len = (n.bit_length() + 7) // 8 + if byte_len % 2 != 0: + byte_len += 1 + return n.to_bytes(byte_len, 'big') + + def encrypt( + self, + message_point: Point, + public_key: Point + ) -> Tuple[Point, Point]: + ephemeral_key = secrets.randbelow(self.curve.order) + point1 = ephemeral_key * self.curve.generator + point2 = message_point + (ephemeral_key * public_key) + return point1, point2 + + @staticmethod + def decrypt( + encrypted: Tuple[Point, Point], + private_key: int + ) -> Point: + point1, point2 = encrypted + shared_secret = private_key * point1 + decrypted_message = point2 - shared_secret + return decrypted_message diff --git a/pyplayready/key.py b/pyplayready/key.py new file mode 100644 index 0000000..bc656e9 --- /dev/null +++ b/pyplayready/key.py @@ -0,0 +1,42 @@ +from enum import Enum +from uuid import UUID + + +class Key: + class KeyType(Enum): + Invalid = 0x0000 + AES128CTR = 0x0001 + RC4 = 0x0002 + AES128ECB = 0x0003 + Cocktail = 0x0004 + UNKNOWN = 0xffff + + @classmethod + def _missing_(cls, value): + return cls.UNKNOWN + + class CipherType(Enum): + Invalid = 0x0000 + RSA128 = 0x0001 + ChainedLicense = 0x0002 + ECC256 = 0x0003 + ECCforScalableLicenses = 4 + UNKNOWN = 0xffff + + @classmethod + def _missing_(cls, value): + return cls.UNKNOWN + + def __init__( + self, + key_id: UUID, + key_type: int, + cipher_type: int, + key_length: int, + key: bytes + ): + self.key_id = key_id + self.key_type = self.KeyType(key_type) + self.cipher_type = self.CipherType(cipher_type) + self.key_length = key_length + self.key = key diff --git a/pyplayready/main.py b/pyplayready/main.py new file mode 100644 index 0000000..70879f6 --- /dev/null +++ b/pyplayready/main.py @@ -0,0 +1,230 @@ +import logging +from datetime import datetime +from pathlib import Path +from typing import Optional +from zlib import crc32 + +import click +import requests +from Crypto.Random import get_random_bytes + +from pyplayready import __version__ +from pyplayready.bcert import CertificateChain, Certificate +from pyplayready.cdm import Cdm +from pyplayready.device import Device +from pyplayready.ecc_key import ECCKey +from pyplayready.pssh import PSSH + + +@click.group(invoke_without_command=True) +@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.") +@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.") +def main(version: bool, debug: bool) -> None: + """Python PlayReady CDM implementation""" + logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) + log = logging.getLogger() + + current_year = datetime.now().year + copyright_years = f"2024-{current_year}" + + log.info("pyplayready version %s Copyright (c) %s DevLARLEY", __version__, copyright_years) + log.info("https://github.com/ready-dl/pyplayready") + log.info("Run 'pyplayready --help' for help") + if version: + return + + +@main.command(name="license") +@click.argument("device_path", type=Path) +@click.argument("pssh", type=PSSH) +@click.argument("server", type=str) +def license_(device_path: Path, pssh: PSSH, server: str) -> None: + """ + Make a License Request to a server using a given PSSH + Will return a list of all keys within the returned license + + Only works for standard license servers that don't use any license wrapping + """ + log = logging.getLogger("license") + + device = Device.load(device_path) + log.info(f"Loaded Device: {device.get_name()}") + + cdm = Cdm.from_device(device) + log.info("Loaded CDM") + + challenge = cdm.get_license_challenge(pssh.wrm_headers[0]) + log.info("Created License Request (Challenge)") + log.debug(challenge) + + license_res = requests.post( + url=server, + headers={ + 'Content-Type': 'text/xml; charset=UTF-8', + }, + data=challenge + ) + + if license_res.status_code != 200: + log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}") + return + + licence = license_res.text + log.info("Got License Message") + log.debug(licence) + + cdm.parse_license(licence) + log.info("License Parsed Successfully") + + for key in cdm.get_keys(): + log.info(f"{key.key_id.hex}:{key.key.hex()}") + + +@main.command() +@click.argument("device", type=Path) +@click.pass_context +def test(ctx: click.Context, device: Path) -> None: + """ + Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server. + https://testweb.playready.microsoft.com/Content/Content2X + + DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd + + MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest + + The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and + group certificate. + """ + pssh = PSSH( + "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH" + "QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh" + "AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg" + "BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA" + "UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE" + "cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD" + "AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ" + "B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA" + "ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF" + "YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT" + "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" + ) + + license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)" + + ctx.invoke( + license_, + device_path=device, + pssh=pssh, + server=license_server + ) + + +@main.command() +@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key") +@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain") +@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") +@click.pass_context +def create_device( + ctx: click.Context, + group_key: Path, + group_certificate: Path, + output: Optional[Path] = None +) -> None: + """Create a Playready Device (.prd) file from an ECC private group key and group certificate chain""" + if not group_key.is_file(): + raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx) + if not group_certificate.is_file(): + raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx) + + log = logging.getLogger("create-device") + + encryption_key = ECCKey.generate() + signing_key = ECCKey.generate() + + certificate_chain = CertificateChain.load(group_certificate) + group_key = ECCKey.load(group_key) + + new_certificate = Certificate.new_key_cert( + cert_id=get_random_bytes(16), + security_level=certificate_chain.get_security_level(), + client_id=get_random_bytes(16), + signing_key=signing_key, + encryption_key=encryption_key, + group_key=group_key, + parent=certificate_chain + ) + certificate_chain.prepend(new_certificate) + + device = Device( + group_certificate=certificate_chain.dumps(), + encryption_key=encryption_key.dumps(), + signing_key=signing_key.dumps() + ) + + prd_bin = device.dumps() + + if output and output.suffix: + if output.suffix.lower() != ".prd": + log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") + out_path = output + else: + out_dir = output or Path.cwd() + out_path = out_dir / f"{device.get_name()}_{crc32(prd_bin).to_bytes(4, 'big').hex()}.prd" + + if out_path.exists(): + log.error(f"A file already exists at the path '{out_path}', cannot overwrite.") + return + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_bytes(prd_bin) + + log.info("Created Playready Device (.prd) file, %s", out_path.name) + log.info(" + Security Level: %s", device.security_level) + log.info(" + Group Certificate: %s (%s bytes)", bool(device.group_certificate.dumps()), len(device.group_certificate.dumps())) + log.info(" + Encryption Key: %s (%s bytes)", bool(device.encryption_key.dumps()), len(device.encryption_key.dumps())) + log.info(" + Signing Key: %s (%s bytes)", bool(device.signing_key.dumps()), len(device.signing_key.dumps())) + log.info(" + Saved to: %s", out_path.absolute()) + + +@main.command() +@click.argument("prd_path", type=Path) +@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory") +@click.pass_context +def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None: + """ + Export a Playready Device (.prd) file to a Group Certificate, Encryption Key and Signing Key + If an output directory is not specified, it will be stored in the current working directory + """ + if not prd_path.is_file(): + raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx) + + log = logging.getLogger("export-device") + log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem) + + if not out_dir: + out_dir = Path.cwd() + + out_path = out_dir / prd_path.stem + if out_path.exists(): + if any(out_path.iterdir()): + log.error("Output directory is not empty, cannot overwrite.") + return + else: + log.warning("Output directory already exists, but is empty.") + else: + out_path.mkdir(parents=True) + + device = Device.load(prd_path) + + log.info(f"L{device.security_level} {device.get_name()}") + log.info(f"Saving to: {out_path}") + + client_id_path = out_path / "bgroupcert.dat" + client_id_path.write_bytes(device.group_certificate.dumps()) + log.info("Exported Group Certificate to bgroupcert.dat") + + private_key_path = out_path / "zprivencr.dat" + private_key_path.write_bytes(device.encryption_key.dumps()) + log.info("Exported Encryption Key as zprivencr.dat") + + private_key_path = out_path / "zprivsig.dat" + private_key_path.write_bytes(device.signing_key.dumps()) + log.info("Exported Signing Key as zprivsig.dat") diff --git a/pyplayready/pssh.py b/pyplayready/pssh.py new file mode 100644 index 0000000..1793978 --- /dev/null +++ b/pyplayready/pssh.py @@ -0,0 +1,87 @@ +import base64 +from typing import Union +from uuid import UUID + +from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, PaddedString, Switch, Int32ub, Const, Container + + +class _PlayreadyPSSHStructs: + PSSHBox = Struct( + "length" / Int32ub, + "pssh" / Const(b"pssh"), + "fullbox" / Int32ub, + "system_id" / Bytes(16), + "data_length" / Int32ub, + "data" / Bytes(this.data_length) + ) + + PlayreadyObject = Struct( + "type" / Int16ul, + "length" / Int16ul, + "data" / Switch( + this.type, + { + 1: PaddedString(this.length, "utf16") + }, + default=Bytes(this.length) + ) + ) + + PlayreadyHeader = Struct( + "length" / Int32ul, + "record_count" / Int16ul, + "records" / Array(this.record_count, PlayreadyObject) + ) + + +class PSSH: + SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95") + + def __init__( + self, + data: Union[str, bytes] + ): + """Represents a PlayReady PSSH""" + if not data: + raise ValueError("Data must not be empty") + + if isinstance(data, str): + try: + data = base64.b64decode(data) + except Exception as e: + raise Exception(f"Could not decode data as Base64, {e}") + + try: + if self._is_playready_pssh_box(data): + pssh_box = _PlayreadyPSSHStructs.PSSHBox.parse(data) + if bool(self._is_utf_16(pssh_box.data)): + self.wrm_headers = [pssh_box.data.decode("utf-16-le")] + elif bool(self._is_utf_16(pssh_box.data[6:])): + self.wrm_headers = [pssh_box.data[6:].decode("utf-16-le")] + elif bool(self._is_utf_16(pssh_box.data[10:])): + self.wrm_headers = [pssh_box.data[10:].decode("utf-16-le")] + else: + self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data))) + elif bool(self._is_utf_16(data)): + self.wrm_headers = [data.decode("utf-16-le")] + elif bool(self._is_utf_16(data[6:])): + self.wrm_headers = [data[6:].decode("utf-16-le")] + elif bool(self._is_utf_16(data[10:])): + self.wrm_headers = [data[10:].decode("utf-16-le")] + else: + self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data))) + except Exception: + raise Exception("Could not parse data as a PSSH Box nor a PlayReadyHeader") + + def _is_playready_pssh_box(self, data: bytes) -> bool: + return data[12:28] == self.SYSTEM_ID.bytes + + @staticmethod + def _is_utf_16(data: bytes) -> bool: + return all(map(lambda i: data[i] == 0, range(1, len(data), 2))) + + @staticmethod + def _get_wrm_headers(wrm_header: Container): + for record in wrm_header.records: + if record.type == 1: + yield record.data diff --git a/pyplayready/xml_key.py b/pyplayready/xml_key.py new file mode 100644 index 0000000..25872ef --- /dev/null +++ b/pyplayready/xml_key.py @@ -0,0 +1,18 @@ +from ecpy.curves import Point, Curve + +from pyplayready.ecc_key import ECCKey +from pyplayready.elgamal import ElGamal + + +class XmlKey: + def __init__(self): + self._shared_point = ECCKey.generate() + self.shared_key_x = self._shared_point.key.pointQ.x + self.shared_key_y = self._shared_point.key.pointQ.y + + self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x)) + self.aes_iv = self._shared_key_x_bytes[:16] + self.aes_key = self._shared_key_x_bytes[16:] + + def get_point(self, curve: Curve) -> Point: + return Point(self.shared_key_x, self.shared_key_y, curve) diff --git a/pyplayready/xmrlicense.py b/pyplayready/xmrlicense.py new file mode 100644 index 0000000..fd54125 --- /dev/null +++ b/pyplayready/xmrlicense.py @@ -0,0 +1,251 @@ +from __future__ import annotations + +import base64 +from pathlib import Path +from typing import Union + +from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container + + +class _XMRLicenseStructs: + PlayEnablerType = Struct( + "player_enabler_type" / Bytes(16) + ) + + DomainRestrictionObject = Struct( + "account_id" / Bytes(16), + "revision" / Int32ub + ) + + IssueDateObject = Struct( + "issue_date" / Int32ub + ) + + RevInfoVersionObject = Struct( + "sequence" / Int32ub + ) + + SecurityLevelObject = Struct( + "minimum_security_level" / Int16ub + ) + + EmbeddedLicenseSettingsObject = Struct( + "indicator" / Int16ub + ) + + ECCKeyObject = Struct( + "curve_type" / Int16ub, + "key_length" / Int16ub, + "key" / Bytes(this.key_length) + ) + + SignatureObject = Struct( + "signature_type" / Int16ub, + "signature_data_length" / Int16ub, + "signature_data" / Bytes(this.signature_data_length) + ) + + ContentKeyObject = Struct( + "key_id" / Bytes(16), + "key_type" / Int16ub, + "cipher_type" / Int16ub, + "key_length" / Int16ub, + "encrypted_key" / Bytes(this.key_length) + ) + + RightsSettingsObject = Struct( + "rights" / Int16ub + ) + + OutputProtectionLevelRestrictionObject = Struct( + "minimum_compressed_digital_video_opl" / Int16ub, + "minimum_uncompressed_digital_video_opl" / Int16ub, + "minimum_analog_video_opl" / Int16ub, + "minimum_digital_compressed_audio_opl" / Int16ub, + "minimum_digital_uncompressed_audio_opl" / Int16ub, + ) + + ExpirationRestrictionObject = Struct( + "begin_date" / Int32ub, + "end_date" / Int32ub + ) + + RemovalDateObject = Struct( + "removal_date" / Int32ub + ) + + UplinkKIDObject = Struct( + "uplink_kid" / Bytes(16), + "chained_checksum_type" / Int16ub, + "chained_checksum_length" / Int16ub, + "chained_checksum" / Bytes(this.chained_checksum_length) + ) + + AnalogVideoOutputConfigurationRestriction = Struct( + "video_output_protection_id" / Bytes(16), + "binary_configuration_data" / Bytes(this._.length - 24) + ) + + DigitalVideoOutputRestrictionObject = Struct( + "video_output_protection_id" / Bytes(16), + "binary_configuration_data" / Bytes(this._.length - 24) + ) + + DigitalAudioOutputRestrictionObject = Struct( + "audio_output_protection_id" / Bytes(16), + "binary_configuration_data" / Bytes(this._.length - 24) + ) + + PolicyMetadataObject = Struct( + "metadata_type" / Bytes(16), + "policy_data" / Bytes(this._.length) + ) + + SecureStopRestrictionObject = Struct( + "metering_id" / Bytes(16) + ) + + MeteringRestrictionObject = Struct( + "metering_id" / Bytes(16) + ) + + ExpirationAfterFirstPlayRestrictionObject = Struct( + "seconds" / Int32ub + ) + + GracePeriodObject = Struct( + "grace_period" / Int32ub + ) + + SourceIdObject = Struct( + "source_id" / Int32ub + ) + + AuxiliaryKey = Struct( + "location" / Int32ub, + "key" / Bytes(16) + ) + + AuxiliaryKeysObject = Struct( + "count" / Int16ub, + "auxiliary_keys" / Array(this.count, AuxiliaryKey) + ) + + UplinkKeyObject3 = Struct( + "uplink_key_id" / Bytes(16), + "chained_length" / Int16ub, + "checksum" / Bytes(this.chained_length), + "count" / Int16ub, + "entries" / Array(this.count, Int32ub) + ) + + CopyEnablerObject = Struct( + "copy_enabler_type" / Bytes(16) + ) + + CopyCountRestrictionObject = Struct( + "count" / Int32ub + ) + + MoveObject = Struct( + "minimum_move_protection_level" / Int32ub + ) + + XMRObject = Struct( + "flags" / Int16ub, + "type" / Int16ub, + "length" / Int32ub, + "data" / Switch( + lambda this_: this_.type, + { + 0x0005: OutputProtectionLevelRestrictionObject, + 0x0008: AnalogVideoOutputConfigurationRestriction, + 0x000a: ContentKeyObject, + 0x000b: SignatureObject, + 0x000d: RightsSettingsObject, + 0x0012: ExpirationRestrictionObject, + 0x0013: IssueDateObject, + 0x0016: MeteringRestrictionObject, + 0x001a: GracePeriodObject, + 0x0022: SourceIdObject, + 0x002a: ECCKeyObject, + 0x002c: PolicyMetadataObject, + 0x0029: DomainRestrictionObject, + 0x0030: ExpirationAfterFirstPlayRestrictionObject, + 0x0031: DigitalAudioOutputRestrictionObject, + 0x0032: RevInfoVersionObject, + 0x0033: EmbeddedLicenseSettingsObject, + 0x0034: SecurityLevelObject, + 0x0037: MoveObject, + 0x0039: PlayEnablerType, + 0x003a: CopyEnablerObject, + 0x003b: UplinkKIDObject, + 0x003d: CopyCountRestrictionObject, + 0x0050: RemovalDateObject, + 0x0051: AuxiliaryKeysObject, + 0x0052: UplinkKeyObject3, + 0x005a: SecureStopRestrictionObject, + 0x0059: DigitalVideoOutputRestrictionObject + }, + default=LazyBound(lambda: _XMRLicenseStructs.XMRObject) + ) + ) + + XmrLicense = Struct( + "signature" / Const(b"XMR\x00"), + "xmr_version" / Int32ub, + "rights_id" / Bytes(16), + "containers" / GreedyRange(XMRObject) + ) + + +class XMRLicense(_XMRLicenseStructs): + def __init__( + self, + parsed_license: Container, + license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense + ): + self.parsed = parsed_license + self._LICENSE = license_obj + + @classmethod + def loads(cls, data: Union[str, bytes]) -> XMRLicense: + if isinstance(data, str): + data = base64.b64decode(data) + if not isinstance(data, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + + licence = _XMRLicenseStructs.XmrLicense + return cls( + parsed_license=licence.parse(data), + license_obj=licence + ) + + @classmethod + def load(cls, path: Union[Path, str]) -> XMRLicense: + if not isinstance(path, (Path, str)): + raise ValueError(f"Expecting Path object or path string, got {path!r}") + with Path(path).open(mode="rb") as f: + return cls.loads(f.read()) + + def dumps(self) -> bytes: + return self._LICENSE.build(self.parsed) + + def struct(self) -> _XMRLicenseStructs.XmrLicense: + return self._LICENSE + + def _locate(self, container: Container): + if container.flags == 2 or container.flags == 3: + return self._locate(container.data) + else: + return container + + def get_object(self, type_: int): + for obj in self.parsed.containers: + container = self._locate(obj) + if container.type == type_: + yield container.data + + def get_content_keys(self): + for content_key in self.get_object(10): + yield content_key diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..68b670d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,41 @@ +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +name = "pyplayready" +version = "0.1.6" +description = "pyplayready CDM (Content Decryption Module) implementation in Python." +license = "GPL-3.0-only" +authors = ["DevLARLEY"] +readme = "README.md" +repository = "https://github.com/ready-dl/pyplayready" +keywords = ["python", "drm", "playready", "microsoft"] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Intended Audience :: End Users/Desktop", + "Natural Language :: English", + "Operating System :: OS Independent", + "Topic :: Multimedia :: Video", + "Topic :: Security :: Cryptography", + "Topic :: Software Development :: Libraries :: Python Modules" +] +include = [ + { path = "README.md", format = "sdist" }, + { path = "LICENSE", format = "sdist" }, +] + +[tool.poetry.urls] +"Issues" = "https://github.com/ready-dl/pyplayready/issues" + +[tool.poetry.dependencies] +python = ">=3.8,<4.0" +requests = "^2.32.3" +pycryptodome = "^3.21.0" +construct = "^2.10.70" +ECPy = "^1.2.5" +click = "^8.1.7" + +[tool.poetry.scripts] +pyplayready = "pyplayready.main:main" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b6d8bb8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests +pycryptodome +ecpy +construct +click \ No newline at end of file