#include "LinearMath/btMinMax.h" #include "LinearMath/btAlignedObjectArray.h" #include "LinearMath/btThreads.h" #include "LinearMath/btQuickprof.h" #include #include #if BT_THREADSAFE #include "btThreadSupportInterface.h" #if defined( _WIN32 ) #define WIN32_LEAN_AND_MEAN #include #endif typedef unsigned long long btU64; static const int kCacheLineSize = 64; void btSpinPause() { #if defined( _WIN32 ) YieldProcessor(); #endif } struct WorkerThreadStatus { enum Type { kInvalid, kWaitingForWork, kWorking, kSleeping, }; }; ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives { static const int kMaxThreadCount = BT_MAX_THREAD_COUNT; // directives for all worker threads packed into a single cacheline char m_threadDirs[kMaxThreadCount]; public: enum Type { kInvalid, kGoToSleep, // go to sleep kStayAwakeButIdle, // wait for not checking job queue kScanForJobs, // actively scan job queue for jobs }; WorkerThreadDirectives() { for ( int i = 0; i < kMaxThreadCount; ++i ) { m_threadDirs[ i ] = 0; } } Type getDirective(int threadId) { btAssert(threadId < kMaxThreadCount); return static_cast(m_threadDirs[threadId]); } void setDirectiveByRange(int threadBegin, int threadEnd, Type dir) { btAssert( threadBegin < threadEnd ); btAssert( threadEnd <= kMaxThreadCount ); char dirChar = static_cast(dir); for ( int i = threadBegin; i < threadEnd; ++i ) { m_threadDirs[ i ] = dirChar; } } }; class JobQueue; ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage { int m_threadId; WorkerThreadStatus::Type m_status; int m_numJobsFinished; btSpinMutex m_mutex; btScalar m_sumResult; WorkerThreadDirectives * m_directive; JobQueue* m_queue; btClock* m_clock; unsigned int m_cooldownTime; }; struct IJob { virtual void executeJob(int threadId) = 0; }; class ParallelForJob : public IJob { const btIParallelForBody* m_body; int m_begin; int m_end; public: ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body ) { m_body = &body; m_begin = iBegin; m_end = iEnd; } virtual void executeJob(int threadId) BT_OVERRIDE { BT_PROFILE( "executeJob" ); // call the functor body to do the work m_body->forLoop( m_begin, m_end ); } }; class ParallelSumJob : public IJob { const btIParallelSumBody* m_body; ThreadLocalStorage* m_threadLocalStoreArray; int m_begin; int m_end; public: ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls ) { m_body = &body; m_threadLocalStoreArray = tls; m_begin = iBegin; m_end = iEnd; } virtual void executeJob( int threadId ) BT_OVERRIDE { BT_PROFILE( "executeJob" ); // call the functor body to do the work btScalar val = m_body->sumLoop( m_begin, m_end ); #if BT_PARALLEL_SUM_DETERMINISTISM // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision) const float TRUNC_SCALE = float(1<<19); val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits #endif m_threadLocalStoreArray[threadId].m_sumResult += val; } }; ATTRIBUTE_ALIGNED64(class) JobQueue { btThreadSupportInterface* m_threadSupport; btCriticalSection* m_queueLock; btSpinMutex m_mutex; btAlignedObjectArray m_jobQueue; char* m_jobMem; int m_jobMemSize; bool m_queueIsEmpty; int m_tailIndex; int m_headIndex; int m_allocSize; bool m_useSpinMutex; btAlignedObjectArray m_neighborContexts; char m_cachePadding[kCacheLineSize]; // prevent false sharing void freeJobMem() { if ( m_jobMem ) { // free old btAlignedFree(m_jobMem); m_jobMem = NULL; } } void resizeJobMem(int newSize) { if (newSize > m_jobMemSize) { freeJobMem(); m_jobMem = static_cast(btAlignedAlloc(newSize, kCacheLineSize)); m_jobMemSize = newSize; } } public: JobQueue() { m_jobMem = NULL; m_jobMemSize = 0; m_threadSupport = NULL; m_queueLock = NULL; m_headIndex = 0; m_tailIndex = 0; m_useSpinMutex = false; } ~JobQueue() { exit(); } void exit() { freeJobMem(); if (m_queueLock && m_threadSupport) { m_threadSupport->deleteCriticalSection(m_queueLock); m_queueLock = NULL; m_threadSupport = 0; } } void init(btThreadSupportInterface* threadSup, btAlignedObjectArray* contextArray) { m_threadSupport = threadSup; if (threadSup) { m_queueLock = m_threadSupport->createCriticalSection(); } setupJobStealing(contextArray, contextArray->size()); } void setupJobStealing(btAlignedObjectArray* contextArray, int numActiveContexts) { btAlignedObjectArray& contexts = *contextArray; int selfIndex = 0; for (int i = 0; i < contexts.size(); ++i) { if ( this == &contexts[ i ] ) { selfIndex = i; break; } } int numNeighbors = btMin(2, contexts.size() - 1); int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3}; int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]); m_neighborContexts.reserve( numNeighbors ); m_neighborContexts.resizeNoInitialize(0); for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++) { int neighborIndex = selfIndex + neighborOffsets[i]; if ( neighborIndex >= 0 && neighborIndex < numActiveContexts) { m_neighborContexts.push_back( &contexts[ neighborIndex ] ); } } } bool isQueueEmpty() const {return m_queueIsEmpty;} void lockQueue() { if ( m_useSpinMutex ) { m_mutex.lock(); } else { m_queueLock->lock(); } } void unlockQueue() { if ( m_useSpinMutex ) { m_mutex.unlock(); } else { m_queueLock->unlock(); } } void clearQueue(int jobCount, int jobSize) { lockQueue(); m_headIndex = 0; m_tailIndex = 0; m_allocSize = 0; m_queueIsEmpty = true; int jobBufSize = jobSize * jobCount; // make sure we have enough memory allocated to store jobs if ( jobBufSize > m_jobMemSize ) { resizeJobMem( jobBufSize ); } // make sure job queue is big enough if ( jobCount > m_jobQueue.capacity() ) { m_jobQueue.reserve( jobCount ); } unlockQueue(); m_jobQueue.resizeNoInitialize( 0 ); } void* allocJobMem(int jobSize) { btAssert(m_jobMemSize >= (m_allocSize + jobSize)); void* jobMem = &m_jobMem[m_allocSize]; m_allocSize += jobSize; return jobMem; } void submitJob( IJob* job ) { btAssert( reinterpret_cast( job ) >= &m_jobMem[ 0 ] && reinterpret_cast( job ) < &m_jobMem[ 0 ] + m_allocSize ); m_jobQueue.push_back( job ); lockQueue(); m_tailIndex++; m_queueIsEmpty = false; unlockQueue(); } IJob* consumeJobFromOwnQueue() { if ( m_queueIsEmpty ) { // lock free path. even if this is taken erroneously it isn't harmful return NULL; } IJob* job = NULL; lockQueue(); if ( !m_queueIsEmpty ) { job = m_jobQueue[ m_headIndex++ ]; btAssert( reinterpret_cast( job ) >= &m_jobMem[ 0 ] && reinterpret_cast( job ) < &m_jobMem[ 0 ] + m_allocSize ); if ( m_headIndex == m_tailIndex ) { m_queueIsEmpty = true; } } unlockQueue(); return job; } IJob* consumeJob() { if (IJob* job = consumeJobFromOwnQueue()) { return job; } // own queue is empty, try to steal from neighbor for (int i = 0; i < m_neighborContexts.size(); ++i) { JobQueue* otherContext = m_neighborContexts[ i ]; if ( IJob* job = otherContext->consumeJobFromOwnQueue() ) { return job; } } return NULL; } }; static void WorkerThreadFunc( void* userPtr ) { BT_PROFILE( "WorkerThreadFunc" ); ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr; JobQueue* jobQueue = localStorage->m_queue; bool shouldSleep = false; int threadId = localStorage->m_threadId; while (! shouldSleep) { // do work localStorage->m_mutex.lock(); while ( IJob* job = jobQueue->consumeJob() ) { localStorage->m_status = WorkerThreadStatus::kWorking; job->executeJob( threadId ); localStorage->m_numJobsFinished++; } localStorage->m_status = WorkerThreadStatus::kWaitingForWork; localStorage->m_mutex.unlock(); btU64 clockStart = localStorage->m_clock->getTimeMicroseconds(); // while queue is empty, while (jobQueue->isQueueEmpty()) { // todo: spin wait a bit to avoid hammering the empty queue btSpinPause(); if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep ) { shouldSleep = true; break; } // if jobs are incoming, if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs ) { clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock } else { for ( int i = 0; i < 50; ++i ) { btSpinPause(); btSpinPause(); btSpinPause(); btSpinPause(); if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty()) { break; } } // if no jobs incoming and queue has been empty for the cooldown time, sleep btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart; if (timeElapsed > localStorage->m_cooldownTime) { shouldSleep = true; break; } } } } { BT_PROFILE("sleep"); // go sleep localStorage->m_mutex.lock(); localStorage->m_status = WorkerThreadStatus::kSleeping; localStorage->m_mutex.unlock(); } } class btTaskSchedulerDefault : public btITaskScheduler { btThreadSupportInterface* m_threadSupport; WorkerThreadDirectives* m_workerDirective; btAlignedObjectArray m_jobQueues; btAlignedObjectArray m_perThreadJobQueues; btAlignedObjectArray m_threadLocalStorage; btSpinMutex m_antiNestingLock; // prevent nested parallel-for btClock m_clock; int m_numThreads; int m_numWorkerThreads; int m_numActiveJobQueues; int m_maxNumThreads; int m_numJobs; static const int kFirstWorkerThreadId = 1; public: btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport") { m_threadSupport = NULL; m_workerDirective = NULL; } virtual ~btTaskSchedulerDefault() { waitForWorkersToSleep(); for ( int i = 0; i < m_jobQueues.size(); ++i ) { m_jobQueues[i].exit(); } if (m_threadSupport) { delete m_threadSupport; m_threadSupport = NULL; } if (m_workerDirective) { btAlignedFree(m_workerDirective); m_workerDirective = NULL; } } void init() { btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc ); m_threadSupport = btThreadSupportInterface::create( constructionInfo ); m_workerDirective = static_cast(btAlignedAlloc(sizeof(*m_workerDirective), 64)); m_numWorkerThreads = m_threadSupport->getNumWorkerThreads(); m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1; m_numThreads = m_maxNumThreads; // ideal to have one job queue for each physical processor (except for the main thread which needs no queue) int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio(); int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue); m_jobQueues.resize(numJobQueues); m_numActiveJobQueues = numJobQueues; for ( int i = 0; i < m_jobQueues.size(); ++i ) { m_jobQueues[i].init( m_threadSupport, &m_jobQueues ); } m_perThreadJobQueues.resize(m_numThreads); for ( int i = 0; i < m_numThreads; i++ ) { JobQueue* jq = NULL; // only worker threads get a job queue if (i > 0) { if (numThreadsPerQueue == 1) { // one queue per worker thread jq = &m_jobQueues[ i - kFirstWorkerThreadId ]; } else { // 2 threads share each queue jq = &m_jobQueues[ i / numThreadsPerQueue ]; } } m_perThreadJobQueues[i] = jq; } m_threadLocalStorage.resize(m_numThreads); for ( int i = 0; i < m_numThreads; i++ ) { ThreadLocalStorage& storage = m_threadLocalStorage[i]; storage.m_threadId = i; storage.m_directive = m_workerDirective; storage.m_status = WorkerThreadStatus::kSleeping; storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do storage.m_clock = &m_clock; storage.m_queue = m_perThreadJobQueues[i]; } setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() ); } void setWorkerDirectives(WorkerThreadDirectives::Type dir) { m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir); } virtual int getMaxNumThreads() const BT_OVERRIDE { return m_maxNumThreads; } virtual int getNumThreads() const BT_OVERRIDE { return m_numThreads; } virtual void setNumThreads( int numThreads ) BT_OVERRIDE { m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 ); m_numWorkerThreads = m_numThreads - 1; m_numActiveJobQueues = 0; // if there is at least 1 worker, if ( m_numWorkerThreads > 0 ) { // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ]; int iLastActiveContext = lastActiveContext - &m_jobQueues[0]; m_numActiveJobQueues = iLastActiveContext + 1; for ( int i = 0; i < m_jobQueues.size(); ++i ) { m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues ); } } m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep); } void waitJobs() { BT_PROFILE( "waitJobs" ); // have the main thread work until the job queues are empty int numMainThreadJobsFinished = 0; for ( int i = 0; i < m_numActiveJobQueues; ++i ) { while ( IJob* job = m_jobQueues[i].consumeJob() ) { job->executeJob( 0 ); numMainThreadJobsFinished++; } } // done with jobs for now, tell workers to rest (but not sleep) setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle ); btU64 clockStart = m_clock.getTimeMicroseconds(); // wait for workers to finish any jobs in progress while ( true ) { int numWorkerJobsFinished = 0; for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread ) { ThreadLocalStorage* storage = &m_threadLocalStorage[iThread]; storage->m_mutex.lock(); numWorkerJobsFinished += storage->m_numJobsFinished; storage->m_mutex.unlock(); } if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs) { break; } btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart; btAssert(timeElapsed < 1000); if (timeElapsed > 100000) { break; } btSpinPause(); } } void wakeWorkers(int numWorkersToWake) { BT_PROFILE( "wakeWorkers" ); btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs ); int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads); int numActiveWorkers = 0; for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) { // note this count of active workers is not necessarily totally reliable, because a worker thread could be // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare. ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ]; if (storage.m_status != WorkerThreadStatus::kSleeping) { numActiveWorkers++; } } for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker ) { ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ]; if (storage.m_status == WorkerThreadStatus::kSleeping) { m_threadSupport->runTask( iWorker, &storage ); numActiveWorkers++; } } } void waitForWorkersToSleep() { BT_PROFILE( "waitForWorkersToSleep" ); setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); m_threadSupport->waitForAllTasks(); for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ ) { ThreadLocalStorage& storage = m_threadLocalStorage[i]; btAssert( storage.m_status == WorkerThreadStatus::kSleeping ); } } virtual void sleepWorkerThreadsHint() BT_OVERRIDE { BT_PROFILE( "sleepWorkerThreadsHint" ); // hint the task scheduler that we may not be using these threads for a little while setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); } void prepareWorkerThreads() { for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i ) { ThreadLocalStorage& storage = m_threadLocalStorage[i]; storage.m_mutex.lock(); storage.m_numJobsFinished = 0; storage.m_mutex.unlock(); } setWorkerDirectives( WorkerThreadDirectives::kScanForJobs ); } virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE { BT_PROFILE( "parallelFor_ThreadSupport" ); btAssert( iEnd >= iBegin ); btAssert( grainSize >= 1 ); int iterationCount = iEnd - iBegin; if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() ) { typedef ParallelForJob JobType; int jobCount = ( iterationCount + grainSize - 1 ) / grainSize; m_numJobs = jobCount; btAssert( jobCount >= 2 ); // need more than one job for multithreading int jobSize = sizeof( JobType ); for (int i = 0; i < m_numActiveJobQueues; ++i) { m_jobQueues[i].clearQueue( jobCount, jobSize ); } // prepare worker threads for incoming work prepareWorkerThreads(); // submit all of the jobs int iJob = 0; int iThread = kFirstWorkerThreadId; // first worker thread for ( int i = iBegin; i < iEnd; i += grainSize ) { btAssert( iJob < jobCount ); int iE = btMin( i + grainSize, iEnd ); JobQueue* jq = m_perThreadJobQueues[ iThread ]; btAssert(jq); btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); void* jobMem = jq->allocJobMem(jobSize); JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new jq->submitJob( job ); iJob++; iThread++; if ( iThread >= m_numThreads ) { iThread = kFirstWorkerThreadId; // first worker thread } } wakeWorkers( jobCount - 1 ); // put the main thread to work on emptying the job queue and then wait for all workers to finish waitJobs(); m_antiNestingLock.unlock(); } else { BT_PROFILE( "parallelFor_mainThread" ); // just run on main thread body.forLoop( iBegin, iEnd ); } } virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE { BT_PROFILE( "parallelSum_ThreadSupport" ); btAssert( iEnd >= iBegin ); btAssert( grainSize >= 1 ); int iterationCount = iEnd - iBegin; if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() ) { typedef ParallelSumJob JobType; int jobCount = ( iterationCount + grainSize - 1 ) / grainSize; m_numJobs = jobCount; btAssert( jobCount >= 2 ); // need more than one job for multithreading int jobSize = sizeof( JobType ); for (int i = 0; i < m_numActiveJobQueues; ++i) { m_jobQueues[i].clearQueue( jobCount, jobSize ); } // initialize summation for ( int iThread = 0; iThread < m_numThreads; ++iThread ) { m_threadLocalStorage[iThread].m_sumResult = btScalar(0); } // prepare worker threads for incoming work prepareWorkerThreads(); // submit all of the jobs int iJob = 0; int iThread = kFirstWorkerThreadId; // first worker thread for ( int i = iBegin; i < iEnd; i += grainSize ) { btAssert( iJob < jobCount ); int iE = btMin( i + grainSize, iEnd ); JobQueue* jq = m_perThreadJobQueues[ iThread ]; btAssert(jq); btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); void* jobMem = jq->allocJobMem(jobSize); JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new jq->submitJob( job ); iJob++; iThread++; if ( iThread >= m_numThreads ) { iThread = kFirstWorkerThreadId; // first worker thread } } wakeWorkers( jobCount - 1 ); // put the main thread to work on emptying the job queue and then wait for all workers to finish waitJobs(); // add up all the thread sums btScalar sum = btScalar(0); for ( int iThread = 0; iThread < m_numThreads; ++iThread ) { sum += m_threadLocalStorage[ iThread ].m_sumResult; } m_antiNestingLock.unlock(); return sum; } else { BT_PROFILE( "parallelSum_mainThread" ); // just run on main thread return body.sumLoop( iBegin, iEnd ); } } }; btITaskScheduler* btCreateDefaultTaskScheduler() { btTaskSchedulerDefault* ts = new btTaskSchedulerDefault(); ts->init(); return ts; } #else // #if BT_THREADSAFE btITaskScheduler* btCreateDefaultTaskScheduler() { return NULL; } #endif // #else // #if BT_THREADSAFE