#!/usr/bin/env python # Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import cStringIO import logging import os import sys import textwrap import unittest BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_PATH) from lib.bucket import Bucket from lib.ordered_dict import OrderedDict from lib.policy import Policy from lib.symbol import SymbolMappingCache from lib.symbol import FUNCTION_SYMBOLS, SOURCEFILE_SYMBOLS, TYPEINFO_SYMBOLS import subcommands class SymbolMappingCacheTest(unittest.TestCase): class MockBucketSet(object): def __init__(self, addresses): self._addresses = addresses def iter_addresses(self, symbol_type): # pylint: disable=W0613 for address in self._addresses: yield address class MockSymbolFinder(object): def __init__(self, mapping): self._mapping = mapping def find(self, address_list): result = OrderedDict() for address in address_list: result[address] = self._mapping[address] return result _TEST_FUNCTION_CACHE = textwrap.dedent("""\ 1 0x0000000000000001 7fc33eebcaa4 __gnu_cxx::new_allocator::allocate 7fc33ef69242 void DispatchToMethod """) _EXPECTED_TEST_FUNCTION_CACHE = textwrap.dedent("""\ 1 0x0000000000000001 7fc33eebcaa4 __gnu_cxx::new_allocator::allocate 7fc33ef69242 void DispatchToMethod 2 0x0000000000000002 7fc33ef7bc3e std::map::operator[] 7fc34411f9d5 WTF::RefCounted::operator new """) _TEST_FUNCTION_ADDRESS_LIST1 = [ 0x1, 0x7fc33eebcaa4, 0x7fc33ef69242] _TEST_FUNCTION_ADDRESS_LIST2 = [ 0x1, 0x2, 0x7fc33eebcaa4, 0x7fc33ef69242, 0x7fc33ef7bc3e, 0x7fc34411f9d5] _TEST_FUNCTION_DICT = { 0x1: '0x0000000000000001', 0x2: '0x0000000000000002', 0x7fc33eebcaa4: '__gnu_cxx::new_allocator::allocate', 0x7fc33ef69242: 'void DispatchToMethod', 0x7fc33ef7bc3e: 'std::map::operator[]', 0x7fc34411f9d5: 'WTF::RefCounted::operator new', } def test_update(self): symbol_mapping_cache = SymbolMappingCache() cache_f = cStringIO.StringIO() cache_f.write(self._TEST_FUNCTION_CACHE) # No update from self._TEST_FUNCTION_CACHE symbol_mapping_cache.update( FUNCTION_SYMBOLS, self.MockBucketSet(self._TEST_FUNCTION_ADDRESS_LIST1), self.MockSymbolFinder(self._TEST_FUNCTION_DICT), cache_f) for address in self._TEST_FUNCTION_ADDRESS_LIST1: self.assertEqual(self._TEST_FUNCTION_DICT[address], symbol_mapping_cache.lookup(FUNCTION_SYMBOLS, address)) self.assertEqual(self._TEST_FUNCTION_CACHE, cache_f.getvalue()) # Update to self._TEST_FUNCTION_ADDRESS_LIST2 symbol_mapping_cache.update( FUNCTION_SYMBOLS, self.MockBucketSet(self._TEST_FUNCTION_ADDRESS_LIST2), self.MockSymbolFinder(self._TEST_FUNCTION_DICT), cache_f) for address in self._TEST_FUNCTION_ADDRESS_LIST2: self.assertEqual(self._TEST_FUNCTION_DICT[address], symbol_mapping_cache.lookup(FUNCTION_SYMBOLS, address)) self.assertEqual(self._EXPECTED_TEST_FUNCTION_CACHE, cache_f.getvalue()) class PolicyTest(unittest.TestCase): class MockSymbolMappingCache(object): def __init__(self): self._symbol_caches = { FUNCTION_SYMBOLS: {}, SOURCEFILE_SYMBOLS: {}, TYPEINFO_SYMBOLS: {}, } def add(self, symbol_type, address, symbol): self._symbol_caches[symbol_type][address] = symbol def lookup(self, symbol_type, address): symbol = self._symbol_caches[symbol_type].get(address) return symbol if symbol else '0x%016x' % address _TEST_POLICY = textwrap.dedent("""\ { "components": [ "second", "mmap-v8", "malloc-v8", "malloc-WebKit", "mmap-catch-all", "malloc-catch-all" ], "rules": [ { "name": "second", "stacktrace": "optional", "allocator": "optional" }, { "name": "mmap-v8", "stacktrace": ".*v8::.*", "allocator": "mmap" }, { "name": "malloc-v8", "stacktrace": ".*v8::.*", "allocator": "malloc" }, { "name": "malloc-WebKit", "stacktrace": ".*WebKit::.*", "allocator": "malloc" }, { "name": "mmap-catch-all", "stacktrace": ".*", "allocator": "mmap" }, { "name": "malloc-catch-all", "stacktrace": ".*", "allocator": "malloc" } ], "version": "POLICY_DEEP_3" } """) def test_load(self): policy = Policy.parse(cStringIO.StringIO(self._TEST_POLICY), 'json') self.assertTrue(policy) self.assertEqual('POLICY_DEEP_3', policy.version) def test_find(self): policy = Policy.parse(cStringIO.StringIO(self._TEST_POLICY), 'json') self.assertTrue(policy) symbol_mapping_cache = self.MockSymbolMappingCache() symbol_mapping_cache.add(FUNCTION_SYMBOLS, 0x1212, 'v8::create') symbol_mapping_cache.add(FUNCTION_SYMBOLS, 0x1381, 'WebKit::create') bucket1 = Bucket([0x1212, 0x013], 'malloc', 0x29492, '_Z') bucket1.symbolize(symbol_mapping_cache) bucket2 = Bucket([0x18242, 0x1381], 'malloc', 0x9492, '_Z') bucket2.symbolize(symbol_mapping_cache) bucket3 = Bucket([0x18242, 0x181], 'malloc', 0x949, '_Z') bucket3.symbolize(symbol_mapping_cache) self.assertEqual('malloc-v8', policy.find_malloc(bucket1)) self.assertEqual('malloc-WebKit', policy.find_malloc(bucket2)) self.assertEqual('malloc-catch-all', policy.find_malloc(bucket3)) class BucketsCommandTest(unittest.TestCase): def test(self): BUCKETS_PATH = os.path.join(BASE_PATH, 'tests', 'output', 'buckets') with open(BUCKETS_PATH) as output_f: expected = output_f.read() out = cStringIO.StringIO() HEAP_PATH = os.path.join(BASE_PATH, 'tests', 'data', 'heap.01234.0001.heap') subcommand = subcommands.BucketsCommand() returncode = subcommand.do(['buckets', HEAP_PATH], out) self.assertEqual(0, returncode) self.assertEqual(expected, out.getvalue()) class UploadCommandTest(unittest.TestCase): def test(self): MOCK_GSUTIL_PATH = os.path.join(BASE_PATH, 'tests', 'mock_gsutil.py') HEAP_PATH = os.path.join(BASE_PATH, 'tests', 'data', 'heap.01234.0001.heap') subcommand = subcommands.UploadCommand() returncode = subcommand.do([ 'upload', '--gsutil', MOCK_GSUTIL_PATH, HEAP_PATH, 'gs://test-storage/']) self.assertEqual(0, returncode) if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG if '-v' in sys.argv else logging.ERROR, format='%(levelname)5s %(filename)15s(%(lineno)3d): %(message)s') unittest.main()