diff --git a/cdm/wks.py b/cdm/wks.py index 5a15e61..feda0f8 100644 --- a/cdm/wks.py +++ b/cdm/wks.py @@ -339,8 +339,6 @@ class Cdm: self.sessions = {} def open_session(self, init_data_b64, device, raw_init_data = None, offline=False): - self.logger.debug("open_session(init_data_b64={}, device={}".format(init_data_b64, device)) - self.logger.info("opening new cdm session") if device.session_id_type == 'android': # format: 16 random hexdigits, 2 digit counter, 14 0s rand_ascii = ''.join(random.choice('ABCDEF0123456789') for _ in range(16)) @@ -353,7 +351,6 @@ class Cdm: session_id = rand_bytes else: # other formats NYI - self.logger.error("device type is unusable") return 1 if raw_init_data and isinstance(raw_init_data, (bytes, bytearray)): # used for NF key exchange, where they don't provide a valid PSSH @@ -366,46 +363,31 @@ class Cdm: if init_data: new_session = Session(session_id, init_data, device, offline) else: - self.logger.error("unable to parse init data") return 1 self.sessions[session_id] = new_session - self.logger.info("session opened and init data parsed successfully") return session_id def _parse_init_data(self, init_data_b64): parsed_init_data = WidevineCencHeader() try: - self.logger.debug("trying to parse init_data directly") parsed_init_data.ParseFromString(base64.b64decode(init_data_b64)[32:]) except DecodeError: - self.logger.debug("unable to parse as-is, trying with removed pssh box header") try: id_bytes = parsed_init_data.ParseFromString(base64.b64decode(init_data_b64)[32:]) except DecodeError: - self.logger.error("unable to parse, unsupported init data format") return None - self.logger.debug("init_data:") - for line in text_format.MessageToString(parsed_init_data).splitlines(): - self.logger.debug(line) return parsed_init_data def close_session(self, session_id): - self.logger.debug("close_session(session_id={})".format(session_id)) - self.logger.info("closing cdm session") if session_id in self.sessions: self.sessions.pop(session_id) - self.logger.info("cdm session closed") return 0 else: - self.logger.info("session {} not found".format(session_id)) return 1 def set_service_certificate(self, session_id, cert_b64): - self.logger.debug("set_service_certificate(session_id={}, cert={})".format(session_id, cert_b64)) - self.logger.info("setting service certificate") if session_id not in self.sessions: - self.logger.error("session id doesn't exist") return 1 session = self.sessions[session_id] @@ -420,35 +402,24 @@ class Cdm: service_certificate = SignedDeviceCertificate() if message.Type: - self.logger.debug("service cert provided as signedmessage") try: service_certificate.ParseFromString(message.Msg) except DecodeError: - self.logger.error("failed to parse service certificate") return 1 else: - self.logger.debug("service cert provided as signeddevicecertificate") try: service_certificate.ParseFromString(base64.b64decode(cert_b64)) except DecodeError: - self.logger.error("failed to parse service certificate") return 1 - self.logger.debug("service certificate:") - for line in text_format.MessageToString(service_certificate).splitlines(): - self.logger.debug(line) - session.service_certificate = service_certificate session.privacy_mode = True return 0 def get_license_request(self, session_id): - self.logger.debug("get_license_request(session_id={})".format(session_id)) - self.logger.info("getting license request") if session_id not in self.sessions: - self.logger.error("session ID does not exist") return 1 session = self.sessions[session_id] @@ -461,17 +432,14 @@ class Cdm: client_id = ClientIdentification() if not os.path.exists(session.device_config.device_client_id_blob_filename): - self.logger.error("no client ID blob available for this device") return 1 with open(session.device_config.device_client_id_blob_filename, "rb") as f: try: cid_bytes = client_id.ParseFromString(f.read()) except DecodeError: - self.logger.error("client id failed to parse as protobuf") return 1 - self.logger.debug("building license request") if not self.raw_pssh: license_request.Type = SignedLicenseRequest.MessageType.Value('LICENSE_REQUEST') license_request.Msg.ContentId.CencId.Pssh.CopyFrom(session.init_data) @@ -493,20 +461,13 @@ class Cdm: if session.privacy_mode: if session.device_config.vmp: - self.logger.debug("vmp required, adding to client_id") - self.logger.debug("reading vmp hashes") vmp_hashes = FileHashes() with open(session.device_config.device_vmp_blob_filename, "rb") as f: try: vmp_bytes = vmp_hashes.ParseFromString(f.read()) except DecodeError: - self.logger.error("vmp hashes failed to parse as protobuf") return 1 client_id._FileHashes.CopyFrom(vmp_hashes) - self.logger.debug("privacy mode & service certificate loaded, encrypting client id") - self.logger.debug("unencrypted client id:") - for line in text_format.MessageToString(client_id).splitlines(): - self.logger.debug(line) cid_aes_key = get_random_bytes(16) cid_iv = get_random_bytes(16) @@ -536,10 +497,8 @@ class Cdm: key = RSA.importKey(open(session.device_config.device_private_key_filename).read()) session.device_key = key else: - self.logger.error("need device private key, other methods unimplemented") return 1 - self.logger.debug("signing license request") hash = SHA1.new(license_request.Msg.SerializeToString()) signature = pss.new(key).sign(hash) @@ -548,25 +507,16 @@ class Cdm: session.license_request = license_request - self.logger.debug("license request:") - for line in text_format.MessageToString(session.license_request).splitlines(): - self.logger.debug(line) - self.logger.info("license request created") - self.logger.debug("license request b64: {}".format(base64.b64encode(license_request.SerializeToString()))) return license_request.SerializeToString() def provide_license(self, session_id, license_b64): - self.logger.debug("provide_license(session_id={}, license_b64={})".format(session_id, license_b64)) - self.logger.info("decrypting provided license") if session_id not in self.sessions: - self.logger.error("session does not exist") return 1 session = self.sessions[session_id] if not session.license_request: - self.logger.error("generate a license request first!") return 1 license = SignedLicense() @@ -578,12 +528,6 @@ class Cdm: session.license = license - self.logger.debug("license:") - for line in text_format.MessageToString(license).splitlines(): - self.logger.debug(line) - - self.logger.debug("deriving keys from session key") - oaep_cipher = PKCS1_OAEP.new(session.device_key) session.session_key = oaep_cipher.decrypt(license.SessionKey) @@ -627,22 +571,14 @@ class Cdm: session.derived_keys['auth_1'] = auth_cmac_combined_1 session.derived_keys['auth_2'] = auth_cmac_combined_2 - self.logger.debug('verifying license signature') - lic_hmac = HMAC.new(session.derived_keys['auth_1'], digestmod=SHA256) lic_hmac.update(license.Msg.SerializeToString()) - self.logger.debug("calculated sig: {} actual sig: {}".format(lic_hmac.hexdigest(), binascii.hexlify(license.Signature))) - if lic_hmac.digest() != license.Signature: - self.logger.info("license signature doesn't match - writing bin so they can be debugged") with open("original_lic.bin", "wb") as f: f.write(base64.b64decode(license_b64)) with open("parsed_lic.bin", "wb") as f: f.write(license.SerializeToString()) - self.logger.info("continuing anyway") - - self.logger.debug("key count: {}".format(len(license.Msg.Key))) for key in license.Msg.Key: if key.Id: key_id = key.Id @@ -664,16 +600,11 @@ class Cdm: else: permissions = [] session.keys.append(Key(key_id, type, Padding.unpad(decrypted_key, 16), permissions)) - - self.logger.info("decrypted all keys") return 0 def get_keys(self, session_id): if session_id in self.sessions: return self.sessions[session_id].keys - else: - self.logger.error("session not found") - return 1 class WvDecrypt(object): WV_SYSTEM_ID = [