Initial Commit

This commit is contained in:
BuildTools 2024-11-11 20:34:22 +01:00
parent de14f87c5c
commit dc9fc1e217
14 changed files with 1636 additions and 0 deletions

71
README.md Normal file
View File

@ -0,0 +1,71 @@
# pyplayready
All of this is already public. 100% of this code has been derived from the mspr_toolkit.
## Installation
```shell
pip install pyplayready
```
Run `pyplayready --help` to view available cli functions
## Devices
Run the command below to create a Playready Device (.prd) from a `bgroupcert.dat` and `zgpriv.dat`:
```shell
pyplayready create-device -c bgroupcert.dat -g zgpriv.dat
```
## Usage
```python
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.pssh import PSSH
import requests
device = Device.load("C:/Path/To/A/Device.prd")
cdm = Cdm.from_device(device)
pssh = PSSH(
"AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
"QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
"AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
"BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
"UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
"cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
"AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
"B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
"ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
"YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
request = cdm.get_license_challenge(pssh.wrm_headers[0])
response = requests.post(
url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)",
headers={
'Content-Type': 'text/xml; charset=UTF-8',
},
data=request,
)
cdm.parse_license(response.text)
for key in cdm.get_keys():
print(f"{key.key_id.hex}:{key.key.hex()}")
```
## Disclaimer
1. This project requires a valid Microsoft Certificate and Group Key, which are not provided by this project.
2. Public test provisions are available and provided by Microsoft to use for testing projects such as this one.
3. This project does not condone piracy or any action against the terms of the DRM systems.
4. All efforts in this project have been the result of Reverse-Engineering, Publicly available research, and Trial & Error.
5. Do not use this program to decrypt or access any content for which you do not have the legal rights or explicit permission.
6. Unauthorized decryption or distribution of copyrighted materials is a violation of applicable laws and intellectual property rights.
7. This tool must not be used for any illegal activities, including but not limited to piracy, circumventing digital rights management (DRM), or unauthorized access to protected content.
8. The developers, contributors, and maintainers of this program are not responsible for any misuse or illegal activities performed using this software.
9. By using this program, you agree to comply with all applicable laws and regulations governing digital rights and copyright protections.
## Credits
+ [mspr_toolkit](https://security-explorations.com/materials/mspr_toolkit.zip)

1
pyplayready/__init__.py Normal file
View File

@ -0,0 +1 @@
__version__ = "0.0.1"

428
pyplayready/bcert.py Normal file
View File

@ -0,0 +1,428 @@
from __future__ import annotations
import base64
from pathlib import Path
from typing import Union
from Crypto.Hash import SHA256
from Crypto.Signature import DSS
from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
from construct import Int16ub, Array
from construct import Struct, this
from pyplayready.ecc_key import ECCKey
class _BCertStructs:
DrmBCertBasicInfo = Struct(
"cert_id" / Bytes(16),
"security_level" / Int32ub,
"flags" / Int32ub,
"cert_type" / Int32ub,
"public_key_digest" / Bytes(32),
"expiration_date" / Int32ub,
"client_id" / Bytes(16)
)
# TODO: untested
DrmBCertDomainInfo = Struct(
"service_id" / Bytes(16),
"account_id" / Bytes(16),
"revision_timestamp" / Int32ub,
"domain_url_length" / Int32ub,
"domain_url" / Bytes((this.domain_url_length + 3) & 0xfffffffc)
)
# TODO: untested
DrmBCertPCInfo = Struct(
"security_version" / Int32ub
)
# TODO: untested
DrmBCertDeviceInfo = Struct(
"max_license" / Int32ub,
"max_header" / Int32ub,
"max_chain_depth" / Int32ub
)
DrmBCertFeatureInfo = Struct(
"feature_count" / Int32ub,
"features" / Array(this.feature_count, Int32ub)
)
DrmBCertKeyInfo = Struct(
"key_count" / Int32ub,
"cert_keys" / Array(this.key_count, Struct(
"type" / Int16ub,
"length" / Int16ub,
"flags" / Int32ub,
"key" / Bytes(this.length // 8),
"usages_count" / Int32ub,
"usages" / Array(this.usages_count, Int32ub)
))
)
DrmBCertManufacturerInfo = Struct(
"flags" / Int32ub,
"manufacturer_name_length" / Int32ub,
"manufacturer_name" / Bytes((this.manufacturer_name_length + 3) & 0xfffffffc),
"model_name_length" / Int32ub,
"model_name" / Bytes((this.model_name_length + 3) & 0xfffffffc),
"model_number_length" / Int32ub,
"model_number" / Bytes((this.model_number_length + 3) & 0xfffffffc),
)
DrmBCertSignatureInfo = Struct(
"signature_type" / Int16ub,
"signature_size" / Int16ub,
"signature" / Bytes(this.signature_size),
"signature_key_size" / Int32ub,
"signature_key" / Bytes(this.signature_key_size // 8)
)
# TODO: untested
DrmBCertSilverlightInfo = Struct(
"security_version" / Int32ub,
"platform_identifier" / Int32ub
)
# TODO: untested
DrmBCertMeteringInfo = Struct(
"metering_id" / Bytes(16),
"metering_url_length" / Int32ub,
"metering_url" / Bytes((this.metering_url_length + 3) & 0xfffffffc)
)
# TODO: untested
DrmBCertExtDataSignKeyInfo = Struct(
"type" / Int16ub,
"length" / Int16ub,
"flags" / Int32ub,
"key" / Bytes(this.length // 8)
)
# TODO: untested
BCertExtDataRecord = Struct(
"data_size" / Int32ub,
"data" / Bytes(this.data_size)
)
# TODO: untested
DrmBCertExtDataSignature = Struct(
"signature_type" / Int16ub,
"signature_size" / Int16ub,
"signature" / Bytes(this.signature_size)
)
# TODO: untested
BCertExtDataContainer = Struct(
"record_count" / Int32ub,
"records" / Array(this.record_count, BCertExtDataRecord),
"signature" / DrmBCertExtDataSignature
)
# TODO: untested
DrmBCertServerInfo = Struct(
"warning_days" / Int32ub
)
# TODO: untested
DrmBcertSecurityVersion = Struct(
"security_version" / Int32ub,
"platform_identifier" / Int32ub
)
Attribute = Struct(
"flags" / Int16ub,
"tag" / Int16ub,
"length" / Int32ub,
"attribute" / Switch(
lambda this_: this_.tag,
{
1: DrmBCertBasicInfo,
2: DrmBCertDomainInfo,
3: DrmBCertPCInfo,
4: DrmBCertDeviceInfo,
5: DrmBCertFeatureInfo,
6: DrmBCertKeyInfo,
7: DrmBCertManufacturerInfo,
8: DrmBCertSignatureInfo,
9: DrmBCertSilverlightInfo,
10: DrmBCertMeteringInfo,
11: DrmBCertExtDataSignKeyInfo,
12: BCertExtDataContainer,
13: DrmBCertExtDataSignature,
14: Bytes(this.length - 8),
15: DrmBCertServerInfo,
16: DrmBcertSecurityVersion,
17: DrmBcertSecurityVersion
},
default=Bytes(this.length - 8)
)
)
BCert = Struct(
"signature" / Const(b"CERT"),
"version" / Int32ub,
"total_length" / Int32ub,
"certificate_length" / Int32ub,
"attributes" / GreedyRange(Attribute)
)
BCertChain = Struct(
"signature" / Const(b"CHAI"),
"version" / Int32ub,
"total_length" / Int32ub,
"flags" / Int32ub,
"certificate_count" / Int32ub,
"certificates" / GreedyRange(BCert)
)
class Certificate(_BCertStructs):
def __init__(
self,
parsed_bcert: Container,
bcert_obj: _BCertStructs.BCert = _BCertStructs.BCert
):
self.parsed = parsed_bcert
self._BCERT = bcert_obj
@classmethod
def new_key_cert(
cls,
cert_id: bytes,
security_level: int,
client_id: bytes,
signing_key: ECCKey,
encryption_key: ECCKey,
group_key: ECCKey,
parent: CertificateChain,
expiry: int = 0xFFFFFFFF,
max_license: int = 10240,
max_header: int = 15360,
max_chain_depth: int = 2
) -> Certificate:
if not cert_id:
raise ValueError("Certificate ID is required")
if not client_id:
raise ValueError("Client ID is required")
basic_info = Container(
cert_id=cert_id,
security_level=security_level,
flags=0,
cert_type=2,
public_key_digest=signing_key.public_sha256_digest(),
expiration_date=expiry,
client_id=client_id
)
basic_info_attribute = Container(
flags=1,
tag=1,
length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8,
attribute=basic_info
)
device_info = Container(
max_license=max_license,
max_header=max_header,
max_chain_depth=max_chain_depth
)
device_info_attribute = Container(
flags=1,
tag=4,
length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8,
attribute=device_info
)
feature = Container(
feature_count=1,
features=ListContainer([
4
])
)
feature_attribute = Container(
flags=1,
tag=5,
length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8,
attribute=feature
)
cert_key_sign = Container(
type=1,
length=512, # bits
flags=0,
key=signing_key.public_bytes(),
usages_count=1,
usages=ListContainer([
1
])
)
cert_key_encrypt = Container(
type=1,
length=512, # bits
flags=0,
key=encryption_key.public_bytes(),
usages_count=1,
usages=ListContainer([
2
])
)
key_info = Container(
key_count=2,
cert_keys=ListContainer([
cert_key_sign,
cert_key_encrypt
])
)
key_info_attribute = Container(
flags=1,
tag=6,
length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8,
attribute=key_info
)
manufacturer_info = parent.get_certificate(0).get_attribute(7)
new_bcert_container = Container(
signature=b"CERT",
version=1,
total_length=0, # filled at a later time
certificate_length=0, # filled at a later time
attributes=ListContainer([
basic_info_attribute,
device_info_attribute,
feature_attribute,
key_info_attribute,
manufacturer_info,
])
)
payload = _BCertStructs.BCert.build(new_bcert_container)
new_bcert_container.certificate_length = len(payload)
new_bcert_container.total_length = len(payload) + 144 # signature length
sign_payload = _BCertStructs.BCert.build(new_bcert_container)
hash_obj = SHA256.new(sign_payload)
signer = DSS.new(group_key.key, 'fips-186-3')
signature = signer.sign(hash_obj)
signature_info = Container(
signature_type=1,
signature_size=64,
signature=signature,
signature_key_size=512, # bits
signature_key=group_key.public_bytes()
)
signature_info_attribute = Container(
flags=1,
tag=8,
length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8,
attribute=signature_info
)
new_bcert_container.attributes.append(signature_info_attribute)
return cls(new_bcert_container)
@classmethod
def loads(cls, data: Union[str, bytes]) -> Certificate:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
cert = _BCertStructs.BCert
return cls(
parsed_bcert=cert.parse(data),
bcert_obj=cert
)
@classmethod
def load(cls, path: Union[Path, str]) -> Certificate:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def get_attribute(self, type_: int):
for attribute in self.parsed.attributes:
if attribute.tag == type_:
return attribute
def get_security_level(self) -> int:
basic_info_attribute = self.get_attribute(1).attribute
if basic_info_attribute:
return basic_info_attribute.security_level
@staticmethod
def _unpad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
def get_name(self):
manufacturer_info = self.get_attribute(7).attribute
if manufacturer_info:
return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
def dumps(self) -> bytes:
return self._BCERT.build(self.parsed)
def struct(self) -> _BCertStructs.BCert:
return self._BCERT
class CertificateChain(_BCertStructs):
def __init__(
self,
parsed_bcert_chain: Container,
bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain
):
self.parsed = parsed_bcert_chain
self._BCERT_CHAIN = bcert_chain_obj
@classmethod
def loads(cls, data: Union[str, bytes]) -> CertificateChain:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
cert_chain = _BCertStructs.BCertChain
return cls(
parsed_bcert_chain=cert_chain.parse(data),
bcert_chain_obj=cert_chain
)
@classmethod
def load(cls, path: Union[Path, str]) -> CertificateChain:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
return self._BCERT_CHAIN.build(self.parsed)
def struct(self) -> _BCertStructs.BCertChain:
return self._BCERT_CHAIN
def get_certificate(self, index: int) -> Certificate:
return Certificate(self.parsed.certificates[index])
def get_security_level(self) -> int:
# not sure if there's a better way than this
return self.get_certificate(0).get_security_level()
def get_name(self) -> str:
return self.get_certificate(0).get_name()
def append(self, bcert: Certificate) -> None:
self.parsed.certificate_count += 1
self.parsed.certificates.append(bcert.parsed)
self.parsed.total_length += len(bcert.dumps())
def prepend(self, bcert: Certificate) -> None:
self.parsed.certificate_count += 1
self.parsed.certificates.insert(0, bcert.parsed)
self.parsed.total_length += len(bcert.dumps())

216
pyplayready/cdm.py Normal file
View File

@ -0,0 +1,216 @@
from __future__ import annotations
import base64
import math
import time
from typing import List
from uuid import UUID
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Random import get_random_bytes
from Crypto.Signature import DSS
from Crypto.Util.Padding import pad
from ecpy.curves import Point, Curve
from pyplayready.bcert import CertificateChain
from pyplayready.ecc_key import ECCKey
from pyplayready.key import Key
from pyplayready.xml_key import XmlKey
from pyplayready.elgamal import ElGamal
from pyplayready.xmrlicense import XMRLicense
class Cdm:
def __init__(
self,
security_level: int,
certificate_chain: CertificateChain,
encryption_key: ECCKey,
signing_key: ECCKey,
client_version: str = "10.0.16384.10011"
):
self.security_level = security_level
self.certificate_chain = certificate_chain
self.encryption_key = encryption_key
self.signing_key = signing_key
self.client_version = client_version
self.curve = Curve.get_curve("secp256r1")
self.elgamal = ElGamal(self.curve)
self._wmrm_key = Point(
x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
curve=self.curve
)
self._xml_key = XmlKey()
self._keys: List[Key] = []
@classmethod
def from_device(cls, device) -> Cdm:
"""Initialize a Playready CDM from a Playready Device (.prd) file"""
return cls(
security_level=device.security_level,
certificate_chain=device.group_certificate,
encryption_key=device.encryption_key,
signing_key=device.signing_key
)
def get_key_data(self):
point1, point2 = self.elgamal.encrypt(
message_point=self._xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key
)
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
def get_cipher_data(self):
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>"
cipher = AES.new(
key=self._xml_key.aes_key,
mode=AES.MODE_CBC,
iv=self._xml_key.aes_iv
)
ciphertext = cipher.encrypt(pad(
body.encode(),
AES.block_size
))
return self._xml_key.aes_iv + ciphertext
def _build_digest_content(
self,
content_header: str,
nonce: str,
wmrm_cipher: str,
cert_cipher: str
) -> str:
return (
'<LA xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols" Id="SignedData" xml:space="preserve">'
'<Version>1</Version>'
f'<ContentHeader>{content_header}</ContentHeader>'
'<CLIENTINFO>'
f'<CLIENTVERSION>{self.client_version}</CLIENTVERSION>'
'</CLIENTINFO>'
f'<LicenseNonce>{nonce}</LicenseNonce>'
f'<ClientTime>{math.floor(time.time())}</ClientTime>'
'<EncryptedData xmlns="http://www.w3.org/2001/04/xmlenc#" Type="http://www.w3.org/2001/04/xmlenc#Element">'
'<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes128-cbc"></EncryptionMethod>'
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<EncryptedKey xmlns="http://www.w3.org/2001/04/xmlenc#">'
'<EncryptionMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256"></EncryptionMethod>'
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<KeyName>WMRMServer</KeyName>'
'</KeyInfo>'
'<CipherData>'
f'<CipherValue>{wmrm_cipher}</CipherValue>'
'</CipherData>'
'</EncryptedKey>'
'</KeyInfo>'
'<CipherData>'
f'<CipherValue>{cert_cipher}</CipherValue>'
'</CipherData>'
'</EncryptedData>'
'</LA>'
)
@staticmethod
def _build_signed_info(digest_value: str) -> str:
return (
'<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"></CanonicalizationMethod>'
'<SignatureMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256"></SignatureMethod>'
'<Reference URI="#SignedData">'
'<DigestMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#sha256"></DigestMethod>'
f'<DigestValue>{digest_value}</DigestValue>'
'</Reference>'
'</SignedInfo>'
)
def get_license_challenge(self, content_header: str) -> str:
la_content = self._build_digest_content(
content_header=content_header,
nonce=base64.b64encode(get_random_bytes(16)).decode(),
wmrm_cipher=base64.b64encode(self.get_key_data()).decode(),
cert_cipher=base64.b64encode(self.get_cipher_data()).decode()
)
la_hash_obj = SHA256.new()
la_hash_obj.update(la_content.encode())
la_hash = la_hash_obj.digest()
signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
signed_info_digest = SHA256.new(signed_info.encode())
signer = DSS.new(self.signing_key.key, 'fips-186-3')
signature = signer.sign(signed_info_digest)
# haven't found a better way to do this. xmltodict.unparse doesn't work
main_body = (
'<?xml version="1.0" encoding="utf-8"?>'
'<soap:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">'
'<soap:Body>'
'<AcquireLicense xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols">'
'<challenge>'
'<Challenge xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols/messages">'
+ la_content +
'<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">'
+ signed_info +
f'<SignatureValue>{base64.b64encode(signature).decode()}</SignatureValue>'
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<KeyValue>'
'<ECCKeyValue>'
f'<PublicKey>{base64.b64encode(self.signing_key.public_bytes()).decode()}</PublicKey>'
'</ECCKeyValue>'
'</KeyValue>'
'</KeyInfo>'
'</Signature>'
'</Challenge>'
'</challenge>'
'</AcquireLicense>'
'</soap:Body>'
'</soap:Envelope>'
)
return main_body
def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes:
point1 = Point(
x=int.from_bytes(encrypted_key[:32], 'big'),
y=int.from_bytes(encrypted_key[32:64], 'big'),
curve=self.curve
)
point2 = Point(
x=int.from_bytes(encrypted_key[64:96], 'big'),
y=int.from_bytes(encrypted_key[96:128], 'big'),
curve=self.curve
)
decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d))
return self.elgamal.to_bytes(decrypted.x)[16:32]
def parse_license(self, licence: str) -> None:
try:
root = ET.fromstring(licence)
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
for license_element in license_elements:
parsed_licence = XMRLicense.loads(license_element.text)
for key in parsed_licence.get_content_keys():
if Key.CipherType(key.cipher_type) == Key.CipherType.ECC256:
self._keys.append(Key(
key_id=UUID(bytes_le=key.key_id),
key_type=key.key_type,
cipher_type=key.cipher_type,
key_length=key.key_length,
key=self._decrypt_ecc256_key(key.encrypted_key)
))
except Exception as e:
raise Exception(f"Unable to parse license, {e}")
def get_keys(self) -> List[Key]:
return self._keys

105
pyplayready/device.py Normal file
View File

@ -0,0 +1,105 @@
from __future__ import annotations
import base64
from enum import IntEnum
from pathlib import Path
from typing import Union, Any
from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
from pyplayready.bcert import CertificateChain
from pyplayready.ecc_key import ECCKey
class SecurityLevel(IntEnum):
SL150 = 150
SL2000 = 2000
SL3000 = 3000
class _DeviceStructs:
magic = Const(b"PRD")
v1 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key_length" / Int32ub,
"group_key" / Bytes(this.group_key_length),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length)
)
v2 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
)
class Device:
CURRENT_STRUCT = _DeviceStructs.v2
def __init__(
self,
*_: Any,
group_certificate: Union[str, bytes],
encryption_key: Union[str, bytes],
signing_key: Union[str, bytes],
**__: Any
):
if isinstance(group_certificate, str):
group_certificate = base64.b64decode(group_certificate)
if not isinstance(group_certificate, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
if isinstance(encryption_key, str):
encryption_key = base64.b64decode(encryption_key)
if not isinstance(encryption_key, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}")
if isinstance(signing_key, str):
signing_key = base64.b64decode(signing_key)
if not isinstance(signing_key, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}")
self.group_certificate = CertificateChain.loads(group_certificate)
self.encryption_key = ECCKey.loads(encryption_key)
self.signing_key = ECCKey.loads(signing_key)
self.security_level = self.group_certificate.get_security_level()
@classmethod
def loads(cls, data: Union[str, bytes]) -> Device:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
return cls(**cls.CURRENT_STRUCT.parse(data))
@classmethod
def load(cls, path: Union[Path, str]) -> Device:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
return self.CURRENT_STRUCT.build(dict(
version=2,
group_certificate_length=len(self.group_certificate.dumps()),
group_certificate=self.group_certificate.dumps(),
encryption_key=self.encryption_key.dumps(),
signing_key=self.signing_key.dumps()
))
def dump(self, path: Union[Path, str]) -> None:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps())
def get_name(self):
name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}"
return ''.join(char for char in name if char.isascii()).strip().lower().replace(" ", "_")

105
pyplayready/ecc_key.py Normal file
View File

@ -0,0 +1,105 @@
from __future__ import annotations
import base64
from pathlib import Path
from typing import Union
from Crypto.Hash import SHA256
from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from ecpy.curves import Curve, Point
class ECCKey:
def __init__(
self,
key: EccKey
):
"""Represents a PlayReady ECC key"""
self.key = key
@classmethod
def generate(cls):
return cls(key=ECC.generate(curve='P-256'))
@classmethod
def construct(
cls,
private_key: Union[bytes, int],
public_key_x: Union[bytes, int],
public_key_y: Union[bytes, int]
):
if isinstance(private_key, bytes):
private_key = int.from_bytes(private_key, 'big')
if not isinstance(private_key, int):
raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
if isinstance(public_key_x, bytes):
public_key_x = int.from_bytes(public_key_x, 'big')
if not isinstance(public_key_x, int):
raise ValueError(f"Expecting Bytes or Int input, got {public_key_x!r}")
if isinstance(public_key_y, bytes):
public_key_y = int.from_bytes(public_key_y, 'big')
if not isinstance(public_key_y, int):
raise ValueError(f"Expecting Bytes or Int input, got {public_key_y!r}")
# The public is always derived from the private key; loading the other stuff won't work
key = ECC.construct(
curve='P-256',
d=private_key,
)
return cls(key=key)
@classmethod
def loads(cls, data: Union[str, bytes]) -> ECCKey:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
if len(data) not in [96, 32]:
raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}")
return cls.construct(
private_key=data[:32],
public_key_x=data[32:64],
public_key_y=data[64:96]
)
@classmethod
def load(cls, path: Union[Path, str]) -> ECCKey:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self):
return self.private_bytes() + self.public_bytes()
def dump(self, path: Union[Path, str]) -> None:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps())
def get_point(self, curve: Curve) -> Point:
return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
def private_bytes(self) -> bytes:
return self.key.d.to_bytes()
def private_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
hash_object.update(self.private_bytes())
return hash_object.digest()
def public_bytes(self) -> bytes:
return self.key.pointQ.x.to_bytes() + self.key.pointQ.y.to_bytes()
def public_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
hash_object.update(self.public_bytes())
return hash_object.digest()

36
pyplayready/elgamal.py Normal file
View File

@ -0,0 +1,36 @@
from typing import Tuple
from ecpy.curves import Curve, Point
import secrets
class ElGamal:
def __init__(self, curve: Curve):
self.curve = curve
@staticmethod
def to_bytes(n: int) -> bytes:
byte_len = (n.bit_length() + 7) // 8
if byte_len % 2 != 0:
byte_len += 1
return n.to_bytes(byte_len, 'big')
def encrypt(
self,
message_point: Point,
public_key: Point
) -> Tuple[Point, Point]:
ephemeral_key = secrets.randbelow(self.curve.order)
point1 = ephemeral_key * self.curve.generator
point2 = message_point + (ephemeral_key * public_key)
return point1, point2
@staticmethod
def decrypt(
encrypted: Tuple[Point, Point],
private_key: int
) -> Point:
point1, point2 = encrypted
shared_secret = private_key * point1
decrypted_message = point2 - shared_secret
return decrypted_message

42
pyplayready/key.py Normal file
View File

@ -0,0 +1,42 @@
from enum import Enum
from uuid import UUID
class Key:
class KeyType(Enum):
Invalid = 0x0000
AES128CTR = 0x0001
RC4 = 0x0002
AES128ECB = 0x0003
Cocktail = 0x0004
UNKNOWN = 0xffff
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
class CipherType(Enum):
Invalid = 0x0000
RSA128 = 0x0001
ChainedLicense = 0x0002
ECC256 = 0x0003
ECCforScalableLicenses = 4
UNKNOWN = 0xffff
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
def __init__(
self,
key_id: UUID,
key_type: int,
cipher_type: int,
key_length: int,
key: bytes
):
self.key_id = key_id
self.key_type = self.KeyType(key_type)
self.cipher_type = self.CipherType(cipher_type)
self.key_length = key_length
self.key = key

230
pyplayready/main.py Normal file
View File

@ -0,0 +1,230 @@
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from zlib import crc32
import click
import requests
from Crypto.Random import get_random_bytes
from pyplayready import __version__
from pyplayready.bcert import CertificateChain, Certificate
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.ecc_key import ECCKey
from pyplayready.pssh import PSSH
@click.group(invoke_without_command=True)
@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.")
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.")
def main(version: bool, debug: bool) -> None:
"""Python PlayReady CDM implementation"""
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
log = logging.getLogger()
current_year = datetime.now().year
copyright_years = f"2024-{current_year}"
log.info("pyplayready version %s Copyright (c) %s DevLARLEY", __version__, copyright_years)
log.info("https://github.com/ready-dl/pyplayready")
log.info("Run 'pyplayready --help' for help")
if version:
return
@main.command(name="license")
@click.argument("device_path", type=Path)
@click.argument("pssh", type=PSSH)
@click.argument("server", type=str)
def license_(device_path: Path, pssh: PSSH, server: str) -> None:
"""
Make a License Request to a server using a given PSSH
Will return a list of all keys within the returned license
Only works for standard license servers that don't use any license wrapping
"""
log = logging.getLogger("license")
device = Device.load(device_path)
log.info(f"Loaded Device: {device.get_name()}")
cdm = Cdm.from_device(device)
log.info("Loaded CDM")
challenge = cdm.get_license_challenge(pssh.wrm_headers[0])
log.info("Created License Request (Challenge)")
log.debug(challenge)
license_res = requests.post(
url=server,
headers={
'Content-Type': 'text/xml; charset=UTF-8',
},
data=challenge
)
if license_res.status_code != 200:
log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}")
return
licence = license_res.text
log.info("Got License Message")
log.debug(licence)
cdm.parse_license(licence)
log.info("License Parsed Successfully")
for key in cdm.get_keys():
log.info(f"{key.key_id.hex}:{key.key.hex()}")
@main.command()
@click.argument("device", type=Path)
@click.pass_context
def test(ctx: click.Context, device: Path) -> None:
"""
Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
https://testweb.playready.microsoft.com/Content/Content2X
+ DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
+ MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and
group certificate.
"""
pssh = PSSH(
"AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
"QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
"AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
"BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
"UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
"cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
"AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
"B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
"ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
"YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)"
ctx.invoke(
license_,
device_path=device,
pssh=pssh,
server=license_server
)
@main.command()
@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key")
@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain")
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def create_device(
ctx: click.Context,
group_key: Path,
group_certificate: Path,
output: Optional[Path] = None
) -> None:
"""Create a Playready Device (.prd) file from an ECC private group key and group certificate chain"""
if not group_key.is_file():
raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
if not group_certificate.is_file():
raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("create-device")
encryption_key = ECCKey.generate()
signing_key = ECCKey.generate()
certificate_chain = CertificateChain.load(group_certificate)
group_key = ECCKey.load(group_key)
new_certificate = Certificate.new_key_cert(
cert_id=get_random_bytes(16),
security_level=certificate_chain.get_security_level(),
client_id=get_random_bytes(16),
signing_key=signing_key,
encryption_key=encryption_key,
group_key=group_key,
parent=certificate_chain
)
certificate_chain.prepend(new_certificate)
device = Device(
group_certificate=certificate_chain.dumps(),
encryption_key=encryption_key.dumps(),
signing_key=signing_key.dumps()
)
prd_bin = device.dumps()
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
out_path = output
else:
out_dir = output or Path.cwd()
out_path = out_dir / f"{device.get_name()}_{crc32(prd_bin).to_bytes(4, 'big').hex()}.prd"
if out_path.exists():
log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
return
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(prd_bin)
log.info("Created Playready Device (.prd) file, %s", out_path.name)
log.info(" + Security Level: %s", device.security_level)
log.info(" + Group Certificate: %s (%s bytes)", bool(device.group_certificate.dumps()), len(device.group_certificate.dumps()))
log.info(" + Encryption Key: %s (%s bytes)", bool(device.encryption_key.dumps()), len(device.encryption_key.dumps()))
log.info(" + Signing Key: %s (%s bytes)", bool(device.signing_key.dumps()), len(device.signing_key.dumps()))
log.info(" + Saved to: %s", out_path.absolute())
@main.command()
@click.argument("prd_path", type=Path)
@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory")
@click.pass_context
def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None:
"""
Export a Playready Device (.prd) file to a Group Certificate, Encryption Key and Signing Key
If an output directory is not specified, it will be stored in the current working directory
"""
if not prd_path.is_file():
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("export-device")
log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem)
if not out_dir:
out_dir = Path.cwd()
out_path = out_dir / prd_path.stem
if out_path.exists():
if any(out_path.iterdir()):
log.error("Output directory is not empty, cannot overwrite.")
return
else:
log.warning("Output directory already exists, but is empty.")
else:
out_path.mkdir(parents=True)
device = Device.load(prd_path)
log.info(f"L{device.security_level} {device.get_name()}")
log.info(f"Saving to: {out_path}")
client_id_path = out_path / "bgroupcert.dat"
client_id_path.write_bytes(device.group_certificate.dumps())
log.info("Exported Group Certificate to bgroupcert.dat")
private_key_path = out_path / "zprivencr.dat"
private_key_path.write_bytes(device.encryption_key.dumps())
log.info("Exported Encryption Key as zprivencr.dat")
private_key_path = out_path / "zprivsig.dat"
private_key_path.write_bytes(device.signing_key.dumps())
log.info("Exported Signing Key as zprivsig.dat")

87
pyplayready/pssh.py Normal file
View File

@ -0,0 +1,87 @@
import base64
from typing import Union
from uuid import UUID
from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, PaddedString, Switch, Int32ub, Const, Container
class _PlayreadyPSSHStructs:
PSSHBox = Struct(
"length" / Int32ub,
"pssh" / Const(b"pssh"),
"fullbox" / Int32ub,
"system_id" / Bytes(16),
"data_length" / Int32ub,
"data" / Bytes(this.data_length)
)
PlayreadyObject = Struct(
"type" / Int16ul,
"length" / Int16ul,
"data" / Switch(
this.type,
{
1: PaddedString(this.length, "utf16")
},
default=Bytes(this.length)
)
)
PlayreadyHeader = Struct(
"length" / Int32ul,
"record_count" / Int16ul,
"records" / Array(this.record_count, PlayreadyObject)
)
class PSSH:
SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95")
def __init__(
self,
data: Union[str, bytes]
):
"""Represents a PlayReady PSSH"""
if not data:
raise ValueError("Data must not be empty")
if isinstance(data, str):
try:
data = base64.b64decode(data)
except Exception as e:
raise Exception(f"Could not decode data as Base64, {e}")
try:
if self._is_playready_pssh_box(data):
pssh_box = _PlayreadyPSSHStructs.PSSHBox.parse(data)
if bool(self._is_utf_16(pssh_box.data)):
self.wrm_headers = [pssh_box.data.decode("utf-16-le")]
elif bool(self._is_utf_16(pssh_box.data[6:])):
self.wrm_headers = [pssh_box.data[6:].decode("utf-16-le")]
elif bool(self._is_utf_16(pssh_box.data[10:])):
self.wrm_headers = [pssh_box.data[10:].decode("utf-16-le")]
else:
self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data)))
elif bool(self._is_utf_16(data)):
self.wrm_headers = [data.decode("utf-16-le")]
elif bool(self._is_utf_16(data[6:])):
self.wrm_headers = [data[6:].decode("utf-16-le")]
elif bool(self._is_utf_16(data[10:])):
self.wrm_headers = [data[10:].decode("utf-16-le")]
else:
self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data)))
except Exception:
raise Exception("Could not parse data as a PSSH Box nor a PlayReadyHeader")
def _is_playready_pssh_box(self, data: bytes) -> bool:
return data[12:28] == self.SYSTEM_ID.bytes
@staticmethod
def _is_utf_16(data: bytes) -> bool:
return all(map(lambda i: data[i] == 0, range(1, len(data), 2)))
@staticmethod
def _get_wrm_headers(wrm_header: Container):
for record in wrm_header.records:
if record.type == 1:
yield record.data

18
pyplayready/xml_key.py Normal file
View File

@ -0,0 +1,18 @@
from ecpy.curves import Point, Curve
from pyplayready.ecc_key import ECCKey
from pyplayready.elgamal import ElGamal
class XmlKey:
def __init__(self):
self._shared_point = ECCKey.generate()
self.shared_key_x = self._shared_point.key.pointQ.x
self.shared_key_y = self._shared_point.key.pointQ.y
self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
self.aes_iv = self._shared_key_x_bytes[:16]
self.aes_key = self._shared_key_x_bytes[16:]
def get_point(self, curve: Curve) -> Point:
return Point(self.shared_key_x, self.shared_key_y, curve)

251
pyplayready/xmrlicense.py Normal file
View File

@ -0,0 +1,251 @@
from __future__ import annotations
import base64
from pathlib import Path
from typing import Union
from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
class _XMRLicenseStructs:
PlayEnablerType = Struct(
"player_enabler_type" / Bytes(16)
)
DomainRestrictionObject = Struct(
"account_id" / Bytes(16),
"revision" / Int32ub
)
IssueDateObject = Struct(
"issue_date" / Int32ub
)
RevInfoVersionObject = Struct(
"sequence" / Int32ub
)
SecurityLevelObject = Struct(
"minimum_security_level" / Int16ub
)
EmbeddedLicenseSettingsObject = Struct(
"indicator" / Int16ub
)
ECCKeyObject = Struct(
"curve_type" / Int16ub,
"key_length" / Int16ub,
"key" / Bytes(this.key_length)
)
SignatureObject = Struct(
"signature_type" / Int16ub,
"signature_data_length" / Int16ub,
"signature_data" / Bytes(this.signature_data_length)
)
ContentKeyObject = Struct(
"key_id" / Bytes(16),
"key_type" / Int16ub,
"cipher_type" / Int16ub,
"key_length" / Int16ub,
"encrypted_key" / Bytes(this.key_length)
)
RightsSettingsObject = Struct(
"rights" / Int16ub
)
OutputProtectionLevelRestrictionObject = Struct(
"minimum_compressed_digital_video_opl" / Int16ub,
"minimum_uncompressed_digital_video_opl" / Int16ub,
"minimum_analog_video_opl" / Int16ub,
"minimum_digital_compressed_audio_opl" / Int16ub,
"minimum_digital_uncompressed_audio_opl" / Int16ub,
)
ExpirationRestrictionObject = Struct(
"begin_date" / Int32ub,
"end_date" / Int32ub
)
RemovalDateObject = Struct(
"removal_date" / Int32ub
)
UplinkKIDObject = Struct(
"uplink_kid" / Bytes(16),
"chained_checksum_type" / Int16ub,
"chained_checksum_length" / Int16ub,
"chained_checksum" / Bytes(this.chained_checksum_length)
)
AnalogVideoOutputConfigurationRestriction = Struct(
"video_output_protection_id" / Bytes(16),
"binary_configuration_data" / Bytes(this._.length - 24)
)
DigitalVideoOutputRestrictionObject = Struct(
"video_output_protection_id" / Bytes(16),
"binary_configuration_data" / Bytes(this._.length - 24)
)
DigitalAudioOutputRestrictionObject = Struct(
"audio_output_protection_id" / Bytes(16),
"binary_configuration_data" / Bytes(this._.length - 24)
)
PolicyMetadataObject = Struct(
"metadata_type" / Bytes(16),
"policy_data" / Bytes(this._.length)
)
SecureStopRestrictionObject = Struct(
"metering_id" / Bytes(16)
)
MeteringRestrictionObject = Struct(
"metering_id" / Bytes(16)
)
ExpirationAfterFirstPlayRestrictionObject = Struct(
"seconds" / Int32ub
)
GracePeriodObject = Struct(
"grace_period" / Int32ub
)
SourceIdObject = Struct(
"source_id" / Int32ub
)
AuxiliaryKey = Struct(
"location" / Int32ub,
"key" / Bytes(16)
)
AuxiliaryKeysObject = Struct(
"count" / Int16ub,
"auxiliary_keys" / Array(this.count, AuxiliaryKey)
)
UplinkKeyObject3 = Struct(
"uplink_key_id" / Bytes(16),
"chained_length" / Int16ub,
"checksum" / Bytes(this.chained_length),
"count" / Int16ub,
"entries" / Array(this.count, Int32ub)
)
CopyEnablerObject = Struct(
"copy_enabler_type" / Bytes(16)
)
CopyCountRestrictionObject = Struct(
"count" / Int32ub
)
MoveObject = Struct(
"minimum_move_protection_level" / Int32ub
)
XMRObject = Struct(
"flags" / Int16ub,
"type" / Int16ub,
"length" / Int32ub,
"data" / Switch(
lambda this_: this_.type,
{
0x0005: OutputProtectionLevelRestrictionObject,
0x0008: AnalogVideoOutputConfigurationRestriction,
0x000a: ContentKeyObject,
0x000b: SignatureObject,
0x000d: RightsSettingsObject,
0x0012: ExpirationRestrictionObject,
0x0013: IssueDateObject,
0x0016: MeteringRestrictionObject,
0x001a: GracePeriodObject,
0x0022: SourceIdObject,
0x002a: ECCKeyObject,
0x002c: PolicyMetadataObject,
0x0029: DomainRestrictionObject,
0x0030: ExpirationAfterFirstPlayRestrictionObject,
0x0031: DigitalAudioOutputRestrictionObject,
0x0032: RevInfoVersionObject,
0x0033: EmbeddedLicenseSettingsObject,
0x0034: SecurityLevelObject,
0x0037: MoveObject,
0x0039: PlayEnablerType,
0x003a: CopyEnablerObject,
0x003b: UplinkKIDObject,
0x003d: CopyCountRestrictionObject,
0x0050: RemovalDateObject,
0x0051: AuxiliaryKeysObject,
0x0052: UplinkKeyObject3,
0x005a: SecureStopRestrictionObject,
0x0059: DigitalVideoOutputRestrictionObject
},
default=LazyBound(lambda: _XMRLicenseStructs.XMRObject)
)
)
XmrLicense = Struct(
"signature" / Const(b"XMR\x00"),
"xmr_version" / Int32ub,
"rights_id" / Bytes(16),
"containers" / GreedyRange(XMRObject)
)
class XMRLicense(_XMRLicenseStructs):
def __init__(
self,
parsed_license: Container,
license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense
):
self.parsed = parsed_license
self._LICENSE = license_obj
@classmethod
def loads(cls, data: Union[str, bytes]) -> XMRLicense:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
licence = _XMRLicenseStructs.XmrLicense
return cls(
parsed_license=licence.parse(data),
license_obj=licence
)
@classmethod
def load(cls, path: Union[Path, str]) -> XMRLicense:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
return self._LICENSE.build(self.parsed)
def struct(self) -> _XMRLicenseStructs.XmrLicense:
return self._LICENSE
def _locate(self, container: Container):
if container.flags == 2 or container.flags == 3:
return self._locate(container.data)
else:
return container
def get_object(self, type_: int):
for obj in self.parsed.containers:
container = self._locate(obj)
if container.type == type_:
yield container.data
def get_content_keys(self):
for content_key in self.get_object(10):
yield content_key

41
pyproject.toml Normal file
View File

@ -0,0 +1,41 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
version = "0.1.6"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "GPL-3.0-only"
authors = ["DevLARLEY"]
readme = "README.md"
repository = "https://github.com/ready-dl/pyplayready"
keywords = ["python", "drm", "playready", "microsoft"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"Natural Language :: English",
"Operating System :: OS Independent",
"Topic :: Multimedia :: Video",
"Topic :: Security :: Cryptography",
"Topic :: Software Development :: Libraries :: Python Modules"
]
include = [
{ path = "README.md", format = "sdist" },
{ path = "LICENSE", format = "sdist" },
]
[tool.poetry.urls]
"Issues" = "https://github.com/ready-dl/pyplayready/issues"
[tool.poetry.dependencies]
python = ">=3.8,<4.0"
requests = "^2.32.3"
pycryptodome = "^3.21.0"
construct = "^2.10.70"
ECPy = "^1.2.5"
click = "^8.1.7"
[tool.poetry.scripts]
pyplayready = "pyplayready.main:main"

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
requests
pycryptodome
ecpy
construct
click