+ Option to downgrade WRMHEADERs to v4.0.0.0

+ Removed CRC32 checksum
+ Several small fixed
This commit is contained in:
BuildTools 2024-11-14 18:19:38 +01:00
parent 042f8c2204
commit 70e47800df
8 changed files with 225 additions and 21 deletions

View File

@ -24,6 +24,7 @@ pyplayready test DEVICE.prd
## Usage
An example code snippet:
```python
from pyplayready.cdm import Cdm
from pyplayready.device import Device
@ -48,7 +49,9 @@ pssh = PSSH(
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
request = cdm.get_license_challenge(pssh.wrm_headers[0])
# set to `True` if your device doesn't support scalable licenses (this projects also doesn't yet) to downgrade the WRMHEADERs to v4.0.0.0
wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False)
request = cdm.get_license_challenge(wrm_headers[0])
response = requests.post(
url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)",

View File

@ -8,4 +8,4 @@ from .pssh import *
from .xml_key import *
from .xmrlicense import *
__version__ = "0.0.2"
__version__ = "0.1.0"

View File

@ -30,14 +30,14 @@ class Cdm:
encryption_key: ECCKey,
signing_key: ECCKey,
client_version: str = "10.0.16384.10011",
la_version: int = 1
protocol_version: int = 1
):
self.security_level = security_level
self.certificate_chain = certificate_chain
self.encryption_key = encryption_key
self.signing_key = signing_key
self.client_version = client_version
self.la_version = la_version
self.protocol_version = protocol_version
self.curve = Curve.get_curve("secp256r1")
self.elgamal = ElGamal(self.curve)
@ -61,14 +61,14 @@ class Cdm:
signing_key=device.signing_key
)
def get_key_data(self):
def get_key_data(self) -> bytes:
point1, point2 = self.elgamal.encrypt(
message_point=self._xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key
)
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
def get_cipher_data(self):
def get_cipher_data(self) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>"
@ -94,7 +94,7 @@ class Cdm:
) -> str:
return (
'<LA xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols" Id="SignedData" xml:space="preserve">'
f'<Version>{self.la_version}</Version>'
f'<Version>{self.protocol_version}</Version>'
f'<ContentHeader>{content_header}</ContentHeader>'
'<CLIENTINFO>'
f'<CLIENTVERSION>{self.client_version}</CLIENTVERSION>'

View File

@ -2,7 +2,6 @@ import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from zlib import crc32
import click
import requests
@ -53,7 +52,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
cdm = Cdm.from_device(device)
log.info("Loaded CDM")
challenge = cdm.get_license_challenge(pssh.wrm_headers[0])
challenge = cdm.get_license_challenge(pssh.get_wrm_headers(downgrade_to_v4=True)[0])
log.info("Created License Request (Challenge)")
log.debug(challenge)
@ -167,7 +166,7 @@ def create_device(
out_path = output
else:
out_dir = output or Path.cwd()
out_path = out_dir / f"{device.get_name()}_{crc32(prd_bin).to_bytes(4, 'big').hex()}.prd"
out_path = out_dir / f"{device.get_name()}.prd"
if out_path.exists():
log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")

View File

@ -4,6 +4,8 @@ from uuid import UUID
from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, PaddedString, Switch, Int32ub, Const, Container
from pyplayready.wrmheader import WRMHeader
class _PlayreadyPSSHStructs:
PSSHBox = Struct(
@ -55,24 +57,34 @@ class PSSH:
if self._is_playready_pssh_box(data):
pssh_box = _PlayreadyPSSHStructs.PSSHBox.parse(data)
if bool(self._is_utf_16(pssh_box.data)):
self.wrm_headers = [pssh_box.data.decode("utf-16-le")]
self._wrm_headers = [pssh_box.data.decode("utf-16-le")]
elif bool(self._is_utf_16(pssh_box.data[6:])):
self.wrm_headers = [pssh_box.data[6:].decode("utf-16-le")]
self._wrm_headers = [pssh_box.data[6:].decode("utf-16-le")]
elif bool(self._is_utf_16(pssh_box.data[10:])):
self.wrm_headers = [pssh_box.data[10:].decode("utf-16-le")]
self._wrm_headers = [pssh_box.data[10:].decode("utf-16-le")]
else:
self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data)))
self._wrm_headers = list(self._read_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data)))
elif bool(self._is_utf_16(data)):
self.wrm_headers = [data.decode("utf-16-le")]
self._wrm_headers = [data.decode("utf-16-le")]
elif bool(self._is_utf_16(data[6:])):
self.wrm_headers = [data[6:].decode("utf-16-le")]
self._wrm_headers = [data[6:].decode("utf-16-le")]
elif bool(self._is_utf_16(data[10:])):
self.wrm_headers = [data[10:].decode("utf-16-le")]
self._wrm_headers = [data[10:].decode("utf-16-le")]
else:
self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data)))
self._wrm_headers = list(self._read_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data)))
except Exception:
raise Exception("Could not parse data as a PSSH Box nor a PlayReadyHeader")
@staticmethod
def _downgrade(wrm_header: str) -> str:
return WRMHeader(wrm_header).to_v4_0_0_0()
def get_wrm_headers(self, downgrade_to_v4: bool = False):
return list(map(
self._downgrade if downgrade_to_v4 else (lambda _: _),
self._wrm_headers
))
def _is_playready_pssh_box(self, data: bytes) -> bool:
return data[12:28] == self.SYSTEM_ID.bytes
@ -81,7 +93,7 @@ class PSSH:
return all(map(lambda i: data[i] == 0, range(1, len(data), 2)))
@staticmethod
def _get_wrm_headers(wrm_header: Container):
def _read_wrm_headers(wrm_header: Container):
for record in wrm_header.records:
if record.type == 1:
yield record.data

188
pyplayready/wrmheader.py Normal file
View File

@ -0,0 +1,188 @@
import base64
from enum import Enum
from typing import Optional, List, Union, Tuple
import xmltodict
class WRMHeader:
class SignedKeyID:
def __init__(
self,
alg_id: str,
value: str,
checksum: str
):
self.alg_id = alg_id
self.value = value
self.checksum = checksum
class Version(Enum):
VERSION_4_0_0_0 = "4.0.0.0"
VERSION_4_1_0_0 = "4.1.0.0"
VERSION_4_2_0_0 = "4.2.0.0"
VERSION_4_3_0_0 = "4.3.0.0"
UNKNOWN = "UNKNOWN"
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
_RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]]
def __init__(
self,
data: Union[str, bytes]
):
"""Represents a PlayReady WRM Header"""
if not data:
raise ValueError("Data must not be empty")
if isinstance(data, str):
try:
data = base64.b64decode(data).decode()
except Exception:
data = data.encode()
self._raw_data: bytes = data
self._parsed = xmltodict.parse(self._raw_data)
self._header = self._parsed.get('WRMHEADER')
if not self._header:
raise ValueError("Data is not a valid WRMHEADER")
self.version = self.Version(self._header.get('@version'))
@staticmethod
def _ensure_list(element: Union[dict, list]) -> List:
if isinstance(element, dict):
return [element]
return element
def to_v4_0_0_0(self) -> str:
"""Will ignore any remaining Key IDs if there's more than just one"""
return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
@staticmethod
def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
return (
[WRMHeader.SignedKeyID(
alg_id=protect_info["ALGID"],
value=data["KID"],
checksum=data.get("CHECKSUM")
)],
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kid = protect_info["KID"]
if kid:
key_ids = [WRMHeader.SignedKeyID(
alg_id=kid["@ALGID"],
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
)]
return (
key_ids,
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kids = protect_info["KIDS"]
if kids:
for kid in WRMHeader._ensure_list(kids["KID"]):
key_ids.append(WRMHeader.SignedKeyID(
alg_id=kid["@ALGID"],
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
))
return (
key_ids,
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kids = protect_info["KIDS"]
for kid in WRMHeader._ensure_list(kids["KID"]):
key_ids.append(WRMHeader.SignedKeyID(
alg_id=kid.get("@ALGID"),
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
))
return (
key_ids,
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
def read_attributes(self) -> _RETURN_STRUCTURE:
data = self._header.get("DATA")
if not data:
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
match self.version:
case self.Version.VERSION_4_0_0_0:
return self._read_v4_0_0_0(data)
case self.Version.VERSION_4_1_0_0:
return self._read_v4_1_0_0(data)
case self.Version.VERSION_4_2_0_0:
return self._read_v4_2_0_0(data)
case self.Version.VERSION_4_3_0_0:
return self._read_v4_3_0_0(data)
@staticmethod
def _build_v4_0_0_0_wrm_header(
key_ids: List[SignedKeyID],
la_url: Optional[str],
lui_url: Optional[str],
ds_id: Optional[str]
) -> str:
if len(key_ids) == 0:
raise Exception("No Key IDs available")
key_id = key_ids[0]
return (
'<WRMHEADER xmlns="http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader" version="4.0.0.0">'
'<DATA>'
'<PROTECTINFO>'
'<KEYLEN>16</KEYLEN>'
'<ALGID>AESCTR</ALGID>'
'</PROTECTINFO>'
f'<KID>{key_id.value}</KID>' +
(f'<LA_URL>{la_url}</LA_URL>' if la_url else '') +
(f'<LUI_URL>{lui_url}</LUI_URL>' if lui_url else '') +
(f'<DS_ID>{ds_id}</DS_ID>' if ds_id else '') +
(f'<CHECKSUM>{key_id.checksum}</CHECKSUM>' if key_id.checksum else '') +
'</DATA>'
'</WRMHEADER>'
)
def dumps(self) -> str:
return self._raw_data.decode()

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
version = "0.0.2"
version = "0.1.0"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "GPL-3.0-only"
authors = ["DevLARLEY"]
@ -36,6 +36,7 @@ pycryptodome = "^3.21.0"
construct = "^2.10.70"
ECPy = "^1.2.5"
click = "^8.1.7"
xmltodict = "^0.14.2"
[tool.poetry.scripts]
pyplayready = "pyplayready.main:main"

View File

@ -2,4 +2,5 @@ requests
pycryptodome
ecpy
construct
click
click
xmltodict