Improve and simplify creation of protobuffer objects

This commit is contained in:
rlaphoenix 2023-11-08 22:08:31 +00:00
parent 0e6aa1d5e8
commit c362192c11
6 changed files with 71 additions and 73 deletions

View File

@ -23,8 +23,8 @@ from pywidevine.exceptions import (InvalidContext, InvalidInitData, InvalidLicen
InvalidSession, NoKeysLoaded, SignatureMismatch, TooManySessions)
from pywidevine.key import Key
from pywidevine.license_protocol_pb2 import (ClientIdentification, DrmCertificate, EncryptedClientIdentification,
License, LicenseRequest, LicenseType, ProtocolVersion,
SignedDrmCertificate, SignedMessage)
License, LicenseRequest, LicenseType, SignedDrmCertificate,
SignedMessage)
from pywidevine.pssh import PSSH
from pywidevine.session import Session
from pywidevine.utils import get_binary_path
@ -263,7 +263,7 @@ class Cdm:
self,
session_id: bytes,
pssh: PSSH,
type_: Union[int, str] = LicenseType.Value("STREAMING"),
license_type: str = "STREAMING",
privacy_mode: bool = True
) -> bytes:
"""
@ -272,8 +272,10 @@ class Cdm:
Parameters:
session_id: Session identifier.
pssh: PSSH Object to get the init data from.
type_: Type of License you wish to exchange, often `STREAMING`. The `OFFLINE`
Licenses are for Offline licensing of Downloaded content.
license_type: Type of License you wish to exchange, often `STREAMING`.
- "STREAMING": Normal one-time-use license.
- "OFFLINE": Offline-use licence, usually for Downloaded content.
- "AUTOMATIC": License type decision is left to provider.
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
privacy certificate is not set yet, this does nothing.
@ -296,13 +298,13 @@ class Cdm:
if not isinstance(pssh, PSSH):
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
try:
if isinstance(type_, str):
type_ = LicenseType.Value(type_)
elif not isinstance(type_, int):
raise ValueError()
except ValueError:
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
if not isinstance(license_type, str):
raise InvalidLicenseType(f"Expected license_type to be a {str}, not {license_type!r}")
if license_type not in LicenseType.keys():
raise InvalidLicenseType(
f"Invalid license_type value of '{license_type}'. "
f"Available values: {LicenseType.keys()}"
)
if self.device_type == Device.Types.ANDROID:
# OEMCrypto's request_id seems to be in AES CTR Counter block form with no suffix
@ -316,35 +318,36 @@ class Cdm:
else:
request_id = get_random_bytes(16)
license_request = LicenseRequest()
license_request.type = LicenseRequest.RequestType.Value("NEW")
license_request.request_time = int(time.time())
license_request.protocol_version = ProtocolVersion.Value("VERSION_2_1")
license_request.key_control_nonce = random.randrange(1, 2 ** 31)
# pssh_data may be either a WidevineCencHeader or custom data
# we have to assume the pssh.init_data value is valid, we cannot test
license_request.content_id.widevine_pssh_data.pssh_data.append(pssh.init_data)
license_request.content_id.widevine_pssh_data.license_type = type_
license_request.content_id.widevine_pssh_data.request_id = request_id
if session.service_certificate and privacy_mode:
# encrypt the client id for privacy mode
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
license_request = LicenseRequest(
client_id=(
self.__client_id
) if not (session.service_certificate and privacy_mode) else None,
encrypted_client_id=self.encrypt_client_id(
client_id=self.__client_id,
service_certificate=session.service_certificate
))
else:
license_request.client_id.CopyFrom(self.__client_id)
) if session.service_certificate and privacy_mode else None,
content_id=LicenseRequest.ContentIdentification(
widevine_pssh_data=LicenseRequest.ContentIdentification.WidevinePsshData(
pssh_data=[pssh.init_data], # either a WidevineCencHeader or custom data
license_type=license_type,
request_id=request_id
)
),
type="NEW",
request_time=int(time.time()),
protocol_version="VERSION_2_1",
key_control_nonce=random.randrange(1, 2 ** 31),
).SerializeToString()
license_message = SignedMessage()
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
license_message.msg = license_request.SerializeToString()
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
signed_license_request = SignedMessage(
type="LICENSE_REQUEST",
msg=license_request,
signature=self.__signer.sign(SHA1.new(license_request))
).SerializeToString()
session.context[request_id] = self.derive_context(license_message.msg)
session.context[request_id] = self.derive_context(license_request)
return license_message.SerializeToString()
return signed_license_request
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
"""
@ -394,7 +397,7 @@ class Cdm:
if not isinstance(license_message, SignedMessage):
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
if license_message.type != SignedMessage.MessageType.LICENSE:
if license_message.type != SignedMessage.MessageType.Value("LICENSE"):
raise InvalidLicenseMessage(
f"Expecting a LICENSE message, not a "
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."
@ -568,20 +571,19 @@ class Cdm:
if not isinstance(service_certificate, DrmCertificate):
raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
enc_client_id = EncryptedClientIdentification()
enc_client_id.provider_id = service_certificate.provider_id
enc_client_id.service_certificate_serial_number = service_certificate.serial_number
enc_client_id.encrypted_client_id = AES. \
new(privacy_key, AES.MODE_CBC, privacy_iv). \
encrypt(Padding.pad(client_id.SerializeToString(), 16))
enc_client_id.encrypted_privacy_key = PKCS1_OAEP. \
new(RSA.importKey(service_certificate.public_key)). \
encrypted_client_id = EncryptedClientIdentification(
provider_id=service_certificate.provider_id,
service_certificate_serial_number=service_certificate.serial_number,
encrypted_client_id=AES.
new(privacy_key, AES.MODE_CBC, privacy_iv).
encrypt(Padding.pad(client_id.SerializeToString(), 16)),
encrypted_client_id_iv=privacy_iv,
encrypted_privacy_key=PKCS1_OAEP.
new(RSA.importKey(service_certificate.public_key)).
encrypt(privacy_key)
enc_client_id.encrypted_client_id_iv = privacy_iv
)
return enc_client_id
return encrypted_client_id
@staticmethod
def derive_context(message: bytes) -> tuple[bytes, bytes]:

View File

@ -27,7 +27,7 @@ class Key:
def from_key_container(cls, key: License.KeyContainer, enc_key: bytes) -> Key:
"""Load Key from a KeyContainer object."""
permissions = []
if key.type == License.KeyContainer.KeyType.OPERATOR_SESSION:
if key.type == License.KeyContainer.KeyType.Value("OPERATOR_SESSION"):
for descriptor, value in key.operator_session_key_permissions.ListFields():
if value == 1:
permissions.append(descriptor.name)

View File

@ -39,12 +39,12 @@ def main(version: bool, debug: bool) -> None:
@click.argument("device_path", type=Path)
@click.argument("pssh", type=PSSH)
@click.argument("server", type=str)
@click.option("-t", "--type", "type_", type=click.Choice(LicenseType.keys(), case_sensitive=False),
@click.option("-t", "--type", "license_type", type=click.Choice(LicenseType.keys(), case_sensitive=False),
default="STREAMING",
help="License Type to Request.")
@click.option("-p", "--privacy", is_flag=True, default=False,
help="Use Privacy Mode, off by default.")
def license_(device_path: Path, pssh: PSSH, server: str, type_: str, privacy: bool) -> None:
def license_(device_path: Path, pssh: PSSH, server: str, license_type: str, privacy: bool) -> None:
"""
Make a License Request for PSSH to SERVER using DEVICE.
It will return a list of all keys within the returned license.
@ -96,7 +96,6 @@ def license_(device_path: Path, pssh: PSSH, server: str, type_: str, privacy: bo
log.debug(service_cert)
# get license challenge
license_type = LicenseType.Value(type_)
challenge = cdm.get_license_challenge(session_id, pssh, license_type, privacy_mode=True)
log.info("[+] Created License Request Message (Challenge)")
log.debug(challenge)
@ -150,7 +149,7 @@ def test(ctx: click.Context, device: Path, privacy: bool) -> None:
# Specify OFFLINE if it's a PSSH for a download/offline mode title, e.g., the
# Download feature on Netflix Apps. Otherwise, use STREAMING or AUTOMATIC.
license_type = LicenseType.STREAMING
license_type = "STREAMING"
# this runs the `cdm license` CLI-command code with the data we set above
# it will print information as it goes to the terminal
@ -159,7 +158,7 @@ def test(ctx: click.Context, device: Path, privacy: bool) -> None:
device_path=device,
pssh=pssh,
server=license_server,
type_=LicenseType.Name(license_type),
license_type=license_type,
privacy=privacy
)

View File

@ -307,9 +307,10 @@ class PSSH:
if self.system_id == PSSH.SystemId.Widevine:
raise ValueError("This is already a Widevine PSSH")
widevine_pssh_data = WidevinePsshData()
widevine_pssh_data.algorithm = WidevinePsshData.Algorithm.Value("AESCTR")
widevine_pssh_data.key_ids[:] = [x.bytes for x in self.key_ids]
widevine_pssh_data = WidevinePsshData(
key_ids=[x.bytes for x in self.key_ids],
algorithm="AESCTR"
)
if self.version == 1:
# ensure both cenc header and box has same Key IDs

View File

@ -185,7 +185,7 @@ class RemoteCdm(Cdm):
self,
session_id: bytes,
pssh: PSSH,
type_: Union[int, str] = LicenseType.STREAMING,
license_type: str = "STREAMING",
privacy_mode: bool = True
) -> bytes:
if not pssh:
@ -193,20 +193,16 @@ class RemoteCdm(Cdm):
if not isinstance(pssh, PSSH):
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
try:
if isinstance(type_, int):
type_ = LicenseType.Name(int(type_))
elif isinstance(type_, str):
type_ = LicenseType.Name(LicenseType.Value(type_))
elif isinstance(type_, LicenseType):
type_ = LicenseType.Name(type_)
else:
raise InvalidLicenseType()
except ValueError:
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
if not isinstance(license_type, str):
raise InvalidLicenseType(f"Expected license_type to be a {str}, not {license_type!r}")
if license_type not in LicenseType.keys():
raise InvalidLicenseType(
f"Invalid license_type value of '{license_type}'. "
f"Available values: {LicenseType.keys()}"
)
r = self.__session.post(
url=f"{self.host}/{self.device_name}/get_license_challenge/{type_}",
url=f"{self.host}/{self.device_name}/get_license_challenge/{license_type}",
json={
"session_id": session_id.hex(),
"init_data": pssh.dumps(),
@ -251,7 +247,7 @@ class RemoteCdm(Cdm):
if not isinstance(license_message, SignedMessage):
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
if license_message.type != SignedMessage.MessageType.LICENSE:
if license_message.type != SignedMessage.MessageType.Value("LICENSE"):
raise InvalidLicenseMessage(
f"Expecting a LICENSE message, not a "
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."

View File

@ -270,7 +270,7 @@ async def get_license_challenge(request: web.Request) -> web.Response:
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
type_=license_type,
license_type=license_type,
privacy_mode=privacy_mode
)
except InvalidSession: