Move Migration Code to Device.migrate()
Also now more effectively migrates using the v1 Structure data. Also fixes the migration error of possibly leaving behind VMP data. Will warn you if VMP data is already in the Client ID (if its different).
This commit is contained in:
parent
a729648a34
commit
1442c945cc
|
@ -1,11 +1,12 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from construct import BitStruct, Bytes, Const
|
||||
from construct import BitStruct, Bytes, Const, ConstructError
|
||||
from construct import Enum as CEnum
|
||||
from construct import Int8ub, Int16ub
|
||||
from construct import Optional as COptional
|
||||
|
@ -22,6 +23,11 @@ class _Types(Enum):
|
|||
|
||||
|
||||
class _Structures:
|
||||
header = Struct(
|
||||
"signature" / Const(b"WVD"),
|
||||
"version" / Const(Int8ub, 1)
|
||||
)
|
||||
|
||||
v2 = Struct(
|
||||
"signature" / Const(b"WVD"),
|
||||
"version" / Const(Int8ub, 2),
|
||||
|
@ -161,5 +167,53 @@ class Device:
|
|||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(self.dumps())
|
||||
|
||||
@classmethod
|
||||
def migrate(cls, data: Union[bytes, str]) -> Device:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
header = _Structures.header.parse(data)
|
||||
if header.version == 2:
|
||||
raise ValueError("Device Data is already migrated to the latest version.")
|
||||
if header.version == 0 or header.version > 2:
|
||||
# we have never used version 0, likely data that just so happened to use the WVD magic
|
||||
raise ValueError("Device Data does not seem to be a WVD file (v0).")
|
||||
|
||||
if header.version == 1: # v1 to v2
|
||||
data = _Structures.v1.parse(data)
|
||||
data.version = 2 # update version to 2 to allow loading
|
||||
|
||||
vmp = FileHashes()
|
||||
if data.vmp:
|
||||
try:
|
||||
vmp.ParseFromString(data.vmp)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||
data.vmp = vmp
|
||||
|
||||
client_id = ClientIdentification()
|
||||
try:
|
||||
client_id.ParseFromString(data.client_id)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||
|
||||
new_vmp_data = data.vmp.SerializeToString()
|
||||
if client_id.vmp_data and client_id.vmp_data != new_vmp_data:
|
||||
logging.getLogger("migrate").warning("Client ID already has Verified Media Path data")
|
||||
client_id.vmp_data = new_vmp_data
|
||||
data.client_id = client_id.SerializeToString()
|
||||
|
||||
try:
|
||||
data = _Structures.v2.build(data)
|
||||
except ConstructError as e:
|
||||
raise ValueError(f"Migration failed, {e}")
|
||||
|
||||
try:
|
||||
return cls.loads(data)
|
||||
except ConstructError as e:
|
||||
raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}")
|
||||
|
||||
|
||||
__ALL__ = (Device,)
|
||||
|
|
|
@ -246,33 +246,13 @@ def migrate(ctx: click.Context, device: Path) -> None:
|
|||
|
||||
log = logging.getLogger("migrate")
|
||||
|
||||
data = bytearray(device.read_bytes())
|
||||
if not data.startswith(b"WVD"):
|
||||
raise click.UsageError("device: Data does not seem to be a WVD file (magic).", ctx)
|
||||
|
||||
version = data[3]
|
||||
if version == 0:
|
||||
# we have never used version 0, likely data that just so happened to use the WVD magic
|
||||
raise click.UsageError("device: Data does not seem to be a WVD file (v0).", ctx)
|
||||
if version == 2:
|
||||
raise click.UsageError("device: Data is already migrated to the latest version.", ctx)
|
||||
|
||||
success_message = ""
|
||||
|
||||
# v1 to v2
|
||||
if version == 1:
|
||||
data[3] = 2 # set version to 2 to allow loading
|
||||
data[6] = 0 # blank flags as there's no valid flags that aren't deprecated
|
||||
# we can now load it, and loading will ignore the now-removed vmp data and length fields
|
||||
success_message = "Successfully migrated from Version 1 to Version 2."
|
||||
|
||||
try:
|
||||
new_device = Device.loads(bytes(data))
|
||||
new_device = Device.migrate(device.read_bytes())
|
||||
except ConstructError as e:
|
||||
raise click.UsageError(f"device: Data seems to be corrupt or invalid, {e}", ctx)
|
||||
raise click.UsageError(str(e), ctx)
|
||||
|
||||
# save
|
||||
log.debug(new_device)
|
||||
new_device.dump(device)
|
||||
|
||||
log.info(success_message)
|
||||
log.info("Successfully migrated the Widevine Device (.wvd) file!")
|
||||
|
|
Loading…
Reference in New Issue