quickmp.h

Go to the documentation of this file.
00001 /************************************************************************
00002 * QuickMP                                                               *
00003 * http://quickmp.sourceforge.net                                        *
00004 * Copyright (C) 2008                                                    *
00005 * Tyler Streeter (http://www.tylerstreeter.net)                         *
00006 *                                                                       *
00007 * This library is free software; you can redistribute it and/or         *
00008 * modify it under the terms of EITHER:                                  *
00009 *   (1) The GNU Lesser General Public License as published by the Free  *
00010 *       Software Foundation; either version 2.1 of the License, or (at  *
00011 *       your option) any later version. The text of the GNU Lesser      *
00012 *       General Public License is included with this library in the     *
00013 *       file license-LGPL.txt.                                          *
00014 *   (2) The BSD-style license that is included with this library in     *
00015 *       the file license-BSD.txt.                                       *
00016 *                                                                       *
00017 * This library is distributed in the hope that it will be useful,       *
00018 * but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00019 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the files    *
00020 * license-LGPL.txt and license-BSD.txt for more details.                *
00021 ************************************************************************/
00022 
00023 #ifndef QUICK_MP_H
00024 #define QUICK_MP_H
00025 
00026 // QuickMP (Quick Multi-Processing) is a simple cross-platform C++ API for
00027 // generating parallel for loops in shared-memory programs, similar to
00028 // OpenMP.  It provides automatic scalable performance based on the number of
00029 // available processors.
00030 //
00031 // Please visit the project website (http://quickprof.sourceforge.net)
00032 // for usage instructions.
00033 
00034 // These macros generate a unique symbol name using the line number.  (We
00035 // must go through several helper macros to force full expansion of __LINE__.)
00036 // The resulting symbols will be unique within a given file.  Name collisions
00037 // with other files can be avoided as long as the files don't include one
00038 // another, or, if they do include one another, the symbols should be
00039 // declared within local scopes that aren't seen by the other file.
00040 #define QMP_UNIQUE_SYMBOL_HELPER2(prefix, line) prefix##_uniqueSymbol##line
00041 #define QMP_UNIQUE_SYMBOL_HELPER1(prefix, line) QMP_UNIQUE_SYMBOL_HELPER2(prefix, line)
00042 #define QMP_UNIQUE_SYMBOL(prefix) QMP_UNIQUE_SYMBOL_HELPER1(prefix, __LINE__)
00043 
00044 /// Defines the beginning of a parallel for loop.  The arguments are the
00045 /// name of the integer index variable (accessible within the loop), the
00046 /// starting value of the index, the number of iterations to perform, and
00047 /// (optionally) the schedule hint.  The index counts up from the starting
00048 /// value.  The valid schedule hints are: quickmp::SEQUENTIAL (default,
00049 /// better for equal-duration loop iterations; similar to OpenMP "static"
00050 /// schedule with default (equal) chunk size) and quickmp::INTERLEAVED
00051 /// (better for non-equal-duration loop iterations; similar to OpenMP
00052 /// "static" schedule with chunk size 1).
00053 // Design notes: In order to create an arbitrary function containing the
00054 // user's for loop code (to be executed by the threads), we can define a
00055 // class to contain the function.  This class can be defined within any
00056 // other function.  The entire class method definition must be written
00057 // inline within the class definition (i.e. we can't define the function
00058 // at the end of a macro and let the user code write the {} brackets).  This
00059 // requires the use of two separate begin/end macros.  The instances of each
00060 // parallel section class should not go out of scope before they finish
00061 // executing.  We use a variadic macro here to allow an optional schedule hint
00062 // argument.
00063 #define QMP_PARALLEL_FOR(indexName, loopFirstIndex, ...) \
00064 { \
00065         qmp_internal::ParallelTaskManager::instance().setLoopIndices( \
00066                 loopFirstIndex, __VA_ARGS__); \
00067         static class QMP_UNIQUE_SYMBOL(ParallelTaskSubclass) : \
00068                 public qmp_internal::ParallelTask \
00069         { \
00070         public: \
00071                 virtual void run(int QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex), \
00072                         int QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex), \
00073                         const unsigned int parallelForLoopThreadIndexUniqueSymbol, \
00074                         int QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
00075                 { \
00076                         for (int indexName = QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex); \
00077                                 indexName <= QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex); \
00078                                 indexName += QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
00079                         {
00080 
00081 /// Defines the end of a parallel for loop.
00082 #define QMP_END_PARALLEL_FOR \
00083                         } \
00084                 } \
00085         }QMP_UNIQUE_SYMBOL(Instance); \
00086         qmp_internal::ParallelTaskManager::instance().process( \
00087                 &QMP_UNIQUE_SYMBOL(Instance)); \
00088 }
00089 
00090 /// Specifies the number of threads to use in subsequent parallel for loops.
00091 /// This is optional; without calling this, the system will use one thread
00092 /// per processor.  If used, this must be called outside any parallel for
00093 /// loops.  This can be called any number of times.  This destroys and
00094 /// creates the internal thread pool, which might take time, so use it
00095 /// sparingly.
00096 #define QMP_SET_NUM_THREADS(numThreads) \
00097         qmp_internal::ParallelTaskManager::instance().setNumThreads(numThreads)
00098 
00099 /// Returns the number of threads currently being used.  In sequential
00100 /// code sections this returns 1; in parallel for loops this returns the
00101 /// total number of threads allocated for use in parallel for loops.
00102 #define QMP_GET_NUM_THREADS \
00103         qmp_internal::ParallelTaskManager::instance().getNumThreads
00104 
00105 /// Returns the total number of threads allocated for use in all parallel
00106 /// for loops.
00107 #define QMP_GET_MAX_THREADS \
00108         qmp_internal::ParallelTaskManager::instance().getMaxThreads
00109 
00110 /// The zero-based index of the current thread.  This is only valid within
00111 /// a parallel for loop code section.  Note: this is not a function call
00112 /// like most other macros (i.e. don't use () at the end).
00113 #define QMP_THREAD_NUM parallelForLoopThreadIndexUniqueSymbol
00114 
00115 /// Returns the number of processors in the current machine at runtime.
00116 #define QMP_GET_NUM_PROCS \
00117         qmp_internal::ParallelTaskManager::instance().getNumProcessors
00118 
00119 /// Returns true if called within a parallel for loop and false otherwise.
00120 #define QMP_IN_PARALLEL \
00121         qmp_internal::ParallelTaskManager::instance().inParallel
00122 
00123 /// Defines the beginning of a critical section used for synchronization.
00124 /// This is necessary to protect shared variables which are read and written
00125 /// by multiple threads.  The given id should be unique for each critical
00126 /// section within a parallel for loop.  Keep the ids low to avoid allocating
00127 /// too many internal critical sections.  Be very careful to use matching
00128 /// ids for the begin and end.
00129 #define QMP_CRITICAL \
00130         qmp_internal::ParallelTaskManager::instance().criticalSectionBegin
00131 
00132 /// Defines the beginning of a critical section used for synchronization.
00133 /// The given id must match the id given at the beginning of the critical
00134 /// section.  Keep the ids low to avoid allocating too many internal critical
00135 /// sections.  Be very careful to use matching ids for the begin and end.
00136 #define QMP_END_CRITICAL \
00137         qmp_internal::ParallelTaskManager::instance().criticalSectionEnd
00138 
00139 /// Defines a barrier routine used to synchronize threads.  Each thread blocks
00140 /// at the barrier until all threads have reached it.  This can be expensive
00141 /// and can often be avoided by splitting one parallel for loop into two.
00142 #define QMP_BARRIER \
00143         qmp_internal::ParallelTaskManager::instance().barrier
00144 
00145 /// Exposes the given variable to any parallel for loops later in the same
00146 /// scope.  The arguments are the variable's type and name.  This must be
00147 /// called outside the loop.  The variable must remain valid as long as they
00148 /// are being accessed by any loops.  Statically-allocated arrays must be
00149 /// given as pointers; for example, int myData[50] requires a pointer
00150 /// int* myDataPtr = myData, then QMP_SHARE(myDataPtr), not QMP_SHARE(myData).
00151 // Design notes: Within the parallel tasks later we can access variables
00152 // outside the class definition only if they're static.  So we must make a
00153 // temporary static reference before the class definition.  We must be sure to
00154 // re-assign the static variable's value each time in case it changes, e.g.,
00155 // if it's referring to a class variable, which would be different for each
00156 // class instance.  Statically-allocated arrays are problematic because the
00157 // address-of operator returns the same thing as the variable itself; we
00158 // could make a separate intermediate type-casted pointer variable, but we
00159 // don't want to do that for everything; solution: just have the user pass
00160 // in their own pointer.
00161 #define QMP_SHARE(variableName) static void* variableName##_tempImportCopy = NULL; \
00162         variableName##_tempImportCopy = (void*)&variableName;
00163 
00164 /// This provides access to the given variable within the parallel for loop,
00165 /// which must have been exposed before the beginning of the loop.  This must
00166 /// be called within the loop.  Statically-allocated arrays must be
00167 /// given as pointers; for example, int myData[50] requires a pointer
00168 /// int* myDataPtr = myData exposed via QMP_SHARE(myDataPtr) then accessed
00169 /// via QMP_USE_SHARED(int*, myDataPtr).
00170 // Design notes: Here we make a reference to the temporary static reference,
00171 // which allows access to the original desired variable.  Also, this 2-step
00172 // process also allows us to maintain the same variable name as the original.
00173 // We use a variadic macro, which allows a variable number of arguments, to
00174 // handle types needing commas (e.g., std::map<int, int>) which would confuse
00175 // the macro preprocessor.
00176 #define QMP_USE_SHARED(variableName, ...) __VA_ARGS__& variableName = \
00177         *((__VA_ARGS__*)variableName##_tempImportCopy);
00178 
00179 /// A namespace for symbols that are part of the public API.
00180 namespace quickmp
00181 {
00182         /// Types of loop scheduling methods.
00183         enum ScheduleHint
00184         {
00185                 /// This is the default.  It distributes loop iterations among threads
00186                 /// in large, equal chunks, similar to the OpenMP "static" scheduling
00187                 /// type with default (equal) chunk size.  This is better for loops
00188                 /// with equal-duration iterations.
00189                 SEQUENTIAL,
00190 
00191                 /// Distributes loop iterations among threads in an interleaved
00192                 /// manner, similar to the OpenMP "static" scheduling type with
00193                 /// chunk size 1.  This is better for loops with non-equal-duration
00194                 /// iterations.
00195                 INTERLEAVED
00196         };
00197 }
00198 
00199 /// A namespace for internal data structures.
00200 namespace qmp_internal
00201 {
00202         // Forward declaration.
00203         struct PlatformThreadObjects;
00204 
00205         /// A base class for parallel task classes which are defined by a set
00206         /// of macros.
00207         class ParallelTask
00208         {
00209         public:
00210                 virtual ~ParallelTask(){}
00211                 /// The function which is executed by each thread with different
00212                 /// indices.  The last index is included.
00213                 virtual void run(int firstIndex, int lastIndex,
00214                         const unsigned int threadIndex, int indexIncrement) = 0;
00215         };
00216 
00217         /// A singleton class to manage parallel code tasks.  This enables
00218         /// automatic init on first use and destroy on exit.
00219         class ParallelTaskManager
00220         {
00221         public:
00222                 /// Provides access to the singleton instance.
00223                 inline static ParallelTaskManager& instance();
00224 
00225                 /// Specifies the number of threads to use in subsequent parallel
00226                 /// for loops.  If not called explicitly by the user, this will be
00227                 /// called with the default value (zero), which uses one thread per
00228                 /// processor.  Can be called multiple times.
00229                 inline void setNumThreads(unsigned int numThreads=0);
00230 
00231                 /// Returns the number of threads currently being used.  In
00232                 /// sequential code sections this returns 1; in parallel for loops
00233                 /// this returns the total number of threads allocated for use in
00234                 /// parallel for loops.
00235                 inline unsigned int getNumThreads()const;
00236 
00237                 /// Returns the total number of threads allocated for use in all
00238                 /// parallel for loops.
00239                 inline unsigned int getMaxThreads()const;
00240 
00241                 /// Returns the number of processors in the current machine at runtime.
00242                 inline unsigned int getNumProcessors()const;
00243 
00244                 /// Returns true if called within a parallel for loop and false
00245                 /// otherwise.
00246                 inline bool inParallel()const;
00247 
00248                 /// Defines the range of the loop index.  Assumes the index begins
00249                 /// at the first index and counts up.  Internally, this sets the
00250                 /// loop indices to be used by each thread.
00251                 inline void setLoopIndices(int loopFirstIndex,
00252                         unsigned int numIterations, quickmp::ScheduleHint scheduleHint);
00253 
00254                 /// Separate version which is used when no schedule hint is supplied.
00255                 inline void setLoopIndices(int loopFirstIndex, unsigned int numIterations);
00256 
00257                 /// Unleashes the threads on the new task/loop.
00258                 inline void process(ParallelTask* task);
00259 
00260                 /// Called by individual threads to process a subset of the loop
00261                 /// iterations.
00262                 inline void processSubset(unsigned int threadIndex);
00263 
00264                 /// Defines the beginning of a critical section used for
00265                 /// synchronization.  This is necessary to protect shared variables
00266                 /// which are read and written by multiple threads.  The given id
00267                 /// should be unique for each critical section within a parallel for
00268                 /// loop.  Keep the ids low to avoid allocating too many internal
00269                 /// critical sections.
00270                 inline void criticalSectionBegin(unsigned int id);
00271 
00272                 /// Defines the end of a critical section used for synchronization.
00273                 /// The given id must match the id given at the beginning of the
00274                 /// critical section.  Keep the ids low to avoid allocating too many
00275                 /// internal critical sections.
00276                 inline void criticalSectionEnd(unsigned int id);
00277 
00278                 /// Defines a barrier routine used to synchronize threads.  Each
00279                 /// thread blocks at the barrier until all threads have reached it.
00280                 inline void barrier();
00281 
00282                 /// Provides access to the internal platform-specific data, like
00283                 /// thread handles and synchronization objects.  This gives access to
00284                 /// these things to the thread function.
00285                 inline PlatformThreadObjects* getPlatformThreadObjects();
00286 
00287                 /// Returns true if the main thread has requested the worker threads
00288                 /// to exit.
00289                 inline bool shouldWorkerThreadsExit()const;
00290 
00291         private:
00292                 inline ParallelTaskManager();
00293 
00294                 inline ~ParallelTaskManager();
00295 
00296                 /// Deallocates everything, closes threads, and returns the system
00297                 /// back to its uninitialized state.
00298                 inline void destroy();
00299 
00300                 PlatformThreadObjects* mPlatform;
00301                 bool mInitialized;
00302                 bool mInParallelSection;
00303                 bool mShouldWorkerThreadsExit;
00304                 ParallelTask* mCurrentTask;
00305                 unsigned int mNumThreads;
00306                 unsigned int mBarrierCount;
00307                 int* mTaskFirstIndices;
00308                 int* mTaskLastIndices;
00309                 int mTaskIndexIncrement;
00310         };
00311 }
00312 
00313 //****************************************************************************
00314 // If desired, this header file can be split into .h and .cpp files to speed
00315 // up compilation.  Simply move the remainder of this file (except the final
00316 // #endif) into a separate .cpp file, and add #include "quickmp.h" to the top.
00317 //****************************************************************************
00318 
00319 #if defined(WIN32) || defined(_WIN32) || defined (__WIN32) || defined(__WIN32__) \
00320         || defined (_WIN64) || defined(__CYGWIN__) || defined(__MINGW32__)
00321         #define QMP_USE_WINDOWS_THREADS
00322         #include <windows.h>
00323         #include <process.h>
00324 #elif defined(__APPLE__)
00325         #include <pthread.h>
00326 
00327         // Required to get number of processors on OS X using sysctlbyname.
00328         #include <sys/sysctl.h>
00329 #elif defined(unix) || defined(__unix) || defined(__unix__)
00330         #include <pthread.h>
00331 
00332         // Required to get number of processors using get_nprocs_conf.
00333         #include <sys/sysinfo.h>
00334 #else
00335         #error This development environment does not support pthreads or windows threads
00336 #endif
00337 
00338 #include <iostream>
00339 #include <vector>
00340 
00341 /// Assert macro.
00342 #define QMP_ASSERT(condition)\
00343 {\
00344         if (!(condition))\
00345         {\
00346                 std::cout << "[QuickMP] Assertion failed in " << __FUNCTION__ \
00347                         << "(line " << __LINE__ << "): assert(" << #condition << ")" \
00348                         << std::endl;\
00349                 ::exit(1);\
00350         }\
00351 }
00352 
00353 namespace qmp_internal
00354 {
00355         struct PlatformThreadObjects
00356         {
00357                 PlatformThreadObjects()
00358                 {
00359 #ifdef QMP_USE_WINDOWS_THREADS
00360                         barrierEventToggle = false;
00361                         barrierEvent1 = NULL;
00362                         barrierEvent2 = NULL;
00363                         threadHandles = NULL;
00364                         threadIDs = NULL;
00365 #else
00366                         threads = NULL;
00367 #endif
00368                 }
00369 
00370 #ifdef QMP_USE_WINDOWS_THREADS
00371                 // Windows condition variables are only available in Vista and later,
00372                 // so we must resort to using events for the barrier.
00373 
00374                 CRITICAL_SECTION barrierCriticalSection;
00375                 bool barrierEventToggle;
00376                 HANDLE barrierEvent1;
00377                 HANDLE barrierEvent2;
00378                 CRITICAL_SECTION csVectorCriticalSection;
00379                 std::vector<CRITICAL_SECTION*> userCriticalSections;
00380                 HANDLE* threadHandles;
00381                 DWORD* threadIDs;
00382 #else
00383                 pthread_t* threads;
00384                 pthread_mutex_t barrierMutex;
00385                 pthread_cond_t barrierCondition;
00386                 pthread_mutex_t mutexVectorMutex;
00387                 std::vector<pthread_mutex_t*> userMutexes;
00388 #endif
00389         };
00390 
00391         /// The routine to be executed by the threads.  Note: Windows threads
00392         /// require a __stdcall routine.
00393 #ifdef QMP_USE_WINDOWS_THREADS
00394         inline unsigned __stdcall threadRoutine(void* threadIndex)
00395 #else
00396         inline void* threadRoutine(void* threadIndex)
00397 #endif
00398         {
00399                 // Note: if the program crashes or data gets corrupted during thread
00400                 // execution, one possible cause is that we exceeded the default
00401                 // thread stack size.  Use thread functions to increase the stack size.
00402 
00403                 // We cast to an unsigned long ints here because a void* on 64-bit
00404                 // machines is 64 bits long, and gcc won't cast a 64-bit void*
00405                 // directly to a 32-bit unsigned int.
00406                 unsigned int myIndex = (unsigned int)((unsigned long int)threadIndex);
00407 
00408                 // Loop until this thread is canceled by the main thread, which only
00409                 // occurs when the program exits.
00410                 while (true)
00411                 {
00412                         // Between the barriers the main thread and worker threads are
00413                         // working on the loop iterations in parallel.  (Compare with
00414                         // ParallelTaskManager::process.)
00415 
00416                         ParallelTaskManager::instance().barrier();
00417 
00418                         if (ParallelTaskManager::instance().shouldWorkerThreadsExit())
00419                         {
00420                                 // Exit the thread.
00421                                 break;
00422                         }
00423                         else
00424                         {
00425                                 // Work on a subset of the loop.
00426                                 ParallelTaskManager::instance().processSubset(myIndex);
00427                         }
00428 
00429                         ParallelTaskManager::instance().barrier();
00430                 }
00431 
00432 #ifdef QMP_USE_WINDOWS_THREADS
00433                 // _endthreadex is called automatically after start_address finishes.
00434                 // We could call it manually, but that causes C++ destructors pending
00435                 // in the thread not to be called.  In this case, though, we are
00436                 // having the main thread call TerminateThread on this thread when
00437                 // the program finishes.
00438 
00439                 return 0;
00440 #else
00441                 // pthreads are terminated in several ways: implicitly when their
00442                 // start routine finishes, when they call pthread_exit (which might
00443                 // not let C++ destructors be called), when another thread calls
00444                 // pthread_cancel on them, or when the entire process is terminated.
00445                 // (If main() finishes normally, the pthreads will terminate, but if
00446                 // it finishes with pthread_exit(), the pthreads will continue to
00447                 // run.)  Here we are having the main thread call pthread_cancel on
00448                 // this thread when the program finishes.
00449 
00450                 return NULL;
00451 #endif
00452         }
00453 
00454         ParallelTaskManager& ParallelTaskManager::instance()
00455         {
00456                 static ParallelTaskManager self;
00457                 return self;
00458         }
00459 
00460         void ParallelTaskManager::setNumThreads(unsigned int numThreads)
00461         {
00462                 if (mInitialized)
00463                 {
00464                         destroy();
00465                 }
00466 
00467                 if (0 == numThreads)
00468                 {
00469                         // By default, create one thread per processor.
00470                         numThreads = getNumProcessors();
00471                 }
00472 
00473                 mNumThreads = numThreads;
00474 
00475                 // We could either create n worker threads (and block the main thread
00476                 // while the workers are working), or use the main thread plus n-1
00477                 // workers.  Here we are doing the latter.
00478 
00479                 QMP_ASSERT(numThreads > 0);
00480                 unsigned int numWorkerThreads = numThreads - 1;
00481 
00482                 mTaskFirstIndices = new int[numThreads];
00483                 mTaskLastIndices = new int[numThreads];
00484                 for (unsigned int i = 0; i < numThreads; ++i)
00485                 {
00486                         mTaskFirstIndices[i] = 0;
00487                         mTaskLastIndices[i] = 0;
00488                 }
00489                 mTaskIndexIncrement = 0;
00490 
00491                 if (numThreads > 1)
00492                 {
00493 #ifdef QMP_USE_WINDOWS_THREADS
00494                         InitializeCriticalSection(&mPlatform->barrierCriticalSection);
00495 
00496                         // Create the synchronization events.
00497                         bool manualReset = true;
00498                         bool startSignaled = false;
00499                         mPlatform->barrierEvent1 = CreateEvent(NULL, manualReset,
00500                                 startSignaled, NULL);
00501                         mPlatform->barrierEvent2 = CreateEvent(NULL, manualReset,
00502                                 startSignaled, NULL);
00503 
00504                         InitializeCriticalSection(&mPlatform->csVectorCriticalSection);
00505 
00506                         // Note: The Windows C runtime functions _beginthreadex/_endthreadex
00507                         // are preferred over the Windows API BeginThread/EndThread functions.
00508                         // See this: http://support.microsoft.com/default.aspx/kb/104641
00509                         // Also, _beginthreadex is preferred over _beginthread because it
00510                         // is more flexible (and provides the same options as BeginThread).
00511                         // Also, make sure to link with multithreaded runtime libraries
00512                         // (single threaded runtime libraries were actually removed starting
00513                         // in VC 2005).
00514 
00515                         // uintptr_t _beginthreadex(
00516                         //      void* security,
00517                         //      unsigned stack_size,
00518                         //      unsigned (*start_address)(void*),
00519                         //      void* arglist,
00520                         //      unsigned initflag,
00521                         //      unsigned* thrdaddr);
00522                         //
00523                         // Arguments:
00524                         // security: NULL means the returned thread handle cannot be
00525                         //           inherited by child processes
00526                         // stack_size: 0 means use the same stack size as the main thread
00527                         // start_address: __stdcall or __clrcall routine, returns exit code
00528                         // arglist: arguments for start_address (or NULL)
00529                         // initflag: 0 for running, CREATE_SUSPENDED for suspended
00530                         // thrdaddr: receives thread ID, can be NULL if not needed
00531                         //
00532                         // Return value:
00533                         // Handle to the new thread (or 0 on error), used for synchronization
00534 
00535                         mPlatform->threadHandles = new HANDLE[numThreads];
00536                         mPlatform->threadIDs = new DWORD[numThreads];
00537                         // The main thread (index 0) handle is not used.
00538                         mPlatform->threadHandles[0] = 0;
00539                         mPlatform->threadIDs[0] = GetCurrentThreadId();
00540                         for (unsigned int threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
00541                         {
00542                                 mPlatform->threadHandles[threadIndex] =
00543                                         (HANDLE)_beginthreadex(NULL, 0, threadRoutine,
00544                                         (void*)threadIndex, 0, (unsigned int*)&mPlatform->
00545                                         threadIDs[threadIndex]);
00546                                 QMP_ASSERT(0 != mPlatform->threadHandles[threadIndex])
00547                         }
00548 #else
00549                         // Create synchronization objects.
00550                         int returnCode = pthread_mutex_init(&mPlatform->barrierMutex, NULL);
00551                         QMP_ASSERT(0 == returnCode);
00552                         returnCode = pthread_cond_init(&mPlatform->barrierCondition, NULL);
00553                         QMP_ASSERT(0 == returnCode);
00554                         returnCode = pthread_mutex_init(&mPlatform->mutexVectorMutex, NULL);
00555                         QMP_ASSERT(0 == returnCode);
00556 
00557                         // int pthread_create(pthread_t* thread, const pthread_attr_t* attr,
00558                         //      void *(*start_routine)(void*), void* arg);
00559                         //
00560                         // Arguments:
00561                         // thread: pthread_t pointer for later access
00562                         // attr: thread attributes (NULL means use default attributes)
00563                         // start_routine: C-style functor for function to be executed
00564                         // arg: argument (void*) for start_routine
00565                         //
00566                         // Return value:
00567                         // Return code (non-zero means an error occurred)
00568 
00569                         pthread_attr_t threadAttributes;
00570                         returnCode = pthread_attr_init(&threadAttributes);
00571                         QMP_ASSERT(0 == returnCode);
00572                         returnCode = pthread_attr_setdetachstate(&threadAttributes,
00573                                 PTHREAD_CREATE_JOINABLE);
00574                         QMP_ASSERT(0 == returnCode);
00575 
00576                         mPlatform->threads = new pthread_t[numThreads];
00577                         mPlatform->threads[0] = pthread_self();
00578                         for (unsigned int threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
00579                         {
00580                                 returnCode = pthread_create(&mPlatform->threads[threadIndex],
00581                                         &threadAttributes, threadRoutine, (void*)threadIndex);
00582                                 QMP_ASSERT(0 == returnCode);
00583                         }
00584 
00585                         returnCode = pthread_attr_destroy(&threadAttributes);
00586                         QMP_ASSERT(0 == returnCode);
00587 #endif
00588                 }
00589 
00590                 mInitialized = true;
00591         }
00592 
00593         unsigned int ParallelTaskManager::getNumThreads()const
00594         {
00595                 if (mInParallelSection)
00596                 {
00597                         return mNumThreads;
00598                 }
00599                 else
00600                 {
00601                         return 1;
00602                 }
00603         }
00604 
00605         unsigned int ParallelTaskManager::getMaxThreads()const
00606         {
00607                 return mNumThreads;
00608         }
00609 
00610         unsigned int ParallelTaskManager::getNumProcessors()const
00611         {
00612 #ifdef QMP_USE_WINDOWS_THREADS
00613                 SYSTEM_INFO systemInfo;
00614                 GetSystemInfo(&systemInfo);
00615                 return (unsigned int)systemInfo.dwNumberOfProcessors;
00616 #elif defined (__APPLE__)
00617                 int numProcessors = 0;
00618                 size_t size = sizeof(numProcessors);
00619                 int returnCode = sysctlbyname("hw.ncpu", &numProcessors, &size, NULL, 0);
00620                 if (0 != returnCode)
00621                 {
00622                         std::cout << "[QuickMP] WARNING: Cannot determine number of "
00623                                 << "processors, defaulting to 1" << std::endl;
00624                         return 1;
00625                 }
00626                 else
00627                 {
00628                         return (unsigned int)numProcessors;
00629                 }
00630 #else
00631                 // Methods for getting the number of processors:
00632 
00633                 // POSIX systems: sysconf() queries system info; the constants
00634                 // _SC_NPROCESSORS_CONF and _SC_NPROCESSORS_ONLN are provided on many
00635                 // systems, but they aren't part of the POSIX standard; they're not
00636                 // available on Mac OS X.
00637 
00638                 // The GNU C library provides get_nprocs_conf() (number configured)
00639                 // and get_nprocs() (number available) in <sys/sysinfo.h>, but these
00640                 // are not available on Mac OS X.
00641 
00642                 // In each of these methods there is a way to get the number of
00643                 // processors configured, which stays constant between reboots, and
00644                 // the number of processors online (capable of running processes),
00645                 // which can change during the lifetime of the calling process is
00646                 // the OS decides to disable some processors.
00647 
00648                 // We'll just assume we have access to all processors.  (When setting
00649                 // the number of threads, we default to this value, but the user
00650                 // still has the option of setting any number of threads.)
00651                 return (unsigned int)get_nprocs_conf();
00652 #endif
00653         }
00654 
00655         bool ParallelTaskManager::inParallel()const
00656         {
00657                 return mInParallelSection;
00658         }
00659 
00660         void ParallelTaskManager::setLoopIndices(int loopFirstIndex,
00661                 unsigned int numIterations, quickmp::ScheduleHint scheduleHint)
00662         {
00663                 if (!mInitialized)
00664                 {
00665                         setNumThreads();
00666                 }
00667 
00668                 if (1 == mNumThreads)
00669                 {
00670                         mTaskFirstIndices[0] = loopFirstIndex;
00671                         mTaskLastIndices[0] = loopFirstIndex + (int)numIterations - 1;
00672                         mTaskIndexIncrement = 1;
00673                         return;
00674                 }
00675 
00676                 // We divide up the iterations as equally as possible among the
00677                 // threads.  The difference between the heaviest and lightest loads
00678                 // should be no more than one iteration.
00679 
00680                 switch(scheduleHint)
00681                 {
00682                         case quickmp::SEQUENTIAL:
00683                         {
00684                                 // Similar to the OpenMP "static" scheduling type with default
00685                                 // (equal) chunk size.  Using large, sequential chunks is
00686                                 // good if all iterations require equal durations.  This
00687                                 // reduces thread contention for overlapping memory locations
00688                                 // because loops usually access sequential addresses.
00689                                 unsigned int numIterationsPerThread = numIterations / mNumThreads;
00690                                 unsigned int numRemainderIterations = numIterations % mNumThreads;
00691                                 int currentFirstIndex = loopFirstIndex;
00692                                 for (unsigned int i = 0; i < mNumThreads; ++i)
00693                                 {
00694                                         mTaskFirstIndices[i] = currentFirstIndex;
00695 
00696                                         // Distribute the remainder iterations.
00697                                         unsigned int numIterationsForThisThread = numIterationsPerThread;
00698                                         if (i < numRemainderIterations)
00699                                         {
00700                                                 ++numIterationsForThisThread;
00701                                         }
00702 
00703                                         // The last index represents the final iteration.
00704                                         mTaskLastIndices[i] = currentFirstIndex +
00705                                                 (int)numIterationsForThisThread - 1;
00706                                         currentFirstIndex = mTaskLastIndices[i] + 1;
00707                                 }
00708                                 mTaskIndexIncrement = 1;
00709                                 break;
00710                         }
00711                         case quickmp::INTERLEAVED:
00712                                 // Similar to the OpenMP "static" scheduling type with chunk
00713                                 // size 1.  If the iterations use unequal durations, it is
00714                                 // better to interleave the iterations.
00715                                 for (unsigned int i = 0; i < mNumThreads; ++i)
00716                                 {
00717                                         mTaskFirstIndices[i] = loopFirstIndex + i;
00718                                         mTaskLastIndices[i] = loopFirstIndex + numIterations - 1;
00719                                 }
00720                                 mTaskIndexIncrement = mNumThreads;
00721                                 break;
00722                         default:
00723                                 break;
00724                 }
00725         }
00726 
00727         void ParallelTaskManager::setLoopIndices(int loopFirstIndex,
00728                 unsigned int numIterations)
00729         {
00730                 setLoopIndices(loopFirstIndex, numIterations, quickmp::SEQUENTIAL);
00731         }
00732 
00733         void ParallelTaskManager::process(ParallelTask* task)
00734         {
00735                 mInParallelSection = true;
00736                 QMP_ASSERT(!mCurrentTask);
00737                 mCurrentTask = task;
00738 
00739                 // Between the barriers the main thread and worker threads are
00740                 // working on the loop iterations in parallel.  (Compare with the
00741                 // thread routine.)
00742 
00743                 barrier();
00744 
00745                 // Work on a subset of the loop.
00746                 processSubset(0);
00747 
00748                 barrier();
00749 
00750                 mCurrentTask = NULL;
00751                 mInParallelSection = false;
00752         }
00753 
00754         void ParallelTaskManager::processSubset(unsigned int threadIndex)
00755         {
00756                 mCurrentTask->run(mTaskFirstIndices[threadIndex],
00757                         mTaskLastIndices[threadIndex], threadIndex, mTaskIndexIncrement);
00758         }
00759 
00760         void ParallelTaskManager::criticalSectionBegin(unsigned int id)
00761         {
00762                 // Avoid synchronization if there are no worker threads.
00763                 if (mNumThreads < 2)
00764                 {
00765                         return;
00766                 }
00767 
00768                 // Dynamically allocate new synchronization objects on first usage
00769                 // up to the given id.
00770 
00771 #ifdef QMP_USE_WINDOWS_THREADS
00772                 if (id >= mPlatform->userCriticalSections.size())
00773                 {
00774                         // Protect against extra allocations by other threads.
00775                         EnterCriticalSection(&mPlatform->csVectorCriticalSection);
00776                         while (id >= mPlatform->userCriticalSections.size())
00777                         {
00778                                 CRITICAL_SECTION* cs = new CRITICAL_SECTION;
00779                                 mPlatform->userCriticalSections.push_back(cs);
00780                                 InitializeCriticalSection(cs);
00781                         }
00782                         LeaveCriticalSection(&mPlatform->csVectorCriticalSection);
00783                 }
00784                 EnterCriticalSection(mPlatform->userCriticalSections[id]);
00785 #else
00786                 if (id >= mPlatform->userMutexes.size())
00787                 {
00788                         // Protect against extra allocations by other threads.
00789                         int returnCode = pthread_mutex_lock(&mPlatform->mutexVectorMutex);
00790                         QMP_ASSERT(0 == returnCode);
00791                         while (id >= mPlatform->userMutexes.size())
00792                         {
00793                                 pthread_mutex_t* mutex = new pthread_mutex_t;
00794                                 mPlatform->userMutexes.push_back(mutex);
00795                                 returnCode = pthread_mutex_init(mutex, NULL);
00796                                 QMP_ASSERT(0 == returnCode);
00797 
00798                         }
00799                         returnCode = pthread_mutex_unlock(&mPlatform->mutexVectorMutex);
00800                         QMP_ASSERT(0 == returnCode);
00801                 }
00802                 int returnCode = pthread_mutex_lock(mPlatform->userMutexes[id]);
00803                 QMP_ASSERT(0 == returnCode);
00804 #endif
00805         }
00806 
00807         void ParallelTaskManager::criticalSectionEnd(unsigned int id)
00808         {
00809                 // Avoid synchronization if there are no worker threads.
00810                 if (mNumThreads < 2)
00811                 {
00812                         return;
00813                 }
00814 
00815 #ifdef QMP_USE_WINDOWS_THREADS
00816                 if (id >= mPlatform->userCriticalSections.size())
00817                 {
00818                         std::cout << "[QuickMP] WARNING: Critical section 'end' (id="
00819                                 << id << ") has no matching 'begin'" << std::endl;
00820                 }
00821                 else
00822                 {
00823                         LeaveCriticalSection(mPlatform->userCriticalSections[id]);
00824                 }
00825 #else
00826                 if (id >= mPlatform->userMutexes.size())
00827                 {
00828                         std::cout << "[QuickMP] WARNING: Critical section 'end' (id="
00829                                 << id << ") has no matching 'begin'" << std::endl;
00830                 }
00831                 else
00832                 {
00833                         int returnCode = pthread_mutex_unlock(mPlatform->userMutexes[id]);
00834                         QMP_ASSERT(0 == returnCode);
00835                 }
00836 #endif
00837         }
00838 
00839         void ParallelTaskManager::barrier()
00840         {
00841                 // Avoid synchronization if there are no worker threads.
00842                 if (mNumThreads < 2)
00843                 {
00844                         return;
00845                 }
00846 
00847                 // Lock access to the shared variables.
00848 #ifdef QMP_USE_WINDOWS_THREADS
00849                 EnterCriticalSection(&mPlatform->barrierCriticalSection);
00850 #else
00851                 int returnCode = pthread_mutex_lock(&mPlatform->barrierMutex);
00852                 QMP_ASSERT(0 == returnCode);
00853 #endif
00854 
00855                 ++mBarrierCount;
00856                 if (mBarrierCount == mNumThreads)
00857                 {
00858                         // All threads have reached the barrier.  First we must reset
00859                         // the count.  Then we signal all threads to continue.
00860                         mBarrierCount = 0;
00861 
00862 #ifdef QMP_USE_WINDOWS_THREADS
00863                         // Use alternating events for every other barrier to avoid race
00864                         // conditions; otherwise, a fast thread might reach the next
00865                         // barrier and reset the event before the others get unblocked.
00866                         if (mPlatform->barrierEventToggle)
00867                         {
00868                                 SetEvent(mPlatform->barrierEvent1);
00869                         }
00870                         else
00871                         {
00872                                 SetEvent(mPlatform->barrierEvent2);
00873                         }
00874                         mPlatform->barrierEventToggle = !mPlatform->barrierEventToggle;
00875                         LeaveCriticalSection(&mPlatform->barrierCriticalSection);
00876 #else
00877                         // This must be called while the mutex is locked.  We must
00878                         // unlock the mutex afterwards in order to unblock the waiting
00879                         // threads.
00880                         returnCode = pthread_cond_broadcast(&mPlatform->barrierCondition);
00881                         QMP_ASSERT(0 == returnCode);
00882                         returnCode = pthread_mutex_unlock(&mPlatform->barrierMutex);
00883                         QMP_ASSERT(0 == returnCode);
00884 #endif
00885                 }
00886                 else
00887                 {
00888                         // Wait until all threads have reached the barrier.
00889 
00890 #ifdef QMP_USE_WINDOWS_THREADS
00891                         // The first thread here must reset the event.
00892                         if (1 == mBarrierCount)
00893                         {
00894                                 if (mPlatform->barrierEventToggle)
00895                                 {
00896                                         ResetEvent(mPlatform->barrierEvent1);
00897                                 }
00898                                 else
00899                                 {
00900                                         ResetEvent(mPlatform->barrierEvent2);
00901                                 }
00902                         }
00903 
00904                         if (mPlatform->barrierEventToggle)
00905                         {
00906                                 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
00907                                 WaitForSingleObject(mPlatform->barrierEvent1, INFINITE);
00908                         }
00909                         else
00910                         {
00911                                 LeaveCriticalSection(&mPlatform->barrierCriticalSection);
00912                                 WaitForSingleObject(mPlatform->barrierEvent2, INFINITE);
00913                         }
00914 #else
00915                         // This must be called while the mutex is locked.  It unlocks
00916                         // the mutex while waiting and locks it again when finished.
00917                         returnCode = pthread_cond_wait(&mPlatform->barrierCondition,
00918                                 &mPlatform->barrierMutex);
00919                         QMP_ASSERT(0 == returnCode);
00920                         returnCode = pthread_mutex_unlock(&mPlatform->barrierMutex);
00921                         QMP_ASSERT(0 == returnCode);
00922 #endif
00923                 }
00924         }
00925 
00926         PlatformThreadObjects* ParallelTaskManager::getPlatformThreadObjects()
00927         {
00928                 return mPlatform;
00929         }
00930 
00931         bool ParallelTaskManager::shouldWorkerThreadsExit()const
00932         {
00933                 return mShouldWorkerThreadsExit;
00934         }
00935 
00936         ParallelTaskManager::ParallelTaskManager()
00937         {
00938                 mPlatform = new PlatformThreadObjects();
00939                 mInitialized = false;
00940                 mInParallelSection = false;
00941                 mShouldWorkerThreadsExit = false;
00942                 mCurrentTask = NULL;
00943                 mNumThreads = 0;
00944                 mBarrierCount = 0;
00945                 mTaskFirstIndices = NULL;
00946                 mTaskLastIndices = NULL;
00947                 mTaskIndexIncrement = 0;
00948         }
00949 
00950         ParallelTaskManager::~ParallelTaskManager()
00951         {
00952                 // This is called when the program exits because the singleton
00953                 // instance is static.
00954 
00955                 destroy();
00956                 delete mPlatform;
00957         }
00958 
00959         void ParallelTaskManager::destroy()
00960         {
00961                 // If ::exit is called within a worker thread, things get a little
00962                 // messy: we can't be expected to close all the worker threads
00963                 // from within one of the worker threads because control would never
00964                 // return to the main thread.  We just have to quit without
00965                 // deallocating everything, which shouldn't be a big problem because
00966                 // the process is quitting anyway.
00967 #ifdef QMP_USE_WINDOWS_THREADS
00968                 if (mNumThreads > 1 && GetCurrentThreadId() != mPlatform->threadIDs[0])
00969 #else
00970                 if (mNumThreads > 1 && !pthread_equal(pthread_self(), mPlatform->threads[0]))
00971 #endif
00972                 {
00973                         return;
00974                 }
00975 
00976                 // Clean up worker threads and synchronization objects.
00977                 if (mNumThreads > 1)
00978                 {
00979                         // Signal the worker threads to exit, and wait until they're
00980                         // finished.  At this point all the worker threads are waiting
00981                         // at the first barrier.
00982                         mShouldWorkerThreadsExit = true;
00983                         barrier();
00984 
00985 #ifdef QMP_USE_WINDOWS_THREADS
00986                         // Wait for all thread handles to become signaled, indicating that
00987                         // the threads have exited.  Note: WaitForMultipleObjects would be
00988                         // ideal here, but it only supports up to MAXIMUM_WAIT_OBJECTS
00989                         // threads.
00990                         for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
00991                         {
00992                                 DWORD returnCode = WaitForSingleObject(mPlatform->
00993                                         threadHandles[threadIndex], INFINITE);
00994                                 QMP_ASSERT(WAIT_OBJECT_0 == returnCode);
00995                         }
00996 #else
00997                         // Call pthread_join on all worker threads, which blocks until the
00998                         // thread exits.
00999                         for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
01000                         {
01001                                 int returnCode = pthread_join(mPlatform->threads[threadIndex], NULL);
01002                                 QMP_ASSERT(0 == returnCode);
01003                         }
01004 #endif
01005 
01006                         // Clean up platform-specific objects, and return everything to its
01007                         // original state in case we're resetting the number of threads.
01008 #ifdef QMP_USE_WINDOWS_THREADS
01009                         DeleteCriticalSection(&mPlatform->barrierCriticalSection);
01010 
01011                         mPlatform->barrierEventToggle = false;
01012 
01013                         BOOL returnCode2 = CloseHandle(mPlatform->barrierEvent1);
01014                         QMP_ASSERT(0 != returnCode2);
01015                         mPlatform->barrierEvent1 = NULL;
01016 
01017                         returnCode2 = CloseHandle(mPlatform->barrierEvent2);
01018                         QMP_ASSERT(0 != returnCode2);
01019                         mPlatform->barrierEvent2 = NULL;
01020 
01021                         DeleteCriticalSection(&mPlatform->csVectorCriticalSection);
01022 
01023                         // The main thread (index 0) handle is not used.
01024                         for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
01025                         {
01026                                 int returnCode = CloseHandle(mPlatform->
01027                                         threadHandles[threadIndex]);
01028                                 QMP_ASSERT(0 != returnCode);
01029                         }
01030                         delete [] mPlatform->threadHandles;
01031                         mPlatform->threadHandles = NULL;
01032 
01033                         delete [] mPlatform->threadIDs;
01034                         mPlatform->threadIDs = NULL;
01035 
01036                         while (!mPlatform->userCriticalSections.empty())
01037                         {
01038                                 DeleteCriticalSection(mPlatform->userCriticalSections.back());
01039                                 delete mPlatform->userCriticalSections.back();
01040                                 mPlatform->userCriticalSections.pop_back();
01041                         }
01042 #else
01043                         delete[] mPlatform->threads;
01044                         mPlatform->threads = NULL;
01045 
01046                         int returnCode = pthread_mutex_destroy(&mPlatform->barrierMutex);
01047                         QMP_ASSERT(0 == returnCode);
01048 
01049                         returnCode = pthread_cond_destroy(&mPlatform->barrierCondition);
01050                         QMP_ASSERT(0 == returnCode);
01051 
01052                         returnCode = pthread_mutex_destroy(&mPlatform->mutexVectorMutex);
01053                         QMP_ASSERT(0 == returnCode);
01054 
01055                         while (!mPlatform->userMutexes.empty())
01056                         {
01057                                 int returnCode = pthread_mutex_destroy(mPlatform->userMutexes.back());
01058                                 QMP_ASSERT(0 == returnCode);
01059                                 delete mPlatform->userMutexes.back();
01060                                 mPlatform->userMutexes.pop_back();
01061                         }
01062 #endif
01063                 }
01064 
01065                 mInitialized = false;
01066                 mInParallelSection = false;
01067                 mShouldWorkerThreadsExit = false;
01068                 mCurrentTask = NULL;
01069                 mNumThreads = 0;
01070                 mBarrierCount = 0;
01071 
01072                 if (mTaskFirstIndices)
01073                 {
01074                         delete [] mTaskFirstIndices;
01075                         mTaskFirstIndices = NULL;
01076                 }
01077                 if (mTaskLastIndices)
01078                 {
01079                         delete [] mTaskLastIndices;
01080                         mTaskLastIndices = NULL;
01081                 }
01082 
01083                 mTaskIndexIncrement = 0;
01084         }
01085 }
01086 
01087 #endif

Generated on Fri Oct 30 16:29:01 2009 for Robot Simulator of the Robotics Group for Self-Organization of Control by  doxygen 1.4.7