Adding tests utilizing the sample packager binary.

Change-Id: Id868c3cb00f418912329cafeb48c784795a13cd3
This commit is contained in:
Kyle Alexander 2014-07-16 13:47:57 -07:00 committed by Gerrit Code Review
parent 72b6988516
commit 78aa230552
3 changed files with 236 additions and 0 deletions

41
app/test/packager_app.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/python
#
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Test wrapper for the sample packager binary."""
import os
import subprocess
import test_env
class PackagerApp(object):
"""Main integration class for testing the packager binary."""
def __init__(self, build_type='Debug'):
self.build_dir = os.path.join(test_env.SRC_DIR, 'out', build_type)
self.binary = os.path.join(self.build_dir, 'packager')
def BuildSrc(self, clean=True):
if clean:
assert 0 == subprocess.call(['ninja', '-C', self.build_dir, '-t', 'clean'])
return subprocess.call(['ninja', '-C', self.build_dir])
def DumpStreamInfo(self, stream):
input_str = 'input=%s' % stream
cmd = [self.binary, input_str, '--dump_stream_info']
return subprocess.check_output(cmd)
def Package(self, streams, flags=None):
if flags is None:
flags = []
cmd = [self.binary]
cmd.extend(streams)
cmd.extend(flags)
assert 0 == subprocess.call(cmd)

142
app/test/packager_test.py Executable file
View File

@ -0,0 +1,142 @@
#!/usr/bin/python
#
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Tests utilizing the sample packager binary."""
import os
import shutil
import tempfile
import unittest
import packager_app
import test_env
class PackagerAppTest(unittest.TestCase):
def setUp(self):
self.packager = packager_app.PackagerApp()
self.input = os.path.join(
test_env.SRC_DIR, 'media', 'test', 'data', 'bear-1280x720.mp4')
self.tmpdir = tempfile.mkdtemp()
fd, self.output = tempfile.mkstemp(dir=self.tmpdir)
os.close(fd)
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testBuildingCode(self):
self.assertEqual(0, self.packager.BuildSrc())
def testDumpStreamInfo(self):
stream_info = self.packager.DumpStreamInfo(self.input)
expected_stream_info = ('Found 2 stream(s).\n'
'Stream [0] type: Audio\n'
' codec_string: mp4a.40.2\n'
' time_scale: 44100\n'
' duration: 121856 (2.8 seconds)\n'
' language: und\n'
' is_encrypted: false\n'
' codec: AAC\n'
' sample_bits: 16\n'
' num_channels: 2\n'
' sampling_frequency: 44100\n\n'
'Stream [1] type: Video\n'
' codec_string: avc1.64001f\n'
' time_scale: 30000\n'
' duration: 82082 (2.7 seconds)\n'
' language: und\n'
' is_encrypted: false\n'
' codec: H264\n'
' width: 1280\n'
' height: 720\n'
' nalu_length_size: 4')
self.assertIn(expected_stream_info, stream_info)
def testMuxFirstStream(self):
stream = 'input=%s,stream=0,output=%s' % (self.input, self.output)
streams = [stream]
self.packager.Package(streams)
self._AssertStreamInfo(self.output, 'type: Audio')
def testMuxAudioStream(self):
stream = 'input=%s,stream=%s,output=%s' % (self.input, 'audio', self.output)
streams = [stream]
self.packager.Package(streams)
self._AssertStreamInfo(self.output, 'type: Audio')
def testMuxMultiSegments(self):
template = '%s$Number$.m4s' % self.output
stream = ('input=%s,stream=%s,init_segment=%s,segment_template=%s' %
(self.input, 'video', self.output, template))
streams = [stream]
flags = ['--nosingle_segment']
self.packager.Package(streams, flags)
self._AssertStreamInfo(self.output, 'type: Video')
def testEncryptingVideoStream(self):
stream = 'input=%s,stream=%s,output=%s' % (self.input, 'video', self.output)
streams = [stream]
flags = ['--enable_fixed_key_encryption',
'--key_id=31323334353637383930313233343536',
'--key=31',
'--pssh=33']
self.packager.Package(streams, flags)
self._AssertStreamInfo(self.output, 'is_encrypted: true')
@unittest.skipUnless(test_env.has_aes_flags,
'Requires AES and network credentials.')
def testWidevineEncryptionWithAes(self):
stream = 'input=%s,stream=%s,output=%s' % (self.input, 'video', self.output)
streams = [stream]
flags = ['--enable_widevine_encryption',
'--key_server_url=' + test_env.options.key_server_url,
'--content_id=' + test_env.options.content_id,
'--signer=' + test_env.options.signer,
'--aes_signing_key=' + test_env.options.aes_signing_key,
'--aes_signing_iv=' + test_env.options.aes_signing_iv]
self.packager.Package(streams, flags)
self._AssertStreamInfo(self.output, 'is_encrypted: true')
@unittest.skipUnless(test_env.has_aes_flags,
'Requires AES and network credentials.')
def testKeyRotationWithAes(self):
stream = 'input=%s,stream=%s,output=%s' % (self.input, 'video', self.output)
streams = [stream]
flags = ['--enable_widevine_encryption',
'--key_server_url=' + test_env.options.key_server_url,
'--content_id=' + test_env.options.content_id,
'--signer=' + test_env.options.signer,
'--aes_signing_key=' + test_env.options.aes_signing_key,
'--aes_signing_iv=' + test_env.options.aes_signing_iv,
'--crypto_period_duration=1']
self.packager.Package(streams, flags)
self._AssertStreamInfo(self.output, 'is_encrypted: true')
@unittest.skipUnless(test_env.has_rsa_flags,
'Requires RSA and network credentials.')
def testWidevineEncryptionWithRsa(self):
stream = 'input=%s,stream=%s,output=%s' % (self.input, 'video', self.output)
streams = [stream]
flags = ['--enable_widevine_encryption',
'--key_server_url=' + test_env.options.key_server_url,
'--content_id=' + test_env.options.content_id,
'--signer=' + test_env.options.signer,
'--rsa_signing_key_path=' + test_env.options.rsa_signing_key_path]
self.packager.Package(streams, flags)
self._AssertStreamInfo(self.output, 'is_encrypted: true')
def _AssertStreamInfo(self, stream, info):
stream_info = self.packager.DumpStreamInfo(stream)
self.assertIn('Found 1 stream(s).', stream_info)
self.assertIn(info, stream_info)
if __name__ == '__main__':
unittest.main()

53
app/test/test_env.py Normal file
View File

@ -0,0 +1,53 @@
#!/usr/bin/python
#
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Packager testing global objects and attributes.
Please do not refer to this module directly. Please set attributes
either by updating the default values below, or by passing the requested
flags through the command line interface.
"""
import argparse
import os
import sys
# Define static global objects and attributes.
SRC_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../..')
# Parse arguments and calculate dynamic global objects and attributes.
parser = argparse.ArgumentParser()
common = parser.add_argument_group(
'encryption flags',
'These flags are required to enable AES and/or RSA encryption tests.')
common.add_argument('--key_server_url')
common.add_argument('--content_id')
common.add_argument('--signer')
aes = parser.add_argument_group(
'aes flags',
'These flags are required to enable AES signed encryption tests.')
aes.add_argument('--aes_signing_key')
aes.add_argument('--aes_signing_iv')
rsa = parser.add_argument_group(
'rsa flags',
'These flags are required to enable RSA signed encryption tests.')
rsa.add_argument('--rsa_signing_key_path')
options, args = parser.parse_known_args()
sys.argv[1:] = args
has_aes_flags = False
if (options.key_server_url and options.content_id and options.signer and
options.aes_signing_key and options.aes_signing_iv):
has_aes_flags = True
has_rsa_flags = False
if (options.key_server_url and options.content_id and options.signer and
options.rsa_signing_key_path):
has_rsa_flags = True