00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef QUICK_MP_H
00024 #define QUICK_MP_H
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #define QMP_UNIQUE_SYMBOL_HELPER2(prefix, line) prefix##_uniqueSymbol##line
00041 #define QMP_UNIQUE_SYMBOL_HELPER1(prefix, line) QMP_UNIQUE_SYMBOL_HELPER2(prefix, line)
00042 #define QMP_UNIQUE_SYMBOL(prefix) QMP_UNIQUE_SYMBOL_HELPER1(prefix, __LINE__)
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 #define QMP_PARALLEL_FOR(indexName, loopFirstIndex, ...) \
00064 { \
00065 qmp_internal::ParallelTaskManager::instance().setLoopIndices( \
00066 loopFirstIndex, __VA_ARGS__); \
00067 static class QMP_UNIQUE_SYMBOL(ParallelTaskSubclass) : \
00068 public qmp_internal::ParallelTask \
00069 { \
00070 public: \
00071 virtual void run(int QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex), \
00072 int QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex), \
00073 const unsigned int parallelForLoopThreadIndexUniqueSymbol, \
00074 int QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
00075 { \
00076 for (int indexName = QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex); \
00077 indexName <= QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex); \
00078 indexName += QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
00079 {
00080
00081
00082 #define QMP_END_PARALLEL_FOR \
00083 } \
00084 } \
00085 }QMP_UNIQUE_SYMBOL(Instance); \
00086 qmp_internal::ParallelTaskManager::instance().process( \
00087 &QMP_UNIQUE_SYMBOL(Instance)); \
00088 }
00089
00090
00091
00092
00093
00094
00095
00096 #define QMP_SET_NUM_THREADS(numThreads) \
00097 qmp_internal::ParallelTaskManager::instance().setNumThreads(numThreads)
00098
00099
00100
00101
00102 #define QMP_GET_NUM_THREADS \
00103 qmp_internal::ParallelTaskManager::instance().getNumThreads
00104
00105
00106
00107 #define QMP_GET_MAX_THREADS \
00108 qmp_internal::ParallelTaskManager::instance().getMaxThreads
00109
00110
00111
00112
00113 #define QMP_THREAD_NUM parallelForLoopThreadIndexUniqueSymbol
00114
00115
00116 #define QMP_GET_NUM_PROCS \
00117 qmp_internal::ParallelTaskManager::instance().getNumProcessors
00118
00119
00120 #define QMP_IN_PARALLEL \
00121 qmp_internal::ParallelTaskManager::instance().inParallel
00122
00123
00124
00125
00126
00127
00128
00129 #define QMP_CRITICAL \
00130 qmp_internal::ParallelTaskManager::instance().criticalSectionBegin
00131
00132
00133
00134
00135
00136 #define QMP_END_CRITICAL \
00137 qmp_internal::ParallelTaskManager::instance().criticalSectionEnd
00138
00139
00140
00141
00142 #define QMP_BARRIER \
00143 qmp_internal::ParallelTaskManager::instance().barrier
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161 #define QMP_SHARE(variableName) static void* variableName##_tempImportCopy = NULL; \
00162 variableName##_tempImportCopy = (void*)&variableName;
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176 #define QMP_USE_SHARED(variableName, ...) __VA_ARGS__& variableName = \
00177 *((__VA_ARGS__*)variableName##_tempImportCopy);
00178
00179
00180 namespace quickmp
00181 {
00182
00183 enum ScheduleHint
00184 {
00185
00186
00187
00188
00189 SEQUENTIAL,
00190
00191
00192
00193
00194
00195 INTERLEAVED
00196 };
00197 }
00198
00199
00200 namespace qmp_internal
00201 {
00202
00203 struct PlatformThreadObjects;
00204
00205
00206
00207 class ParallelTask
00208 {
00209 public:
00210 virtual ~ParallelTask(){}
00211
00212
00213 virtual void run(int firstIndex, int lastIndex,
00214 const unsigned int threadIndex, int indexIncrement) = 0;
00215 };
00216
00217
00218
00219 class ParallelTaskManager
00220 {
00221 public:
00222
00223 inline static ParallelTaskManager& instance();
00224
00225
00226
00227
00228
00229 inline void setNumThreads(unsigned int numThreads=0);
00230
00231
00232
00233
00234
00235 inline unsigned int getNumThreads()const;
00236
00237
00238
00239 inline unsigned int getMaxThreads()const;
00240
00241
00242 inline unsigned int getNumProcessors()const;
00243
00244
00245
00246 inline bool inParallel()const;
00247
00248
00249
00250
00251 inline void setLoopIndices(int loopFirstIndex,
00252 unsigned int numIterations, quickmp::ScheduleHint scheduleHint);
00253
00254
00255 inline void setLoopIndices(int loopFirstIndex, unsigned int numIterations);
00256
00257
00258 inline void process(ParallelTask* task);
00259
00260
00261
00262 inline void processSubset(unsigned int threadIndex);
00263
00264
00265
00266
00267
00268
00269
00270 inline void criticalSectionBegin(unsigned int id);
00271
00272
00273
00274
00275
00276 inline void criticalSectionEnd(unsigned int id);
00277
00278
00279
00280 inline void barrier();
00281
00282
00283
00284
00285 inline PlatformThreadObjects* getPlatformThreadObjects();
00286
00287
00288
00289 inline bool shouldWorkerThreadsExit()const;
00290
00291 private:
00292 inline ParallelTaskManager();
00293
00294 inline ~ParallelTaskManager();
00295
00296
00297
00298 inline void destroy();
00299
00300 PlatformThreadObjects* mPlatform;
00301 bool mInitialized;
00302 bool mInParallelSection;
00303 bool mShouldWorkerThreadsExit;
00304 ParallelTask* mCurrentTask;
00305 unsigned int mNumThreads;
00306 unsigned int mBarrierCount;
00307 int* mTaskFirstIndices;
00308 int* mTaskLastIndices;
00309 int mTaskIndexIncrement;
00310 };
00311 }
00312
00313
00314
00315
00316
00317
00318
00319 #if defined(WIN32) || defined(_WIN32) || defined (__WIN32) || defined(__WIN32__) \
00320 || defined (_WIN64) || defined(__CYGWIN__) || defined(__MINGW32__)
00321 #define QMP_USE_WINDOWS_THREADS
00322 #include <windows.h>
00323 #include <process.h>
00324 #elif defined(__APPLE__)
00325 #include <pthread.h>
00326
00327
00328 #include <sys/sysctl.h>
00329 #elif defined(unix) || defined(__unix) || defined(__unix__)
00330 #include <pthread.h>
00331
00332
00333 #include <sys/sysinfo.h>
00334 #else
00335 #error This development environment does not support pthreads or windows threads
00336 #endif
00337
00338 #include <iostream>
00339 #include <vector>
00340
00341
00342 #define QMP_ASSERT(condition)\
00343 {\
00344 if (!(condition))\
00345 {\
00346 std::cout << "[QuickMP] Assertion failed in " << __FUNCTION__ \
00347 << "(line " << __LINE__ << "): assert(" << #condition << ")" \
00348 << std::endl;\
00349 ::exit(1);\
00350 }\
00351 }
00352
00353 namespace qmp_internal
00354 {
00355 struct PlatformThreadObjects
00356 {
00357 PlatformThreadObjects()
00358 {
00359 #ifdef QMP_USE_WINDOWS_THREADS
00360 barrierEventToggle = false;
00361 barrierEvent1 = NULL;
00362 barrierEvent2 = NULL;
00363 threadHandles = NULL;
00364 threadIDs = NULL;
00365 #else
00366 threads = NULL;
00367 #endif
00368 }
00369
00370 #ifdef QMP_USE_WINDOWS_THREADS
00371
00372
00373
00374 CRITICAL_SECTION barrierCriticalSection;
00375 bool barrierEventToggle;
00376 HANDLE barrierEvent1;
00377 HANDLE barrierEvent2;
00378 CRITICAL_SECTION csVectorCriticalSection;
00379 std::vector<CRITICAL_SECTION*> userCriticalSections;
00380 HANDLE* threadHandles;
00381 DWORD* threadIDs;
00382 #else
00383 pthread_t* threads;
00384 pthread_mutex_t barrierMutex;
00385 pthread_cond_t barrierCondition;
00386 pthread_mutex_t mutexVectorMutex;
00387 std::vector<pthread_mutex_t*> userMutexes;
00388 #endif
00389 };
00390
00391
00392
00393 #ifdef QMP_USE_WINDOWS_THREADS
00394 inline unsigned __stdcall threadRoutine(void* threadIndex)
00395 #else
00396 inline void* threadRoutine(void* threadIndex)
00397 #endif
00398 {
00399
00400
00401
00402
00403
00404
00405
00406 unsigned int myIndex = (unsigned int)((unsigned long int)threadIndex);
00407
00408
00409
00410 while (true)
00411 {
00412
00413
00414
00415
00416 ParallelTaskManager::instance().barrier();
00417
00418 if (ParallelTaskManager::instance().shouldWorkerThreadsExit())
00419 {
00420
00421 break;
00422 }
00423 else
00424 {
00425
00426 ParallelTaskManager::instance().processSubset(myIndex);
00427 }
00428
00429 ParallelTaskManager::instance().barrier();
00430 }
00431
00432 #ifdef QMP_USE_WINDOWS_THREADS
00433
00434
00435
00436
00437
00438
00439 return 0;
00440 #else
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450 return NULL;
00451 #endif
00452 }
00453
00454 ParallelTaskManager& ParallelTaskManager::instance()
00455 {
00456 static ParallelTaskManager self;
00457 return self;
00458 }
00459
00460 void ParallelTaskManager::setNumThreads(unsigned int numThreads)
00461 {
00462 if (mInitialized)
00463 {
00464 destroy();
00465 }
00466
00467 if (0 == numThreads)
00468 {
00469
00470 numThreads = getNumProcessors();
00471 }
00472
00473 mNumThreads = numThreads;
00474
00475
00476
00477
00478
00479 QMP_ASSERT(numThreads > 0);
00480 unsigned int numWorkerThreads = numThreads - 1;
00481
00482 mTaskFirstIndices = new int[numThreads];
00483 mTaskLastIndices = new int[numThreads];
00484 for (unsigned int i = 0; i < numThreads; ++i)
00485 {
00486 mTaskFirstIndices[i] = 0;
00487 mTaskLastIndices[i] = 0;
00488 }
00489 mTaskIndexIncrement = 0;
00490
00491 if (numThreads > 1)
00492 {
00493 #ifdef QMP_USE_WINDOWS_THREADS
00494 InitializeCriticalSection(&mPlatform->barrierCriticalSection);
00495
00496
00497 bool manualReset = true;
00498 bool startSignaled = false;
00499 mPlatform->barrierEvent1 = CreateEvent(NULL, manualReset,
00500 startSignaled, NULL);
00501 mPlatform->barrierEvent2 = CreateEvent(NULL, manualReset,
00502 startSignaled, NULL);
00503
00504 InitializeCriticalSection(&mPlatform->csVectorCriticalSection);
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535 mPlatform->threadHandles = new HANDLE[numThreads];
00536 mPlatform->threadIDs = new DWORD[numThreads];
00537
00538 mPlatform->threadHandles[0] = 0;
00539 mPlatform->threadIDs[0] = GetCurrentThreadId();
00540 for (unsigned int threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
00541 {
00542 mPlatform->threadHandles[threadIndex] =
00543 (HANDLE)_beginthreadex(NULL, 0, threadRoutine,
00544 (void*)threadIndex, 0, (unsigned int*)&mPlatform->
00545 threadIDs[threadIndex]);
00546 QMP_ASSERT(0 != mPlatform->threadHandles[threadIndex])
00547 }
00548 #else
00549
00550 int returnCode = pthread_mutex_init(&mPlatform->barrierMutex, NULL);
00551 QMP_ASSERT(0 == returnCode);
00552 returnCode = pthread_cond_init(&mPlatform->barrierCondition, NULL);
00553 QMP_ASSERT(0 == returnCode);
00554 returnCode = pthread_mutex_init(&mPlatform->mutexVectorMutex, NULL);
00555 QMP_ASSERT(0 == returnCode);
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569 pthread_attr_t threadAttributes;
00570 returnCode = pthread_attr_init(&threadAttributes);
00571 QMP_ASSERT(0 == returnCode);
00572 returnCode = pthread_attr_setdetachstate(&threadAttributes,
00573 PTHREAD_CREATE_JOINABLE);
00574 QMP_ASSERT(0 == returnCode);
00575
00576 mPlatform->threads = new pthread_t[numThreads];
00577 mPlatform->threads[0] = pthread_self();
00578 for (unsigned int threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
00579 {
00580 returnCode = pthread_create(&mPlatform->threads[threadIndex],
00581 &threadAttributes, threadRoutine, (void*)threadIndex);
00582 QMP_ASSERT(0 == returnCode);
00583 }
00584
00585 returnCode = pthread_attr_destroy(&threadAttributes);
00586 QMP_ASSERT(0 == returnCode);
00587 #endif
00588 }
00589
00590 mInitialized = true;
00591 }
00592
00593 unsigned int ParallelTaskManager::getNumThreads()const
00594 {
00595 if (mInParallelSection)
00596 {
00597 return mNumThreads;
00598 }
00599 else
00600 {
00601 return 1;
00602 }
00603 }
00604
00605 unsigned int ParallelTaskManager::getMaxThreads()const
00606 {
00607 return mNumThreads;
00608 }
00609
00610 unsigned int ParallelTaskManager::getNumProcessors()const
00611 {
00612 #ifdef QMP_USE_WINDOWS_THREADS
00613 SYSTEM_INFO systemInfo;
00614 GetSystemInfo(&systemInfo);
00615 return (unsigned int)systemInfo.dwNumberOfProcessors;
00616 #elif defined (__APPLE__)
00617 int numProcessors = 0;
00618 size_t size = sizeof(numProcessors);
00619 int returnCode = sysctlbyname("hw.ncpu", &numProcessors, &size, NULL, 0);
00620 if (0 != returnCode)
00621 {
00622 std::cout << "[QuickMP] WARNING: Cannot determine number of "
00623 << "processors, defaulting to 1" << std::endl;
00624 return 1;
00625 }
00626 else
00627 {
00628 return (unsigned int)numProcessors;
00629 }
00630 #else
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651 return (unsigned int)get_nprocs_conf();
00652 #endif
00653 }
00654
00655 bool ParallelTaskManager::inParallel()const
00656 {
00657 return mInParallelSection;
00658 }
00659
00660 void ParallelTaskManager::setLoopIndices(int loopFirstIndex,
00661 unsigned int numIterations, quickmp::ScheduleHint scheduleHint)
00662 {
00663 if (!mInitialized)
00664 {
00665 setNumThreads();
00666 }
00667
00668 if (1 == mNumThreads)
00669 {
00670 mTaskFirstIndices[0] = loopFirstIndex;
00671 mTaskLastIndices[0] = loopFirstIndex + (int)numIterations - 1;
00672 mTaskIndexIncrement = 1;
00673 return;
00674 }
00675
00676
00677
00678
00679
00680 switch(scheduleHint)
00681 {
00682 case quickmp::SEQUENTIAL:
00683 {
00684
00685
00686
00687
00688
00689 unsigned int numIterationsPerThread = numIterations / mNumThreads;
00690 unsigned int numRemainderIterations = numIterations % mNumThreads;
00691 int currentFirstIndex = loopFirstIndex;
00692 for (unsigned int i = 0; i < mNumThreads; ++i)
00693 {
00694 mTaskFirstIndices[i] = currentFirstIndex;
00695
00696
00697 unsigned int numIterationsForThisThread = numIterationsPerThread;
00698 if (i < numRemainderIterations)
00699 {
00700 ++numIterationsForThisThread;
00701 }
00702
00703
00704 mTaskLastIndices[i] = currentFirstIndex +
00705 (int)numIterationsForThisThread - 1;
00706 currentFirstIndex = mTaskLastIndices[i] + 1;
00707 }
00708 mTaskIndexIncrement = 1;
00709 break;
00710 }
00711 case quickmp::INTERLEAVED:
00712
00713
00714
00715 for (unsigned int i = 0; i < mNumThreads; ++i)
00716 {
00717 mTaskFirstIndices[i] = loopFirstIndex + i;
00718 mTaskLastIndices[i] = loopFirstIndex + numIterations - 1;
00719 }
00720 mTaskIndexIncrement = mNumThreads;
00721 break;
00722 default:
00723 break;
00724 }
00725 }
00726
00727 void ParallelTaskManager::setLoopIndices(int loopFirstIndex,
00728 unsigned int numIterations)
00729 {
00730 setLoopIndices(loopFirstIndex, numIterations, quickmp::SEQUENTIAL);
00731 }
00732
00733 void ParallelTaskManager::process(ParallelTask* task)
00734 {
00735 mInParallelSection = true;
00736 QMP_ASSERT(!mCurrentTask);
00737 mCurrentTask = task;
00738
00739
00740
00741
00742
00743 barrier();
00744
00745
00746 processSubset(0);
00747
00748 barrier();
00749
00750 mCurrentTask = NULL;
00751 mInParallelSection = false;
00752 }
00753
00754 void ParallelTaskManager::processSubset(unsigned int threadIndex)
00755 {
00756 mCurrentTask->run(mTaskFirstIndices[threadIndex],
00757 mTaskLastIndices[threadIndex], threadIndex, mTaskIndexIncrement);
00758 }
00759
00760 void ParallelTaskManager::criticalSectionBegin(unsigned int id)
00761 {
00762
00763 if (mNumThreads < 2)
00764 {
00765 return;
00766 }
00767
00768
00769
00770
00771 #ifdef QMP_USE_WINDOWS_THREADS
00772 if (id >= mPlatform->userCriticalSections.size())
00773 {
00774
00775 EnterCriticalSection(&mPlatform->csVectorCriticalSection);
00776 while (id >= mPlatform->userCriticalSections.size())
00777 {
00778 CRITICAL_SECTION* cs = new CRITICAL_SECTION;
00779 mPlatform->userCriticalSections.push_back(cs);
00780 InitializeCriticalSection(cs);
00781 }
00782 LeaveCriticalSection(&mPlatform->csVectorCriticalSection);
00783 }
00784 EnterCriticalSection(mPlatform->userCriticalSections[id]);
00785 #else
00786 if (id >= mPlatform->userMutexes.size())
00787 {
00788
00789 int returnCode = pthread_mutex_lock(&mPlatform->mutexVectorMutex);
00790 QMP_ASSERT(0 == returnCode);
00791 while (id >= mPlatform->userMutexes.size())
00792 {
00793 pthread_mutex_t* mutex = new pthread_mutex_t;
00794 mPlatform->userMutexes.push_back(mutex);
00795 returnCode = pthread_mutex_init(mutex, NULL);
00796 QMP_ASSERT(0 == returnCode);
00797
00798 }
00799 returnCode = pthread_mutex_unlock(&mPlatform->mutexVectorMutex);
00800 QMP_ASSERT(0 == returnCode);
00801 }
00802 int returnCode = pthread_mutex_lock(mPlatform->userMutexes[id]);
00803 QMP_ASSERT(0 == returnCode);
00804 #endif
00805 }
00806
00807 void ParallelTaskManager::criticalSectionEnd(unsigned int id)
00808 {
00809
00810 if (mNumThreads < 2)
00811 {
00812 return;
00813 }
00814
00815 #ifdef QMP_USE_WINDOWS_THREADS
00816 if (id >= mPlatform->userCriticalSections.size())
00817 {
00818 std::cout << "[QuickMP] WARNING: Critical section 'end' (id="
00819 << id << ") has no matching 'begin'" << std::endl;
00820 }
00821 else
00822 {
00823 LeaveCriticalSection(mPlatform->userCriticalSections[id]);
00824 }
00825 #else
00826 if (id >= mPlatform->userMutexes.size())
00827 {
00828 std::cout << "[QuickMP] WARNING: Critical section 'end' (id="
00829 << id << ") has no matching 'begin'" << std::endl;
00830 }
00831 else
00832 {
00833 int returnCode = pthread_mutex_unlock(mPlatform->userMutexes[id]);
00834 QMP_ASSERT(0 == returnCode);
00835 }
00836 #endif
00837 }
00838
00839 void ParallelTaskManager::barrier()
00840 {
00841
00842 if (mNumThreads < 2)
00843 {
00844 return;
00845 }
00846
00847
00848 #ifdef QMP_USE_WINDOWS_THREADS
00849 EnterCriticalSection(&mPlatform->barrierCriticalSection);
00850 #else
00851 int returnCode = pthread_mutex_lock(&mPlatform->barrierMutex);
00852 QMP_ASSERT(0 == returnCode);
00853 #endif
00854
00855 ++mBarrierCount;
00856 if (mBarrierCount == mNumThreads)
00857 {
00858
00859
00860 mBarrierCount = 0;
00861
00862 #ifdef QMP_USE_WINDOWS_THREADS
00863
00864
00865
00866 if (mPlatform->barrierEventToggle)
00867 {
00868 SetEvent(mPlatform->barrierEvent1);
00869 }
00870 else
00871 {
00872 SetEvent(mPlatform->barrierEvent2);
00873 }
00874 mPlatform->barrierEventToggle = !mPlatform->barrierEventToggle;
00875 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
00876 #else
00877
00878
00879
00880 returnCode = pthread_cond_broadcast(&mPlatform->barrierCondition);
00881 QMP_ASSERT(0 == returnCode);
00882 returnCode = pthread_mutex_unlock(&mPlatform->barrierMutex);
00883 QMP_ASSERT(0 == returnCode);
00884 #endif
00885 }
00886 else
00887 {
00888
00889
00890 #ifdef QMP_USE_WINDOWS_THREADS
00891
00892 if (1 == mBarrierCount)
00893 {
00894 if (mPlatform->barrierEventToggle)
00895 {
00896 ResetEvent(mPlatform->barrierEvent1);
00897 }
00898 else
00899 {
00900 ResetEvent(mPlatform->barrierEvent2);
00901 }
00902 }
00903
00904 if (mPlatform->barrierEventToggle)
00905 {
00906 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
00907 WaitForSingleObject(mPlatform->barrierEvent1, INFINITE);
00908 }
00909 else
00910 {
00911 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
00912 WaitForSingleObject(mPlatform->barrierEvent2, INFINITE);
00913 }
00914 #else
00915
00916
00917 returnCode = pthread_cond_wait(&mPlatform->barrierCondition,
00918 &mPlatform->barrierMutex);
00919 QMP_ASSERT(0 == returnCode);
00920 returnCode = pthread_mutex_unlock(&mPlatform->barrierMutex);
00921 QMP_ASSERT(0 == returnCode);
00922 #endif
00923 }
00924 }
00925
00926 PlatformThreadObjects* ParallelTaskManager::getPlatformThreadObjects()
00927 {
00928 return mPlatform;
00929 }
00930
00931 bool ParallelTaskManager::shouldWorkerThreadsExit()const
00932 {
00933 return mShouldWorkerThreadsExit;
00934 }
00935
00936 ParallelTaskManager::ParallelTaskManager()
00937 {
00938 mPlatform = new PlatformThreadObjects();
00939 mInitialized = false;
00940 mInParallelSection = false;
00941 mShouldWorkerThreadsExit = false;
00942 mCurrentTask = NULL;
00943 mNumThreads = 0;
00944 mBarrierCount = 0;
00945 mTaskFirstIndices = NULL;
00946 mTaskLastIndices = NULL;
00947 mTaskIndexIncrement = 0;
00948 }
00949
00950 ParallelTaskManager::~ParallelTaskManager()
00951 {
00952
00953
00954
00955 destroy();
00956 delete mPlatform;
00957 }
00958
00959 void ParallelTaskManager::destroy()
00960 {
00961
00962
00963
00964
00965
00966
00967 #ifdef QMP_USE_WINDOWS_THREADS
00968 if (mNumThreads > 1 && GetCurrentThreadId() != mPlatform->threadIDs[0])
00969 #else
00970 if (mNumThreads > 1 && !pthread_equal(pthread_self(), mPlatform->threads[0]))
00971 #endif
00972 {
00973 return;
00974 }
00975
00976
00977 if (mNumThreads > 1)
00978 {
00979
00980
00981
00982 mShouldWorkerThreadsExit = true;
00983 barrier();
00984
00985 #ifdef QMP_USE_WINDOWS_THREADS
00986
00987
00988
00989
00990 for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
00991 {
00992 DWORD returnCode = WaitForSingleObject(mPlatform->
00993 threadHandles[threadIndex], INFINITE);
00994 QMP_ASSERT(WAIT_OBJECT_0 == returnCode);
00995 }
00996 #else
00997
00998
00999 for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
01000 {
01001 int returnCode = pthread_join(mPlatform->threads[threadIndex], NULL);
01002 QMP_ASSERT(0 == returnCode);
01003 }
01004 #endif
01005
01006
01007
01008 #ifdef QMP_USE_WINDOWS_THREADS
01009 DeleteCriticalSection(&mPlatform->barrierCriticalSection);
01010
01011 mPlatform->barrierEventToggle = false;
01012
01013 BOOL returnCode2 = CloseHandle(mPlatform->barrierEvent1);
01014 QMP_ASSERT(0 != returnCode2);
01015 mPlatform->barrierEvent1 = NULL;
01016
01017 returnCode2 = CloseHandle(mPlatform->barrierEvent2);
01018 QMP_ASSERT(0 != returnCode2);
01019 mPlatform->barrierEvent2 = NULL;
01020
01021 DeleteCriticalSection(&mPlatform->csVectorCriticalSection);
01022
01023
01024 for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
01025 {
01026 int returnCode = CloseHandle(mPlatform->
01027 threadHandles[threadIndex]);
01028 QMP_ASSERT(0 != returnCode);
01029 }
01030 delete [] mPlatform->threadHandles;
01031 mPlatform->threadHandles = NULL;
01032
01033 delete [] mPlatform->threadIDs;
01034 mPlatform->threadIDs = NULL;
01035
01036 while (!mPlatform->userCriticalSections.empty())
01037 {
01038 DeleteCriticalSection(mPlatform->userCriticalSections.back());
01039 delete mPlatform->userCriticalSections.back();
01040 mPlatform->userCriticalSections.pop_back();
01041 }
01042 #else
01043 delete[] mPlatform->threads;
01044 mPlatform->threads = NULL;
01045
01046 int returnCode = pthread_mutex_destroy(&mPlatform->barrierMutex);
01047 QMP_ASSERT(0 == returnCode);
01048
01049 returnCode = pthread_cond_destroy(&mPlatform->barrierCondition);
01050 QMP_ASSERT(0 == returnCode);
01051
01052 returnCode = pthread_mutex_destroy(&mPlatform->mutexVectorMutex);
01053 QMP_ASSERT(0 == returnCode);
01054
01055 while (!mPlatform->userMutexes.empty())
01056 {
01057 int returnCode = pthread_mutex_destroy(mPlatform->userMutexes.back());
01058 QMP_ASSERT(0 == returnCode);
01059 delete mPlatform->userMutexes.back();
01060 mPlatform->userMutexes.pop_back();
01061 }
01062 #endif
01063 }
01064
01065 mInitialized = false;
01066 mInParallelSection = false;
01067 mShouldWorkerThreadsExit = false;
01068 mCurrentTask = NULL;
01069 mNumThreads = 0;
01070 mBarrierCount = 0;
01071
01072 if (mTaskFirstIndices)
01073 {
01074 delete [] mTaskFirstIndices;
01075 mTaskFirstIndices = NULL;
01076 }
01077 if (mTaskLastIndices)
01078 {
01079 delete [] mTaskLastIndices;
01080 mTaskLastIndices = NULL;
01081 }
01082
01083 mTaskIndexIncrement = 0;
01084 }
01085 }
01086
01087 #endif