+ File structure re-design

+ Moved ECC functions to a dedicated Crypto class
This commit is contained in:
BuildTools 2024-12-06 19:17:04 +01:00
parent c910d1f349
commit e5782f5674
19 changed files with 713 additions and 641 deletions

View File

@ -33,7 +33,7 @@ An example code snippet:
```python
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.pssh import PSSH
from pyplayready.system.pssh import PSSH
import requests

View File

@ -1,13 +1,14 @@
from .bcert import *
from .cdm import *
from .device import *
from .ecc_key import *
from .elgamal import *
from .key import *
from .pssh import *
from .remotecdm import *
from .session import *
from .xml_key import *
from .xmrlicense import *
from pyplayready.cdm import *
from pyplayready.crypto.ecc_key import *
from pyplayready.crypto.elgamal import *
from pyplayready.device import *
from pyplayready.license.key import *
from pyplayready.license.xml_key import *
from pyplayready.license.xmrlicense import *
from pyplayready.remote.remotecdm import *
from pyplayready.system.bcert import *
from pyplayready.system.pssh import *
from pyplayready.system.session import *
__version__ = "0.4.3"
__version__ = "0.4.4"

View File

@ -10,23 +10,21 @@ import xml.etree.ElementTree as ET
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Random import get_random_bytes
from Crypto.Signature import DSS
from Crypto.Util.Padding import pad
from ecpy.curves import Point, Curve
from pyplayready.bcert import CertificateChain
from pyplayready.ecc_key import ECCKey
from pyplayready.key import Key
from pyplayready.xml_key import XmlKey
from pyplayready.elgamal import ElGamal
from pyplayready.xmrlicense import XMRLicense
from pyplayready.crypto import Crypto
from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.license.key import Key
from pyplayready.license.xml_key import XmlKey
from pyplayready.license.xmrlicense import XMRLicense
from pyplayready.exceptions import (InvalidSession, TooManySessions, InvalidLicense)
from pyplayready.session import Session
from pyplayready.system.session import Session
class Cdm:
MAX_NUM_OF_SESSIONS = 16
def __init__(
@ -45,13 +43,11 @@ class Cdm:
self.client_version = client_version
self.protocol_version = protocol_version
self.curve = Curve.get_curve("secp256r1")
self.elgamal = ElGamal(self.curve)
self.__crypto = Crypto()
self._wmrm_key = Point(
x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
curve=self.curve
curve=Curve.get_curve("secp256r1")
)
self.__sessions: dict[bytes, Session] = {}
@ -98,11 +94,10 @@ class Cdm:
del self.__sessions[session_id]
def _get_key_data(self, session: Session) -> bytes:
point1, point2 = self.elgamal.encrypt(
message_point=session.xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key
return self.__crypto.ecc256_encrypt(
public_key=self._wmrm_key,
plaintext=session.xml_key.get_point()
)
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
def _get_cipher_data(self, session: Session) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
@ -190,10 +185,7 @@ class Cdm:
la_hash = la_hash_obj.digest()
signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
signed_info_digest = SHA256.new(signed_info.encode())
signer = DSS.new(session.signing_key.key, 'fips-186-3')
signature = signer.sign(signed_info_digest)
signature = self.__crypto.ecc256_sign(session.signing_key, signed_info.encode())
# haven't found a better way to do this. xmltodict.unparse doesn't work
main_body = (
@ -224,23 +216,8 @@ class Cdm:
return main_body
def _decrypt_ecc256_key(self, session: Session, encrypted_key: bytes) -> bytes:
point1 = Point(
x=int.from_bytes(encrypted_key[:32], 'big'),
y=int.from_bytes(encrypted_key[32:64], 'big'),
curve=self.curve
)
point2 = Point(
x=int.from_bytes(encrypted_key[64:96], 'big'),
y=int.from_bytes(encrypted_key[96:128], 'big'),
curve=self.curve
)
decrypted = self.elgamal.decrypt((point1, point2), int(session.encryption_key.key.d))
return self.elgamal.to_bytes(decrypted.x)[16:32]
@staticmethod
def _verify_ecc_key(session: Session, licence: XMRLicense) -> bool:
def _verify_encryption_key(session: Session, licence: XMRLicense) -> bool:
ecc_keys = list(licence.get_object(42))
if not ecc_keys:
raise InvalidLicense("No ECC public key in license")
@ -258,20 +235,28 @@ class Cdm:
try:
root = ET.fromstring(licence)
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
for license_element in license_elements:
parsed_licence = XMRLicense.loads(license_element.text)
if not self._verify_ecc_key(session, parsed_licence):
if not self._verify_encryption_key(session, parsed_licence):
raise InvalidLicense("Public encryption key does not match")
for key in parsed_licence.get_content_keys():
if Key.CipherType(key.cipher_type) == Key.CipherType.ECC_256:
for content_key in parsed_licence.get_content_keys():
if Key.CipherType(content_key.cipher_type) == Key.CipherType.ECC_256:
key = self.__crypto.ecc256_decrypt(
private_key=session.encryption_key,
ciphertext=content_key.encrypted_key
)[16:32]
else:
continue
session.keys.append(Key(
key_id=UUID(bytes_le=key.key_id),
key_type=key.key_type,
cipher_type=key.cipher_type,
key_length=key.key_length,
key=self._decrypt_ecc256_key(session, key.encrypted_key)
key_id=UUID(bytes_le=content_key.key_id),
key_type=content_key.key_type,
cipher_type=content_key.cipher_type,
key_length=content_key.key_length,
key=key
))
except InvalidLicense as e:
raise InvalidLicense(e)

View File

@ -0,0 +1,96 @@
from typing import Union, Tuple
from Crypto.Hash import SHA256
from Crypto.Hash.SHA256 import SHA256Hash
from Crypto.PublicKey.ECC import EccKey
from Crypto.Signature import DSS
from ecpy.curves import Point, Curve
from pyplayready.crypto.elgamal import ElGamal
from pyplayready.crypto.ecc_key import ECCKey
class Crypto:
def __init__(self, curve: str = "secp256r1"):
self.curve = Curve.get_curve(curve)
self.elgamal = ElGamal(self.curve)
def ecc256_encrypt(self, public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
if isinstance(public_key, ECCKey):
public_key = public_key.get_point(self.curve)
if not isinstance(public_key, Point):
raise ValueError(f"Expecting ECCKey or Point input, got {public_key!r}")
if isinstance(plaintext, bytes):
plaintext = Point(
x=int.from_bytes(plaintext[:32], 'big'),
y=int.from_bytes(plaintext[32:64], 'big'),
curve=self.curve
)
if not isinstance(plaintext, Point):
raise ValueError(f"Expecting Point or Bytes input, got {plaintext!r}")
point1, point2 = self.elgamal.encrypt(
message_point=plaintext,
public_key=public_key
)
return b''.join([
self.elgamal.to_bytes(point1.x),
self.elgamal.to_bytes(point1.y),
self.elgamal.to_bytes(point2.x),
self.elgamal.to_bytes(point2.y)
])
def ecc256_decrypt(self, private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
if isinstance(ciphertext, bytes):
ciphertext = (
Point(
x=int.from_bytes(ciphertext[:32], 'big'),
y=int.from_bytes(ciphertext[32:64], 'big'),
curve=self.curve
),
Point(
x=int.from_bytes(ciphertext[64:96], 'big'),
y=int.from_bytes(ciphertext[96:128], 'big'),
curve=self.curve
)
)
if not isinstance(ciphertext, Tuple):
raise ValueError(f"Expecting Tuple[Point, Point] or Bytes input, got {ciphertext!r}")
decrypted = self.elgamal.decrypt(ciphertext, int(private_key.key.d))
return self.elgamal.to_bytes(decrypted.x)
@staticmethod
def ecc256_sign(private_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes]) -> bytes:
if isinstance(private_key, ECCKey):
private_key = private_key.key
if not isinstance(private_key, EccKey):
raise ValueError(f"Expecting ECCKey or EccKey input, got {private_key!r}")
if isinstance(data, bytes):
data = SHA256.new(data)
if not isinstance(data, SHA256Hash):
raise ValueError(f"Expecting SHA256Hash or Bytes input, got {data!r}")
signer = DSS.new(private_key, 'fips-186-3')
return signer.sign(data)
@staticmethod
def ecc256_verify(public_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes], signature: bytes) -> bool:
if isinstance(public_key, ECCKey):
public_key = public_key.key
if not isinstance(public_key, EccKey):
raise ValueError(f"Expecting ECCKey or EccKey input, got {public_key!r}")
if isinstance(data, bytes):
data = SHA256.new(data)
if not isinstance(data, SHA256Hash):
raise ValueError(f"Expecting SHA256Hash or Bytes input, got {data!r}")
verifier = DSS.new(public_key, 'fips-186-3')
try:
verifier.verify(data, signature)
return True
except ValueError:
return False

View File

@ -11,7 +11,7 @@ from ecpy.curves import Curve, Point
class ECCKey:
"""Represents a PlayReady ECC key"""
"""Represents a PlayReady ECC key pair"""
def __init__(self, key: EccKey):
self.key = key

View File

@ -5,61 +5,21 @@ from enum import IntEnum
from pathlib import Path
from typing import Union, Any
from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
from pyplayready.bcert import CertificateChain
from pyplayready.ecc_key import ECCKey
class SecurityLevel(IntEnum):
SL150 = 150
SL2000 = 2000
SL3000 = 3000
class _DeviceStructs:
magic = Const(b"PRD")
header = Struct(
"signature" / magic,
"version" / Int8ub,
)
# was never in production
v1 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key_length" / Int32ub,
"group_key" / Bytes(this.group_key_length),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length)
)
v2 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
)
v3 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key" / Bytes(96),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
)
from pyplayready.device.structs import DeviceStructs
from pyplayready.system.bcert import CertificateChain
from pyplayready.crypto.ecc_key import ECCKey
class Device:
"""Represents a PlayReady Device (.prd)"""
CURRENT_STRUCT = _DeviceStructs.v3
CURRENT_STRUCT = DeviceStructs.v3
CURRENT_VERSION = 3
class SecurityLevel(IntEnum):
SL150 = 150
SL2000 = 2000
SL3000 = 3000
def __init__(
self,
*_: Any,
@ -100,11 +60,11 @@ class Device:
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
prd_header = _DeviceStructs.header.parse(data)
prd_header = DeviceStructs.header.parse(data)
if prd_header.version == 2:
return cls(
group_key=None,
**_DeviceStructs.v2.parse(data)
**DeviceStructs.v2.parse(data)
)
return cls(**cls.CURRENT_STRUCT.parse(data))

View File

@ -0,0 +1,39 @@
from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
class DeviceStructs:
magic = Const(b"PRD")
header = Struct(
"signature" / magic,
"version" / Int8ub,
)
# was never in production
v1 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key_length" / Int32ub,
"group_key" / Bytes(this.group_key_length),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length)
)
v2 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
)
v3 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key" / Bytes(96),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
)

View File

@ -1,13 +1,15 @@
from ecpy.curves import Point, Curve
from pyplayready.ecc_key import ECCKey
from pyplayready.elgamal import ElGamal
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.crypto.elgamal import ElGamal
class XmlKey:
"""Represents a PlayReady XMLKey"""
def __init__(self):
self.curve = Curve.get_curve("secp256r1")
self._shared_point = ECCKey.generate()
self.shared_key_x = self._shared_point.key.pointQ.x
self.shared_key_y = self._shared_point.key.pointQ.y
@ -16,5 +18,5 @@ class XmlKey:
self.aes_iv = self._shared_key_x_bytes[:16]
self.aes_key = self._shared_key_x_bytes[16:]
def get_point(self, curve: Curve) -> Point:
return Point(self.shared_key_x, self.shared_key_y, curve)
def get_point(self) -> Point:
return Point(self.shared_key_x, self.shared_key_y, self.curve)

View File

@ -249,5 +249,4 @@ class XMRLicense(_XMRLicenseStructs):
yield container.data
def get_content_keys(self):
for content_key in self.get_object(10):
yield content_key
yield from self.get_object(10)

View File

@ -8,12 +8,12 @@ import requests
from Crypto.Random import get_random_bytes
from pyplayready import __version__
from pyplayready.bcert import CertificateChain, Certificate
from pyplayready.system.bcert import CertificateChain, Certificate
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.ecc_key import ECCKey
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.exceptions import OutdatedDevice
from pyplayready.pssh import PSSH
from pyplayready.system.pssh import PSSH
@click.group(invoke_without_command=True)
@ -306,7 +306,7 @@ def serve_(config_path: Path, host: str, port: int) -> None:
Host as 127.0.0.1 may block remote access even if port-forwarded.
Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded.
"""
from pyplayready import serve
from pyplayready.remote import serve
import yaml
config = yaml.safe_load(config_path.read_text(encoding="utf8"))

View File

@ -6,7 +6,7 @@ import requests
from pyplayready.cdm import Cdm
from pyplayready.device import Device
from pyplayready.key import Key
from pyplayready.license.key import Key
from pyplayready.exceptions import (DeviceMismatch, InvalidInitData)

View File

@ -1,4 +1,3 @@
import base64
from pathlib import Path
from typing import Any, Optional, Union

View File

@ -3,6 +3,7 @@ import collections.abc
from Crypto.PublicKey import ECC
from pyplayready.crypto import Crypto
from pyplayready.exceptions import InvalidCertificateChain
# monkey patch for construct 2.8.8 compatibility
@ -13,13 +14,11 @@ import base64
from pathlib import Path
from typing import Union
from Crypto.Hash import SHA256
from Crypto.Signature import DSS
from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
from construct import Int16ub, Array
from construct import Struct, this
from pyplayready.ecc_key import ECCKey
from pyplayready.crypto.ecc_key import ECCKey
class _BCertStructs:
@ -249,7 +248,7 @@ class Certificate(_BCertStructs):
# 2, # Receiver
# 3, # SharedCertificate
4, # SecureClock
5, # AntiRollBackClock
# 5, # AntiRollBackClock
# 6, # ReservedMetering
# 7, # ReservedLicSync
# 8, # ReservedSymOpt
@ -323,10 +322,7 @@ class Certificate(_BCertStructs):
new_bcert_container.total_length = len(payload) + 144 # signature length
sign_payload = _BCertStructs.BCert.build(new_bcert_container)
hash_obj = SHA256.new(sign_payload)
signer = DSS.new(group_key.key, 'fips-186-3')
signature = signer.sign(hash_obj)
signature = Crypto.ecc256_sign(group_key, sign_payload)
signature_info = Container(
signature_type=1,
@ -403,14 +399,11 @@ class Certificate(_BCertStructs):
point_y=int.from_bytes(raw_signature_key[32:], 'big')
)
hash_obj = SHA256.new(sign_payload)
verifier = DSS.new(signature_key, 'fips-186-3')
try:
verifier.verify(hash_obj, signature_attribute.signature)
return True
except ValueError:
return False
return Crypto.ecc256_verify(
public_key=signature_key,
data=sign_payload,
signature=signature_attribute.signature
)
class CertificateChain(_BCertStructs):

View File

@ -5,7 +5,7 @@ from uuid import UUID
from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, Switch, Int32ub, Const, Container, ConstructError
from pyplayready.exceptions import InvalidPssh
from pyplayready.wrmheader import WRMHeader
from pyplayready.system.wrmheader import WRMHeader
class _PlayreadyPSSHStructs:

View File

@ -1,10 +1,8 @@
from typing import Optional
from Crypto.Random import get_random_bytes
from pyplayready.key import Key
from pyplayready.ecc_key import ECCKey
from pyplayready.xml_key import XmlKey
from pyplayready.license.key import Key
from pyplayready.crypto.ecc_key import ECCKey
from pyplayready.license.xml_key import XmlKey
class Session:

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
version = "0.4.3"
version = "0.4.4"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY, Erevoc", "DevataDev"]