42 #define QMP_UNIQUE_SYMBOL_HELPER2(prefix, line) prefix##_uniqueSymbol##line
43 #define QMP_UNIQUE_SYMBOL_HELPER1(prefix, line) QMP_UNIQUE_SYMBOL_HELPER2(prefix, line)
44 #define QMP_UNIQUE_SYMBOL(prefix) QMP_UNIQUE_SYMBOL_HELPER1(prefix, __LINE__)
65 #define QMP_PARALLEL_FOR(indexName, loopFirstIndex, ...) \
67 qmp_internal::ParallelTaskManager::instance().setLoopIndices( \
68 loopFirstIndex, __VA_ARGS__); \
69 static class QMP_UNIQUE_SYMBOL(ParallelTaskSubclass) : \
70 public qmp_internal::ParallelTask \
73 virtual void run(int QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex), \
74 int QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex), \
75 const unsigned int parallelForLoopThreadIndexUniqueSymbol, \
76 int QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
78 for (int indexName = QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex); \
79 indexName <= QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex); \
80 indexName += QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
84 #define QMP_END_PARALLEL_FOR \
87 }QMP_UNIQUE_SYMBOL(Instance); \
88 qmp_internal::ParallelTaskManager::instance().process( \
89 &QMP_UNIQUE_SYMBOL(Instance)); \
98 #define QMP_SET_NUM_THREADS(numThreads) \
99 qmp_internal::ParallelTaskManager::instance().setNumThreads(numThreads)
104 #define QMP_GET_NUM_THREADS \
105 qmp_internal::ParallelTaskManager::instance().getNumThreads
109 #define QMP_GET_MAX_THREADS \
110 qmp_internal::ParallelTaskManager::instance().getMaxThreads
115 #define QMP_THREAD_NUM parallelForLoopThreadIndexUniqueSymbol
118 #define QMP_GET_NUM_PROCS \
119 qmp_internal::ParallelTaskManager::instance().getNumProcessors
122 #define QMP_IN_PARALLEL \
123 qmp_internal::ParallelTaskManager::instance().inParallel
131 #define QMP_CRITICAL \
132 qmp_internal::ParallelTaskManager::instance().criticalSectionBegin
138 #define QMP_END_CRITICAL \
139 qmp_internal::ParallelTaskManager::instance().criticalSectionEnd
144 #define QMP_BARRIER \
145 qmp_internal::ParallelTaskManager::instance().barrier
163 #define QMP_SHARE(variableName) static void* variableName##_tempImportCopy = NULL; \
164 variableName##_tempImportCopy = (void*)&variableName;
178 #define QMP_USE_SHARED(variableName, ...) __VA_ARGS__& variableName = \
179 *((__VA_ARGS__*)variableName##_tempImportCopy);
202 namespace qmp_internal
215 virtual void run(
int firstIndex,
int lastIndex,
216 const unsigned int threadIndex,
int indexIncrement) = 0;
257 inline void setLoopIndices(
int loopFirstIndex,
unsigned int numIterations);
300 inline void destroy();
304 bool mInParallelSection;
305 bool mShouldWorkerThreadsExit;
307 unsigned int mNumThreads;
308 unsigned int mBarrierCount;
309 int* mTaskFirstIndices;
310 int* mTaskLastIndices;
311 int mTaskIndexIncrement;
321 #if defined(WIN32) || defined(_WIN32) || defined (__WIN32) || defined(__WIN32__) \
322 || defined (_WIN64) || defined(__CYGWIN__) || defined(__MINGW32__)
323 #define QMP_USE_WINDOWS_THREADS
326 #elif defined(__APPLE__)
330 #include <sys/sysctl.h>
331 #elif defined(unix) || defined(__unix) || defined(__unix__)
335 #include <sys/sysinfo.h>
337 #error This development environment does not support pthreads or windows threads
344 #define QMP_ASSERT(condition)\
348 std::cout << "[QuickMP] Assertion failed in " << __FUNCTION__ \
349 << "(line " << __LINE__ << "): assert(" << #condition << ")" \
355 namespace qmp_internal
361 #ifdef QMP_USE_WINDOWS_THREADS
362 barrierEventToggle =
false;
363 barrierEvent1 = NULL;
364 barrierEvent2 = NULL;
365 threadHandles = NULL;
372 #ifdef QMP_USE_WINDOWS_THREADS
376 CRITICAL_SECTION barrierCriticalSection;
377 bool barrierEventToggle;
378 HANDLE barrierEvent1;
379 HANDLE barrierEvent2;
380 CRITICAL_SECTION csVectorCriticalSection;
381 std::vector<CRITICAL_SECTION*> userCriticalSections;
382 HANDLE* threadHandles;
395 #ifdef QMP_USE_WINDOWS_THREADS
408 unsigned int myIndex = (
unsigned int)((uintptr_t)threadIndex);
434 #ifdef QMP_USE_WINDOWS_THREADS
475 mNumThreads = numThreads;
482 unsigned int numWorkerThreads = numThreads - 1;
484 mTaskFirstIndices =
new int[numThreads];
485 mTaskLastIndices =
new int[numThreads];
486 for (
unsigned int i = 0; i < numThreads; ++i)
488 mTaskFirstIndices[i] = 0;
489 mTaskLastIndices[i] = 0;
491 mTaskIndexIncrement = 0;
495 #ifdef QMP_USE_WINDOWS_THREADS
496 InitializeCriticalSection(&mPlatform->barrierCriticalSection);
499 bool manualReset =
true;
500 bool startSignaled =
false;
501 mPlatform->barrierEvent1 = CreateEvent(NULL, manualReset,
502 startSignaled, NULL);
503 mPlatform->barrierEvent2 = CreateEvent(NULL, manualReset,
504 startSignaled, NULL);
506 InitializeCriticalSection(&mPlatform->csVectorCriticalSection);
537 mPlatform->threadHandles =
new HANDLE[numThreads];
538 mPlatform->threadIDs =
new DWORD[numThreads];
540 mPlatform->threadHandles[0] = 0;
541 mPlatform->threadIDs[0] = GetCurrentThreadId();
542 for (
unsigned int threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
544 mPlatform->threadHandles[threadIndex] =
546 (
void*)threadIndex, 0, (
unsigned int*)&mPlatform->
547 threadIDs[threadIndex]);
548 QMP_ASSERT(0 != mPlatform->threadHandles[threadIndex])
552 int returnCode = pthread_mutex_init(&mPlatform->
barrierMutex, NULL);
571 pthread_attr_t threadAttributes;
572 returnCode = pthread_attr_init(&threadAttributes);
574 returnCode = pthread_attr_setdetachstate(&threadAttributes,
575 PTHREAD_CREATE_JOINABLE);
578 mPlatform->
threads =
new pthread_t[numThreads];
579 mPlatform->
threads[0] = pthread_self();
580 for (uintptr_t threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
582 returnCode = pthread_create(&mPlatform->
threads[threadIndex],
587 returnCode = pthread_attr_destroy(&threadAttributes);
597 if (mInParallelSection)
614 #ifdef QMP_USE_WINDOWS_THREADS
615 SYSTEM_INFO systemInfo;
616 GetSystemInfo(&systemInfo);
617 return (
unsigned int)systemInfo.dwNumberOfProcessors;
618 #elif defined (__APPLE__)
619 int numProcessors = 0;
620 size_t size =
sizeof(numProcessors);
621 int returnCode = sysctlbyname(
"hw.ncpu", &numProcessors, &size, NULL, 0);
624 std::cout <<
"[QuickMP] WARNING: Cannot determine number of "
625 <<
"processors, defaulting to 1" << std::endl;
630 return (
unsigned int)numProcessors;
653 return (
unsigned int)get_nprocs_conf();
659 return mInParallelSection;
670 if (1 == mNumThreads)
672 mTaskFirstIndices[0] = loopFirstIndex;
673 mTaskLastIndices[0] = loopFirstIndex + (int)numIterations - 1;
674 mTaskIndexIncrement = 1;
691 unsigned int numIterationsPerThread = numIterations / mNumThreads;
692 unsigned int numRemainderIterations = numIterations % mNumThreads;
693 int currentFirstIndex = loopFirstIndex;
694 for (
unsigned int i = 0; i < mNumThreads; ++i)
696 mTaskFirstIndices[i] = currentFirstIndex;
699 unsigned int numIterationsForThisThread = numIterationsPerThread;
700 if (i < numRemainderIterations)
702 ++numIterationsForThisThread;
706 mTaskLastIndices[i] = currentFirstIndex +
707 (int)numIterationsForThisThread - 1;
708 currentFirstIndex = mTaskLastIndices[i] + 1;
710 mTaskIndexIncrement = 1;
717 for (
unsigned int i = 0; i < mNumThreads; ++i)
719 mTaskFirstIndices[i] = loopFirstIndex + i;
720 mTaskLastIndices[i] = loopFirstIndex + numIterations - 1;
722 mTaskIndexIncrement = mNumThreads;
730 unsigned int numIterations)
737 mInParallelSection =
true;
753 mInParallelSection =
false;
758 mCurrentTask->
run(mTaskFirstIndices[threadIndex],
759 mTaskLastIndices[threadIndex], threadIndex, mTaskIndexIncrement);
773 #ifdef QMP_USE_WINDOWS_THREADS
774 if (
id >= mPlatform->userCriticalSections.size())
777 EnterCriticalSection(&mPlatform->csVectorCriticalSection);
778 while (
id >= mPlatform->userCriticalSections.size())
780 CRITICAL_SECTION* cs =
new CRITICAL_SECTION;
781 mPlatform->userCriticalSections.push_back(cs);
782 InitializeCriticalSection(cs);
784 LeaveCriticalSection(&mPlatform->csVectorCriticalSection);
786 EnterCriticalSection(mPlatform->userCriticalSections[
id]);
795 pthread_mutex_t* mutex =
new pthread_mutex_t;
797 returnCode = pthread_mutex_init(mutex, NULL);
804 int returnCode = pthread_mutex_lock(mPlatform->
userMutexes[
id]);
817 #ifdef QMP_USE_WINDOWS_THREADS
818 if (
id >= mPlatform->userCriticalSections.size())
820 std::cout <<
"[QuickMP] WARNING: Critical section 'end' (id="
821 <<
id <<
") has no matching 'begin'" << std::endl;
825 LeaveCriticalSection(mPlatform->userCriticalSections[
id]);
830 std::cout <<
"[QuickMP] WARNING: Critical section 'end' (id="
831 <<
id <<
") has no matching 'begin'" << std::endl;
835 int returnCode = pthread_mutex_unlock(mPlatform->
userMutexes[
id]);
850 #ifdef QMP_USE_WINDOWS_THREADS
851 EnterCriticalSection(&mPlatform->barrierCriticalSection);
853 int returnCode = pthread_mutex_lock(&mPlatform->
barrierMutex);
858 if (mBarrierCount == mNumThreads)
864 #ifdef QMP_USE_WINDOWS_THREADS
868 if (mPlatform->barrierEventToggle)
870 SetEvent(mPlatform->barrierEvent1);
874 SetEvent(mPlatform->barrierEvent2);
876 mPlatform->barrierEventToggle = !mPlatform->barrierEventToggle;
877 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
884 returnCode = pthread_mutex_unlock(&mPlatform->
barrierMutex);
892 #ifdef QMP_USE_WINDOWS_THREADS
894 if (1 == mBarrierCount)
896 if (mPlatform->barrierEventToggle)
898 ResetEvent(mPlatform->barrierEvent1);
902 ResetEvent(mPlatform->barrierEvent2);
906 if (mPlatform->barrierEventToggle)
908 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
909 WaitForSingleObject(mPlatform->barrierEvent1, INFINITE);
913 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
914 WaitForSingleObject(mPlatform->barrierEvent2, INFINITE);
922 returnCode = pthread_mutex_unlock(&mPlatform->
barrierMutex);
935 return mShouldWorkerThreadsExit;
938 ParallelTaskManager::ParallelTaskManager()
941 mInitialized =
false;
942 mInParallelSection =
false;
943 mShouldWorkerThreadsExit =
false;
947 mTaskFirstIndices = NULL;
948 mTaskLastIndices = NULL;
949 mTaskIndexIncrement = 0;
952 ParallelTaskManager::~ParallelTaskManager()
961 void ParallelTaskManager::destroy()
969 #ifdef QMP_USE_WINDOWS_THREADS
970 if (mNumThreads > 1 && GetCurrentThreadId() != mPlatform->threadIDs[0])
972 if (mNumThreads > 1 && !pthread_equal(pthread_self(), mPlatform->
threads[0]))
984 mShouldWorkerThreadsExit =
true;
987 #ifdef QMP_USE_WINDOWS_THREADS
992 for (
unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
994 DWORD returnCode = WaitForSingleObject(mPlatform->
995 threadHandles[threadIndex], INFINITE);
1001 for (
unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
1003 int returnCode = pthread_join(mPlatform->
threads[threadIndex], NULL);
1010 #ifdef QMP_USE_WINDOWS_THREADS
1011 DeleteCriticalSection(&mPlatform->barrierCriticalSection);
1013 mPlatform->barrierEventToggle =
false;
1015 BOOL returnCode2 = CloseHandle(mPlatform->barrierEvent1);
1017 mPlatform->barrierEvent1 = NULL;
1019 returnCode2 = CloseHandle(mPlatform->barrierEvent2);
1021 mPlatform->barrierEvent2 = NULL;
1023 DeleteCriticalSection(&mPlatform->csVectorCriticalSection);
1026 for (
unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
1028 int returnCode = CloseHandle(mPlatform->
1029 threadHandles[threadIndex]);
1032 delete [] mPlatform->threadHandles;
1033 mPlatform->threadHandles = NULL;
1035 delete [] mPlatform->threadIDs;
1036 mPlatform->threadIDs = NULL;
1038 while (!mPlatform->userCriticalSections.empty())
1040 DeleteCriticalSection(mPlatform->userCriticalSections.back());
1041 delete mPlatform->userCriticalSections.back();
1042 mPlatform->userCriticalSections.pop_back();
1048 int returnCode = pthread_mutex_destroy(&mPlatform->
barrierMutex);
1059 int returnCode = pthread_mutex_destroy(mPlatform->
userMutexes.back());
1067 mInitialized =
false;
1068 mInParallelSection =
false;
1069 mShouldWorkerThreadsExit =
false;
1070 mCurrentTask = NULL;
1074 if (mTaskFirstIndices)
1076 delete [] mTaskFirstIndices;
1077 mTaskFirstIndices = NULL;
1079 if (mTaskLastIndices)
1081 delete [] mTaskLastIndices;
1082 mTaskLastIndices = NULL;
1085 mTaskIndexIncrement = 0;
unsigned int getNumProcessors() const
Returns the number of processors in the current machine at runtime.
Definition: quickmp.h:612
This is the default.
Definition: quickmp.h:191
Distributes loop iterations among threads in an interleaved manner, similar to the OpenMP "static" sc...
Definition: quickmp.h:197
void barrier()
Defines a barrier routine used to synchronize threads.
Definition: quickmp.h:841
bool shouldWorkerThreadsExit() const
Returns true if the main thread has requested the worker threads to exit.
Definition: quickmp.h:933
void processSubset(unsigned int threadIndex)
Called by individual threads to process a subset of the loop iterations.
Definition: quickmp.h:756
void setLoopIndices(int loopFirstIndex, unsigned int numIterations, quickmp::ScheduleHint scheduleHint)
Defines the range of the loop index.
Definition: quickmp.h:662
void criticalSectionEnd(unsigned int id)
Defines the end of a critical section used for synchronization.
Definition: quickmp.h:809
static ParallelTaskManager & instance()
Provides access to the singleton instance.
Definition: quickmp.h:456
unsigned int getMaxThreads() const
Returns the total number of threads allocated for use in all parallel for loops.
Definition: quickmp.h:607
unsigned int getNumThreads() const
Returns the number of threads currently being used.
Definition: quickmp.h:595
virtual ~ParallelTask()
Definition: quickmp.h:212
A base class for parallel task classes which are defined by a set of macros.
Definition: quickmp.h:209
PlatformThreadObjects * getPlatformThreadObjects()
Provides access to the internal platform-specific data, like thread handles and synchronization objec...
Definition: quickmp.h:928
void * threadRoutine(void *threadIndex)
The routine to be executed by the threads.
Definition: quickmp.h:398
bool inParallel() const
Returns true if called within a parallel for loop and false otherwise.
Definition: quickmp.h:657
ScheduleHint
Types of loop scheduling methods.
Definition: quickmp.h:185
void criticalSectionBegin(unsigned int id)
Defines the beginning of a critical section used for synchronization.
Definition: quickmp.h:762
virtual void run(int firstIndex, int lastIndex, const unsigned int threadIndex, int indexIncrement)=0
The function which is executed by each thread with different indices.
void setNumThreads(unsigned int numThreads=0)
Specifies the number of threads to use in subsequent parallel for loops.
Definition: quickmp.h:462
A singleton class to manage parallel code tasks.
Definition: quickmp.h:221
#define QMP_ASSERT(condition)
Assert macro.
Definition: quickmp.h:344
void process(ParallelTask *task)
Unleashes the threads on the new task/loop.
Definition: quickmp.h:735