// Copyright 2009-2020 Intel Corporation // SPDX-License-Identifier: Apache-2.0 #include "taskschedulerinternal.h" #include "../math/math.h" #include "../sys/sysinfo.h" #include namespace embree { RTC_NAMESPACE_BEGIN static MutexSys g_mutex; size_t TaskScheduler::g_numThreads = 0; __thread TaskScheduler* TaskScheduler::g_instance = nullptr; std::vector> g_instance_vector; __thread TaskScheduler::Thread* TaskScheduler::thread_local_thread = nullptr; TaskScheduler::ThreadPool* TaskScheduler::threadPool = nullptr; template __forceinline void TaskScheduler::steal_loop(Thread& thread, const Predicate& pred, const Body& body) { while (true) { /*! some rounds that yield */ for (size_t i=0; i<32; i++) { /*! some spinning rounds */ const size_t threadCount = thread.threadCount(); for (size_t j=0; j<1024; j+=threadCount) { if (!pred()) return; if (thread.scheduler->steal_from_other_threads(thread)) { i=j=0; body(); } } yield(); } } } /*! run this task */ void TaskScheduler::Task::run_internal (Thread& thread) // FIXME: avoid as many dll_exports as possible { /* try to run if not already stolen */ if (try_switch_state(INITIALIZED,DONE)) { Task* prevTask = thread.task; thread.task = this; try { if (thread.scheduler->cancellingException == nullptr) closure->execute(); } catch (...) { if (thread.scheduler->cancellingException == nullptr) thread.scheduler->cancellingException = std::current_exception(); } thread.task = prevTask; add_dependencies(-1); } /* steal until all dependencies have completed */ steal_loop(thread, [&] () { return dependencies>0; }, [&] () { while (thread.tasks.execute_local_internal(thread,this)); }); /* now signal our parent task that we are finished */ if (parent) parent->add_dependencies(-1); } /*! run this task */ dll_export void TaskScheduler::Task::run (Thread& thread) { run_internal(thread); } bool TaskScheduler::TaskQueue::execute_local_internal(Thread& thread, Task* parent) { /* stop if we run out of local tasks or reach the waiting task */ if (right == 0 || &tasks[right-1] == parent) return false; /* execute task */ size_t oldRight = right; tasks[right-1].run_internal(thread); if (right != oldRight) { THROW_RUNTIME_ERROR("you have to wait for spawned subtasks"); } /* pop task and closure from stack */ right--; if (tasks[right].stackPtr != size_t(-1)) stackPtr = tasks[right].stackPtr; /* also move left pointer */ if (left >= right) left.store(right.load()); return right != 0; } dll_export bool TaskScheduler::TaskQueue::execute_local(Thread& thread, Task* parent) { return execute_local_internal(thread,parent); } bool TaskScheduler::TaskQueue::steal(Thread& thread) { size_t l = left; size_t r = right; if (l < r) { l = left++; if (l >= r) return false; } else return false; if (!tasks[l].try_steal(thread.tasks.tasks[thread.tasks.right])) return false; thread.tasks.right++; return true; } /* we steal from the left */ size_t TaskScheduler::TaskQueue::getTaskSizeAtLeft() { if (left >= right) return 0; return tasks[left].N; } void threadPoolFunction(std::pair* pair) { TaskScheduler::ThreadPool* pool = pair->first; size_t threadIndex = pair->second; delete pair; pool->thread_loop(threadIndex); } TaskScheduler::ThreadPool::ThreadPool(bool set_affinity) : numThreads(0), numThreadsRunning(0), set_affinity(set_affinity), running(false) {} dll_export void TaskScheduler::ThreadPool::startThreads() { if (running) return; setNumThreads(numThreads,true); } void TaskScheduler::ThreadPool::setNumThreads(size_t newNumThreads, bool startThreads) { Lock lock(g_mutex); assert(newNumThreads); newNumThreads = min(newNumThreads, (size_t) getNumberOfLogicalThreads()); numThreads = newNumThreads; if (!startThreads && !running) return; running = true; size_t numThreadsActive = numThreadsRunning; mutex.lock(); numThreadsRunning = newNumThreads; mutex.unlock(); condition.notify_all(); /* start new threads */ for (size_t t=numThreadsActive; t(this,t); threads.push_back(createThread((thread_func)threadPoolFunction,pair,4*1024*1024,set_affinity ? t : -1)); } /* stop some threads if we reduce the number of threads */ for (ssize_t t=numThreadsActive-1; t>=ssize_t(numThreadsRunning); t--) { if (t == 0) continue; embree::join(threads.back()); threads.pop_back(); } } TaskScheduler::ThreadPool::~ThreadPool() { /* leave all taskschedulers */ mutex.lock(); numThreadsRunning = 0; mutex.unlock(); condition.notify_all(); /* wait for threads to terminate */ for (size_t i=0; i& scheduler) { mutex.lock(); schedulers.push_back(scheduler); mutex.unlock(); condition.notify_all(); } dll_export void TaskScheduler::ThreadPool::remove(const Ref& scheduler) { Lock lock(mutex); for (std::list >::iterator it = schedulers.begin(); it != schedulers.end(); it++) { if (scheduler == *it) { schedulers.erase(it); return; } } } void TaskScheduler::ThreadPool::thread_loop(size_t globalThreadIndex) { while (globalThreadIndex < numThreadsRunning) { Ref scheduler = NULL; ssize_t threadIndex = -1; { Lock lock(mutex); condition.wait(mutex, [&] () { return globalThreadIndex >= numThreadsRunning || !schedulers.empty(); }); if (globalThreadIndex >= numThreadsRunning) break; scheduler = schedulers.front(); threadIndex = scheduler->allocThreadIndex(); } scheduler->thread_loop(threadIndex); } } TaskScheduler::TaskScheduler() : threadCounter(0), anyTasksRunning(0), hasRootTask(false) { threadLocal.resize(2*getNumberOfLogicalThreads()); // FIXME: this has to be 2x as in the compatibility join mode with rtcCommitScene the worker threads also join. When disallowing rtcCommitScene to join a build we can remove the 2x. for (size_t i=0; ithreadIndex; else return 0; } dll_export size_t TaskScheduler::threadIndex() { Thread* thread = TaskScheduler::thread(); if (thread) return thread->threadIndex; else return 0; } dll_export size_t TaskScheduler::threadCount() { return threadPool->size(); } dll_export TaskScheduler* TaskScheduler::instance() { if (g_instance == NULL) { Lock lock(g_mutex); g_instance = new TaskScheduler; g_instance_vector.push_back(g_instance); } return g_instance; } void TaskScheduler::create(size_t numThreads, bool set_affinity, bool start_threads) { if (!threadPool) threadPool = new TaskScheduler::ThreadPool(set_affinity); threadPool->setNumThreads(numThreads,start_threads); } void TaskScheduler::destroy() { delete threadPool; threadPool = nullptr; } dll_export ssize_t TaskScheduler::allocThreadIndex() { size_t threadIndex = threadCounter++; assert(threadIndex < threadLocal.size()); return threadIndex; } void TaskScheduler::join() { mutex.lock(); size_t threadIndex = allocThreadIndex(); condition.wait(mutex, [&] () { return hasRootTask.load(); }); mutex.unlock(); std::exception_ptr except = thread_loop(threadIndex); if (except != nullptr) std::rethrow_exception(except); } void TaskScheduler::reset() { hasRootTask = false; } void TaskScheduler::wait_for_threads(size_t threadCount) { while (threadCounter < threadCount-1) pause_cpu(); } dll_export TaskScheduler::Thread* TaskScheduler::thread() { return thread_local_thread; } dll_export TaskScheduler::Thread* TaskScheduler::swapThread(Thread* thread) { Thread* old = thread_local_thread; thread_local_thread = thread; return old; } dll_export bool TaskScheduler::wait() { Thread* thread = TaskScheduler::thread(); if (thread == nullptr) return true; while (thread->tasks.execute_local_internal(*thread,thread->task)) {}; return thread->scheduler->cancellingException == nullptr; } std::exception_ptr TaskScheduler::thread_loop(size_t threadIndex) { /* allocate thread structure */ std::unique_ptr mthread(new Thread(threadIndex,this)); // too large for stack allocation Thread& thread = *mthread; threadLocal[threadIndex].store(&thread); Thread* oldThread = swapThread(&thread); /* main thread loop */ while (anyTasksRunning) { steal_loop(thread, [&] () { return anyTasksRunning > 0; }, [&] () { anyTasksRunning++; while (thread.tasks.execute_local_internal(thread,nullptr)); anyTasksRunning--; }); } threadLocal[threadIndex].store(nullptr); swapThread(oldThread); /* remember exception to throw */ std::exception_ptr except = nullptr; if (cancellingException != nullptr) except = cancellingException; /* wait for all threads to terminate */ threadCounter--; #if defined(__WIN32__) size_t loopIndex = 1; #endif #define LOOP_YIELD_THRESHOLD (4096) while (threadCounter > 0) { #if defined(__WIN32__) if ((loopIndex % LOOP_YIELD_THRESHOLD) == 0) yield(); else _mm_pause(); loopIndex++; #else yield(); #endif } return except; } bool TaskScheduler::steal_from_other_threads(Thread& thread) { const size_t threadIndex = thread.threadIndex; const size_t threadCount = this->threadCounter; for (size_t i=1; i= threadCount) otherThreadIndex -= threadCount; Thread* othread = threadLocal[otherThreadIndex].load(); if (!othread) continue; if (othread->tasks.steal(thread)) return true; } return false; } dll_export void TaskScheduler::startThreads() { threadPool->startThreads(); } dll_export void TaskScheduler::addScheduler(const Ref& scheduler) { threadPool->add(scheduler); } dll_export void TaskScheduler::removeScheduler(const Ref& scheduler) { threadPool->remove(scheduler); } RTC_NAMESPACE_END }