Store Service Certificate in session as SignedDrmCertificate

This is for less effort to use the Service Certificate later on. We have no reason to keep the SignedMessage shell as it's just a way to send it as a message from License Acquisition APIs.
This commit is contained in:
rlaphoenix 2022-09-28 07:43:36 +01:00
parent 42b825dcd5
commit 74f960aeba
3 changed files with 14 additions and 31 deletions

View File

@ -186,10 +186,8 @@ class Cdm:
if certificate is None: if certificate is None:
if session.service_certificate: if session.service_certificate:
signed_drm_certificate = SignedDrmCertificate()
signed_drm_certificate.ParseFromString(session.service_certificate)
drm_certificate = DrmCertificate() drm_certificate = DrmCertificate()
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) drm_certificate.ParseFromString(session.service_certificate.drm_certificate)
provider_id = drm_certificate.provider_id provider_id = drm_certificate.provider_id
else: else:
provider_id = None provider_id = None
@ -216,10 +214,6 @@ class Cdm:
signed_drm_certificate.ParseFromString(certificate) signed_drm_certificate.ParseFromString(certificate)
if signed_drm_certificate.SerializeToString() != certificate: if signed_drm_certificate.SerializeToString() != certificate:
raise DecodeError("partial parse") raise DecodeError("partial parse")
# Craft a SignedMessage as it's stored as a SignedMessage
signed_message.Clear()
signed_message.msg = signed_drm_certificate.SerializeToString()
# we don't need to sign this message, this is normal
except DecodeError as e: except DecodeError as e:
# could be a direct unsigned DrmCertificate, but reject those anyway # could be a direct unsigned DrmCertificate, but reject those anyway
raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}") raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}")
@ -241,10 +235,12 @@ class Cdm:
except DecodeError as e: except DecodeError as e:
raise DecodeError(f"Could not parse signed certificate's message as a DrmCertificate, {e}") raise DecodeError(f"Could not parse signed certificate's message as a DrmCertificate, {e}")
session.service_certificate = signed_message # must be stored as a SignedDrmCertificate as the signature needs to be kept for RemoteCdm
# if we store as DrmCertificate (no signature) then RemoteCdm cannot verify the Certificate
session.service_certificate = signed_drm_certificate
return drm_certificate.provider_id return drm_certificate.provider_id
def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]: def get_service_certificate(self, session_id: bytes) -> Optional[SignedDrmCertificate]:
""" """
Get the currently set Service Privacy Certificate of the Session. Get the currently set Service Privacy Certificate of the Session.
@ -553,7 +549,7 @@ class Cdm:
@staticmethod @staticmethod
def encrypt_client_id( def encrypt_client_id(
client_id: ClientIdentification, client_id: ClientIdentification,
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate], service_certificate: Union[SignedDrmCertificate, DrmCertificate],
key: bytes = None, key: bytes = None,
iv: bytes = None iv: bytes = None
) -> EncryptedClientIdentification: ) -> EncryptedClientIdentification:
@ -561,10 +557,6 @@ class Cdm:
privacy_key = key or get_random_bytes(16) privacy_key = key or get_random_bytes(16)
privacy_iv = iv or get_random_bytes(16) privacy_iv = iv or get_random_bytes(16)
if isinstance(service_certificate, SignedMessage):
signed_drm_certificate = SignedDrmCertificate()
signed_drm_certificate.ParseFromString(service_certificate.msg)
service_certificate = signed_drm_certificate
if isinstance(service_certificate, SignedDrmCertificate): if isinstance(service_certificate, SignedDrmCertificate):
drm_certificate = DrmCertificate() drm_certificate = DrmCertificate()
drm_certificate.ParseFromString(service_certificate.drm_certificate) drm_certificate.ParseFromString(service_certificate.drm_certificate)

View File

@ -143,7 +143,7 @@ class RemoteCdm(Cdm):
return r["provider_id"] return r["provider_id"]
def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]: def get_service_certificate(self, session_id: bytes) -> Optional[SignedDrmCertificate]:
r = self.__session.post( r = self.__session.post(
url=f"{self.host}/{self.device_name}/get_service_certificate", url=f"{self.host}/{self.device_name}/get_service_certificate",
json={ json={
@ -159,21 +159,12 @@ class RemoteCdm(Cdm):
return None return None
service_certificate = base64.b64decode(service_certificate) service_certificate = base64.b64decode(service_certificate)
signed_message = SignedMessage()
signed_drm_certificate = SignedDrmCertificate() signed_drm_certificate = SignedDrmCertificate()
try: try:
signed_message.ParseFromString(service_certificate)
if signed_message.SerializeToString() == service_certificate:
signed_drm_certificate.ParseFromString(signed_message.msg)
else:
signed_drm_certificate.ParseFromString(service_certificate) signed_drm_certificate.ParseFromString(service_certificate)
if signed_drm_certificate.SerializeToString() != service_certificate: if signed_drm_certificate.SerializeToString() != service_certificate:
raise DecodeError("partial parse") raise DecodeError("partial parse")
# Craft a SignedMessage as it's stored as a SignedMessage
signed_message.Clear()
signed_message.msg = signed_drm_certificate.SerializeToString()
# we don't need to sign this message, this is normal
except DecodeError as e: except DecodeError as e:
# could be a direct unsigned DrmCertificate, but reject those anyway # could be a direct unsigned DrmCertificate, but reject those anyway
raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}") raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}")
@ -187,8 +178,8 @@ class RemoteCdm(Cdm):
) )
except (ValueError, TypeError): except (ValueError, TypeError):
raise SignatureMismatch("Signature Mismatch on SignedDrmCertificate, rejecting certificate") raise SignatureMismatch("Signature Mismatch on SignedDrmCertificate, rejecting certificate")
else:
return signed_message return signed_drm_certificate
def get_license_challenge( def get_license_challenge(
self, self,

View File

@ -3,13 +3,13 @@ from typing import Optional
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
from pywidevine.key import Key from pywidevine.key import Key
from pywidevine.license_protocol_pb2 import SignedMessage from pywidevine.license_protocol_pb2 import SignedDrmCertificate
class Session: class Session:
def __init__(self, number: int): def __init__(self, number: int):
self.number = number self.number = number
self.id = get_random_bytes(16) self.id = get_random_bytes(16)
self.service_certificate: Optional[SignedMessage] = None self.service_certificate: Optional[SignedDrmCertificate] = None
self.context: dict[bytes, tuple[bytes, bytes]] = {} self.context: dict[bytes, tuple[bytes, bytes]] = {}
self.keys: list[Key] = [] self.keys: list[Key] = []