RemoteCdm: Implement /set_service_certificate

This commit is contained in:
rlaphoenix 2022-08-04 05:40:59 +01:00
parent 5788dde7b1
commit 3d794ad659
1 changed files with 28 additions and 9 deletions

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import base64 import base64
import binascii import binascii
from typing import Union from typing import Union, Optional
import requests import requests
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
@ -117,7 +117,32 @@ class RemoteCdm(Cdm):
raise ValueError(f"Cannot Close CDM Session, {r.text} [{r.status_code}]") raise ValueError(f"Cannot Close CDM Session, {r.text} [{r.status_code}]")
del self._sessions[session_id] del self._sessions[session_id]
# TODO: Implement set_service_certificate with /service_cert API schema def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if certificate is None:
certificate_b64 = None
elif isinstance(certificate, str):
certificate_b64 = certificate # assuming base64
elif isinstance(certificate, bytes):
certificate_b64 = base64.b64encode(certificate).decode()
else:
raise DecodeError(f"Expecting Certificate to be base64 or bytes, not {certificate!r}")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/set_service_certificate",
json={
"session_id": session_id.hex(),
"certificate": certificate_b64
}
)
if r.status_code != 200:
raise ValueError(f"Cannot Set CDMs Service Certificate, {r.text} [{r.status_code}]")
r = r.json()["data"]
return r["provider_id"]
def get_license_challenge( def get_license_challenge(
self, self,
@ -149,17 +174,11 @@ class RemoteCdm(Cdm):
except ValueError: except ValueError:
raise InvalidLicenseType(f"License Type {type_!r} is invalid") raise InvalidLicenseType(f"License Type {type_!r} is invalid")
if session.service_certificate:
service_certificate_b64 = base64.b64encode(session.service_certificate.SerializeToString()).decode()
else:
service_certificate_b64 = None
r = self.__session.post( r = self.__session.post(
url=f"{self.host}/{self.device_name}/challenge/{type_}", url=f"{self.host}/{self.device_name}/challenge/{type_}",
json={ json={
"session_id": session_id.hex(), "session_id": session_id.hex(),
"init_data": base64.b64encode(init_data).decode(), "init_data": base64.b64encode(init_data).decode()
"service_certificate": service_certificate_b64
} }
) )
if r.status_code != 200: if r.status_code != 200: