# Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import os import re import time import urlparse from telemetry.core import util def _UrlPathJoin(*args): """Joins each path in |args| for insertion into a URL path. This is distinct from os.path.join in that: 1. Forward slashes are always used. 2. Paths beginning with '/' are not treated as absolute. For example: _UrlPathJoin('a', 'b') => 'a/b' _UrlPathJoin('a/', 'b') => 'a/b' _UrlPathJoin('a', '/b') => 'a/b' _UrlPathJoin('a/', '/b') => 'a/b' """ if not args: return '' if len(args) == 1: return str(args[0]) else: args = [str(arg).replace('\\', '/') for arg in args] work = [args[0]] for arg in args[1:]: if not arg: continue if arg.startswith('/'): work.append(arg[1:]) else: work.append(arg) joined = reduce(os.path.join, work) return joined.replace('\\', '/') class Page(object): def __init__(self, url, page_set, attributes=None, base_dir=None): parsed_url = urlparse.urlparse(url) if not parsed_url.scheme: abspath = os.path.abspath(os.path.join(base_dir, parsed_url.path)) if os.path.exists(abspath): url = 'file://%s' % os.path.abspath(os.path.join(base_dir, url)) else: raise Exception('URLs must be fully qualified: %s' % url) self.url = url self.page_set = page_set self.base_dir = base_dir # These attributes can be set dynamically by the page. self.credentials = None self.disabled = False self.script_to_evaluate_on_commit = None if attributes: for k, v in attributes.iteritems(): setattr(self, k, v) def __getattr__(self, name): if self.page_set and hasattr(self.page_set, name): return getattr(self.page_set, name) raise AttributeError() @property def is_file(self): parsed_url = urlparse.urlparse(self.url) return parsed_url.scheme == 'file' @property def is_local(self): parsed_url = urlparse.urlparse(self.url) return parsed_url.scheme == 'file' or parsed_url.scheme == 'chrome' @property def serving_dirs_and_file(self): parsed_url = urlparse.urlparse(self.url) path = _UrlPathJoin(self.base_dir, parsed_url.netloc, parsed_url.path) if hasattr(self.page_set, 'serving_dirs'): url_base_dir = os.path.commonprefix(self.page_set.serving_dirs) base_path = _UrlPathJoin(self.base_dir, url_base_dir) return ([_UrlPathJoin(self.base_dir, d) for d in self.page_set.serving_dirs], path.replace(base_path, '')) return os.path.split(path) # A version of this page's URL that's safe to use as a filename. @property def url_as_file_safe_name(self): # Just replace all special characters in the url with underscore. return re.sub('[^a-zA-Z0-9]', '_', self.display_url) @property def display_url(self): if not self.is_local: return self.url url_paths = ['/'.join(p.url.strip('/').split('/')[:-1]) for p in self.page_set if p.is_file] common_prefix = os.path.commonprefix(url_paths) return self.url[len(common_prefix):].strip('/') @property def archive_path(self): return self.page_set.WprFilePathForPage(self) def __str__(self): return self.url def WaitToLoad(self, tab, timeout, poll_interval=0.1): Page.WaitForPageToLoad(self, tab, timeout, poll_interval) # TODO(dtu): Remove this method when no page sets use a click interaction # with a wait condition. crbug.com/168431 @staticmethod def WaitForPageToLoad(obj, tab, timeout, poll_interval=0.1): """Waits for various wait conditions present in obj.""" if hasattr(obj, 'wait_seconds'): time.sleep(obj.wait_seconds) if hasattr(obj, 'wait_for_element_with_text'): callback_code = 'function(element) { return element != null; }' util.WaitFor( lambda: util.FindElementAndPerformAction( tab, obj.wait_for_element_with_text, callback_code), timeout, poll_interval) if hasattr(obj, 'wait_for_element_with_selector'): util.WaitFor(lambda: tab.EvaluateJavaScript( 'document.querySelector(\'' + obj.wait_for_element_with_selector + '\') != null'), timeout, poll_interval) if hasattr(obj, 'post_navigate_javascript_to_execute'): tab.EvaluateJavaScript(obj.post_navigate_javascript_to_execute) if hasattr(obj, 'wait_for_javascript_expression'): util.WaitFor( lambda: tab.EvaluateJavaScript(obj.wait_for_javascript_expression), timeout, poll_interval)