From 95982725c3eccf37643a6ef25cbb53623f2fb249 Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Thu, 21 Jul 2022 17:26:14 +0100 Subject: [PATCH] Cdm: Support providing Service Cert as any 3 schemas Some service's might provide the Service Certificate as a SignedDrmCertificate instead of a SignedMessage so I added support for supplying such format certificates. I also added support for supplying a DrmCertificate directly, though it's unlikely for a service to provide it raw without a signature like that. The Service Certificate is now also stored as just the DrmCertificate internally, as it will not be using the signature. --- pywidevine/cdm.py | 67 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/pywidevine/cdm.py b/pywidevine/cdm.py index c436c34..1d0699f 100644 --- a/pywidevine/cdm.py +++ b/pywidevine/cdm.py @@ -84,10 +84,10 @@ class Cdm: self.init_data = PSSH.get_as_box(pssh).init_data self.session_id = get_random_bytes(16) - self.service_certificate: Optional[SignedMessage] = None + self.service_certificate: Optional[DrmCertificate] = None self.context: dict[bytes, tuple[bytes, bytes]] = {} - def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage: + def set_service_certificate(self, certificate: Union[bytes, str]) -> DrmCertificate: """ Set a Service Privacy Certificate for Privacy Mode. (optional but recommended) @@ -96,7 +96,7 @@ class Cdm: Some services have their own, but most use the common privacy cert, (common_privacy_cert). - Returns the parsed Signed Message if successful, otherwise raises a DecodeError. + Returns the parsed Drm Certificate if successful, otherwise raises a DecodeError. The Service Certificate is used to encrypt Client IDs in Licenses. This is also known as Privacy Mode and may be required for some services or for some devices. @@ -106,13 +106,48 @@ class Cdm: certificate = base64.b64decode(certificate) # assuming base64 signed_message = SignedMessage() - try: - signed_message.ParseFromString(certificate) - except DecodeError as e: - raise DecodeError(f"Could not parse certificate as a Signed Message: {e}") + signed_drm_certificate = SignedDrmCertificate() + drm_certificate = DrmCertificate() - self.service_certificate = signed_message - return signed_message + # Note: A secure CDM would likely reject any Service Certificate that is + # not either a SignedMessage or a SignedDrmCertificate. This is because + # the DrmCertificate itself is not signed. This CDM does not verify the + # signatures as I'm not sure what HMAC key is used. At this stage of the + # CDM flow, we wouldn't have any mac_keys, and those might not work for + # verifying service certificate signatures (likely not). + + # All these 3 schemas can sort of parse each other in a minimal buggy way, + # so we have to parse down the full chain instead of relaying each step + + try: # SignedMessage input + signed_message.ParseFromString(certificate) + signed_drm_certificate.ParseFromString(signed_message.msg) + if not signed_drm_certificate.drm_certificate: + raise DecodeError() + drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) + self.service_certificate = drm_certificate + return self.service_certificate + except DecodeError: + pass + + try: # SignedDrmCertificate input + signed_drm_certificate.ParseFromString(certificate) + if not signed_drm_certificate.drm_certificate: + raise DecodeError() + drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) + self.service_certificate = drm_certificate + return self.service_certificate + except DecodeError: + pass + + try: # DrmCertificate input + drm_certificate.ParseFromString(certificate) + self.service_certificate = drm_certificate + return self.service_certificate + except DecodeError: + pass + + raise DecodeError("Could not parse certificate as a Service Certificate") def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes: """ @@ -253,7 +288,7 @@ class Cdm: @staticmethod def encrypt_client_id( client_id: ClientIdentification, - service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate], + service_certificate: DrmCertificate, key: bytes = None, iv: bytes = None ) -> EncryptedClientIdentification: @@ -261,18 +296,8 @@ class Cdm: privacy_key = key or get_random_bytes(16) privacy_iv = iv or get_random_bytes(16) - if isinstance(service_certificate, SignedMessage): - signed_service_certificate = SignedDrmCertificate() - signed_service_certificate.ParseFromString(service_certificate.msg) - service_certificate = signed_service_certificate - - if isinstance(service_certificate, SignedDrmCertificate): - service_service_drm_certificate = DrmCertificate() - service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate) - service_certificate = service_service_drm_certificate - if not isinstance(service_certificate, DrmCertificate): - raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}") + raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}") enc_client_id = EncryptedClientIdentification() enc_client_id.provider_id = service_certificate.provider_id