From c362192c11f2e042204c21b85da03fd4a4aa2c42 Mon Sep 17 00:00:00 2001 From: rlaphoenix Date: Wed, 8 Nov 2023 22:08:31 +0000 Subject: [PATCH] Improve and simplify creation of protobuffer objects --- pywidevine/cdm.py | 100 ++++++++++++++++++++-------------------- pywidevine/key.py | 2 +- pywidevine/main.py | 9 ++-- pywidevine/pssh.py | 7 +-- pywidevine/remotecdm.py | 24 ++++------ pywidevine/serve.py | 2 +- 6 files changed, 71 insertions(+), 73 deletions(-) diff --git a/pywidevine/cdm.py b/pywidevine/cdm.py index 048c89c..11b8aa0 100644 --- a/pywidevine/cdm.py +++ b/pywidevine/cdm.py @@ -23,8 +23,8 @@ from pywidevine.exceptions import (InvalidContext, InvalidInitData, InvalidLicen InvalidSession, NoKeysLoaded, SignatureMismatch, TooManySessions) from pywidevine.key import Key from pywidevine.license_protocol_pb2 import (ClientIdentification, DrmCertificate, EncryptedClientIdentification, - License, LicenseRequest, LicenseType, ProtocolVersion, - SignedDrmCertificate, SignedMessage) + License, LicenseRequest, LicenseType, SignedDrmCertificate, + SignedMessage) from pywidevine.pssh import PSSH from pywidevine.session import Session from pywidevine.utils import get_binary_path @@ -263,7 +263,7 @@ class Cdm: self, session_id: bytes, pssh: PSSH, - type_: Union[int, str] = LicenseType.Value("STREAMING"), + license_type: str = "STREAMING", privacy_mode: bool = True ) -> bytes: """ @@ -272,8 +272,10 @@ class Cdm: Parameters: session_id: Session identifier. pssh: PSSH Object to get the init data from. - type_: Type of License you wish to exchange, often `STREAMING`. The `OFFLINE` - Licenses are for Offline licensing of Downloaded content. + license_type: Type of License you wish to exchange, often `STREAMING`. + - "STREAMING": Normal one-time-use license. + - "OFFLINE": Offline-use licence, usually for Downloaded content. + - "AUTOMATIC": License type decision is left to provider. privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the privacy certificate is not set yet, this does nothing. @@ -296,13 +298,13 @@ class Cdm: if not isinstance(pssh, PSSH): raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}") - try: - if isinstance(type_, str): - type_ = LicenseType.Value(type_) - elif not isinstance(type_, int): - raise ValueError() - except ValueError: - raise InvalidLicenseType(f"License Type {type_!r} is invalid") + if not isinstance(license_type, str): + raise InvalidLicenseType(f"Expected license_type to be a {str}, not {license_type!r}") + if license_type not in LicenseType.keys(): + raise InvalidLicenseType( + f"Invalid license_type value of '{license_type}'. " + f"Available values: {LicenseType.keys()}" + ) if self.device_type == Device.Types.ANDROID: # OEMCrypto's request_id seems to be in AES CTR Counter block form with no suffix @@ -316,35 +318,36 @@ class Cdm: else: request_id = get_random_bytes(16) - license_request = LicenseRequest() - license_request.type = LicenseRequest.RequestType.Value("NEW") - license_request.request_time = int(time.time()) - license_request.protocol_version = ProtocolVersion.Value("VERSION_2_1") - license_request.key_control_nonce = random.randrange(1, 2 ** 31) - - # pssh_data may be either a WidevineCencHeader or custom data - # we have to assume the pssh.init_data value is valid, we cannot test - license_request.content_id.widevine_pssh_data.pssh_data.append(pssh.init_data) - license_request.content_id.widevine_pssh_data.license_type = type_ - license_request.content_id.widevine_pssh_data.request_id = request_id - - if session.service_certificate and privacy_mode: - # encrypt the client id for privacy mode - license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id( + license_request = LicenseRequest( + client_id=( + self.__client_id + ) if not (session.service_certificate and privacy_mode) else None, + encrypted_client_id=self.encrypt_client_id( client_id=self.__client_id, service_certificate=session.service_certificate - )) - else: - license_request.client_id.CopyFrom(self.__client_id) + ) if session.service_certificate and privacy_mode else None, + content_id=LicenseRequest.ContentIdentification( + widevine_pssh_data=LicenseRequest.ContentIdentification.WidevinePsshData( + pssh_data=[pssh.init_data], # either a WidevineCencHeader or custom data + license_type=license_type, + request_id=request_id + ) + ), + type="NEW", + request_time=int(time.time()), + protocol_version="VERSION_2_1", + key_control_nonce=random.randrange(1, 2 ** 31), + ).SerializeToString() - license_message = SignedMessage() - license_message.type = SignedMessage.MessageType.LICENSE_REQUEST - license_message.msg = license_request.SerializeToString() - license_message.signature = self.__signer.sign(SHA1.new(license_message.msg)) + signed_license_request = SignedMessage( + type="LICENSE_REQUEST", + msg=license_request, + signature=self.__signer.sign(SHA1.new(license_request)) + ).SerializeToString() - session.context[request_id] = self.derive_context(license_message.msg) + session.context[request_id] = self.derive_context(license_request) - return license_message.SerializeToString() + return signed_license_request def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None: """ @@ -394,7 +397,7 @@ class Cdm: if not isinstance(license_message, SignedMessage): raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}") - if license_message.type != SignedMessage.MessageType.LICENSE: + if license_message.type != SignedMessage.MessageType.Value("LICENSE"): raise InvalidLicenseMessage( f"Expecting a LICENSE message, not a " f"'{SignedMessage.MessageType.Name(license_message.type)}' message." @@ -568,20 +571,19 @@ class Cdm: if not isinstance(service_certificate, DrmCertificate): raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}") - enc_client_id = EncryptedClientIdentification() - enc_client_id.provider_id = service_certificate.provider_id - enc_client_id.service_certificate_serial_number = service_certificate.serial_number - - enc_client_id.encrypted_client_id = AES. \ - new(privacy_key, AES.MODE_CBC, privacy_iv). \ - encrypt(Padding.pad(client_id.SerializeToString(), 16)) - - enc_client_id.encrypted_privacy_key = PKCS1_OAEP. \ - new(RSA.importKey(service_certificate.public_key)). \ + encrypted_client_id = EncryptedClientIdentification( + provider_id=service_certificate.provider_id, + service_certificate_serial_number=service_certificate.serial_number, + encrypted_client_id=AES. + new(privacy_key, AES.MODE_CBC, privacy_iv). + encrypt(Padding.pad(client_id.SerializeToString(), 16)), + encrypted_client_id_iv=privacy_iv, + encrypted_privacy_key=PKCS1_OAEP. + new(RSA.importKey(service_certificate.public_key)). encrypt(privacy_key) - enc_client_id.encrypted_client_id_iv = privacy_iv + ) - return enc_client_id + return encrypted_client_id @staticmethod def derive_context(message: bytes) -> tuple[bytes, bytes]: diff --git a/pywidevine/key.py b/pywidevine/key.py index e267d6e..6b7ff90 100644 --- a/pywidevine/key.py +++ b/pywidevine/key.py @@ -27,7 +27,7 @@ class Key: def from_key_container(cls, key: License.KeyContainer, enc_key: bytes) -> Key: """Load Key from a KeyContainer object.""" permissions = [] - if key.type == License.KeyContainer.KeyType.OPERATOR_SESSION: + if key.type == License.KeyContainer.KeyType.Value("OPERATOR_SESSION"): for descriptor, value in key.operator_session_key_permissions.ListFields(): if value == 1: permissions.append(descriptor.name) diff --git a/pywidevine/main.py b/pywidevine/main.py index 09c29d2..702dde2 100644 --- a/pywidevine/main.py +++ b/pywidevine/main.py @@ -39,12 +39,12 @@ def main(version: bool, debug: bool) -> None: @click.argument("device_path", type=Path) @click.argument("pssh", type=PSSH) @click.argument("server", type=str) -@click.option("-t", "--type", "type_", type=click.Choice(LicenseType.keys(), case_sensitive=False), +@click.option("-t", "--type", "license_type", type=click.Choice(LicenseType.keys(), case_sensitive=False), default="STREAMING", help="License Type to Request.") @click.option("-p", "--privacy", is_flag=True, default=False, help="Use Privacy Mode, off by default.") -def license_(device_path: Path, pssh: PSSH, server: str, type_: str, privacy: bool) -> None: +def license_(device_path: Path, pssh: PSSH, server: str, license_type: str, privacy: bool) -> None: """ Make a License Request for PSSH to SERVER using DEVICE. It will return a list of all keys within the returned license. @@ -96,7 +96,6 @@ def license_(device_path: Path, pssh: PSSH, server: str, type_: str, privacy: bo log.debug(service_cert) # get license challenge - license_type = LicenseType.Value(type_) challenge = cdm.get_license_challenge(session_id, pssh, license_type, privacy_mode=True) log.info("[+] Created License Request Message (Challenge)") log.debug(challenge) @@ -150,7 +149,7 @@ def test(ctx: click.Context, device: Path, privacy: bool) -> None: # Specify OFFLINE if it's a PSSH for a download/offline mode title, e.g., the # Download feature on Netflix Apps. Otherwise, use STREAMING or AUTOMATIC. - license_type = LicenseType.STREAMING + license_type = "STREAMING" # this runs the `cdm license` CLI-command code with the data we set above # it will print information as it goes to the terminal @@ -159,7 +158,7 @@ def test(ctx: click.Context, device: Path, privacy: bool) -> None: device_path=device, pssh=pssh, server=license_server, - type_=LicenseType.Name(license_type), + license_type=license_type, privacy=privacy ) diff --git a/pywidevine/pssh.py b/pywidevine/pssh.py index 14096e8..4eb4b61 100644 --- a/pywidevine/pssh.py +++ b/pywidevine/pssh.py @@ -307,9 +307,10 @@ class PSSH: if self.system_id == PSSH.SystemId.Widevine: raise ValueError("This is already a Widevine PSSH") - widevine_pssh_data = WidevinePsshData() - widevine_pssh_data.algorithm = WidevinePsshData.Algorithm.Value("AESCTR") - widevine_pssh_data.key_ids[:] = [x.bytes for x in self.key_ids] + widevine_pssh_data = WidevinePsshData( + key_ids=[x.bytes for x in self.key_ids], + algorithm="AESCTR" + ) if self.version == 1: # ensure both cenc header and box has same Key IDs diff --git a/pywidevine/remotecdm.py b/pywidevine/remotecdm.py index 35fa15d..c3d9395 100644 --- a/pywidevine/remotecdm.py +++ b/pywidevine/remotecdm.py @@ -185,7 +185,7 @@ class RemoteCdm(Cdm): self, session_id: bytes, pssh: PSSH, - type_: Union[int, str] = LicenseType.STREAMING, + license_type: str = "STREAMING", privacy_mode: bool = True ) -> bytes: if not pssh: @@ -193,20 +193,16 @@ class RemoteCdm(Cdm): if not isinstance(pssh, PSSH): raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}") - try: - if isinstance(type_, int): - type_ = LicenseType.Name(int(type_)) - elif isinstance(type_, str): - type_ = LicenseType.Name(LicenseType.Value(type_)) - elif isinstance(type_, LicenseType): - type_ = LicenseType.Name(type_) - else: - raise InvalidLicenseType() - except ValueError: - raise InvalidLicenseType(f"License Type {type_!r} is invalid") + if not isinstance(license_type, str): + raise InvalidLicenseType(f"Expected license_type to be a {str}, not {license_type!r}") + if license_type not in LicenseType.keys(): + raise InvalidLicenseType( + f"Invalid license_type value of '{license_type}'. " + f"Available values: {LicenseType.keys()}" + ) r = self.__session.post( - url=f"{self.host}/{self.device_name}/get_license_challenge/{type_}", + url=f"{self.host}/{self.device_name}/get_license_challenge/{license_type}", json={ "session_id": session_id.hex(), "init_data": pssh.dumps(), @@ -251,7 +247,7 @@ class RemoteCdm(Cdm): if not isinstance(license_message, SignedMessage): raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}") - if license_message.type != SignedMessage.MessageType.LICENSE: + if license_message.type != SignedMessage.MessageType.Value("LICENSE"): raise InvalidLicenseMessage( f"Expecting a LICENSE message, not a " f"'{SignedMessage.MessageType.Name(license_message.type)}' message." diff --git a/pywidevine/serve.py b/pywidevine/serve.py index 37a0c35..b5d5cc2 100644 --- a/pywidevine/serve.py +++ b/pywidevine/serve.py @@ -270,7 +270,7 @@ async def get_license_challenge(request: web.Request) -> web.Response: license_request = cdm.get_license_challenge( session_id=session_id, pssh=init_data, - type_=license_type, + license_type=license_type, privacy_mode=privacy_mode ) except InvalidSession: