Add files via upload

has been cleaned up a little
This commit is contained in:
Sasuke-Duck UwU 2023-11-16 06:04:39 -05:00 committed by GitHub
parent 87126cf4e1
commit ef83af4806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 0 additions and 69 deletions

View File

@ -339,8 +339,6 @@ class Cdm:
self.sessions = {} self.sessions = {}
def open_session(self, init_data_b64, device, raw_init_data = None, offline=False): def open_session(self, init_data_b64, device, raw_init_data = None, offline=False):
self.logger.debug("open_session(init_data_b64={}, device={}".format(init_data_b64, device))
self.logger.info("opening new cdm session")
if device.session_id_type == 'android': if device.session_id_type == 'android':
# format: 16 random hexdigits, 2 digit counter, 14 0s # format: 16 random hexdigits, 2 digit counter, 14 0s
rand_ascii = ''.join(random.choice('ABCDEF0123456789') for _ in range(16)) rand_ascii = ''.join(random.choice('ABCDEF0123456789') for _ in range(16))
@ -353,7 +351,6 @@ class Cdm:
session_id = rand_bytes session_id = rand_bytes
else: else:
# other formats NYI # other formats NYI
self.logger.error("device type is unusable")
return 1 return 1
if raw_init_data and isinstance(raw_init_data, (bytes, bytearray)): if raw_init_data and isinstance(raw_init_data, (bytes, bytearray)):
# used for NF key exchange, where they don't provide a valid PSSH # used for NF key exchange, where they don't provide a valid PSSH
@ -366,46 +363,31 @@ class Cdm:
if init_data: if init_data:
new_session = Session(session_id, init_data, device, offline) new_session = Session(session_id, init_data, device, offline)
else: else:
self.logger.error("unable to parse init data")
return 1 return 1
self.sessions[session_id] = new_session self.sessions[session_id] = new_session
self.logger.info("session opened and init data parsed successfully")
return session_id return session_id
def _parse_init_data(self, init_data_b64): def _parse_init_data(self, init_data_b64):
parsed_init_data = WidevineCencHeader() parsed_init_data = WidevineCencHeader()
try: try:
self.logger.debug("trying to parse init_data directly")
parsed_init_data.ParseFromString(base64.b64decode(init_data_b64)[32:]) parsed_init_data.ParseFromString(base64.b64decode(init_data_b64)[32:])
except DecodeError: except DecodeError:
self.logger.debug("unable to parse as-is, trying with removed pssh box header")
try: try:
id_bytes = parsed_init_data.ParseFromString(base64.b64decode(init_data_b64)[32:]) id_bytes = parsed_init_data.ParseFromString(base64.b64decode(init_data_b64)[32:])
except DecodeError: except DecodeError:
self.logger.error("unable to parse, unsupported init data format")
return None return None
self.logger.debug("init_data:")
for line in text_format.MessageToString(parsed_init_data).splitlines():
self.logger.debug(line)
return parsed_init_data return parsed_init_data
def close_session(self, session_id): def close_session(self, session_id):
self.logger.debug("close_session(session_id={})".format(session_id))
self.logger.info("closing cdm session")
if session_id in self.sessions: if session_id in self.sessions:
self.sessions.pop(session_id) self.sessions.pop(session_id)
self.logger.info("cdm session closed")
return 0 return 0
else: else:
self.logger.info("session {} not found".format(session_id))
return 1 return 1
def set_service_certificate(self, session_id, cert_b64): def set_service_certificate(self, session_id, cert_b64):
self.logger.debug("set_service_certificate(session_id={}, cert={})".format(session_id, cert_b64))
self.logger.info("setting service certificate")
if session_id not in self.sessions: if session_id not in self.sessions:
self.logger.error("session id doesn't exist")
return 1 return 1
session = self.sessions[session_id] session = self.sessions[session_id]
@ -420,35 +402,24 @@ class Cdm:
service_certificate = SignedDeviceCertificate() service_certificate = SignedDeviceCertificate()
if message.Type: if message.Type:
self.logger.debug("service cert provided as signedmessage")
try: try:
service_certificate.ParseFromString(message.Msg) service_certificate.ParseFromString(message.Msg)
except DecodeError: except DecodeError:
self.logger.error("failed to parse service certificate")
return 1 return 1
else: else:
self.logger.debug("service cert provided as signeddevicecertificate")
try: try:
service_certificate.ParseFromString(base64.b64decode(cert_b64)) service_certificate.ParseFromString(base64.b64decode(cert_b64))
except DecodeError: except DecodeError:
self.logger.error("failed to parse service certificate")
return 1 return 1
self.logger.debug("service certificate:")
for line in text_format.MessageToString(service_certificate).splitlines():
self.logger.debug(line)
session.service_certificate = service_certificate session.service_certificate = service_certificate
session.privacy_mode = True session.privacy_mode = True
return 0 return 0
def get_license_request(self, session_id): def get_license_request(self, session_id):
self.logger.debug("get_license_request(session_id={})".format(session_id))
self.logger.info("getting license request")
if session_id not in self.sessions: if session_id not in self.sessions:
self.logger.error("session ID does not exist")
return 1 return 1
session = self.sessions[session_id] session = self.sessions[session_id]
@ -461,17 +432,14 @@ class Cdm:
client_id = ClientIdentification() client_id = ClientIdentification()
if not os.path.exists(session.device_config.device_client_id_blob_filename): if not os.path.exists(session.device_config.device_client_id_blob_filename):
self.logger.error("no client ID blob available for this device")
return 1 return 1
with open(session.device_config.device_client_id_blob_filename, "rb") as f: with open(session.device_config.device_client_id_blob_filename, "rb") as f:
try: try:
cid_bytes = client_id.ParseFromString(f.read()) cid_bytes = client_id.ParseFromString(f.read())
except DecodeError: except DecodeError:
self.logger.error("client id failed to parse as protobuf")
return 1 return 1
self.logger.debug("building license request")
if not self.raw_pssh: if not self.raw_pssh:
license_request.Type = SignedLicenseRequest.MessageType.Value('LICENSE_REQUEST') license_request.Type = SignedLicenseRequest.MessageType.Value('LICENSE_REQUEST')
license_request.Msg.ContentId.CencId.Pssh.CopyFrom(session.init_data) license_request.Msg.ContentId.CencId.Pssh.CopyFrom(session.init_data)
@ -493,20 +461,13 @@ class Cdm:
if session.privacy_mode: if session.privacy_mode:
if session.device_config.vmp: if session.device_config.vmp:
self.logger.debug("vmp required, adding to client_id")
self.logger.debug("reading vmp hashes")
vmp_hashes = FileHashes() vmp_hashes = FileHashes()
with open(session.device_config.device_vmp_blob_filename, "rb") as f: with open(session.device_config.device_vmp_blob_filename, "rb") as f:
try: try:
vmp_bytes = vmp_hashes.ParseFromString(f.read()) vmp_bytes = vmp_hashes.ParseFromString(f.read())
except DecodeError: except DecodeError:
self.logger.error("vmp hashes failed to parse as protobuf")
return 1 return 1
client_id._FileHashes.CopyFrom(vmp_hashes) client_id._FileHashes.CopyFrom(vmp_hashes)
self.logger.debug("privacy mode & service certificate loaded, encrypting client id")
self.logger.debug("unencrypted client id:")
for line in text_format.MessageToString(client_id).splitlines():
self.logger.debug(line)
cid_aes_key = get_random_bytes(16) cid_aes_key = get_random_bytes(16)
cid_iv = get_random_bytes(16) cid_iv = get_random_bytes(16)
@ -536,10 +497,8 @@ class Cdm:
key = RSA.importKey(open(session.device_config.device_private_key_filename).read()) key = RSA.importKey(open(session.device_config.device_private_key_filename).read())
session.device_key = key session.device_key = key
else: else:
self.logger.error("need device private key, other methods unimplemented")
return 1 return 1
self.logger.debug("signing license request")
hash = SHA1.new(license_request.Msg.SerializeToString()) hash = SHA1.new(license_request.Msg.SerializeToString())
signature = pss.new(key).sign(hash) signature = pss.new(key).sign(hash)
@ -548,25 +507,16 @@ class Cdm:
session.license_request = license_request session.license_request = license_request
self.logger.debug("license request:")
for line in text_format.MessageToString(session.license_request).splitlines():
self.logger.debug(line)
self.logger.info("license request created")
self.logger.debug("license request b64: {}".format(base64.b64encode(license_request.SerializeToString())))
return license_request.SerializeToString() return license_request.SerializeToString()
def provide_license(self, session_id, license_b64): def provide_license(self, session_id, license_b64):
self.logger.debug("provide_license(session_id={}, license_b64={})".format(session_id, license_b64))
self.logger.info("decrypting provided license")
if session_id not in self.sessions: if session_id not in self.sessions:
self.logger.error("session does not exist")
return 1 return 1
session = self.sessions[session_id] session = self.sessions[session_id]
if not session.license_request: if not session.license_request:
self.logger.error("generate a license request first!")
return 1 return 1
license = SignedLicense() license = SignedLicense()
@ -578,12 +528,6 @@ class Cdm:
session.license = license session.license = license
self.logger.debug("license:")
for line in text_format.MessageToString(license).splitlines():
self.logger.debug(line)
self.logger.debug("deriving keys from session key")
oaep_cipher = PKCS1_OAEP.new(session.device_key) oaep_cipher = PKCS1_OAEP.new(session.device_key)
session.session_key = oaep_cipher.decrypt(license.SessionKey) session.session_key = oaep_cipher.decrypt(license.SessionKey)
@ -627,22 +571,14 @@ class Cdm:
session.derived_keys['auth_1'] = auth_cmac_combined_1 session.derived_keys['auth_1'] = auth_cmac_combined_1
session.derived_keys['auth_2'] = auth_cmac_combined_2 session.derived_keys['auth_2'] = auth_cmac_combined_2
self.logger.debug('verifying license signature')
lic_hmac = HMAC.new(session.derived_keys['auth_1'], digestmod=SHA256) lic_hmac = HMAC.new(session.derived_keys['auth_1'], digestmod=SHA256)
lic_hmac.update(license.Msg.SerializeToString()) lic_hmac.update(license.Msg.SerializeToString())
self.logger.debug("calculated sig: {} actual sig: {}".format(lic_hmac.hexdigest(), binascii.hexlify(license.Signature)))
if lic_hmac.digest() != license.Signature: if lic_hmac.digest() != license.Signature:
self.logger.info("license signature doesn't match - writing bin so they can be debugged")
with open("original_lic.bin", "wb") as f: with open("original_lic.bin", "wb") as f:
f.write(base64.b64decode(license_b64)) f.write(base64.b64decode(license_b64))
with open("parsed_lic.bin", "wb") as f: with open("parsed_lic.bin", "wb") as f:
f.write(license.SerializeToString()) f.write(license.SerializeToString())
self.logger.info("continuing anyway")
self.logger.debug("key count: {}".format(len(license.Msg.Key)))
for key in license.Msg.Key: for key in license.Msg.Key:
if key.Id: if key.Id:
key_id = key.Id key_id = key.Id
@ -664,16 +600,11 @@ class Cdm:
else: else:
permissions = [] permissions = []
session.keys.append(Key(key_id, type, Padding.unpad(decrypted_key, 16), permissions)) session.keys.append(Key(key_id, type, Padding.unpad(decrypted_key, 16), permissions))
self.logger.info("decrypted all keys")
return 0 return 0
def get_keys(self, session_id): def get_keys(self, session_id):
if session_id in self.sessions: if session_id in self.sessions:
return self.sessions[session_id].keys return self.sessions[session_id].keys
else:
self.logger.error("session not found")
return 1
class WvDecrypt(object): class WvDecrypt(object):
WV_SYSTEM_ID = [ WV_SYSTEM_ID = [