RemoteCDM Improvements

This commit is contained in:
Erevoc 2024-11-15 19:25:50 +02:00
parent 761e879ba7
commit 1e01ca9e8d
5 changed files with 24 additions and 24 deletions

View File

@ -66,6 +66,8 @@ cdm.parse_license(session_id, response.text)
for key in cdm.get_keys(session_id):
print(f"{key.key_id.hex}:{key.key.hex()}")
cdm.close(session_id)
```
## Disclaimer

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import base64
import math
import time
from typing import List
from typing import List, Union
from uuid import UUID
import xml.etree.ElementTree as ET
@ -32,9 +32,9 @@ class Cdm:
def __init__(
self,
security_level: int,
certificate_chain: CertificateChain,
encryption_key: ECCKey,
signing_key: ECCKey,
certificate_chain: Union[CertificateChain, None],
encryption_key: Union[ECCKey, None],
signing_key: Union[ECCKey, None],
client_version: str = "10.0.16384.10011",
protocol_version: int = 1
):

View File

@ -52,6 +52,9 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
cdm = Cdm.from_device(device)
log.info("Loaded CDM")
session_id = cdm.open()
log.info("Opened Session")
challenge = cdm.get_license_challenge(pssh.get_wrm_headers(downgrade_to_v4=True)[0])
log.info("Created License Request (Challenge)")
log.debug(challenge)
@ -78,6 +81,9 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
for key in cdm.get_keys():
log.info(f"{key.key_id.hex}:{key.key.hex()}")
cdm.close(session_id)
log.info("Clossed Session")
@main.command()
@click.argument("device", type=Path)
@ -232,7 +238,7 @@ def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] =
@main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.")
@click.argument("config_path", type=Path)
@click.option("-h", "--host", type=str, default="127.0.0.1", help="Host to serve from.")
@click.option("-p", "--port", type=int, default=8786, help="Port to serve from.")
@click.option("-p", "--port", type=int, default=7723, help="Port to serve from.")
def serve_(config_path: Path, host: str, port: int) -> None:
"""
Serve your local CDM and Playready Devices Remotely.

View File

@ -5,9 +5,7 @@ import re
import requests
from pyplayready.cdm import Cdm
from pyplayready.bcert import CertificateChain
from pyplayready.device import Device
from pyplayready.ecc_key import ECCKey
from pyplayready.key import Key
from pyplayready.exceptions import (DeviceMismatch, InvalidInitData)
@ -49,7 +47,7 @@ class RemoteCdm(Cdm):
self.device_name = device_name
# spoof certificate_chain and ecc_key just so we can construct via super call
super().__init__(security_level, CertificateChain, ECCKey, ECCKey)
super().__init__(security_level, None, None, None)
self.__session = requests.Session()
self.__session.headers.update({
@ -97,20 +95,18 @@ class RemoteCdm(Cdm):
def get_license_challenge(
self,
session_id: bytes,
pssh: str,
downgrade: str
wrm_header: str,
) -> str:
if not pssh:
raise InvalidInitData("A pssh must be provided.")
if not isinstance(pssh, str):
raise InvalidInitData(f"Expected pssh to be a {str}, not {pssh!r}")
if not wrm_header:
raise InvalidInitData("A wrm_header must be provided.")
if not isinstance(wrm_header, str):
raise InvalidInitData(f"Expected wrm_header to be a {str}, not {wrm_header!r}")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/get_license_challenge",
json={
"session_id": session_id.hex(),
"init_data": pssh,
"downgrade": downgrade,
"init_data": wrm_header,
}
).json()
if r["status"] != 200:

View File

@ -115,7 +115,7 @@ async def get_license_challenge(request: web.Request) -> web.Response:
device_name = request.match_info["device"]
body = await request.json()
for required_field in ("session_id", "init_data", "downgrade"):
for required_field in ("session_id", "init_data"):
if not body.get(required_field):
return web.json_response({
"status": 400,
@ -125,11 +125,6 @@ async def get_license_challenge(request: web.Request) -> web.Response:
# get session id
session_id = bytes.fromhex(body["session_id"])
# get downgrade
downgrade = False
if body['downgrade'] == 'true':
downgrade = True
# get cdm
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
if not cdm:
@ -139,13 +134,14 @@ async def get_license_challenge(request: web.Request) -> web.Response:
}, status=400)
# get init data
init_data = PSSH(body["init_data"]).get_wrm_headers(downgrade_to_v4=downgrade)
# init_data = PSSH(body["init_data"]).get_wrm_headers(downgrade_to_v4=downgrade)
init_data = body["init_data"]
# get challenge
try:
license_request = cdm.get_license_challenge(
session_id=session_id,
content_header=init_data[0],
content_header=init_data,
)
except InvalidSession:
return web.json_response({