From 70e47800df80979c009940dc2d8936d6eabfe6c0 Mon Sep 17 00:00:00 2001 From: BuildTools Date: Thu, 14 Nov 2024 18:19:38 +0100 Subject: [PATCH] + Option to downgrade WRMHEADERs to v4.0.0.0 + Removed CRC32 checksum + Several small fixed --- README.md | 5 +- pyplayready/__init__.py | 2 +- pyplayready/cdm.py | 10 +-- pyplayready/main.py | 5 +- pyplayready/pssh.py | 30 +++++-- pyplayready/wrmheader.py | 188 +++++++++++++++++++++++++++++++++++++++ pyproject.toml | 3 +- requirements.txt | 3 +- 8 files changed, 225 insertions(+), 21 deletions(-) create mode 100644 pyplayready/wrmheader.py diff --git a/README.md b/README.md index eeecadb..5b8cf6d 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ pyplayready test DEVICE.prd ## Usage An example code snippet: + ```python from pyplayready.cdm import Cdm from pyplayready.device import Device @@ -48,7 +49,9 @@ pssh = PSSH( "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" ) -request = cdm.get_license_challenge(pssh.wrm_headers[0]) +# set to `True` if your device doesn't support scalable licenses (this projects also doesn't yet) to downgrade the WRMHEADERs to v4.0.0.0 +wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False) +request = cdm.get_license_challenge(wrm_headers[0]) response = requests.post( url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)", diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py index d37927b..ea3b145 100644 --- a/pyplayready/__init__.py +++ b/pyplayready/__init__.py @@ -8,4 +8,4 @@ from .pssh import * from .xml_key import * from .xmrlicense import * -__version__ = "0.0.2" +__version__ = "0.1.0" diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py index 78bf9c4..ff6cbb9 100644 --- a/pyplayready/cdm.py +++ b/pyplayready/cdm.py @@ -30,14 +30,14 @@ class Cdm: encryption_key: ECCKey, signing_key: ECCKey, client_version: str = "10.0.16384.10011", - la_version: int = 1 + protocol_version: int = 1 ): self.security_level = security_level self.certificate_chain = certificate_chain self.encryption_key = encryption_key self.signing_key = signing_key self.client_version = client_version - self.la_version = la_version + self.protocol_version = protocol_version self.curve = Curve.get_curve("secp256r1") self.elgamal = ElGamal(self.curve) @@ -61,14 +61,14 @@ class Cdm: signing_key=device.signing_key ) - def get_key_data(self): + def get_key_data(self) -> bytes: point1, point2 = self.elgamal.encrypt( message_point=self._xml_key.get_point(self.elgamal.curve), public_key=self._wmrm_key ) return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y) - def get_cipher_data(self): + def get_cipher_data(self) -> bytes: b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode() body = f"{b64_chain}" @@ -94,7 +94,7 @@ class Cdm: ) -> str: return ( '' - f'{self.la_version}' + f'{self.protocol_version}' f'{content_header}' '' f'{self.client_version}' diff --git a/pyplayready/main.py b/pyplayready/main.py index 70879f6..4af1f6a 100644 --- a/pyplayready/main.py +++ b/pyplayready/main.py @@ -2,7 +2,6 @@ import logging from datetime import datetime from pathlib import Path from typing import Optional -from zlib import crc32 import click import requests @@ -53,7 +52,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None: cdm = Cdm.from_device(device) log.info("Loaded CDM") - challenge = cdm.get_license_challenge(pssh.wrm_headers[0]) + challenge = cdm.get_license_challenge(pssh.get_wrm_headers(downgrade_to_v4=True)[0]) log.info("Created License Request (Challenge)") log.debug(challenge) @@ -167,7 +166,7 @@ def create_device( out_path = output else: out_dir = output or Path.cwd() - out_path = out_dir / f"{device.get_name()}_{crc32(prd_bin).to_bytes(4, 'big').hex()}.prd" + out_path = out_dir / f"{device.get_name()}.prd" if out_path.exists(): log.error(f"A file already exists at the path '{out_path}', cannot overwrite.") diff --git a/pyplayready/pssh.py b/pyplayready/pssh.py index 1793978..ec3e919 100644 --- a/pyplayready/pssh.py +++ b/pyplayready/pssh.py @@ -4,6 +4,8 @@ from uuid import UUID from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, PaddedString, Switch, Int32ub, Const, Container +from pyplayready.wrmheader import WRMHeader + class _PlayreadyPSSHStructs: PSSHBox = Struct( @@ -55,24 +57,34 @@ class PSSH: if self._is_playready_pssh_box(data): pssh_box = _PlayreadyPSSHStructs.PSSHBox.parse(data) if bool(self._is_utf_16(pssh_box.data)): - self.wrm_headers = [pssh_box.data.decode("utf-16-le")] + self._wrm_headers = [pssh_box.data.decode("utf-16-le")] elif bool(self._is_utf_16(pssh_box.data[6:])): - self.wrm_headers = [pssh_box.data[6:].decode("utf-16-le")] + self._wrm_headers = [pssh_box.data[6:].decode("utf-16-le")] elif bool(self._is_utf_16(pssh_box.data[10:])): - self.wrm_headers = [pssh_box.data[10:].decode("utf-16-le")] + self._wrm_headers = [pssh_box.data[10:].decode("utf-16-le")] else: - self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data))) + self._wrm_headers = list(self._read_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data))) elif bool(self._is_utf_16(data)): - self.wrm_headers = [data.decode("utf-16-le")] + self._wrm_headers = [data.decode("utf-16-le")] elif bool(self._is_utf_16(data[6:])): - self.wrm_headers = [data[6:].decode("utf-16-le")] + self._wrm_headers = [data[6:].decode("utf-16-le")] elif bool(self._is_utf_16(data[10:])): - self.wrm_headers = [data[10:].decode("utf-16-le")] + self._wrm_headers = [data[10:].decode("utf-16-le")] else: - self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data))) + self._wrm_headers = list(self._read_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data))) except Exception: raise Exception("Could not parse data as a PSSH Box nor a PlayReadyHeader") + @staticmethod + def _downgrade(wrm_header: str) -> str: + return WRMHeader(wrm_header).to_v4_0_0_0() + + def get_wrm_headers(self, downgrade_to_v4: bool = False): + return list(map( + self._downgrade if downgrade_to_v4 else (lambda _: _), + self._wrm_headers + )) + def _is_playready_pssh_box(self, data: bytes) -> bool: return data[12:28] == self.SYSTEM_ID.bytes @@ -81,7 +93,7 @@ class PSSH: return all(map(lambda i: data[i] == 0, range(1, len(data), 2))) @staticmethod - def _get_wrm_headers(wrm_header: Container): + def _read_wrm_headers(wrm_header: Container): for record in wrm_header.records: if record.type == 1: yield record.data diff --git a/pyplayready/wrmheader.py b/pyplayready/wrmheader.py new file mode 100644 index 0000000..7a6a325 --- /dev/null +++ b/pyplayready/wrmheader.py @@ -0,0 +1,188 @@ +import base64 +from enum import Enum +from typing import Optional, List, Union, Tuple + +import xmltodict + + +class WRMHeader: + class SignedKeyID: + def __init__( + self, + alg_id: str, + value: str, + checksum: str + ): + self.alg_id = alg_id + self.value = value + self.checksum = checksum + + class Version(Enum): + VERSION_4_0_0_0 = "4.0.0.0" + VERSION_4_1_0_0 = "4.1.0.0" + VERSION_4_2_0_0 = "4.2.0.0" + VERSION_4_3_0_0 = "4.3.0.0" + UNKNOWN = "UNKNOWN" + + @classmethod + def _missing_(cls, value): + return cls.UNKNOWN + + _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]] + + def __init__( + self, + data: Union[str, bytes] + ): + """Represents a PlayReady WRM Header""" + if not data: + raise ValueError("Data must not be empty") + + if isinstance(data, str): + try: + data = base64.b64decode(data).decode() + except Exception: + data = data.encode() + + self._raw_data: bytes = data + self._parsed = xmltodict.parse(self._raw_data) + + self._header = self._parsed.get('WRMHEADER') + if not self._header: + raise ValueError("Data is not a valid WRMHEADER") + + self.version = self.Version(self._header.get('@version')) + + @staticmethod + def _ensure_list(element: Union[dict, list]) -> List: + if isinstance(element, dict): + return [element] + return element + + def to_v4_0_0_0(self) -> str: + """Will ignore any remaining Key IDs if there's more than just one""" + return self._build_v4_0_0_0_wrm_header(*self.read_attributes()) + + @staticmethod + def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE: + protect_info = data.get("PROTECTINFO") + + return ( + [WRMHeader.SignedKeyID( + alg_id=protect_info["ALGID"], + value=data["KID"], + checksum=data.get("CHECKSUM") + )], + data.get("LA_URL"), + data.get("LUI_URL"), + data.get("DS_ID") + ) + + @staticmethod + def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE: + protect_info = data.get("PROTECTINFO") + + key_ids = [] + if protect_info: + kid = protect_info["KID"] + if kid: + key_ids = [WRMHeader.SignedKeyID( + alg_id=kid["@ALGID"], + value=kid["@VALUE"], + checksum=kid.get("@CHECKSUM") + )] + + return ( + key_ids, + data.get("LA_URL"), + data.get("LUI_URL"), + data.get("DS_ID") + ) + + @staticmethod + def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE: + protect_info = data.get("PROTECTINFO") + + key_ids = [] + if protect_info: + kids = protect_info["KIDS"] + if kids: + for kid in WRMHeader._ensure_list(kids["KID"]): + key_ids.append(WRMHeader.SignedKeyID( + alg_id=kid["@ALGID"], + value=kid["@VALUE"], + checksum=kid.get("@CHECKSUM") + )) + + return ( + key_ids, + data.get("LA_URL"), + data.get("LUI_URL"), + data.get("DS_ID") + ) + + @staticmethod + def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE: + protect_info = data.get("PROTECTINFO") + + key_ids = [] + if protect_info: + kids = protect_info["KIDS"] + for kid in WRMHeader._ensure_list(kids["KID"]): + key_ids.append(WRMHeader.SignedKeyID( + alg_id=kid.get("@ALGID"), + value=kid["@VALUE"], + checksum=kid.get("@CHECKSUM") + )) + + return ( + key_ids, + data.get("LA_URL"), + data.get("LUI_URL"), + data.get("DS_ID") + ) + + def read_attributes(self) -> _RETURN_STRUCTURE: + data = self._header.get("DATA") + if not data: + raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") + + match self.version: + case self.Version.VERSION_4_0_0_0: + return self._read_v4_0_0_0(data) + case self.Version.VERSION_4_1_0_0: + return self._read_v4_1_0_0(data) + case self.Version.VERSION_4_2_0_0: + return self._read_v4_2_0_0(data) + case self.Version.VERSION_4_3_0_0: + return self._read_v4_3_0_0(data) + + @staticmethod + def _build_v4_0_0_0_wrm_header( + key_ids: List[SignedKeyID], + la_url: Optional[str], + lui_url: Optional[str], + ds_id: Optional[str] + ) -> str: + if len(key_ids) == 0: + raise Exception("No Key IDs available") + + key_id = key_ids[0] + return ( + '' + '' + '' + '16' + 'AESCTR' + '' + f'{key_id.value}' + + (f'{la_url}' if la_url else '') + + (f'{lui_url}' if lui_url else '') + + (f'{ds_id}' if ds_id else '') + + (f'{key_id.checksum}' if key_id.checksum else '') + + '' + '' + ) + + def dumps(self) -> str: + return self._raw_data.decode() diff --git a/pyproject.toml b/pyproject.toml index 6a11328..43f17a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyplayready" -version = "0.0.2" +version = "0.1.0" description = "pyplayready CDM (Content Decryption Module) implementation in Python." license = "GPL-3.0-only" authors = ["DevLARLEY"] @@ -36,6 +36,7 @@ pycryptodome = "^3.21.0" construct = "^2.10.70" ECPy = "^1.2.5" click = "^8.1.7" +xmltodict = "^0.14.2" [tool.poetry.scripts] pyplayready = "pyplayready.main:main" diff --git a/requirements.txt b/requirements.txt index b6d8bb8..308c138 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ requests pycryptodome ecpy construct -click \ No newline at end of file +click +xmltodict \ No newline at end of file