diff --git a/pywidevine/cdm.py b/pywidevine/cdm.py index 84e812c..6191d74 100644 --- a/pywidevine/cdm.py +++ b/pywidevine/cdm.py @@ -232,6 +232,24 @@ class Cdm: drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate) return drm_certificate.provider_id + def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]: + """ + Get the currently set Service Privacy Certificate of the Session. + + Parameters: + session_id: Session identifier. + + Raises: + InvalidSession: If the Session identifier is invalid. + + Returns the Service Certificate if one is set, otherwise None. + """ + session = self.__sessions.get(session_id) + if not session: + raise InvalidSession(f"Session identifier {session_id!r} is invalid.") + + return session.service_certificate + def get_license_challenge( self, session_id: bytes, diff --git a/pywidevine/remotecdm.py b/pywidevine/remotecdm.py index 27fe971..2d56760 100644 --- a/pywidevine/remotecdm.py +++ b/pywidevine/remotecdm.py @@ -6,14 +6,18 @@ import re from typing import Union, Optional import requests +from Crypto.Hash import SHA1 from Crypto.PublicKey import RSA +from Crypto.Signature import pss from google.protobuf.message import DecodeError from pywidevine.cdm import Cdm from pywidevine.device import Device -from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch +from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch, \ + SignatureMismatch from pywidevine.key import Key -from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification +from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification, \ + SignedDrmCertificate from pywidevine.pssh import PSSH @@ -139,6 +143,53 @@ class RemoteCdm(Cdm): return r["provider_id"] + def get_service_certificate(self, session_id: bytes) -> Optional[SignedMessage]: + r = self.__session.post( + url=f"{self.host}/{self.device_name}/get_service_certificate", + json={ + "session_id": session_id.hex() + } + ).json() + if r["status"] != 200: + raise ValueError(f"Cannot Get CDMs Service Certificate, {r['message']} [{r['status']}]") + r = r["data"] + + service_certificate = r["service_certificate"] + if not service_certificate: + return None + + service_certificate = base64.b64decode(service_certificate) + signed_message = SignedMessage() + signed_drm_certificate = SignedDrmCertificate() + + try: + signed_message.ParseFromString(service_certificate) + if signed_message.SerializeToString() == service_certificate: + signed_drm_certificate.ParseFromString(signed_message.msg) + else: + signed_drm_certificate.ParseFromString(service_certificate) + if signed_drm_certificate.SerializeToString() != service_certificate: + raise DecodeError("partial parse") + # Craft a SignedMessage as it's stored as a SignedMessage + signed_message.Clear() + signed_message.msg = signed_drm_certificate.SerializeToString() + # we don't need to sign this message, this is normal + except DecodeError as e: + # could be a direct unsigned DrmCertificate, but reject those anyway + raise DecodeError(f"Could not parse certificate as a SignedDrmCertificate, {e}") + + try: + pss. \ + new(RSA.import_key(self.root_cert.public_key)). \ + verify( + msg_hash=SHA1.new(signed_drm_certificate.drm_certificate), + signature=signed_drm_certificate.signature + ) + except (ValueError, TypeError): + raise SignatureMismatch("Signature Mismatch on SignedDrmCertificate, rejecting certificate") + else: + return signed_message + def get_license_challenge( self, session_id: bytes, diff --git a/pywidevine/serve.py b/pywidevine/serve.py index 90ed00b..3111e60 100644 --- a/pywidevine/serve.py +++ b/pywidevine/serve.py @@ -177,6 +177,51 @@ async def set_service_certificate(request: web.Request) -> web.Response: }) +@routes.post("/{device}/get_service_certificate") +async def get_service_certificate(request: web.Request) -> web.Response: + secret_key = request.headers["X-Secret-Key"] + device_name = request.match_info["device"] + + body = await request.json() + for required_field in ("session_id",): + if not body.get(required_field): + return web.json_response({ + "status": 400, + "message": f"Missing required field '{required_field}' in JSON body." + }, status=400) + + # get session id + session_id = bytes.fromhex(body["session_id"]) + + # get cdm + cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name)) + if not cdm: + return web.json_response({ + "status": 400, + "message": f"No Cdm session for {device_name} has been opened yet. No session to use." + }, status=400) + + # get service certificate + try: + service_certificate = cdm.get_service_certificate(session_id) + except InvalidSession: + return web.json_response({ + "status": 400, + "message": f"Invalid Session ID '{session_id.hex()}', it may have expired." + }, status=400) + + if service_certificate: + service_certificate = base64.b64encode(service_certificate.SerializeToString()).decode() + + return web.json_response({ + "status": 200, + "message": "Successfully got the Service Certificate.", + "data": { + "service_certificate": service_certificate + } + }) + + @routes.post("/{device}/get_license_challenge/{license_type}") async def get_license_challenge(request: web.Request) -> web.Response: secret_key = request.headers["X-Secret-Key"]