+ Added license chain verification (during provisioning)

+ Added aescbc (symmetric/scalable) license support
+ Added license integrity verification using CI
+ Added matching of group key and certificate chain during provisioning
+ Removed wrm header downgrading
This commit is contained in:
titus 2025-01-04 19:18:54 +01:00
parent 91db649c9c
commit 3472fa3f27
13 changed files with 166 additions and 130 deletions

View File

@ -55,8 +55,7 @@ pssh = PSSH(
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
) )
# set to `True` if your device doesn't support scalable licenses (this projects also doesn't yet) to downgrade the WRMHEADERs to v4.0.0.0 wrm_headers = pssh.get_wrm_headers()
wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False)
request = cdm.get_license_challenge(session_id, wrm_headers[0]) request = cdm.get_license_challenge(session_id, wrm_headers[0])
response = requests.post( response = requests.post(

View File

@ -11,4 +11,4 @@ from pyplayready.system.pssh import *
from pyplayready.system.session import * from pyplayready.system.session import *
__version__ = "0.4.5" __version__ = "0.5.0"

View File

@ -18,7 +18,6 @@ from pyplayready.crypto import Crypto
from pyplayready.system.bcert import CertificateChain from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.license.key import Key from pyplayready.license.key import Key
from pyplayready.license.xml_key import XmlKey
from pyplayready.license.xmrlicense import XMRLicense from pyplayready.license.xmrlicense import XMRLicense
from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense) from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense)
from pyplayready.system.session import Session from pyplayready.system.session import Session
@ -49,6 +48,7 @@ class Cdm:
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562, y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
curve=Curve.get_curve("secp256r1") curve=Curve.get_curve("secp256r1")
) )
self._rgbMagicConstantZero = bytes([0x7e, 0xe9, 0xed, 0x4a, 0xf7, 0x73, 0x22, 0x4f, 0x00, 0xb8, 0xea, 0x7e, 0xfb, 0x02, 0x7c, 0xbb])
self.__sessions: dict[bytes, Session] = {} self.__sessions: dict[bytes, Session] = {}
@ -74,7 +74,6 @@ class Cdm:
session = Session(len(self.__sessions) + 1) session = Session(len(self.__sessions) + 1)
self.__sessions[session.id] = session self.__sessions[session.id] = session
session.xml_key = XmlKey()
return session.id return session.id
@ -101,7 +100,17 @@ class Cdm:
def _get_cipher_data(self, session: Session) -> bytes: def _get_cipher_data(self, session: Session) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode() b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>" body = (
"<Data>"
f"<CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains>"
"<Features>"
'<Feature Name="AESCBC">""</Feature>'
"<REE>"
"<AESCBCS></AESCBCS>"
"</REE>"
"</Features>"
"</Data>"
)
cipher = AES.new( cipher = AES.new(
key=session.xml_key.aes_key, key=session.xml_key.aes_key,
@ -242,21 +251,52 @@ class Cdm:
if not self._verify_encryption_key(session, parsed_licence): if not self._verify_encryption_key(session, parsed_licence):
raise InvalidLicense("Public encryption key does not match") raise InvalidLicense("Public encryption key does not match")
is_scalable = bool(next(parsed_licence.get_object(81), None))
for content_key in parsed_licence.get_content_keys(): for content_key in parsed_licence.get_content_keys():
if Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256: cipher_type = Key.CipherType(content_key.cipher_type)
key = self.__crypto.ecc256_decrypt(
private_key=session.encryption_key, if not cipher_type in (Key.CipherType.ECC_256, Key.CipherType.ECC_256_WITH_KZ, Key.CipherType.ECC_256_VIA_SYMMETRIC):
ciphertext=content_key.encrypted_key raise InvalidLicense(f"Invalid cipher type {cipher_type}")
)[16:32]
else: via_symmetric = Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256_VIA_SYMMETRIC
continue
decrypted = self.__crypto.ecc256_decrypt(
private_key=session.encryption_key,
ciphertext=content_key.encrypted_key
)
ci, ck = decrypted[:16], decrypted[16:32]
if is_scalable:
ci, ck = decrypted[::2][:16], decrypted[1::2][:16]
if via_symmetric:
embedded_root_license = content_key.encrypted_key[:144]
embedded_leaf_license = content_key.encrypted_key[144:]
rgb_key = bytes(ck[i] ^ self._rgbMagicConstantZero[i] for i in range(16))
content_key_prime = AES.new(ck, AES.MODE_ECB).encrypt(rgb_key)
aux_key = next(parsed_licence.get_object(81))["auxiliary_keys"][0]["key"]
derived_aux_key = AES.new(content_key_prime, AES.MODE_ECB).encrypt(aux_key)
uplink_x_key = bytes(bytearray(16)[i] ^ derived_aux_key[i] for i in range(16))
secondary_key = AES.new(ck, AES.MODE_ECB).encrypt(embedded_root_license[128:])
embedded_leaf_license = AES.new(uplink_x_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
embedded_leaf_license = AES.new(secondary_key, AES.MODE_ECB).encrypt(embedded_leaf_license)
ci, ck = embedded_leaf_license[:16], embedded_leaf_license[16:]
if not parsed_licence.check_signature(ci):
raise InvalidLicense("License integrity signature does not match")
session.keys.append(Key( session.keys.append(Key(
key_id=UUID(bytes_le=content_key.key_id), key_id=UUID(bytes_le=content_key.key_id),
key_type=content_key.key_type, key_type=content_key.key_type,
cipher_type=content_key.cipher_type, cipher_type=content_key.cipher_type,
key_length=content_key.key_length, key_length=content_key.key_length,
key=key key=ck
)) ))
except InvalidLicense as e: except InvalidLicense as e:
raise InvalidLicense(e) raise InvalidLicense(e)

View File

@ -3,16 +3,16 @@ from __future__ import annotations
import base64 import base64
from enum import IntEnum from enum import IntEnum
from pathlib import Path from pathlib import Path
from typing import Union, Any from typing import Union, Any, Optional
from pyplayready.device.structs import DeviceStructs from pyplayready.device.structs import DeviceStructs
from pyplayready.exceptions import OutdatedDevice
from pyplayready.system.bcert import CertificateChain from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey from pyplayready.crypto.ecc_key import ECCKey
class Device: class Device:
"""Represents a PlayReady Device (.prd)""" """Represents a PlayReady Device (.prd)"""
CURRENT_STRUCT = DeviceStructs.v3
CURRENT_VERSION = 3 CURRENT_VERSION = 3
class SecurityLevel(IntEnum): class SecurityLevel(IntEnum):
@ -23,7 +23,7 @@ class Device:
def __init__( def __init__(
self, self,
*_: Any, *_: Any,
group_key: Union[str, bytes, None], group_key: Optional[str, bytes, None],
encryption_key: Union[str, bytes], encryption_key: Union[str, bytes],
signing_key: Union[str, bytes], signing_key: Union[str, bytes],
group_certificate: Union[str, bytes], group_certificate: Union[str, bytes],
@ -60,14 +60,11 @@ class Device:
if not isinstance(data, bytes): if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
prd_header = DeviceStructs.header.parse(data) parsed = DeviceStructs.prd.parse(data)
if prd_header.version == 2: return cls(**{
return cls( **parsed,
group_key=None, 'group_key': parsed.get('group_key', None)
**DeviceStructs.v2.parse(data) })
)
return cls(**cls.CURRENT_STRUCT.parse(data))
@classmethod @classmethod
def load(cls, path: Union[Path, str]) -> Device: def load(cls, path: Union[Path, str]) -> Device:
@ -77,7 +74,10 @@ class Device:
return cls.loads(f.read()) return cls.loads(f.read())
def dumps(self) -> bytes: def dumps(self) -> bytes:
return self.CURRENT_STRUCT.build(dict( if not self.group_key:
raise OutdatedDevice("Cannot dump a v2 device, re-create it or use a Device with a version of 3 or higher")
return DeviceStructs.prd.build(dict(
version=self.CURRENT_VERSION, version=self.CURRENT_VERSION,
group_key=self.group_key.dumps(), group_key=self.group_key.dumps(),
encryption_key=self.encryption_key.dumps(), encryption_key=self.encryption_key.dumps(),

View File

@ -1,18 +1,11 @@
from construct import Struct, Const, Int8ub, Bytes, this, Int32ub from construct import Struct, Const, Int8ub, Bytes, this, Int32ub, Switch, Embedded
class DeviceStructs: class DeviceStructs:
magic = Const(b"PRD") magic = Const(b"PRD")
header = Struct(
"signature" / magic,
"version" / Int8ub,
)
# was never in production # was never in production
v1 = Struct( v1 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key_length" / Int32ub, "group_key_length" / Int32ub,
"group_key" / Bytes(this.group_key_length), "group_key" / Bytes(this.group_key_length),
"group_certificate_length" / Int32ub, "group_certificate_length" / Int32ub,
@ -20,8 +13,6 @@ class DeviceStructs:
) )
v2 = Struct( v2 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_certificate_length" / Int32ub, "group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length), "group_certificate" / Bytes(this.group_certificate_length),
"encryption_key" / Bytes(96), "encryption_key" / Bytes(96),
@ -29,11 +20,22 @@ class DeviceStructs:
) )
v3 = Struct( v3 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key" / Bytes(96), "group_key" / Bytes(96),
"encryption_key" / Bytes(96), "encryption_key" / Bytes(96),
"signing_key" / Bytes(96), "signing_key" / Bytes(96),
"group_certificate_length" / Int32ub, "group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length), "group_certificate" / Bytes(this.group_certificate_length),
) )
prd = Struct(
"signature" / magic,
"version" / Int8ub,
Embedded(Switch(
lambda ctx: ctx.version,
{
1: v1,
2: v2,
3: v3
}
))
)

View File

@ -26,9 +26,13 @@ class InvalidLicense(PyPlayreadyException):
"""Unable to parse XMR License.""" """Unable to parse XMR License."""
class InvalidCertificateChain(PyPlayreadyException): class InvalidCertificate(PyPlayreadyException):
"""The BCert is not correctly formatted.""" """The BCert is not correctly formatted."""
class InvalidCertificateChain(PyPlayreadyException):
"""The BCertChain is not correctly formatted."""
class OutdatedDevice(PyPlayreadyException): class OutdatedDevice(PyPlayreadyException):
"""The PlayReady Device is outdated and does not support a specific operation.""" """The PlayReady Device is outdated and does not support a specific operation."""

View File

@ -1,9 +1,10 @@
from __future__ import annotations from __future__ import annotations
import base64 import base64
from pathlib import Path
from typing import Union from typing import Union
from Crypto.Cipher import AES
from Crypto.Hash import CMAC
from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
@ -98,7 +99,7 @@ class _XMRLicenseStructs:
PolicyMetadataObject = Struct( PolicyMetadataObject = Struct(
"metadata_type" / Bytes(16), "metadata_type" / Bytes(16),
"policy_data" / Bytes(this._.length) "policy_data" / Bytes(this._.length - 24)
) )
SecureStopRestrictionObject = Struct( SecureStopRestrictionObject = Struct(
@ -223,13 +224,6 @@ class XMRLicense(_XMRLicenseStructs):
license_obj=licence license_obj=licence
) )
@classmethod
def load(cls, path: Union[Path, str]) -> XMRLicense:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes: def dumps(self) -> bytes:
return self._license_obj.build(self.parsed) return self._license_obj.build(self.parsed)
@ -250,3 +244,11 @@ class XMRLicense(_XMRLicenseStructs):
def get_content_keys(self): def get_content_keys(self):
yield from self.get_object(10) yield from self.get_object(10)
def check_signature(self, integrity_key: bytes) -> bool:
cmac = CMAC.new(integrity_key, ciphermod=AES)
signature_data = next(self.get_object(11))
cmac.update(self.dumps()[:-(signature_data.signature_data_length + 12)])
return signature_data.signature_data == cmac.digest()

View File

@ -7,7 +7,7 @@ import click
import requests import requests
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
from pyplayready import __version__ from pyplayready import __version__, InvalidCertificateChain
from pyplayready.system.bcert import CertificateChain, Certificate from pyplayready.system.bcert import CertificateChain, Certificate
from pyplayready.cdm import Cdm from pyplayready.cdm import Cdm
from pyplayready.device import Device from pyplayready.device import Device
@ -56,7 +56,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
session_id = cdm.open() session_id = cdm.open()
log.info("Opened Session") log.info("Opened Session")
challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers(downgrade_to_v4=True)[0]) challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers()[0])
log.info("Created License Request (Challenge)") log.info("Created License Request (Challenge)")
log.debug(challenge) log.debug(challenge)
@ -69,7 +69,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
) )
if license_res.status_code != 200: if license_res.status_code != 200:
log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}") log.error(f"Failed to send challenge [{license_res.status_code}]: {license_res.text}")
return return
licence = license_res.text licence = license_res.text
@ -88,8 +88,10 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
@main.command() @main.command()
@click.argument("device", type=Path) @click.argument("device", type=Path)
@click.option("-c", "--ckt", type=click.Choice(["aesctr", "aescbc"], case_sensitive=False), default="aesctr", help="Content Key Encryption Type")
@click.option("-sl", "--security_level", type=click.Choice(["150", "2000", "3000"], case_sensitive=False), default="2000", help="Minimum Security Level")
@click.pass_context @click.pass_context
def test(ctx: click.Context, device: Path) -> None: def test(ctx: click.Context, device: Path, ckt: str, security_level: str) -> None:
""" """
Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server. Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
https://testweb.playready.microsoft.com/Content/Content2X https://testweb.playready.microsoft.com/Content/Content2X
@ -113,7 +115,7 @@ def test(ctx: click.Context, device: Path) -> None:
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
) )
license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)" license_server = f"https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:{security_level},ckt:{ckt})"
ctx.invoke( ctx.invoke(
license_, license_,
@ -148,6 +150,9 @@ def create_device(
group_key = ECCKey.load(group_key) group_key = ECCKey.load(group_key)
certificate_chain = CertificateChain.load(group_certificate) certificate_chain = CertificateChain.load(group_certificate)
if certificate_chain.get(0).get_issuer_key() != group_key.public_bytes():
raise InvalidCertificateChain("Group key does not match this certificate")
new_certificate = Certificate.new_leaf_cert( new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16), cert_id=get_random_bytes(16),
security_level=certificate_chain.get_security_level(), security_level=certificate_chain.get_security_level(),
@ -159,6 +164,8 @@ def create_device(
) )
certificate_chain.prepend(new_certificate) certificate_chain.prepend(new_certificate)
certificate_chain.verify()
device = Device( device = Device(
group_key=group_key.dumps(), group_key=group_key.dumps(),
encryption_key=encryption_key.dumps(), encryption_key=encryption_key.dumps(),

View File

@ -136,7 +136,7 @@ async def get_license_challenge(request: web.Request) -> web.Response:
if not init_data.startswith("<WRMHEADER"): if not init_data.startswith("<WRMHEADER"):
try: try:
pssh = PSSH(init_data) pssh = PSSH(init_data)
wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=True) wrm_headers = pssh.get_wrm_headers()
if wrm_headers: if wrm_headers:
init_data = wrm_headers[0] init_data = wrm_headers[0]
except InvalidPssh as e: except InvalidPssh as e:

View File

@ -4,7 +4,7 @@ import collections.abc
from Crypto.PublicKey import ECC from Crypto.PublicKey import ECC
from pyplayready.crypto import Crypto from pyplayready.crypto import Crypto
from pyplayready.exceptions import InvalidCertificateChain from pyplayready.exceptions import InvalidCertificateChain, InvalidCertificate
# monkey patch for construct 2.8.8 compatibility # monkey patch for construct 2.8.8 compatibility
if not hasattr(collections, 'Sequence'): if not hasattr(collections, 'Sequence'):
@ -106,7 +106,7 @@ class _BCertStructs:
"key_type" / Int16ub, "key_type" / Int16ub,
"key_length" / Int16ub, "key_length" / Int16ub,
"flags" / Int32ub, "flags" / Int32ub,
"key" / Bytes(this.length // 8) "key" / Bytes(this.key_length // 8)
) )
# TODO: untested # TODO: untested
@ -208,10 +208,7 @@ class Certificate(_BCertStructs):
encryption_key: ECCKey, encryption_key: ECCKey,
group_key: ECCKey, group_key: ECCKey,
parent: CertificateChain, parent: CertificateChain,
expiry: int = 0xFFFFFFFF, expiry: int = 0xFFFFFFFF
max_license: int = 10240,
max_header: int = 15360,
max_chain_depth: int = 2
) -> Certificate: ) -> Certificate:
basic_info = Container( basic_info = Container(
cert_id=cert_id, cert_id=cert_id,
@ -230,9 +227,9 @@ class Certificate(_BCertStructs):
) )
device_info = Container( device_info = Container(
max_license=max_license, max_license=10240,
max_header=max_header, max_header=15360,
max_chain_depth=max_chain_depth max_chain_depth=2
) )
device_info_attribute = Container( device_info_attribute = Container(
flags=1, flags=1,
@ -301,7 +298,7 @@ class Certificate(_BCertStructs):
attribute=key_info attribute=key_info
) )
manufacturer_info = parent.get_certificate(0).get_attribute(7) manufacturer_info = parent.get(0).get_attribute(7)
new_bcert_container = Container( new_bcert_container = Container(
signature=b"CERT", signature=b"CERT",
@ -354,13 +351,6 @@ class Certificate(_BCertStructs):
bcert_obj=cert bcert_obj=cert
) )
@classmethod
def load(cls, path: Union[Path, str]) -> Certificate:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def get_attribute(self, type_: int): def get_attribute(self, type_: int):
for attribute in self.parsed.attributes: for attribute in self.parsed.attributes:
if attribute.tag == type_: if attribute.tag == type_:
@ -380,35 +370,54 @@ class Certificate(_BCertStructs):
if manufacturer_info: if manufacturer_info:
return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}" return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
def get_issuer_key(self) -> Union[bytes, None]:
key_info_object = self.get_attribute(6)
if not key_info_object:
return
key_info_attribute = key_info_object.attribute
return next(map(lambda key: key.key, filter(lambda key: 6 in key.usages, key_info_attribute.cert_keys)), None)
def dumps(self) -> bytes: def dumps(self) -> bytes:
return self._BCERT.build(self.parsed) return self._BCERT.build(self.parsed)
def struct(self) -> _BCertStructs.BCert: def struct(self) -> _BCertStructs.BCert:
return self._BCERT return self._BCERT
def verify_signature(self): def verify(self, public_key: bytes, index: int):
signature_object = self.get_attribute(8) signature_object = self.get_attribute(8)
if not signature_object:
raise InvalidCertificate(f"No signature object found in certificate {index}")
signature_attribute = signature_object.attribute signature_attribute = signature_object.attribute
sign_payload = self.dumps()[:-signature_object.length]
raw_signature_key = signature_attribute.signature_key raw_signature_key = signature_attribute.signature_key
if public_key != raw_signature_key:
raise InvalidCertificate(f"Signature keys of certificate {index} do not match")
signature_key = ECC.construct( signature_key = ECC.construct(
curve='P-256', curve='P-256',
point_x=int.from_bytes(raw_signature_key[:32], 'big'), point_x=int.from_bytes(raw_signature_key[:32], 'big'),
point_y=int.from_bytes(raw_signature_key[32:], 'big') point_y=int.from_bytes(raw_signature_key[32:], 'big')
) )
return Crypto.ecc256_verify( sign_payload = self.dumps()[:-signature_object.length]
if not Crypto.ecc256_verify(
public_key=signature_key, public_key=signature_key,
data=sign_payload, data=sign_payload,
signature=signature_attribute.signature signature=signature_attribute.signature
) ):
raise InvalidCertificate(f"Signature of certificate {index} is not authentic")
return self.get_issuer_key()
class CertificateChain(_BCertStructs): class CertificateChain(_BCertStructs):
"""Represents a BCertChain""" """Represents a BCertChain"""
ECC256MSBCertRootIssuerPubKey = bytes.fromhex("864d61cff2256e422c568b3c28001cfb3e1527658584ba0521b79b1828d936de1d826a8fc3e6e7fa7a90d5ca2946f1f64a2efb9f5dcffe7e434eb44293fac5ab")
def __init__( def __init__(
self, self,
parsed_bcert_chain: Container, parsed_bcert_chain: Container,
@ -443,15 +452,27 @@ class CertificateChain(_BCertStructs):
def struct(self) -> _BCertStructs.BCertChain: def struct(self) -> _BCertStructs.BCertChain:
return self._BCERT_CHAIN return self._BCERT_CHAIN
def get_certificate(self, index: int) -> Certificate:
return Certificate(self.parsed.certificates[index])
def get_security_level(self) -> int: def get_security_level(self) -> int:
# not sure if there's a better way than this # not sure if there's a better way than this
return self.get_certificate(0).get_security_level() return self.get(0).get_security_level()
def get_name(self) -> str: def get_name(self) -> str:
return self.get_certificate(0).get_name() return self.get(0).get_name()
def verify(self) -> bool:
issuer_key = self.ECC256MSBCertRootIssuerPubKey
try:
for i in reversed(range(self.count())):
certificate = self.get(i)
issuer_key = certificate.verify(issuer_key, i)
if not issuer_key and i != 0:
raise InvalidCertificate(f"Certificate {i} is not valid")
except InvalidCertificate as e:
raise InvalidCertificateChain(e)
return True
def append(self, bcert: Certificate) -> None: def append(self, bcert: Certificate) -> None:
self.parsed.certificate_count += 1 self.parsed.certificate_count += 1
@ -464,21 +485,20 @@ class CertificateChain(_BCertStructs):
self.parsed.total_length += len(bcert.dumps()) self.parsed.total_length += len(bcert.dumps())
def remove(self, index: int) -> None: def remove(self, index: int) -> None:
if self.parsed.certificate_count <= 0: if self.count() <= 0:
raise InvalidCertificateChain("CertificateChain does not contain any Certificates") raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
if index >= self.parsed.certificate_count: if index >= self.count():
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") raise IndexError(f"No Certificate at index {index}, {self.count()} total")
self.parsed.certificate_count -= 1 self.parsed.certificate_count -= 1
bcert = Certificate(self.parsed.certificates[index]) self.parsed.total_length -= len(self.get(index).dumps())
self.parsed.total_length -= len(bcert.dumps())
self.parsed.certificates.pop(index) self.parsed.certificates.pop(index)
def get(self, index: int) -> Certificate: def get(self, index: int) -> Certificate:
if self.parsed.certificate_count <= 0: if self.count() <= 0:
raise InvalidCertificateChain("CertificateChain does not contain any Certificates") raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
if index >= self.parsed.certificate_count: if index >= self.count():
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") raise IndexError(f"No Certificate at index {index}, {self.count()} total")
return Certificate(self.parsed.certificates[index]) return Certificate(self.parsed.certificates[index])

View File

@ -86,13 +86,11 @@ class PSSH(_PlayreadyPSSHStructs):
) )
)) ))
def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]: def get_wrm_headers(self) -> List[str]:
""" """
Return a list of all WRM Headers in the PSSH as plaintext strings Return a list of all WRM Headers in the PSSH as plaintext strings
downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR
""" """
return list(map( return list(map(
lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(), lambda wrm_header: wrm_header.dumps(),
self.wrm_headers self.wrm_headers
)) ))

View File

@ -63,14 +63,6 @@ class WRMHeader:
return [element] return [element]
return element return element
def to_v4_0_0_0(self) -> str:
"""
Build a v4.0.0.0 WRM header from any possible WRM Header version
Note: Will ignore any remaining Key IDs if there's more than just one
"""
return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
@staticmethod @staticmethod
def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE: def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO") protect_info = data.get("PROTECTINFO")
@ -156,7 +148,6 @@ class WRMHeader:
Returns a tuple structured like this: Tuple[List[SignedKeyID], <LA_URL>, <LUI_URL>, <DS_ID>] Returns a tuple structured like this: Tuple[List[SignedKeyID], <LA_URL>, <LUI_URL>, <DS_ID>]
""" """
data = self._header.get("DATA") data = self._header.get("DATA")
if not data: if not data:
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
@ -170,32 +161,5 @@ class WRMHeader:
elif self.version == self.Version.VERSION_4_3_0_0: elif self.version == self.Version.VERSION_4_3_0_0:
return self._read_v4_3_0_0(data) return self._read_v4_3_0_0(data)
@staticmethod
def _build_v4_0_0_0_wrm_header(
key_ids: List[SignedKeyID],
la_url: Optional[str],
lui_url: Optional[str],
ds_id: Optional[str]
) -> str:
if len(key_ids) == 0:
raise Exception("No Key IDs available")
key_id = key_ids[0]
return (
'<WRMHEADER xmlns="http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader" version="4.0.0.0">'
'<DATA>'
'<PROTECTINFO>'
'<KEYLEN>16</KEYLEN>'
'<ALGID>AESCTR</ALGID>'
'</PROTECTINFO>'
f'<KID>{key_id.value}</KID>' +
(f'<LA_URL>{la_url}</LA_URL>' if la_url else '') +
(f'<LUI_URL>{lui_url}</LUI_URL>' if lui_url else '') +
(f'<DS_ID>{ds_id}</DS_ID>' if ds_id else '') +
(f'<CHECKSUM>{key_id.checksum}</CHECKSUM>' if key_id.checksum else '') +
'</DATA>'
'</WRMHEADER>'
)
def dumps(self) -> str: def dumps(self) -> str:
return self._raw_data.decode("utf-16-le") return self._raw_data.decode("utf-16-le")

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "pyplayready" name = "pyplayready"
version = "0.4.5" version = "0.5.0"
description = "pyplayready CDM (Content Decryption Module) implementation in Python." description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "CC BY-NC-ND 4.0" license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY, Erevoc", "DevataDev"] authors = ["DevLARLEY, Erevoc", "DevataDev"]