937 lines
33 KiB
C++
937 lines
33 KiB
C++
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
#include "base/threading/sequenced_worker_pool.h"
|
|
|
|
#include <algorithm>
|
|
|
|
#include "base/bind.h"
|
|
#include "base/compiler_specific.h"
|
|
#include "base/memory/ref_counted.h"
|
|
#include "base/memory/scoped_ptr.h"
|
|
#include "base/message_loop/message_loop.h"
|
|
#include "base/message_loop/message_loop_proxy.h"
|
|
#include "base/synchronization/condition_variable.h"
|
|
#include "base/synchronization/lock.h"
|
|
#include "base/test/sequenced_task_runner_test_template.h"
|
|
#include "base/test/sequenced_worker_pool_owner.h"
|
|
#include "base/test/task_runner_test_template.h"
|
|
#include "base/test/test_timeouts.h"
|
|
#include "base/threading/platform_thread.h"
|
|
#include "base/time/time.h"
|
|
#include "base/tracked_objects.h"
|
|
#include "testing/gtest/include/gtest/gtest.h"
|
|
|
|
namespace base {
|
|
|
|
// IMPORTANT NOTE:
|
|
//
|
|
// Many of these tests have failure modes where they'll hang forever. These
|
|
// tests should not be flaky, and hanging indicates a type of failure. Do not
|
|
// mark as flaky if they're hanging, it's likely an actual bug.
|
|
|
|
namespace {
|
|
|
|
const size_t kNumWorkerThreads = 3;
|
|
|
|
// Allows a number of threads to all be blocked on the same event, and
|
|
// provides a way to unblock a certain number of them.
|
|
class ThreadBlocker {
|
|
public:
|
|
ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
|
|
|
|
void Block() {
|
|
{
|
|
base::AutoLock lock(lock_);
|
|
while (unblock_counter_ == 0)
|
|
cond_var_.Wait();
|
|
unblock_counter_--;
|
|
}
|
|
cond_var_.Signal();
|
|
}
|
|
|
|
void Unblock(size_t count) {
|
|
{
|
|
base::AutoLock lock(lock_);
|
|
DCHECK(unblock_counter_ == 0);
|
|
unblock_counter_ = count;
|
|
}
|
|
cond_var_.Signal();
|
|
}
|
|
|
|
private:
|
|
base::Lock lock_;
|
|
base::ConditionVariable cond_var_;
|
|
|
|
size_t unblock_counter_;
|
|
};
|
|
|
|
class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
|
|
public:
|
|
TestTracker()
|
|
: lock_(),
|
|
cond_var_(&lock_),
|
|
started_events_(0) {
|
|
}
|
|
|
|
// Each of these tasks appends the argument to the complete sequence vector
|
|
// so calling code can see what order they finished in.
|
|
void FastTask(int id) {
|
|
SignalWorkerDone(id);
|
|
}
|
|
|
|
void SlowTask(int id) {
|
|
base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
|
|
SignalWorkerDone(id);
|
|
}
|
|
|
|
void BlockTask(int id, ThreadBlocker* blocker) {
|
|
// Note that this task has started and signal anybody waiting for that
|
|
// to happen.
|
|
{
|
|
base::AutoLock lock(lock_);
|
|
started_events_++;
|
|
}
|
|
cond_var_.Signal();
|
|
|
|
blocker->Block();
|
|
SignalWorkerDone(id);
|
|
}
|
|
|
|
void PostAdditionalTasks(
|
|
int id, SequencedWorkerPool* pool,
|
|
bool expected_return_value) {
|
|
Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
|
|
EXPECT_EQ(expected_return_value,
|
|
pool->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE, fast_task,
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
|
|
EXPECT_EQ(expected_return_value,
|
|
pool->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE, fast_task,
|
|
SequencedWorkerPool::SKIP_ON_SHUTDOWN));
|
|
pool->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE, fast_task,
|
|
SequencedWorkerPool::BLOCK_SHUTDOWN);
|
|
SignalWorkerDone(id);
|
|
}
|
|
|
|
// Waits until the given number of tasks have started executing.
|
|
void WaitUntilTasksBlocked(size_t count) {
|
|
{
|
|
base::AutoLock lock(lock_);
|
|
while (started_events_ < count)
|
|
cond_var_.Wait();
|
|
}
|
|
cond_var_.Signal();
|
|
}
|
|
|
|
// Blocks the current thread until at least the given number of tasks are in
|
|
// the completed vector, and then returns a copy.
|
|
std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
|
|
std::vector<int> ret;
|
|
{
|
|
base::AutoLock lock(lock_);
|
|
while (complete_sequence_.size() < num_tasks)
|
|
cond_var_.Wait();
|
|
ret = complete_sequence_;
|
|
}
|
|
cond_var_.Signal();
|
|
return ret;
|
|
}
|
|
|
|
size_t GetTasksCompletedCount() {
|
|
base::AutoLock lock(lock_);
|
|
return complete_sequence_.size();
|
|
}
|
|
|
|
void ClearCompleteSequence() {
|
|
base::AutoLock lock(lock_);
|
|
complete_sequence_.clear();
|
|
started_events_ = 0;
|
|
}
|
|
|
|
private:
|
|
friend class base::RefCountedThreadSafe<TestTracker>;
|
|
~TestTracker() {}
|
|
|
|
void SignalWorkerDone(int id) {
|
|
{
|
|
base::AutoLock lock(lock_);
|
|
complete_sequence_.push_back(id);
|
|
}
|
|
cond_var_.Signal();
|
|
}
|
|
|
|
// Protects the complete_sequence.
|
|
base::Lock lock_;
|
|
|
|
base::ConditionVariable cond_var_;
|
|
|
|
// Protected by lock_.
|
|
std::vector<int> complete_sequence_;
|
|
|
|
// Counter of the number of "block" workers that have started.
|
|
size_t started_events_;
|
|
};
|
|
|
|
class SequencedWorkerPoolTest : public testing::Test {
|
|
public:
|
|
SequencedWorkerPoolTest()
|
|
: tracker_(new TestTracker) {
|
|
ResetPool();
|
|
}
|
|
|
|
virtual ~SequencedWorkerPoolTest() {}
|
|
|
|
virtual void SetUp() OVERRIDE {}
|
|
|
|
virtual void TearDown() OVERRIDE {
|
|
pool()->Shutdown();
|
|
}
|
|
|
|
const scoped_refptr<SequencedWorkerPool>& pool() {
|
|
return pool_owner_->pool();
|
|
}
|
|
TestTracker* tracker() { return tracker_.get(); }
|
|
|
|
// Destroys the SequencedWorkerPool instance, blocking until it is fully shut
|
|
// down, and creates a new instance.
|
|
void ResetPool() {
|
|
pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
|
|
}
|
|
|
|
void SetWillWaitForShutdownCallback(const Closure& callback) {
|
|
pool_owner_->SetWillWaitForShutdownCallback(callback);
|
|
}
|
|
|
|
// Ensures that the given number of worker threads is created by adding
|
|
// tasks and waiting until they complete. Worker thread creation is
|
|
// serialized, can happen on background threads asynchronously, and doesn't
|
|
// happen any more at shutdown. This means that if a test posts a bunch of
|
|
// tasks and calls shutdown, fewer workers will be created than the test may
|
|
// expect.
|
|
//
|
|
// This function ensures that this condition can't happen so tests can make
|
|
// assumptions about the number of workers active. See the comment in
|
|
// PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
|
|
// details.
|
|
//
|
|
// It will post tasks to the queue with id -1. It also assumes this is the
|
|
// first thing called in a test since it will clear the complete_sequence_.
|
|
void EnsureAllWorkersCreated() {
|
|
// Create a bunch of threads, all waiting. This will cause that may
|
|
// workers to be created.
|
|
ThreadBlocker blocker;
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), -1, &blocker));
|
|
}
|
|
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
|
|
|
// Now wake them up and wait until they're done.
|
|
blocker.Unblock(kNumWorkerThreads);
|
|
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
|
|
|
|
// Clean up the task IDs we added.
|
|
tracker()->ClearCompleteSequence();
|
|
}
|
|
|
|
int has_work_call_count() const {
|
|
return pool_owner_->has_work_call_count();
|
|
}
|
|
|
|
private:
|
|
MessageLoop message_loop_;
|
|
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
|
|
const scoped_refptr<TestTracker> tracker_;
|
|
};
|
|
|
|
// Checks that the given number of entries are in the tasks to complete of
|
|
// the given tracker, and then signals the given event the given number of
|
|
// times. This is used to wakt up blocked background threads before blocking
|
|
// on shutdown.
|
|
void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
|
|
size_t expected_tasks_to_complete,
|
|
ThreadBlocker* blocker,
|
|
size_t threads_to_awake) {
|
|
EXPECT_EQ(
|
|
expected_tasks_to_complete,
|
|
tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
|
|
|
|
blocker->Unblock(threads_to_awake);
|
|
}
|
|
|
|
class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
|
|
public:
|
|
explicit DeletionHelper(
|
|
const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
|
|
: deleted_flag_(deleted_flag) {
|
|
}
|
|
|
|
private:
|
|
friend class base::RefCountedThreadSafe<DeletionHelper>;
|
|
virtual ~DeletionHelper() { deleted_flag_->data = true; }
|
|
|
|
const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
|
|
DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
|
|
};
|
|
|
|
void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
|
|
const scoped_refptr<DeletionHelper>& helper) {
|
|
ADD_FAILURE() << "Should never run";
|
|
}
|
|
|
|
// Tests that delayed tasks are deleted upon shutdown of the pool.
|
|
TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
|
|
// Post something to verify the pool is started up.
|
|
EXPECT_TRUE(pool()->PostTask(
|
|
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
|
|
|
|
scoped_refptr<base::RefCountedData<bool> > deleted_flag(
|
|
new base::RefCountedData<bool>(false));
|
|
|
|
base::Time posted_at(base::Time::Now());
|
|
// Post something that shouldn't run.
|
|
EXPECT_TRUE(pool()->PostDelayedTask(
|
|
FROM_HERE,
|
|
base::Bind(&HoldPoolReference,
|
|
pool(),
|
|
make_scoped_refptr(new DeletionHelper(deleted_flag))),
|
|
TestTimeouts::action_timeout()));
|
|
|
|
std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
|
|
ASSERT_EQ(1u, completion_sequence.size());
|
|
ASSERT_EQ(1, completion_sequence[0]);
|
|
|
|
pool()->Shutdown();
|
|
// Shutdown is asynchronous, so use ResetPool() to block until the pool is
|
|
// fully destroyed (and thus shut down).
|
|
ResetPool();
|
|
|
|
// Verify that we didn't block until the task was due.
|
|
ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
|
|
|
|
// Verify that the deferred task has not only not run, but has also been
|
|
// destroyed.
|
|
ASSERT_TRUE(deleted_flag->data);
|
|
}
|
|
|
|
// Tests that same-named tokens have the same ID.
|
|
TEST_F(SequencedWorkerPoolTest, NamedTokens) {
|
|
const std::string name1("hello");
|
|
SequencedWorkerPool::SequenceToken token1 =
|
|
pool()->GetNamedSequenceToken(name1);
|
|
|
|
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
|
|
|
|
const std::string name3("goodbye");
|
|
SequencedWorkerPool::SequenceToken token3 =
|
|
pool()->GetNamedSequenceToken(name3);
|
|
|
|
// All 3 tokens should be different.
|
|
EXPECT_FALSE(token1.Equals(token2));
|
|
EXPECT_FALSE(token1.Equals(token3));
|
|
EXPECT_FALSE(token2.Equals(token3));
|
|
|
|
// Requesting the same name again should give the same value.
|
|
SequencedWorkerPool::SequenceToken token1again =
|
|
pool()->GetNamedSequenceToken(name1);
|
|
EXPECT_TRUE(token1.Equals(token1again));
|
|
|
|
SequencedWorkerPool::SequenceToken token3again =
|
|
pool()->GetNamedSequenceToken(name3);
|
|
EXPECT_TRUE(token3.Equals(token3again));
|
|
}
|
|
|
|
// Tests that posting a bunch of tasks (many more than the number of worker
|
|
// threads) runs them all.
|
|
TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::SlowTask, tracker(), 0));
|
|
|
|
const size_t kNumTasks = 20;
|
|
for (size_t i = 1; i < kNumTasks; i++) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), i));
|
|
}
|
|
|
|
std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
|
|
EXPECT_EQ(kNumTasks, result.size());
|
|
}
|
|
|
|
// Tests that posting a bunch of tasks (many more than the number of
|
|
// worker threads) to two pools simultaneously runs them all twice.
|
|
// This test is meant to shake out any concurrency issues between
|
|
// pools (like histograms).
|
|
TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
|
|
SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
|
|
SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
|
|
|
|
base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
|
|
pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
|
|
pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
|
|
|
|
const size_t kNumTasks = 20;
|
|
for (size_t i = 1; i < kNumTasks; i++) {
|
|
base::Closure fast_task =
|
|
base::Bind(&TestTracker::FastTask, tracker(), i);
|
|
pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
|
|
pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
|
|
}
|
|
|
|
std::vector<int> result =
|
|
tracker()->WaitUntilTasksComplete(2*kNumTasks);
|
|
EXPECT_EQ(2 * kNumTasks, result.size());
|
|
|
|
pool2.pool()->Shutdown();
|
|
pool1.pool()->Shutdown();
|
|
}
|
|
|
|
// Test that tasks with the same sequence token are executed in order but don't
|
|
// affect other tasks.
|
|
TEST_F(SequencedWorkerPoolTest, Sequence) {
|
|
// Fill all the worker threads except one.
|
|
const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
|
|
ThreadBlocker background_blocker;
|
|
for (size_t i = 0; i < kNumBackgroundTasks; i++) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), i, &background_blocker));
|
|
}
|
|
tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
|
|
|
|
// Create two tasks with the same sequence token, one that will block on the
|
|
// event, and one which will just complete quickly when it's run. Since there
|
|
// is one worker thread free, the first task will start and then block, and
|
|
// the second task should be waiting.
|
|
ThreadBlocker blocker;
|
|
SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
|
|
pool()->PostSequencedWorkerTask(
|
|
token1, FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
|
|
pool()->PostSequencedWorkerTask(
|
|
token1, FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 101));
|
|
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
|
|
|
// Create another two tasks as above with a different token. These will be
|
|
// blocked since there are no slots to run.
|
|
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
|
|
pool()->PostSequencedWorkerTask(
|
|
token2, FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 200));
|
|
pool()->PostSequencedWorkerTask(
|
|
token2, FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 201));
|
|
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
|
|
|
// Let one background task complete. This should then let both tasks of
|
|
// token2 run to completion in order. The second task of token1 should still
|
|
// be blocked.
|
|
background_blocker.Unblock(1);
|
|
std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
|
|
ASSERT_EQ(3u, result.size());
|
|
EXPECT_EQ(200, result[1]);
|
|
EXPECT_EQ(201, result[2]);
|
|
|
|
// Finish the rest of the background tasks. This should leave some workers
|
|
// free with the second token1 task still blocked on the first.
|
|
background_blocker.Unblock(kNumBackgroundTasks - 1);
|
|
EXPECT_EQ(kNumBackgroundTasks + 2,
|
|
tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
|
|
|
|
// Allow the first task of token1 to complete. This should run the second.
|
|
blocker.Unblock(1);
|
|
result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
|
|
ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
|
|
EXPECT_EQ(100, result[result.size() - 2]);
|
|
EXPECT_EQ(101, result[result.size() - 1]);
|
|
}
|
|
|
|
// Tests that any tasks posted after Shutdown are ignored.
|
|
// Disabled for flakiness. See http://crbug.com/166451.
|
|
TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
|
|
// Start tasks to take all the threads and block them.
|
|
EnsureAllWorkersCreated();
|
|
ThreadBlocker blocker;
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), i, &blocker));
|
|
}
|
|
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
|
|
|
SetWillWaitForShutdownCallback(
|
|
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
|
|
scoped_refptr<TestTracker>(tracker()), 0,
|
|
&blocker, kNumWorkerThreads));
|
|
|
|
// Shutdown the worker pool. This should discard all non-blocking tasks.
|
|
const int kMaxNewBlockingTasksAfterShutdown = 100;
|
|
pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
|
|
|
|
int old_has_work_call_count = has_work_call_count();
|
|
|
|
std::vector<int> result =
|
|
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
|
|
|
|
// The kNumWorkerThread items should have completed, in no particular order.
|
|
ASSERT_EQ(kNumWorkerThreads, result.size());
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
|
|
result.end());
|
|
}
|
|
|
|
// No further tasks, regardless of shutdown mode, should be allowed.
|
|
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 100),
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
|
|
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 101),
|
|
SequencedWorkerPool::SKIP_ON_SHUTDOWN));
|
|
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 102),
|
|
SequencedWorkerPool::BLOCK_SHUTDOWN));
|
|
|
|
ASSERT_EQ(old_has_work_call_count, has_work_call_count());
|
|
}
|
|
|
|
TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
|
|
// Test that <n> new blocking tasks are allowed provided they're posted
|
|
// by a running tasks.
|
|
EnsureAllWorkersCreated();
|
|
ThreadBlocker blocker;
|
|
|
|
// Start tasks to take all the threads and block them.
|
|
const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
|
|
for (int i = 0; i < kNumBlockTasks; ++i) {
|
|
EXPECT_TRUE(pool()->PostWorkerTask(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
|
|
}
|
|
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
|
|
|
// Queue up shutdown blocking tasks behind those which will attempt to post
|
|
// additional tasks when run, PostAdditionalTasks attemtps to post 3
|
|
// new FastTasks, one for each shutdown_behavior.
|
|
const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
|
|
for (int i = 0; i < kNumQueuedTasks; ++i) {
|
|
EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
|
|
false),
|
|
SequencedWorkerPool::BLOCK_SHUTDOWN));
|
|
}
|
|
|
|
// Setup to open the floodgates from within Shutdown().
|
|
SetWillWaitForShutdownCallback(
|
|
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
|
|
scoped_refptr<TestTracker>(tracker()),
|
|
0, &blocker, kNumBlockTasks));
|
|
|
|
// Allow half of the additional blocking tasks thru.
|
|
const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
|
|
pool()->Shutdown(kNumNewBlockingTasksToAllow);
|
|
|
|
// Ensure that the correct number of tasks actually got run.
|
|
tracker()->WaitUntilTasksComplete(static_cast<size_t>(
|
|
kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
|
|
|
|
// Clean up the task IDs we added and go home.
|
|
tracker()->ClearCompleteSequence();
|
|
}
|
|
|
|
// Tests that unrun tasks are discarded properly according to their shutdown
|
|
// mode.
|
|
TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
|
|
// Start tasks to take all the threads and block them.
|
|
EnsureAllWorkersCreated();
|
|
ThreadBlocker blocker;
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), i, &blocker));
|
|
}
|
|
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
|
|
|
// Create some tasks with different shutdown modes.
|
|
pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 100),
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
|
|
pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 101),
|
|
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
|
|
pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 102),
|
|
SequencedWorkerPool::BLOCK_SHUTDOWN);
|
|
|
|
// Shutdown the worker pool. This should discard all non-blocking tasks.
|
|
SetWillWaitForShutdownCallback(
|
|
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
|
|
scoped_refptr<TestTracker>(tracker()), 0,
|
|
&blocker, kNumWorkerThreads));
|
|
pool()->Shutdown();
|
|
|
|
std::vector<int> result =
|
|
tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
|
|
|
|
// The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
|
|
// one, in no particular order.
|
|
ASSERT_EQ(kNumWorkerThreads + 1, result.size());
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
|
|
result.end());
|
|
}
|
|
EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
|
|
}
|
|
|
|
// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
|
|
TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
|
|
scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
|
|
scoped_refptr<SequencedTaskRunner> sequenced_runner(
|
|
pool()->GetSequencedTaskRunnerWithShutdownBehavior(
|
|
pool()->GetSequenceToken(),
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
|
|
EnsureAllWorkersCreated();
|
|
ThreadBlocker blocker;
|
|
pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), 0, &blocker),
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
|
|
runner->PostTask(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), 1, &blocker));
|
|
sequenced_runner->PostTask(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), 2, &blocker));
|
|
|
|
tracker()->WaitUntilTasksBlocked(3);
|
|
|
|
// This should not block. If this test hangs, it means it failed.
|
|
pool()->Shutdown();
|
|
|
|
// The task should not have completed yet.
|
|
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
|
|
|
// Posting more tasks should fail.
|
|
EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
|
|
SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
|
|
EXPECT_FALSE(runner->PostTask(
|
|
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
|
|
EXPECT_FALSE(sequenced_runner->PostTask(
|
|
FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
|
|
|
|
// Continue the background thread and make sure the tasks can complete.
|
|
blocker.Unblock(3);
|
|
std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
|
|
EXPECT_EQ(3u, result.size());
|
|
}
|
|
|
|
// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
|
|
// until they stop, but tasks not yet started do not.
|
|
TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
|
|
// Start tasks to take all the threads and block them.
|
|
EnsureAllWorkersCreated();
|
|
ThreadBlocker blocker;
|
|
|
|
// Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
|
|
// return until these tasks have completed.
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
|
|
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
|
|
}
|
|
tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
|
|
|
// Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
|
|
// executed once Shutdown() has been called.
|
|
pool()->PostWorkerTaskWithShutdownBehavior(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::BlockTask,
|
|
tracker(), 0, &blocker),
|
|
SequencedWorkerPool::SKIP_ON_SHUTDOWN);
|
|
|
|
// This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
|
|
// been started block shutdown.
|
|
SetWillWaitForShutdownCallback(
|
|
base::Bind(&EnsureTasksToCompleteCountAndUnblock,
|
|
scoped_refptr<TestTracker>(tracker()), 0,
|
|
&blocker, kNumWorkerThreads));
|
|
|
|
// No tasks should have completed yet.
|
|
EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
|
|
|
// This should not block. If this test hangs, it means it failed.
|
|
pool()->Shutdown();
|
|
|
|
// Shutdown should not return until all of the tasks have completed.
|
|
std::vector<int> result =
|
|
tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
|
|
|
|
// Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
|
|
// allowed to complete. No additional non-blocking tasks should have been
|
|
// started.
|
|
ASSERT_EQ(kNumWorkerThreads, result.size());
|
|
for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
|
EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
|
|
result.end());
|
|
}
|
|
}
|
|
|
|
// Ensure all worker threads are created, and then trigger a spurious
|
|
// work signal. This shouldn't cause any other work signals to be
|
|
// triggered. This is a regression test for http://crbug.com/117469.
|
|
TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
|
|
EnsureAllWorkersCreated();
|
|
int old_has_work_call_count = has_work_call_count();
|
|
pool()->SignalHasWorkForTesting();
|
|
// This is inherently racy, but can only produce false positives.
|
|
base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
|
|
EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
|
|
}
|
|
|
|
void IsRunningOnCurrentThreadTask(
|
|
SequencedWorkerPool::SequenceToken test_positive_token,
|
|
SequencedWorkerPool::SequenceToken test_negative_token,
|
|
SequencedWorkerPool* pool,
|
|
SequencedWorkerPool* unused_pool) {
|
|
EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
|
|
EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
|
|
EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
|
|
EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
|
|
EXPECT_FALSE(
|
|
unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
|
|
EXPECT_FALSE(
|
|
unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
|
|
}
|
|
|
|
// Verify correctness of the IsRunningSequenceOnCurrentThread method.
|
|
TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
|
|
SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
|
|
SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
|
|
SequencedWorkerPool::SequenceToken unsequenced_token;
|
|
|
|
scoped_refptr<SequencedWorkerPool> unused_pool =
|
|
new SequencedWorkerPool(2, "unused_pool");
|
|
|
|
EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
|
|
EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
|
|
EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
|
|
EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
|
|
EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
|
|
EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
|
|
EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
|
|
EXPECT_FALSE(
|
|
unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
|
|
|
|
pool()->PostSequencedWorkerTask(
|
|
token1, FROM_HERE,
|
|
base::Bind(&IsRunningOnCurrentThreadTask,
|
|
token1, token2, pool(), unused_pool));
|
|
pool()->PostSequencedWorkerTask(
|
|
token2, FROM_HERE,
|
|
base::Bind(&IsRunningOnCurrentThreadTask,
|
|
token2, unsequenced_token, pool(), unused_pool));
|
|
pool()->PostWorkerTask(
|
|
FROM_HERE,
|
|
base::Bind(&IsRunningOnCurrentThreadTask,
|
|
unsequenced_token, token1, pool(), unused_pool));
|
|
pool()->Shutdown();
|
|
unused_pool->Shutdown();
|
|
}
|
|
|
|
// Verify that FlushForTesting works as intended.
|
|
TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
|
|
// Should be fine to call on a new instance.
|
|
pool()->FlushForTesting();
|
|
|
|
// Queue up a bunch of work, including a long delayed task and
|
|
// a task that produces additional tasks as an artifact.
|
|
pool()->PostDelayedWorkerTask(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 0),
|
|
TimeDelta::FromMinutes(5));
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::SlowTask, tracker(), 0));
|
|
const size_t kNumFastTasks = 20;
|
|
for (size_t i = 0; i < kNumFastTasks; i++) {
|
|
pool()->PostWorkerTask(FROM_HERE,
|
|
base::Bind(&TestTracker::FastTask, tracker(), 0));
|
|
}
|
|
pool()->PostWorkerTask(
|
|
FROM_HERE,
|
|
base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
|
|
true));
|
|
|
|
// We expect all except the delayed task to have been run. We verify all
|
|
// closures have been deleted by looking at the refcount of the
|
|
// tracker.
|
|
EXPECT_FALSE(tracker()->HasOneRef());
|
|
pool()->FlushForTesting();
|
|
EXPECT_TRUE(tracker()->HasOneRef());
|
|
EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
|
|
|
|
// Should be fine to call on an idle instance with all threads created, and
|
|
// spamming the method shouldn't deadlock or confuse the class.
|
|
pool()->FlushForTesting();
|
|
pool()->FlushForTesting();
|
|
|
|
// Should be fine to call after shutdown too.
|
|
pool()->Shutdown();
|
|
pool()->FlushForTesting();
|
|
}
|
|
|
|
TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
|
|
MessageLoop loop;
|
|
scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
|
|
scoped_refptr<SequencedTaskRunner> task_runner =
|
|
pool->GetSequencedTaskRunnerWithShutdownBehavior(
|
|
pool->GetSequenceToken(),
|
|
base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
|
|
|
|
// Upon test exit, should shut down without hanging.
|
|
pool->Shutdown();
|
|
}
|
|
|
|
class SequencedWorkerPoolTaskRunnerTestDelegate {
|
|
public:
|
|
SequencedWorkerPoolTaskRunnerTestDelegate() {}
|
|
|
|
~SequencedWorkerPoolTaskRunnerTestDelegate() {}
|
|
|
|
void StartTaskRunner() {
|
|
pool_owner_.reset(
|
|
new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
|
|
}
|
|
|
|
scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
|
|
return pool_owner_->pool();
|
|
}
|
|
|
|
void StopTaskRunner() {
|
|
// Make sure all tasks are run before shutting down. Delayed tasks are
|
|
// not run, they're simply deleted.
|
|
pool_owner_->pool()->FlushForTesting();
|
|
pool_owner_->pool()->Shutdown();
|
|
// Don't reset |pool_owner_| here, as the test may still hold a
|
|
// reference to the pool.
|
|
}
|
|
|
|
bool TaskRunnerHandlesNonZeroDelays() const {
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
MessageLoop message_loop_;
|
|
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
|
|
};
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(
|
|
SequencedWorkerPool, TaskRunnerTest,
|
|
SequencedWorkerPoolTaskRunnerTestDelegate);
|
|
|
|
class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
|
|
public:
|
|
SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
|
|
|
|
~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
|
|
}
|
|
|
|
void StartTaskRunner() {
|
|
pool_owner_.reset(
|
|
new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
|
|
task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
|
|
SequencedWorkerPool::BLOCK_SHUTDOWN);
|
|
}
|
|
|
|
scoped_refptr<TaskRunner> GetTaskRunner() {
|
|
return task_runner_;
|
|
}
|
|
|
|
void StopTaskRunner() {
|
|
// Make sure all tasks are run before shutting down. Delayed tasks are
|
|
// not run, they're simply deleted.
|
|
pool_owner_->pool()->FlushForTesting();
|
|
pool_owner_->pool()->Shutdown();
|
|
// Don't reset |pool_owner_| here, as the test may still hold a
|
|
// reference to the pool.
|
|
}
|
|
|
|
bool TaskRunnerHandlesNonZeroDelays() const {
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
MessageLoop message_loop_;
|
|
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
|
|
scoped_refptr<TaskRunner> task_runner_;
|
|
};
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(
|
|
SequencedWorkerPoolTaskRunner, TaskRunnerTest,
|
|
SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
|
|
|
|
class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
|
|
public:
|
|
SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
|
|
|
|
~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
|
|
}
|
|
|
|
void StartTaskRunner() {
|
|
pool_owner_.reset(new SequencedWorkerPoolOwner(
|
|
10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
|
|
task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
|
|
pool_owner_->pool()->GetSequenceToken());
|
|
}
|
|
|
|
scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
|
|
return task_runner_;
|
|
}
|
|
|
|
void StopTaskRunner() {
|
|
// Make sure all tasks are run before shutting down. Delayed tasks are
|
|
// not run, they're simply deleted.
|
|
pool_owner_->pool()->FlushForTesting();
|
|
pool_owner_->pool()->Shutdown();
|
|
// Don't reset |pool_owner_| here, as the test may still hold a
|
|
// reference to the pool.
|
|
}
|
|
|
|
bool TaskRunnerHandlesNonZeroDelays() const {
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
MessageLoop message_loop_;
|
|
scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
|
|
scoped_refptr<SequencedTaskRunner> task_runner_;
|
|
};
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(
|
|
SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
|
|
SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
|
|
|
|
INSTANTIATE_TYPED_TEST_CASE_P(
|
|
SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
|
|
SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
|
|
|
|
} // namespace
|
|
|
|
} // namespace base
|