+ v3 PRD device format

+ Reprovisioning CLI function for v3
This commit is contained in:
BuildTools 2024-11-30 12:55:39 +01:00
parent 9b25aafe07
commit 78fb7be3b7
7 changed files with 179 additions and 55 deletions

View File

@ -10,4 +10,4 @@ from .session import *
from .xml_key import * from .xml_key import *
from .xmrlicense import * from .xmrlicense import *
__version__ = "0.3.9" __version__ = "0.4.0"

View File

@ -1,6 +1,10 @@
from __future__ import annotations from __future__ import annotations
import collections.abc import collections.abc
from Crypto.PublicKey import ECC
from pyplayready.exceptions import InvalidCertificateChain
# monkey patch for construct 2.8.8 compatibility # monkey patch for construct 2.8.8 compatibility
if not hasattr(collections, 'Sequence'): if not hasattr(collections, 'Sequence'):
collections.Sequence = collections.abc.Sequence collections.Sequence = collections.abc.Sequence
@ -51,7 +55,7 @@ class _BCertStructs:
) )
DrmBCertFeatureInfo = Struct( DrmBCertFeatureInfo = Struct(
"feature_count" / Int32ub, "feature_count" / Int32ub, # max. 32
"features" / Array(this.feature_count, Int32ub) "features" / Array(this.feature_count, Int32ub)
) )
@ -100,8 +104,8 @@ class _BCertStructs:
# TODO: untested # TODO: untested
DrmBCertExtDataSignKeyInfo = Struct( DrmBCertExtDataSignKeyInfo = Struct(
"type" / Int16ub, "key_type" / Int16ub,
"length" / Int16ub, "key_length" / Int16ub,
"flags" / Int32ub, "flags" / Int32ub,
"key" / Bytes(this.length // 8) "key" / Bytes(this.length // 8)
) )
@ -121,7 +125,7 @@ class _BCertStructs:
# TODO: untested # TODO: untested
BCertExtDataContainer = Struct( BCertExtDataContainer = Struct(
"record_count" / Int32ub, "record_count" / Int32ub, # always 1
"records" / Array(this.record_count, BCertExtDataRecord), "records" / Array(this.record_count, BCertExtDataRecord),
"signature" / DrmBCertExtDataSignature "signature" / DrmBCertExtDataSignature
) )
@ -380,6 +384,26 @@ class Certificate(_BCertStructs):
def struct(self) -> _BCertStructs.BCert: def struct(self) -> _BCertStructs.BCert:
return self._BCERT return self._BCERT
def verify_signature(self):
sign_payload = self.dumps()[:-144]
signature_attribute = self.get_attribute(8).attribute
raw_signature_key = signature_attribute.signature_key
signature_key = ECC.construct(
curve='P-256',
point_x=int.from_bytes(raw_signature_key[:32], 'big'),
point_y=int.from_bytes(raw_signature_key[32:], 'big')
)
hash_obj = SHA256.new(sign_payload)
verifier = DSS.new(signature_key, 'fips-186-3')
try:
verifier.verify(hash_obj, signature_attribute.signature)
return True
except ValueError:
return False
class CertificateChain(_BCertStructs): class CertificateChain(_BCertStructs):
"""Represents a BCertChain""" """Represents a BCertChain"""
@ -437,3 +461,25 @@ class CertificateChain(_BCertStructs):
self.parsed.certificate_count += 1 self.parsed.certificate_count += 1
self.parsed.certificates.insert(0, bcert.parsed) self.parsed.certificates.insert(0, bcert.parsed)
self.parsed.total_length += len(bcert.dumps()) self.parsed.total_length += len(bcert.dumps())
def remove(self, index: int) -> None:
if self.parsed.certificate_count <= 0:
raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
if index >= self.parsed.certificate_count:
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
self.parsed.certificate_count -= 1
bcert = Certificate(self.parsed.certificates[index])
self.parsed.total_length -= len(bcert.dumps())
self.parsed.certificates.pop(index)
def get(self, index: int) -> Certificate:
if self.parsed.certificate_count <= 0:
raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
if index >= self.parsed.certificate_count:
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
return Certificate(self.parsed.certificates[index])
def count(self) -> int:
return self.parsed.certificate_count

View File

@ -20,6 +20,12 @@ class SecurityLevel(IntEnum):
class _DeviceStructs: class _DeviceStructs:
magic = Const(b"PRD") magic = Const(b"PRD")
header = Struct(
"signature" / magic,
"version" / Int8ub,
)
# was never in production
v1 = Struct( v1 = Struct(
"signature" / magic, "signature" / magic,
"version" / Int8ub, "version" / Int8ub,
@ -38,36 +44,53 @@ class _DeviceStructs:
"signing_key" / Bytes(96), "signing_key" / Bytes(96),
) )
v3 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key" / Bytes(96),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
)
class Device: class Device:
"""Represents a PlayReady Device (.prd)""" """Represents a PlayReady Device (.prd)"""
CURRENT_STRUCT = _DeviceStructs.v2 CURRENT_STRUCT = _DeviceStructs.v3
CURRENT_VERSION = 3
def __init__( def __init__(
self, self,
*_: Any, *_: Any,
group_certificate: Union[str, bytes], group_key: Union[str, bytes, None],
encryption_key: Union[str, bytes], encryption_key: Union[str, bytes],
signing_key: Union[str, bytes], signing_key: Union[str, bytes],
group_certificate: Union[str, bytes],
**__: Any **__: Any
): ):
if isinstance(group_certificate, str): if isinstance(group_key, str):
group_certificate = base64.b64decode(group_certificate) group_key = base64.b64decode(group_key)
if not isinstance(group_certificate, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
if isinstance(encryption_key, str): if isinstance(encryption_key, str):
encryption_key = base64.b64decode(encryption_key) encryption_key = base64.b64decode(encryption_key)
if not isinstance(encryption_key, bytes): if not isinstance(encryption_key, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}") raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}")
if isinstance(signing_key, str): if isinstance(signing_key, str):
signing_key = base64.b64decode(signing_key) signing_key = base64.b64decode(signing_key)
if not isinstance(signing_key, bytes): if not isinstance(signing_key, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}") raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}")
self.group_certificate = CertificateChain.loads(group_certificate) if isinstance(group_certificate, str):
group_certificate = base64.b64decode(group_certificate)
if not isinstance(group_certificate, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
self.group_key = None if group_key is None else ECCKey.loads(group_key)
self.encryption_key = ECCKey.loads(encryption_key) self.encryption_key = ECCKey.loads(encryption_key)
self.signing_key = ECCKey.loads(signing_key) self.signing_key = ECCKey.loads(signing_key)
self.group_certificate = CertificateChain.loads(group_certificate)
self.security_level = self.group_certificate.get_security_level() self.security_level = self.group_certificate.get_security_level()
@classmethod @classmethod
@ -76,6 +99,14 @@ class Device:
data = base64.b64decode(data) data = base64.b64decode(data)
if not isinstance(data, bytes): if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
prd_header = _DeviceStructs.header.parse(data)
if prd_header.version == 2:
return cls(
group_key=None,
**_DeviceStructs.v2.parse(data)
)
return cls(**cls.CURRENT_STRUCT.parse(data)) return cls(**cls.CURRENT_STRUCT.parse(data))
@classmethod @classmethod
@ -87,11 +118,12 @@ class Device:
def dumps(self) -> bytes: def dumps(self) -> bytes:
return self.CURRENT_STRUCT.build(dict( return self.CURRENT_STRUCT.build(dict(
version=2, version=self.CURRENT_VERSION,
group_key=self.group_key.dumps(),
encryption_key=self.encryption_key.dumps(),
signing_key=self.signing_key.dumps(),
group_certificate_length=len(self.group_certificate.dumps()), group_certificate_length=len(self.group_certificate.dumps()),
group_certificate=self.group_certificate.dumps(), group_certificate=self.group_certificate.dumps(),
encryption_key=self.encryption_key.dumps(),
signing_key=self.signing_key.dumps()
)) ))
def dump(self, path: Union[Path, str]) -> None: def dump(self, path: Union[Path, str]) -> None:
@ -101,6 +133,6 @@ class Device:
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps()) path.write_bytes(self.dumps())
def get_name(self): def get_name(self) -> str:
name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}" name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}"
return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_") return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_")

View File

@ -22,28 +22,13 @@ class ECCKey:
return cls(key=ECC.generate(curve='P-256')) return cls(key=ECC.generate(curve='P-256'))
@classmethod @classmethod
def construct( def construct(cls, private_key: Union[bytes, int]):
cls,
private_key: Union[bytes, int],
public_key_x: Union[bytes, int],
public_key_y: Union[bytes, int]
):
"""Construct an ECC key pair from private/public bytes/ints""" """Construct an ECC key pair from private/public bytes/ints"""
if isinstance(private_key, bytes): if isinstance(private_key, bytes):
private_key = int.from_bytes(private_key, 'big') private_key = int.from_bytes(private_key, 'big')
if not isinstance(private_key, int): if not isinstance(private_key, int):
raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}") raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
if isinstance(public_key_x, bytes):
public_key_x = int.from_bytes(public_key_x, 'big')
if not isinstance(public_key_x, int):
raise ValueError(f"Expecting Bytes or Int input, got {public_key_x!r}")
if isinstance(public_key_y, bytes):
public_key_y = int.from_bytes(public_key_y, 'big')
if not isinstance(public_key_y, int):
raise ValueError(f"Expecting Bytes or Int input, got {public_key_y!r}")
# The public is always derived from the private key; loading the other stuff won't work # The public is always derived from the private key; loading the other stuff won't work
key = ECC.construct( key = ECC.construct(
curve='P-256', curve='P-256',
@ -62,11 +47,7 @@ class ECCKey:
if len(data) not in [96, 32]: if len(data) not in [96, 32]:
raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}") raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}")
return cls.construct( return cls.construct(private_key=data[:32])
private_key=data[:32],
public_key_x=data[32:64],
public_key_y=data[64:96]
)
@classmethod @classmethod
def load(cls, path: Union[Path, str]) -> ECCKey: def load(cls, path: Union[Path, str]) -> ECCKey:

View File

@ -24,3 +24,11 @@ class DeviceMismatch(PyPlayreadyException):
class InvalidLicense(PyPlayreadyException): class InvalidLicense(PyPlayreadyException):
"""Unable to parse XMR License.""" """Unable to parse XMR License."""
class InvalidCertificateChain(PyPlayreadyException):
"""The BCert is not correctly formatted."""
class OutdatedDevice(PyPlayreadyException):
"""The PlayReady Device is outdated and does not support a specific operation."""

View File

@ -12,6 +12,7 @@ from pyplayready.bcert import CertificateChain, Certificate
from pyplayready.cdm import Cdm from pyplayready.cdm import Cdm
from pyplayready.device import Device from pyplayready.device import Device
from pyplayready.ecc_key import ECCKey from pyplayready.ecc_key import ECCKey
from pyplayready.exceptions import OutdatedDevice
from pyplayready.pssh import PSSH from pyplayready.pssh import PSSH
@ -144,8 +145,8 @@ def create_device(
encryption_key = ECCKey.generate() encryption_key = ECCKey.generate()
signing_key = ECCKey.generate() signing_key = ECCKey.generate()
certificate_chain = CertificateChain.load(group_certificate)
group_key = ECCKey.load(group_key) group_key = ECCKey.load(group_key)
certificate_chain = CertificateChain.load(group_certificate)
new_certificate = Certificate.new_leaf_cert( new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16), cert_id=get_random_bytes(16),
@ -159,13 +160,12 @@ def create_device(
certificate_chain.prepend(new_certificate) certificate_chain.prepend(new_certificate)
device = Device( device = Device(
group_certificate=certificate_chain.dumps(), group_key=group_key.dumps(),
encryption_key=encryption_key.dumps(), encryption_key=encryption_key.dumps(),
signing_key=signing_key.dumps() signing_key=signing_key.dumps(),
group_certificate=certificate_chain.dumps(),
) )
prd_bin = device.dumps()
if output and output.suffix: if output and output.suffix:
if output.suffix.lower() != ".prd": if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
@ -179,23 +179,78 @@ def create_device(
return return
out_path.parent.mkdir(parents=True, exist_ok=True) out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(prd_bin) out_path.write_bytes(device.dumps())
log.info("Created Playready Device (.prd) file, %s", out_path.name) log.info("Created Playready Device (.prd) file, %s", out_path.name)
log.info(" + Security Level: %s", device.security_level) log.info(" + Security Level: %s", device.security_level)
log.info(" + Group Certificate: %s (%s bytes)", bool(device.group_certificate.dumps()), len(device.group_certificate.dumps())) log.info(" + Group Key: %s bytes", len(device.group_key.dumps()))
log.info(" + Encryption Key: %s (%s bytes)", bool(device.encryption_key.dumps()), len(device.encryption_key.dumps())) log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps()))
log.info(" + Signing Key: %s (%s bytes)", bool(device.signing_key.dumps()), len(device.signing_key.dumps())) log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps()))
log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps()))
log.info(" + Saved to: %s", out_path.absolute()) log.info(" + Saved to: %s", out_path.absolute())
@main.command()
@click.argument("prd_path", type=Path)
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def reprovision_device(ctx: click.Context, prd_path: Path, output: Optional[Path] = None) -> None:
"""
Reprovision a Playready Device (.prd) by creating a new leaf certificate and new encryption/signing keys.
Will override the device if an output path or directory is not specified
Only works on PRD Devices of v3 or higher
"""
if not prd_path.is_file():
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("reprovision-device")
log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name)
device = Device.load(prd_path)
if device.group_key is None:
raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher")
device.group_certificate.remove(0)
encryption_key = ECCKey.generate()
signing_key = ECCKey.generate()
device.encryption_key = encryption_key
device.signing_key = signing_key
new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16),
security_level=device.group_certificate.get_security_level(),
client_id=get_random_bytes(16),
signing_key=signing_key,
encryption_key=encryption_key,
group_key=device.group_key,
parent=device.group_certificate
)
device.group_certificate.prepend(new_certificate)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
out_path = output
else:
out_path = prd_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(device.dumps())
log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name)
@main.command() @main.command()
@click.argument("prd_path", type=Path) @click.argument("prd_path", type=Path)
@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory") @click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory")
@click.pass_context @click.pass_context
def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None: def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None:
""" """
Export a Playready Device (.prd) file to a Group Certificate, Encryption Key and Signing Key Export a Playready Device (.prd) file to a Group Key, Encryption Key, Signing Key and Group Certificate
If an output directory is not specified, it will be stored in the current working directory If an output directory is not specified, it will be stored in the current working directory
""" """
if not prd_path.is_file(): if not prd_path.is_file():
@ -222,9 +277,9 @@ def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] =
log.info(f"L{device.security_level} {device.get_name()}") log.info(f"L{device.security_level} {device.get_name()}")
log.info(f"Saving to: {out_path}") log.info(f"Saving to: {out_path}")
client_id_path = out_path / "bgroupcert.dat" group_key_path = out_path / "zgpriv.dat"
client_id_path.write_bytes(device.group_certificate.dumps()) group_key_path.write_bytes(device.group_key.dumps())
log.info("Exported Group Certificate to bgroupcert.dat") log.info("Exported Group Key as zgpriv.dat")
private_key_path = out_path / "zprivencr.dat" private_key_path = out_path / "zprivencr.dat"
private_key_path.write_bytes(device.encryption_key.dumps()) private_key_path.write_bytes(device.encryption_key.dumps())
@ -234,6 +289,10 @@ def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] =
private_key_path.write_bytes(device.signing_key.dumps()) private_key_path.write_bytes(device.signing_key.dumps())
log.info("Exported Signing Key as zprivsig.dat") log.info("Exported Signing Key as zprivsig.dat")
client_id_path = out_path / "bgroupcert.dat"
client_id_path.write_bytes(device.group_certificate.dumps())
log.info("Exported Group Certificate to bgroupcert.dat")
@main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.") @main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.")
@click.argument("config_path", type=Path) @click.argument("config_path", type=Path)
@ -243,16 +302,14 @@ def serve_(config_path: Path, host: str, port: int) -> None:
""" """
Serve your local CDM and Playready Devices Remotely. Serve your local CDM and Playready Devices Remotely.
\b
[CONFIG] is a path to a serve config file. [CONFIG] is a path to a serve config file.
See `serve.example.yml` for an example config file. See `serve.example.yml` for an example config file.
\b
Host as 127.0.0.1 may block remote access even if port-forwarded. Host as 127.0.0.1 may block remote access even if port-forwarded.
Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded. Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded.
""" """
from pyplayready import serve # isort:skip from pyplayready import serve
import yaml # isort:skip import yaml
config = yaml.safe_load(config_path.read_text(encoding="utf8")) config = yaml.safe_load(config_path.read_text(encoding="utf8"))
serve.run(config, host, port) serve.run(config, host, port)

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "pyplayready" name = "pyplayready"
version = "0.3.9" version = "0.4.0"
description = "pyplayready CDM (Content Decryption Module) implementation in Python." description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "CC BY-NC-ND 4.0" license = "CC BY-NC-ND 4.0"
authors = ["DevLARLEY, Erevoc", "DevataDev"] authors = ["DevLARLEY, Erevoc", "DevataDev"]