+ Added encryption key verification

+ Fixed <Python 3.10 CLI command incompatibility
+ Small fixes
This commit is contained in:
BuildTools 2024-11-15 20:18:00 +01:00
parent 859d78e175
commit 02f4cfa90e
9 changed files with 48 additions and 34 deletions

View File

@ -10,4 +10,4 @@ from .session import *
from .xml_key import * from .xml_key import *
from .xmrlicense import * from .xmrlicense import *
__version__ = "0.1.0" __version__ = "0.3.1"

View File

@ -189,7 +189,7 @@ class Certificate(_BCertStructs):
self._BCERT = bcert_obj self._BCERT = bcert_obj
@classmethod @classmethod
def new_key_cert( def new_leaf_cert(
cls, cls,
cert_id: bytes, cert_id: bytes,
security_level: int, security_level: int,

View File

@ -21,7 +21,7 @@ from pyplayready.xml_key import XmlKey
from pyplayready.elgamal import ElGamal from pyplayready.elgamal import ElGamal
from pyplayready.xmrlicense import XMRLicense from pyplayready.xmrlicense import XMRLicense
from pyplayready.exceptions import (InvalidSession, TooManySessions) from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense)
from pyplayready.session import Session from pyplayready.session import Session
@ -78,7 +78,7 @@ class Cdm:
session = Session(len(self.__sessions) + 1) session = Session(len(self.__sessions) + 1)
self.__sessions[session.id] = session self.__sessions[session.id] = session
session._xml_key = XmlKey() session.xml_key = XmlKey()
return session.id return session.id
@ -97,21 +97,21 @@ class Cdm:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.") raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
del self.__sessions[session_id] del self.__sessions[session_id]
def get_key_data(self, session: Session) -> bytes: def _get_key_data(self, session: Session) -> bytes:
point1, point2 = self.elgamal.encrypt( point1, point2 = self.elgamal.encrypt(
message_point= session._xml_key.get_point(self.elgamal.curve), message_point=session.xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key public_key=self._wmrm_key
) )
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y) return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
def get_cipher_data(self, session: Session) -> bytes: def _get_cipher_data(self, session: Session) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode() b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>" body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>"
cipher = AES.new( cipher = AES.new(
key=session._xml_key.aes_key, key=session.xml_key.aes_key,
mode=AES.MODE_CBC, mode=AES.MODE_CBC,
iv=session._xml_key.aes_iv iv=session.xml_key.aes_iv
) )
ciphertext = cipher.encrypt(pad( ciphertext = cipher.encrypt(pad(
@ -119,7 +119,7 @@ class Cdm:
AES.block_size AES.block_size
)) ))
return session._xml_key.aes_iv + ciphertext return session.xml_key.aes_iv + ciphertext
def _build_digest_content( def _build_digest_content(
self, self,
@ -181,8 +181,8 @@ class Cdm:
la_content = self._build_digest_content( la_content = self._build_digest_content(
content_header=content_header, content_header=content_header,
nonce=base64.b64encode(get_random_bytes(16)).decode(), nonce=base64.b64encode(get_random_bytes(16)).decode(),
wmrm_cipher=base64.b64encode(self.get_key_data(session)).decode(), wmrm_cipher=base64.b64encode(self._get_key_data(session)).decode(),
cert_cipher=base64.b64encode(self.get_cipher_data(session)).decode() cert_cipher=base64.b64encode(self._get_cipher_data(session)).decode()
) )
la_hash_obj = SHA256.new() la_hash_obj = SHA256.new()
@ -239,6 +239,14 @@ class Cdm:
decrypted = self.elgamal.decrypt((point1, point2), int(session.encryption_key.key.d)) decrypted = self.elgamal.decrypt((point1, point2), int(session.encryption_key.key.d))
return self.elgamal.to_bytes(decrypted.x)[16:32] return self.elgamal.to_bytes(decrypted.x)[16:32]
@staticmethod
def _verify_ecc_key(session: Session, licence: XMRLicense) -> bool:
ecc_keys = list(licence.get_object(42))
if not ecc_keys:
raise InvalidLicense("No ECC public key in license")
return ecc_keys[0].key == session.encryption_key.public_bytes()
def parse_license(self, session_id: bytes, licence: str) -> None: def parse_license(self, session_id: bytes, licence: str) -> None:
session = self.__sessions.get(session_id) session = self.__sessions.get(session_id)
if not session: if not session:
@ -249,6 +257,10 @@ class Cdm:
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License") license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
for license_element in license_elements: for license_element in license_elements:
parsed_licence = XMRLicense.loads(license_element.text) parsed_licence = XMRLicense.loads(license_element.text)
if not self._verify_ecc_key(session, parsed_licence):
raise InvalidLicense("Public encryption key does not match")
for key in parsed_licence.get_content_keys(): for key in parsed_licence.get_content_keys():
if Key.CipherType(key.cipher_type) == Key.CipherType.ECC256: if Key.CipherType(key.cipher_type) == Key.CipherType.ECC256:
session.keys.append(Key( session.keys.append(Key(

View File

@ -1,18 +1,22 @@
class PyPlayredyException(Exception): class PyPlayreadyException(Exception):
"""Exceptions used by pyplayready.""" """Exceptions used by pyplayready."""
class TooManySessions(PyPlayredyException): class TooManySessions(PyPlayreadyException):
"""Too many Sessions are open.""" """Too many Sessions are open."""
class InvalidSession(PyPlayredyException): class InvalidSession(PyPlayreadyException):
"""No Session is open with the specified identifier.""" """No Session is open with the specified identifier."""
class InvalidInitData(PyPlayredyException): class InvalidInitData(PyPlayreadyException):
"""The Playready Cenc Header Data is invalid or empty.""" """The Playready Cenc Header Data is invalid or empty."""
class DeviceMismatch(PyPlayredyException): class DeviceMismatch(PyPlayreadyException):
"""The Remote CDMs Device information and the APIs Device information did not match.""" """The Remote CDMs Device information and the APIs Device information did not match."""
class InvalidLicense(PyPlayreadyException):
"""Unable to parse XMR License."""

View File

@ -26,7 +26,7 @@ def main(version: bool, debug: bool) -> None:
current_year = datetime.now().year current_year = datetime.now().year
copyright_years = f"2024-{current_year}" copyright_years = f"2024-{current_year}"
log.info("pyplayready version %s Copyright (c) %s DevLARLEY", __version__, copyright_years) log.info("pyplayready version %s Copyright (c) %s DevLARLEY, Erevoc", __version__, copyright_years)
log.info("https://github.com/ready-dl/pyplayready") log.info("https://github.com/ready-dl/pyplayready")
log.info("Run 'pyplayready --help' for help") log.info("Run 'pyplayready --help' for help")
if version: if version:
@ -147,7 +147,7 @@ def create_device(
certificate_chain = CertificateChain.load(group_certificate) certificate_chain = CertificateChain.load(group_certificate)
group_key = ECCKey.load(group_key) group_key = ECCKey.load(group_key)
new_certificate = Certificate.new_key_cert( new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16), cert_id=get_random_bytes(16),
security_level=certificate_chain.get_security_level(), security_level=certificate_chain.get_security_level(),
client_id=get_random_bytes(16), client_id=get_random_bytes(16),

View File

@ -1,5 +1,4 @@
import base64 import base64
import sys
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Union from typing import Any, Optional, Union

View File

@ -11,7 +11,7 @@ class Session:
def __init__(self, number: int): def __init__(self, number: int):
self.number = number self.number = number
self.id = get_random_bytes(16) self.id = get_random_bytes(16)
self._xml_key = XmlKey() self.xml_key = XmlKey()
self.signing_key: ECCKey = None self.signing_key: ECCKey = None
self.encryption_key: ECCKey = None self.encryption_key: ECCKey = None
self.keys: list[Key] = [] self.keys: list[Key] = []

View File

@ -147,15 +147,14 @@ class WRMHeader:
if not data: if not data:
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
match self.version: if self.version == self.Version.VERSION_4_0_0_0:
case self.Version.VERSION_4_0_0_0: return self._read_v4_0_0_0(data)
return self._read_v4_0_0_0(data) elif self.version == self.Version.VERSION_4_1_0_0:
case self.Version.VERSION_4_1_0_0: return self._read_v4_1_0_0(data)
return self._read_v4_1_0_0(data) elif self.version == self.Version.VERSION_4_2_0_0:
case self.Version.VERSION_4_2_0_0: return self._read_v4_2_0_0(data)
return self._read_v4_2_0_0(data) elif self.version == self.Version.VERSION_4_3_0_0:
case self.Version.VERSION_4_3_0_0: return self._read_v4_3_0_0(data)
return self._read_v4_3_0_0(data)
@staticmethod @staticmethod
def _build_v4_0_0_0_wrm_header( def _build_v4_0_0_0_wrm_header(

View File

@ -4,10 +4,10 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "pyplayready" name = "pyplayready"
version = "0.1.0" version = "0.3.1"
description = "pyplayready CDM (Content Decryption Module) implementation in Python." description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "GPL-3.0-only" license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY"] authors = ["DevLARLEY, Erevoc"]
readme = "README.md" readme = "README.md"
repository = "https://github.com/ready-dl/pyplayready" repository = "https://github.com/ready-dl/pyplayready"
keywords = ["python", "drm", "playready", "microsoft"] keywords = ["python", "drm", "playready", "microsoft"]