Add a utility script used to generate/parse PSSH boxes.
This script can be used to parse and generate PSSH boxes. This serves two purposes: * Parse PSSH boxes into a human readable format. * Take several arguments to generate PSSH boxes. This is also able to parse Widevine and PlayReady PSSH data and will print the parsed data instead of the data as binary. This can also generate Widevine PSSH data instead of having the PSSH data passed as an argument. Change-Id: I245bd9abf79a259f6fda01d1fc0489b4a4a73db5
This commit is contained in:
parent
a9e5a2ff4f
commit
c9645bc21b
|
@ -0,0 +1,29 @@
|
|||
pssh-box - Utility to generate and print PSSH boxes
|
||||
===================================================
|
||||
|
||||
## Installation
|
||||
|
||||
To use this script you must first install the Python ProtoBuf library. If you
|
||||
have it installed already, you can just use the script directly. These
|
||||
instructions describe how to compile the ProtoBuf library so this script can
|
||||
run. This will not install ProtoBuf globally; it will only compile it.
|
||||
|
||||
1) You need Python 2.6 or newer.
|
||||
|
||||
2) Install `setuptools`. This is installed by default when you
|
||||
install `pip`. If you don't have it, when you run `setup.py` it will install
|
||||
it locally. If you want to install manually, see:
|
||||
|
||||
```
|
||||
https://packaging.python.org/en/latest/installing.html#setup-for-installing-packages
|
||||
```
|
||||
|
||||
3) Build the packager, which will build `protoc` in `out/{Debug,Release}`.
|
||||
|
||||
4) Run `setup.py`. You will need to have `protoc` in PATH, which was build in
|
||||
the previous step:
|
||||
|
||||
```bash
|
||||
cd packager/third_party/protobuf/python
|
||||
PATH=../../../../out/Debug/:"$PATH" python setup.py build
|
||||
```
|
|
@ -0,0 +1,407 @@
|
|||
#!/usr/bin/python
|
||||
# Copyright 2016 Google Inc. All rights reserved.
|
||||
#
|
||||
# Use of this source code is governed by a BSD-style
|
||||
# license that can be found in the LICENSE file or at
|
||||
# https://developers.google.com/open-source/licenses/bsd
|
||||
|
||||
"""A utility to parse and generate PSSH boxes."""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import itertools
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Append the local protobuf location. Use a path relative to the tools/pssh
|
||||
# folder where this file should be found. This allows the file to be executed
|
||||
# from any directory.
|
||||
_pssh_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
sys.path.append(os.path.join(_pssh_dir, '../../third_party/protobuf/python'))
|
||||
# Import the widevine protobuf. Use either Release or Debug.
|
||||
_proto_path_format = os.path.join(
|
||||
_pssh_dir, '../../../out/%s/pyproto/packager/media/base')
|
||||
if os.path.isdir(_proto_path_format % 'Release'):
|
||||
sys.path.append(_proto_path_format % 'Release')
|
||||
else:
|
||||
sys.path.append(_proto_path_format % 'Debug')
|
||||
try:
|
||||
import widevine_pssh_data_pb2 # pylint: disable=g-import-not-at-top
|
||||
except ImportError:
|
||||
print >> sys.stderr, 'Cannot find proto file, make sure to build first'
|
||||
raise
|
||||
|
||||
|
||||
COMMON_SYSTEM_ID = base64.b16decode('1077EFECC0B24D02ACE33C1E52E2FB4B')
|
||||
WIDEVINE_SYSTEM_ID = base64.b16decode('EDEF8BA979D64ACEA3C827DCD51D21ED')
|
||||
PLAYREADY_SYSTEM_ID = base64.b16decode('9A04F07998404286AB92E65BE0885F95')
|
||||
|
||||
|
||||
class BinaryReader(object):
|
||||
"""A helper class used to read binary data from an binary string."""
|
||||
|
||||
def __init__(self, data, little_endian):
|
||||
self.data = data
|
||||
self.little_endian = little_endian
|
||||
self.position = 0
|
||||
|
||||
def has_data(self):
|
||||
"""Returns whether the reader has any data left to read."""
|
||||
return self.position < len(self.data)
|
||||
|
||||
def read_bytes(self, count):
|
||||
"""Reads the given number of bytes into an array."""
|
||||
if len(self.data) < self.position + count:
|
||||
raise Exception('Invalid PSSH box, not enough data')
|
||||
ret = self.data[self.position:self.position+count]
|
||||
self.position += count
|
||||
return ret
|
||||
|
||||
def read_int(self, size):
|
||||
"""Reads an integer of the given size (in bytes)."""
|
||||
data = self.read_bytes(size)
|
||||
ret = 0
|
||||
for i in range(0, size):
|
||||
if self.little_endian:
|
||||
ret |= (ord(data[i]) << (8 * i))
|
||||
else:
|
||||
ret |= (ord(data[i]) << (8 * (size - i - 1)))
|
||||
return ret
|
||||
|
||||
|
||||
class Pssh(object):
|
||||
"""Defines a PSSH box and related functions."""
|
||||
|
||||
def __init__(self, version, system_id, key_ids, pssh_data):
|
||||
"""Parses a PSSH box from the given data.
|
||||
|
||||
Args:
|
||||
version: The version number of the box
|
||||
system_id: A binary string of the System ID
|
||||
key_ids: An array of binary strings for the key IDs
|
||||
pssh_data: A binary string of the PSSH data
|
||||
"""
|
||||
self.version = version
|
||||
self.system_id = system_id
|
||||
self.key_ids = key_ids or []
|
||||
self.pssh_data = pssh_data or ''
|
||||
|
||||
def binary_string(self):
|
||||
"""Converts the PSSH box to a binary string."""
|
||||
ret = b'pssh' + _create_bin_int(self.version << 24)
|
||||
ret += self.system_id
|
||||
if self.version == 1:
|
||||
ret += _create_bin_int(len(self.key_ids))
|
||||
for key in self.key_ids:
|
||||
ret += key
|
||||
ret += _create_bin_int(len(self.pssh_data))
|
||||
ret += self.pssh_data
|
||||
return _create_bin_int(len(ret) + 4) + ret
|
||||
|
||||
def human_string(self):
|
||||
"""Converts the PSSH box to a human readable string."""
|
||||
system_name = ''
|
||||
convert_data = None
|
||||
if self.system_id == WIDEVINE_SYSTEM_ID:
|
||||
system_name = 'Widevine'
|
||||
convert_data = _parse_widevine_data
|
||||
elif self.system_id == PLAYREADY_SYSTEM_ID:
|
||||
system_name = 'PlayReady'
|
||||
convert_data = _parse_playready_data
|
||||
elif self.system_id == COMMON_SYSTEM_ID:
|
||||
system_name = 'Common'
|
||||
|
||||
lines = [
|
||||
'PSSH Box v%d' % self.version,
|
||||
' System ID: %s %s' % (system_name, _create_uuid(self.system_id))
|
||||
]
|
||||
if self.version == 1:
|
||||
lines.append(' Key IDs (%d):' % len(self.key_ids))
|
||||
lines.extend([' ' + _create_uuid(key) for key in self.key_ids])
|
||||
|
||||
lines.append(' PSSH Data (size: %d):' % len(self.pssh_data))
|
||||
if self.pssh_data:
|
||||
if convert_data:
|
||||
lines.append(' ' + system_name + ' Data:')
|
||||
try:
|
||||
extra = convert_data(self.pssh_data)
|
||||
lines.extend([' ' + x for x in extra])
|
||||
# pylint: disable=broad-except
|
||||
except Exception as e:
|
||||
lines.append(' ERROR: ' + e.message)
|
||||
else:
|
||||
lines.extend([
|
||||
' Raw Data (base64):',
|
||||
' ' + base64.b64encode(self.pssh_data)
|
||||
])
|
||||
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
def _split_list_on(elems, sep):
|
||||
"""Splits the given list on the given separator."""
|
||||
return [list(g) for k, g in itertools.groupby(elems, lambda x: x == sep)
|
||||
if not k]
|
||||
|
||||
|
||||
def _create_bin_int(value):
|
||||
"""Creates a 4-byte binary string from the given integer."""
|
||||
return (chr(value >> 24) + chr((value >> 16) & 0xff) +
|
||||
chr((value >> 8) & 0xff) + chr(value & 0xff))
|
||||
|
||||
|
||||
def _create_uuid(data):
|
||||
"""Creates a human readable UUID string from the given binary string."""
|
||||
ret = base64.b16encode(data).lower()
|
||||
return (ret[:8] + '-' + ret[8:12] + '-' + ret[12:16] + '-' + ret[16:20] +
|
||||
'-' + ret[20:])
|
||||
|
||||
|
||||
def _generate_widevine_data(key_ids, content_id, provider):
|
||||
wv = widevine_pssh_data_pb2.WidevinePsshData()
|
||||
wv.key_id.extend(key_ids)
|
||||
wv.provider = provider or ''
|
||||
wv.content_id = content_id
|
||||
return wv.SerializeToString()
|
||||
|
||||
|
||||
def _parse_widevine_data(data):
|
||||
"""Parses Widevine PSSH box from the given binary string."""
|
||||
wv = widevine_pssh_data_pb2.WidevinePsshData()
|
||||
wv.ParseFromString(data)
|
||||
|
||||
ret = []
|
||||
if wv.key_id:
|
||||
ret.append('Key IDs (%d):' % len(wv.key_id))
|
||||
ret.extend([' ' + _create_uuid(x) for x in wv.key_id])
|
||||
|
||||
if wv.HasField('provider'):
|
||||
ret.append('Provider: ' + wv.provider)
|
||||
if wv.HasField('content_id'):
|
||||
ret.append('Content ID: ' + base64.b16encode(wv.content_id))
|
||||
if wv.HasField('policy'):
|
||||
ret.append('Policy: ' + wv.policy)
|
||||
if wv.HasField('crypto_period_index'):
|
||||
ret.append('Crypto Period Index: %d' % wv.crypto_period_index)
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def _parse_playready_data(data):
|
||||
"""Parses PlayReady PSSH data from the given binary string."""
|
||||
reader = BinaryReader(data, little_endian=True)
|
||||
size = reader.read_int(4)
|
||||
if size != len(data):
|
||||
raise Exception('Length incorrect')
|
||||
|
||||
ret = []
|
||||
count = reader.read_int(2)
|
||||
while count > 0:
|
||||
count -= 1
|
||||
record_type = reader.read_int(2)
|
||||
record_len = reader.read_int(2)
|
||||
record_data = reader.read_bytes(record_len)
|
||||
|
||||
ret.append('Record (size %d):' % record_len)
|
||||
if record_type == 1:
|
||||
xml = record_data.decode('utf-16 LE')
|
||||
ret.extend([
|
||||
' Record Type: Rights Management Header (1)',
|
||||
' Record XML:',
|
||||
' ' + xml
|
||||
])
|
||||
elif record_type == 3:
|
||||
ret.extend([
|
||||
' Record Type: License Store (1)',
|
||||
' License Data:',
|
||||
' ' + base64.b64encode(record_data)
|
||||
])
|
||||
else:
|
||||
raise Exception('Invalid record type %d' % record_type)
|
||||
|
||||
if reader.has_data():
|
||||
raise Exception('Extra data after records')
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def _parse_boxes(data):
|
||||
"""Parses one or more PSSH boxes for the given binary data."""
|
||||
reader = BinaryReader(data, little_endian=False)
|
||||
boxes = []
|
||||
while reader.has_data():
|
||||
start = reader.position
|
||||
size = reader.read_int(4)
|
||||
|
||||
box_type = reader.read_bytes(4)
|
||||
if box_type != b'pssh':
|
||||
raise Exception(
|
||||
'Invalid box type 0x%s, not \'pssh\'' % box_type.encode('hex'))
|
||||
|
||||
version_and_flags = reader.read_int(4)
|
||||
version = version_and_flags >> 24
|
||||
if version > 1:
|
||||
raise Exception('Invalid PSSH version %d' % version)
|
||||
|
||||
system_id = reader.read_bytes(16)
|
||||
|
||||
key_ids = []
|
||||
if version == 1:
|
||||
count = reader.read_int(4)
|
||||
while count > 0:
|
||||
key = reader.read_bytes(16)
|
||||
key_ids.append(key)
|
||||
count -= 1
|
||||
|
||||
pssh_data_size = reader.read_int(4)
|
||||
pssh_data = reader.read_bytes(pssh_data_size)
|
||||
|
||||
if start + size != reader.position:
|
||||
raise Exception('Box size does not match size of data')
|
||||
|
||||
pssh = Pssh(version, system_id, key_ids, pssh_data)
|
||||
boxes.append(pssh)
|
||||
return boxes
|
||||
|
||||
|
||||
def _create_argument_parser():
|
||||
"""Creates an argument parser."""
|
||||
def hex_16_bytes(string):
|
||||
if not string or len(string) != 32:
|
||||
raise argparse.ArgumentTypeError(
|
||||
'Must be a 32-character hex string, %d given' % len(string))
|
||||
return base64.b16decode(string)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
usage='[--base64 | --hex | --human] options [-- options [-- ...]',
|
||||
epilog="""\
|
||||
This utility can be used to generate one or more PSSH boxes. This is done by
|
||||
passing a system ID and either --key-id or --pssh-data. Multiple boxes can be
|
||||
generated by separating boxes with --. Using --key-id will generate v1 pssh
|
||||
boxes, if none are given, it will generate v0.
|
||||
|
||||
You can also import PSSH boxes using --from-base64 and --from-hex. These must
|
||||
be valid PSSH boxes, but can be multiple concatenated together. These arguments
|
||||
can appear anywhere in the string. If it appears 'inside' another definition,
|
||||
it will appear before the generated one.
|
||||
|
||||
An alternative to --pssh-data is to generate Widevine PSSH data. This is only
|
||||
valid with --widevine-system-id. Passing --content-id will make it generate
|
||||
Widevine PSSH data instead. You can optionally add --provider. It will
|
||||
generate a v0 PSSH box for compatibility reasons.""")
|
||||
|
||||
formats = parser.add_mutually_exclusive_group()
|
||||
formats.add_argument('--base64',
|
||||
dest='format',
|
||||
action='store_const',
|
||||
const='base64',
|
||||
help='Output base64 encoded')
|
||||
formats.add_argument('--hex',
|
||||
dest='format',
|
||||
action='store_const',
|
||||
const='hex',
|
||||
help='Output hexadecimal encoded')
|
||||
formats.add_argument('--human',
|
||||
dest='format',
|
||||
action='store_const',
|
||||
const='human',
|
||||
help='Output a human readable string')
|
||||
|
||||
inputs = parser.add_mutually_exclusive_group()
|
||||
inputs.add_argument('--from-base64',
|
||||
metavar='<base64-string>',
|
||||
dest='input',
|
||||
type=base64.b64decode,
|
||||
help='Parse the given base64 encoded PSSH box')
|
||||
inputs.add_argument('--from-hex',
|
||||
metavar='<hex-string>',
|
||||
dest='input',
|
||||
type=base64.b16decode,
|
||||
help='Parse the given hexadecimal encoded PSSH box')
|
||||
|
||||
system_ids = parser.add_mutually_exclusive_group()
|
||||
system_ids.add_argument('--system-id',
|
||||
metavar='<hex-string>',
|
||||
dest='system_id',
|
||||
type=hex_16_bytes,
|
||||
help='Sets the system ID')
|
||||
system_ids.add_argument('--common-system-id',
|
||||
dest='system_id',
|
||||
action='store_const',
|
||||
const=COMMON_SYSTEM_ID,
|
||||
help='Use the Common system ID')
|
||||
system_ids.add_argument('--widevine-system-id',
|
||||
dest='system_id',
|
||||
action='store_const',
|
||||
const=WIDEVINE_SYSTEM_ID,
|
||||
help='Use the Widevine system ID')
|
||||
|
||||
extra = parser.add_argument_group()
|
||||
extra.add_argument('--key-id',
|
||||
metavar='<hex-string>',
|
||||
action='append',
|
||||
type=hex_16_bytes,
|
||||
help='Adds a key ID (can appear multiple times)')
|
||||
extra.add_argument('--pssh-data',
|
||||
metavar='<base64-string>',
|
||||
type=base64.b64decode,
|
||||
help='Sets the extra data')
|
||||
extra.add_argument('--content-id',
|
||||
metavar='<hex-string>',
|
||||
type=base64.b16decode,
|
||||
help='Sets the content ID of the Widevine PSSH data')
|
||||
extra.add_argument('--provider',
|
||||
metavar='<string>',
|
||||
help='Sets the provider of the Widevine PSSH data')
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(all_args):
|
||||
boxes = []
|
||||
output_format = None
|
||||
parser = _create_argument_parser()
|
||||
arg_groups = _split_list_on(all_args, '--')
|
||||
for args in arg_groups:
|
||||
ns = parser.parse_args(args)
|
||||
|
||||
if ns.format:
|
||||
if output_format:
|
||||
raise Exception('Can only specify one of: --base64, --hex, --human')
|
||||
else:
|
||||
output_format = ns.format
|
||||
|
||||
if ns.input:
|
||||
boxes.extend(_parse_boxes(ns.input))
|
||||
|
||||
pssh_data = ns.pssh_data
|
||||
if pssh_data and ns.content_id:
|
||||
raise Exception('Cannot specify both --pssh-data and --content-id')
|
||||
if ns.content_id:
|
||||
if ns.system_id != WIDEVINE_SYSTEM_ID:
|
||||
raise Exception('--content-id only valid with Widevine system ID')
|
||||
pssh_data = _generate_widevine_data(ns.key_id, ns.content_id, ns.provider)
|
||||
|
||||
# Ignore if we have no data.
|
||||
if not pssh_data and not ns.key_id and not ns.system_id:
|
||||
continue
|
||||
if not ns.system_id:
|
||||
raise Exception('System ID is required')
|
||||
|
||||
version = 1 if ns.key_id and not ns.content_id else 0
|
||||
boxes.append(Pssh(version, ns.system_id, ns.key_id, pssh_data))
|
||||
|
||||
if output_format == 'human' or not output_format:
|
||||
for box in boxes:
|
||||
print box.human_string()
|
||||
else:
|
||||
box_data = ''.join([x.binary_string() for x in boxes])
|
||||
if output_format == 'hex':
|
||||
print base64.b16encode(box_data)
|
||||
else:
|
||||
print base64.b64encode(box_data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv[1:])
|
Loading…
Reference in New Issue