diff --git a/pywidevine/device.py b/pywidevine/device.py index d5d85bb..ccbeb1f 100644 --- a/pywidevine/device.py +++ b/pywidevine/device.py @@ -1,11 +1,12 @@ from __future__ import annotations import base64 +import logging from enum import Enum from pathlib import Path from typing import Any, Optional, Union -from construct import BitStruct, Bytes, Const +from construct import BitStruct, Bytes, Const, ConstructError from construct import Enum as CEnum from construct import Int8ub, Int16ub from construct import Optional as COptional @@ -22,6 +23,11 @@ class _Types(Enum): class _Structures: + header = Struct( + "signature" / Const(b"WVD"), + "version" / Const(Int8ub, 1) + ) + v2 = Struct( "signature" / Const(b"WVD"), "version" / Const(Int8ub, 2), @@ -161,5 +167,53 @@ class Device: path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(self.dumps()) + @classmethod + def migrate(cls, data: Union[bytes, str]) -> Device: + if isinstance(data, str): + data = base64.b64decode(data) + if not isinstance(data, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + + header = _Structures.header.parse(data) + if header.version == 2: + raise ValueError("Device Data is already migrated to the latest version.") + if header.version == 0 or header.version > 2: + # we have never used version 0, likely data that just so happened to use the WVD magic + raise ValueError("Device Data does not seem to be a WVD file (v0).") + + if header.version == 1: # v1 to v2 + data = _Structures.v1.parse(data) + data.version = 2 # update version to 2 to allow loading + + vmp = FileHashes() + if data.vmp: + try: + vmp.ParseFromString(data.vmp) + except DecodeError as e: + raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}") + data.vmp = vmp + + client_id = ClientIdentification() + try: + client_id.ParseFromString(data.client_id) + except DecodeError as e: + raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}") + + new_vmp_data = data.vmp.SerializeToString() + if client_id.vmp_data and client_id.vmp_data != new_vmp_data: + logging.getLogger("migrate").warning("Client ID already has Verified Media Path data") + client_id.vmp_data = new_vmp_data + data.client_id = client_id.SerializeToString() + + try: + data = _Structures.v2.build(data) + except ConstructError as e: + raise ValueError(f"Migration failed, {e}") + + try: + return cls.loads(data) + except ConstructError as e: + raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}") + __ALL__ = (Device,) diff --git a/pywidevine/main.py b/pywidevine/main.py index 20399b5..04cd12e 100644 --- a/pywidevine/main.py +++ b/pywidevine/main.py @@ -246,33 +246,13 @@ def migrate(ctx: click.Context, device: Path) -> None: log = logging.getLogger("migrate") - data = bytearray(device.read_bytes()) - if not data.startswith(b"WVD"): - raise click.UsageError("device: Data does not seem to be a WVD file (magic).", ctx) - - version = data[3] - if version == 0: - # we have never used version 0, likely data that just so happened to use the WVD magic - raise click.UsageError("device: Data does not seem to be a WVD file (v0).", ctx) - if version == 2: - raise click.UsageError("device: Data is already migrated to the latest version.", ctx) - - success_message = "" - - # v1 to v2 - if version == 1: - data[3] = 2 # set version to 2 to allow loading - data[6] = 0 # blank flags as there's no valid flags that aren't deprecated - # we can now load it, and loading will ignore the now-removed vmp data and length fields - success_message = "Successfully migrated from Version 1 to Version 2." - try: - new_device = Device.loads(bytes(data)) + new_device = Device.migrate(device.read_bytes()) except ConstructError as e: - raise click.UsageError(f"device: Data seems to be corrupt or invalid, {e}", ctx) + raise click.UsageError(str(e), ctx) # save log.debug(new_device) new_device.dump(device) - log.info(success_message) + log.info("Successfully migrated the Widevine Device (.wvd) file!")