Robot Simulator of the Robotics Group for Self-Organization of Control  0.8.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quickmp.h
Go to the documentation of this file.
1 /************************************************************************
2 * QuickMP *
3 * http://quickmp.sourceforge.net *
4 * Copyright (C) 2008 *
5 * Tyler Streeter (http://www.tylerstreeter.net) *
6 * *
7 * This library is free software; you can redistribute it and/or *
8 * modify it under the terms of EITHER: *
9 * (1) The GNU Lesser General Public License as published by the Free *
10 * Software Foundation; either version 2.1 of the License, or (at *
11 * your option) any later version. The text of the GNU Lesser *
12 * General Public License is included with this library in the *
13 * file license-LGPL.txt. *
14 * (2) The BSD-style license that is included with this library in *
15 * the file license-BSD.txt. *
16 * *
17 * This library is distributed in the hope that it will be useful, *
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the files *
20 * license-LGPL.txt and license-BSD.txt for more details. *
21 ************************************************************************/
22 
23 #ifndef QUICK_MP_H
24 #define QUICK_MP_H
25 
26 #include <stdint.h>
27 
28 // QuickMP (Quick Multi-Processing) is a simple cross-platform C++ API for
29 // generating parallel for loops in shared-memory programs, similar to
30 // OpenMP. It provides automatic scalable performance based on the number of
31 // available processors.
32 //
33 // Please visit the project website (http://quickprof.sourceforge.net)
34 // for usage instructions.
35 
36 // These macros generate a unique symbol name using the line number. (We
37 // must go through several helper macros to force full expansion of __LINE__.)
38 // The resulting symbols will be unique within a given file. Name collisions
39 // with other files can be avoided as long as the files don't include one
40 // another, or, if they do include one another, the symbols should be
41 // declared within local scopes that aren't seen by the other file.
42 #define QMP_UNIQUE_SYMBOL_HELPER2(prefix, line) prefix##_uniqueSymbol##line
43 #define QMP_UNIQUE_SYMBOL_HELPER1(prefix, line) QMP_UNIQUE_SYMBOL_HELPER2(prefix, line)
44 #define QMP_UNIQUE_SYMBOL(prefix) QMP_UNIQUE_SYMBOL_HELPER1(prefix, __LINE__)
45 
46 /// Defines the beginning of a parallel for loop. The arguments are the
47 /// name of the integer index variable (accessible within the loop), the
48 /// starting value of the index, the number of iterations to perform, and
49 /// (optionally) the schedule hint. The index counts up from the starting
50 /// value. The valid schedule hints are: quickmp::SEQUENTIAL (default,
51 /// better for equal-duration loop iterations; similar to OpenMP "static"
52 /// schedule with default (equal) chunk size) and quickmp::INTERLEAVED
53 /// (better for non-equal-duration loop iterations; similar to OpenMP
54 /// "static" schedule with chunk size 1).
55 // Design notes: In order to create an arbitrary function containing the
56 // user's for loop code (to be executed by the threads), we can define a
57 // class to contain the function. This class can be defined within any
58 // other function. The entire class method definition must be written
59 // inline within the class definition (i.e. we can't define the function
60 // at the end of a macro and let the user code write the {} brackets). This
61 // requires the use of two separate begin/end macros. The instances of each
62 // parallel section class should not go out of scope before they finish
63 // executing. We use a variadic macro here to allow an optional schedule hint
64 // argument.
65 #define QMP_PARALLEL_FOR(indexName, loopFirstIndex, ...) \
66 { \
67  qmp_internal::ParallelTaskManager::instance().setLoopIndices( \
68  loopFirstIndex, __VA_ARGS__); \
69  static class QMP_UNIQUE_SYMBOL(ParallelTaskSubclass) : \
70  public qmp_internal::ParallelTask \
71  { \
72  public: \
73  virtual void run(int QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex), \
74  int QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex), \
75  const unsigned int parallelForLoopThreadIndexUniqueSymbol, \
76  int QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
77  { \
78  for (int indexName = QMP_UNIQUE_SYMBOL(parallelForLoopFirstIndex); \
79  indexName <= QMP_UNIQUE_SYMBOL(parallelForLoopLastIndex); \
80  indexName += QMP_UNIQUE_SYMBOL(parallelForLoopIndexIncrement)) \
81  {
82 
83 /// Defines the end of a parallel for loop.
84 #define QMP_END_PARALLEL_FOR \
85  } \
86  } \
87  }QMP_UNIQUE_SYMBOL(Instance); \
88  qmp_internal::ParallelTaskManager::instance().process( \
89  &QMP_UNIQUE_SYMBOL(Instance)); \
90 }
91 
92 /// Specifies the number of threads to use in subsequent parallel for loops.
93 /// This is optional; without calling this, the system will use one thread
94 /// per processor. If used, this must be called outside any parallel for
95 /// loops. This can be called any number of times. This destroys and
96 /// creates the internal thread pool, which might take time, so use it
97 /// sparingly.
98 #define QMP_SET_NUM_THREADS(numThreads) \
99  qmp_internal::ParallelTaskManager::instance().setNumThreads(numThreads)
100 
101 /// Returns the number of threads currently being used. In sequential
102 /// code sections this returns 1; in parallel for loops this returns the
103 /// total number of threads allocated for use in parallel for loops.
104 #define QMP_GET_NUM_THREADS \
105  qmp_internal::ParallelTaskManager::instance().getNumThreads
106 
107 /// Returns the total number of threads allocated for use in all parallel
108 /// for loops.
109 #define QMP_GET_MAX_THREADS \
110  qmp_internal::ParallelTaskManager::instance().getMaxThreads
111 
112 /// The zero-based index of the current thread. This is only valid within
113 /// a parallel for loop code section. Note: this is not a function call
114 /// like most other macros (i.e. don't use () at the end).
115 #define QMP_THREAD_NUM parallelForLoopThreadIndexUniqueSymbol
116 
117 /// Returns the number of processors in the current machine at runtime.
118 #define QMP_GET_NUM_PROCS \
119  qmp_internal::ParallelTaskManager::instance().getNumProcessors
120 
121 /// Returns true if called within a parallel for loop and false otherwise.
122 #define QMP_IN_PARALLEL \
123  qmp_internal::ParallelTaskManager::instance().inParallel
124 
125 /// Defines the beginning of a critical section used for synchronization.
126 /// This is necessary to protect shared variables which are read and written
127 /// by multiple threads. The given id should be unique for each critical
128 /// section within a parallel for loop. Keep the ids low to avoid allocating
129 /// too many internal critical sections. Be very careful to use matching
130 /// ids for the begin and end.
131 #define QMP_CRITICAL \
132  qmp_internal::ParallelTaskManager::instance().criticalSectionBegin
133 
134 /// Defines the beginning of a critical section used for synchronization.
135 /// The given id must match the id given at the beginning of the critical
136 /// section. Keep the ids low to avoid allocating too many internal critical
137 /// sections. Be very careful to use matching ids for the begin and end.
138 #define QMP_END_CRITICAL \
139  qmp_internal::ParallelTaskManager::instance().criticalSectionEnd
140 
141 /// Defines a barrier routine used to synchronize threads. Each thread blocks
142 /// at the barrier until all threads have reached it. This can be expensive
143 /// and can often be avoided by splitting one parallel for loop into two.
144 #define QMP_BARRIER \
145  qmp_internal::ParallelTaskManager::instance().barrier
146 
147 /// Exposes the given variable to any parallel for loops later in the same
148 /// scope. The arguments are the variable's type and name. This must be
149 /// called outside the loop. The variable must remain valid as long as they
150 /// are being accessed by any loops. Statically-allocated arrays must be
151 /// given as pointers; for example, int myData[50] requires a pointer
152 /// int* myDataPtr = myData, then QMP_SHARE(myDataPtr), not QMP_SHARE(myData).
153 // Design notes: Within the parallel tasks later we can access variables
154 // outside the class definition only if they're static. So we must make a
155 // temporary static reference before the class definition. We must be sure to
156 // re-assign the static variable's value each time in case it changes, e.g.,
157 // if it's referring to a class variable, which would be different for each
158 // class instance. Statically-allocated arrays are problematic because the
159 // address-of operator returns the same thing as the variable itself; we
160 // could make a separate intermediate type-casted pointer variable, but we
161 // don't want to do that for everything; solution: just have the user pass
162 // in their own pointer.
163 #define QMP_SHARE(variableName) static void* variableName##_tempImportCopy = NULL; \
164  variableName##_tempImportCopy = (void*)&variableName;
165 
166 /// This provides access to the given variable within the parallel for loop,
167 /// which must have been exposed before the beginning of the loop. This must
168 /// be called within the loop. Statically-allocated arrays must be
169 /// given as pointers; for example, int myData[50] requires a pointer
170 /// int* myDataPtr = myData exposed via QMP_SHARE(myDataPtr) then accessed
171 /// via QMP_USE_SHARED(int*, myDataPtr).
172 // Design notes: Here we make a reference to the temporary static reference,
173 // which allows access to the original desired variable. Also, this 2-step
174 // process also allows us to maintain the same variable name as the original.
175 // We use a variadic macro, which allows a variable number of arguments, to
176 // handle types needing commas (e.g., std::map<int, int>) which would confuse
177 // the macro preprocessor.
178 #define QMP_USE_SHARED(variableName, ...) __VA_ARGS__& variableName = \
179  *((__VA_ARGS__*)variableName##_tempImportCopy);
180 
181 /// A namespace for symbols that are part of the public API.
182 namespace quickmp
183 {
184  /// Types of loop scheduling methods.
186  {
187  /// This is the default. It distributes loop iterations among threads
188  /// in large, equal chunks, similar to the OpenMP "static" scheduling
189  /// type with default (equal) chunk size. This is better for loops
190  /// with equal-duration iterations.
192 
193  /// Distributes loop iterations among threads in an interleaved
194  /// manner, similar to the OpenMP "static" scheduling type with
195  /// chunk size 1. This is better for loops with non-equal-duration
196  /// iterations.
198  };
199 }
200 
201 /// A namespace for internal data structures.
202 namespace qmp_internal
203 {
204  // Forward declaration.
205  struct PlatformThreadObjects;
206 
207  /// A base class for parallel task classes which are defined by a set
208  /// of macros.
210  {
211  public:
212  virtual ~ParallelTask(){}
213  /// The function which is executed by each thread with different
214  /// indices. The last index is included.
215  virtual void run(int firstIndex, int lastIndex,
216  const unsigned int threadIndex, int indexIncrement) = 0;
217  };
218 
219  /// A singleton class to manage parallel code tasks. This enables
220  /// automatic init on first use and destroy on exit.
222  {
223  public:
224  /// Provides access to the singleton instance.
225  inline static ParallelTaskManager& instance();
226 
227  /// Specifies the number of threads to use in subsequent parallel
228  /// for loops. If not called explicitly by the user, this will be
229  /// called with the default value (zero), which uses one thread per
230  /// processor. Can be called multiple times.
231  inline void setNumThreads(unsigned int numThreads=0);
232 
233  /// Returns the number of threads currently being used. In
234  /// sequential code sections this returns 1; in parallel for loops
235  /// this returns the total number of threads allocated for use in
236  /// parallel for loops.
237  inline unsigned int getNumThreads()const;
238 
239  /// Returns the total number of threads allocated for use in all
240  /// parallel for loops.
241  inline unsigned int getMaxThreads()const;
242 
243  /// Returns the number of processors in the current machine at runtime.
244  inline unsigned int getNumProcessors()const;
245 
246  /// Returns true if called within a parallel for loop and false
247  /// otherwise.
248  inline bool inParallel()const;
249 
250  /// Defines the range of the loop index. Assumes the index begins
251  /// at the first index and counts up. Internally, this sets the
252  /// loop indices to be used by each thread.
253  inline void setLoopIndices(int loopFirstIndex,
254  unsigned int numIterations, quickmp::ScheduleHint scheduleHint);
255 
256  /// Separate version which is used when no schedule hint is supplied.
257  inline void setLoopIndices(int loopFirstIndex, unsigned int numIterations);
258 
259  /// Unleashes the threads on the new task/loop.
260  inline void process(ParallelTask* task);
261 
262  /// Called by individual threads to process a subset of the loop
263  /// iterations.
264  inline void processSubset(unsigned int threadIndex);
265 
266  /// Defines the beginning of a critical section used for
267  /// synchronization. This is necessary to protect shared variables
268  /// which are read and written by multiple threads. The given id
269  /// should be unique for each critical section within a parallel for
270  /// loop. Keep the ids low to avoid allocating too many internal
271  /// critical sections.
272  inline void criticalSectionBegin(unsigned int id);
273 
274  /// Defines the end of a critical section used for synchronization.
275  /// The given id must match the id given at the beginning of the
276  /// critical section. Keep the ids low to avoid allocating too many
277  /// internal critical sections.
278  inline void criticalSectionEnd(unsigned int id);
279 
280  /// Defines a barrier routine used to synchronize threads. Each
281  /// thread blocks at the barrier until all threads have reached it.
282  inline void barrier();
283 
284  /// Provides access to the internal platform-specific data, like
285  /// thread handles and synchronization objects. This gives access to
286  /// these things to the thread function.
288 
289  /// Returns true if the main thread has requested the worker threads
290  /// to exit.
291  inline bool shouldWorkerThreadsExit()const;
292 
293  private:
294  inline ParallelTaskManager();
295 
296  inline ~ParallelTaskManager();
297 
298  /// Deallocates everything, closes threads, and returns the system
299  /// back to its uninitialized state.
300  inline void destroy();
301 
302  PlatformThreadObjects* mPlatform;
303  bool mInitialized;
304  bool mInParallelSection;
305  bool mShouldWorkerThreadsExit;
306  ParallelTask* mCurrentTask;
307  unsigned int mNumThreads;
308  unsigned int mBarrierCount;
309  int* mTaskFirstIndices;
310  int* mTaskLastIndices;
311  int mTaskIndexIncrement;
312  };
313 }
314 
315 //****************************************************************************
316 // If desired, this header file can be split into .h and .cpp files to speed
317 // up compilation. Simply move the remainder of this file (except the final
318 // #endif) into a separate .cpp file, and add #include "quickmp.h" to the top.
319 //****************************************************************************
320 
321 #if defined(WIN32) || defined(_WIN32) || defined (__WIN32) || defined(__WIN32__) \
322  || defined (_WIN64) || defined(__CYGWIN__) || defined(__MINGW32__)
323  #define QMP_USE_WINDOWS_THREADS
324  #include <windows.h>
325  #include <process.h>
326 #elif defined(__APPLE__)
327  #include <pthread.h>
328 
329  // Required to get number of processors on OS X using sysctlbyname.
330  #include <sys/sysctl.h>
331 #elif defined(unix) || defined(__unix) || defined(__unix__)
332  #include <pthread.h>
333 
334  // Required to get number of processors using get_nprocs_conf.
335  #include <sys/sysinfo.h>
336 #else
337  #error This development environment does not support pthreads or windows threads
338 #endif
339 
340 #include <iostream>
341 #include <vector>
342 
343 /// Assert macro.
344 #define QMP_ASSERT(condition)\
345 {\
346  if (!(condition))\
347  {\
348  std::cout << "[QuickMP] Assertion failed in " << __FUNCTION__ \
349  << "(line " << __LINE__ << "): assert(" << #condition << ")" \
350  << std::endl;\
351  ::exit(1);\
352  }\
353 }
354 
355 namespace qmp_internal
356 {
358  {
360  {
361 #ifdef QMP_USE_WINDOWS_THREADS
362  barrierEventToggle = false;
363  barrierEvent1 = NULL;
364  barrierEvent2 = NULL;
365  threadHandles = NULL;
366  threadIDs = NULL;
367 #else
368  threads = NULL;
369 #endif
370  }
371 
372 #ifdef QMP_USE_WINDOWS_THREADS
373  // Windows condition variables are only available in Vista and later,
374  // so we must resort to using events for the barrier.
375 
376  CRITICAL_SECTION barrierCriticalSection;
377  bool barrierEventToggle;
378  HANDLE barrierEvent1;
379  HANDLE barrierEvent2;
380  CRITICAL_SECTION csVectorCriticalSection;
381  std::vector<CRITICAL_SECTION*> userCriticalSections;
382  HANDLE* threadHandles;
383  DWORD* threadIDs;
384 #else
385  pthread_t* threads;
386  pthread_mutex_t barrierMutex;
387  pthread_cond_t barrierCondition;
388  pthread_mutex_t mutexVectorMutex;
389  std::vector<pthread_mutex_t*> userMutexes;
390 #endif
391  };
392 
393  /// The routine to be executed by the threads. Note: Windows threads
394  /// require a __stdcall routine.
395 #ifdef QMP_USE_WINDOWS_THREADS
396  inline unsigned __stdcall threadRoutine(void* threadIndex)
397 #else
398  inline void* threadRoutine(void* threadIndex)
399 #endif
400  {
401  // Note: if the program crashes or data gets corrupted during thread
402  // execution, one possible cause is that we exceeded the default
403  // thread stack size. Use thread functions to increase the stack size.
404 
405  // We cast to an unsigned long ints here because a void* on 64-bit
406  // machines is 64 bits long, and gcc won't cast a 64-bit void*
407  // directly to a 32-bit unsigned int.
408  unsigned int myIndex = (unsigned int)((uintptr_t)threadIndex);
409 
410  // Loop until this thread is canceled by the main thread, which only
411  // occurs when the program exits.
412  while (true)
413  {
414  // Between the barriers the main thread and worker threads are
415  // working on the loop iterations in parallel. (Compare with
416  // ParallelTaskManager::process.)
417 
419 
420  if (ParallelTaskManager::instance().shouldWorkerThreadsExit())
421  {
422  // Exit the thread.
423  break;
424  }
425  else
426  {
427  // Work on a subset of the loop.
429  }
430 
432  }
433 
434 #ifdef QMP_USE_WINDOWS_THREADS
435  // _endthreadex is called automatically after start_address finishes.
436  // We could call it manually, but that causes C++ destructors pending
437  // in the thread not to be called. In this case, though, we are
438  // having the main thread call TerminateThread on this thread when
439  // the program finishes.
440 
441  return 0;
442 #else
443  // pthreads are terminated in several ways: implicitly when their
444  // start routine finishes, when they call pthread_exit (which might
445  // not let C++ destructors be called), when another thread calls
446  // pthread_cancel on them, or when the entire process is terminated.
447  // (If main() finishes normally, the pthreads will terminate, but if
448  // it finishes with pthread_exit(), the pthreads will continue to
449  // run.) Here we are having the main thread call pthread_cancel on
450  // this thread when the program finishes.
451 
452  return NULL;
453 #endif
454  }
455 
457  {
458  static ParallelTaskManager self;
459  return self;
460  }
461 
462  void ParallelTaskManager::setNumThreads(unsigned int numThreads)
463  {
464  if (mInitialized)
465  {
466  destroy();
467  }
468 
469  if (0 == numThreads)
470  {
471  // By default, create one thread per processor.
472  numThreads = getNumProcessors();
473  }
474 
475  mNumThreads = numThreads;
476 
477  // We could either create n worker threads (and block the main thread
478  // while the workers are working), or use the main thread plus n-1
479  // workers. Here we are doing the latter.
480 
481  QMP_ASSERT(numThreads > 0);
482  unsigned int numWorkerThreads = numThreads - 1;
483 
484  mTaskFirstIndices = new int[numThreads];
485  mTaskLastIndices = new int[numThreads];
486  for (unsigned int i = 0; i < numThreads; ++i)
487  {
488  mTaskFirstIndices[i] = 0;
489  mTaskLastIndices[i] = 0;
490  }
491  mTaskIndexIncrement = 0;
492 
493  if (numThreads > 1)
494  {
495 #ifdef QMP_USE_WINDOWS_THREADS
496  InitializeCriticalSection(&mPlatform->barrierCriticalSection);
497 
498  // Create the synchronization events.
499  bool manualReset = true;
500  bool startSignaled = false;
501  mPlatform->barrierEvent1 = CreateEvent(NULL, manualReset,
502  startSignaled, NULL);
503  mPlatform->barrierEvent2 = CreateEvent(NULL, manualReset,
504  startSignaled, NULL);
505 
506  InitializeCriticalSection(&mPlatform->csVectorCriticalSection);
507 
508  // Note: The Windows C runtime functions _beginthreadex/_endthreadex
509  // are preferred over the Windows API BeginThread/EndThread functions.
510  // See this: http://support.microsoft.com/default.aspx/kb/104641
511  // Also, _beginthreadex is preferred over _beginthread because it
512  // is more flexible (and provides the same options as BeginThread).
513  // Also, make sure to link with multithreaded runtime libraries
514  // (single threaded runtime libraries were actually removed starting
515  // in VC 2005).
516 
517  // uintptr_t _beginthreadex(
518  // void* security,
519  // unsigned stack_size,
520  // unsigned (*start_address)(void*),
521  // void* arglist,
522  // unsigned initflag,
523  // unsigned* thrdaddr);
524  //
525  // Arguments:
526  // security: NULL means the returned thread handle cannot be
527  // inherited by child processes
528  // stack_size: 0 means use the same stack size as the main thread
529  // start_address: __stdcall or __clrcall routine, returns exit code
530  // arglist: arguments for start_address (or NULL)
531  // initflag: 0 for running, CREATE_SUSPENDED for suspended
532  // thrdaddr: receives thread ID, can be NULL if not needed
533  //
534  // Return value:
535  // Handle to the new thread (or 0 on error), used for synchronization
536 
537  mPlatform->threadHandles = new HANDLE[numThreads];
538  mPlatform->threadIDs = new DWORD[numThreads];
539  // The main thread (index 0) handle is not used.
540  mPlatform->threadHandles[0] = 0;
541  mPlatform->threadIDs[0] = GetCurrentThreadId();
542  for (unsigned int threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
543  {
544  mPlatform->threadHandles[threadIndex] =
545  (HANDLE)_beginthreadex(NULL, 0, threadRoutine,
546  (void*)threadIndex, 0, (unsigned int*)&mPlatform->
547  threadIDs[threadIndex]);
548  QMP_ASSERT(0 != mPlatform->threadHandles[threadIndex])
549  }
550 #else
551  // Create synchronization objects.
552  int returnCode = pthread_mutex_init(&mPlatform->barrierMutex, NULL);
553  QMP_ASSERT(0 == returnCode);
554  returnCode = pthread_cond_init(&mPlatform->barrierCondition, NULL);
555  QMP_ASSERT(0 == returnCode);
556  returnCode = pthread_mutex_init(&mPlatform->mutexVectorMutex, NULL);
557  QMP_ASSERT(0 == returnCode);
558 
559  // int pthread_create(pthread_t* thread, const pthread_attr_t* attr,
560  // void *(*start_routine)(void*), void* arg);
561  //
562  // Arguments:
563  // thread: pthread_t pointer for later access
564  // attr: thread attributes (NULL means use default attributes)
565  // start_routine: C-style functor for function to be executed
566  // arg: argument (void*) for start_routine
567  //
568  // Return value:
569  // Return code (non-zero means an error occurred)
570 
571  pthread_attr_t threadAttributes;
572  returnCode = pthread_attr_init(&threadAttributes);
573  QMP_ASSERT(0 == returnCode);
574  returnCode = pthread_attr_setdetachstate(&threadAttributes,
575  PTHREAD_CREATE_JOINABLE);
576  QMP_ASSERT(0 == returnCode);
577 
578  mPlatform->threads = new pthread_t[numThreads];
579  mPlatform->threads[0] = pthread_self();
580  for (uintptr_t threadIndex = 1; threadIndex <= numWorkerThreads; ++threadIndex)
581  {
582  returnCode = pthread_create(&mPlatform->threads[threadIndex],
583  &threadAttributes, threadRoutine, (void*)threadIndex);
584  QMP_ASSERT(0 == returnCode);
585  }
586 
587  returnCode = pthread_attr_destroy(&threadAttributes);
588  QMP_ASSERT(0 == returnCode);
589 #endif
590  }
591 
592  mInitialized = true;
593  }
594 
596  {
597  if (mInParallelSection)
598  {
599  return mNumThreads;
600  }
601  else
602  {
603  return 1;
604  }
605  }
606 
608  {
609  return mNumThreads;
610  }
611 
613  {
614 #ifdef QMP_USE_WINDOWS_THREADS
615  SYSTEM_INFO systemInfo;
616  GetSystemInfo(&systemInfo);
617  return (unsigned int)systemInfo.dwNumberOfProcessors;
618 #elif defined (__APPLE__)
619  int numProcessors = 0;
620  size_t size = sizeof(numProcessors);
621  int returnCode = sysctlbyname("hw.ncpu", &numProcessors, &size, NULL, 0);
622  if (0 != returnCode)
623  {
624  std::cout << "[QuickMP] WARNING: Cannot determine number of "
625  << "processors, defaulting to 1" << std::endl;
626  return 1;
627  }
628  else
629  {
630  return (unsigned int)numProcessors;
631  }
632 #else
633  // Methods for getting the number of processors:
634 
635  // POSIX systems: sysconf() queries system info; the constants
636  // _SC_NPROCESSORS_CONF and _SC_NPROCESSORS_ONLN are provided on many
637  // systems, but they aren't part of the POSIX standard; they're not
638  // available on Mac OS X.
639 
640  // The GNU C library provides get_nprocs_conf() (number configured)
641  // and get_nprocs() (number available) in <sys/sysinfo.h>, but these
642  // are not available on Mac OS X.
643 
644  // In each of these methods there is a way to get the number of
645  // processors configured, which stays constant between reboots, and
646  // the number of processors online (capable of running processes),
647  // which can change during the lifetime of the calling process is
648  // the OS decides to disable some processors.
649 
650  // We'll just assume we have access to all processors. (When setting
651  // the number of threads, we default to this value, but the user
652  // still has the option of setting any number of threads.)
653  return (unsigned int)get_nprocs_conf();
654 #endif
655  }
656 
658  {
659  return mInParallelSection;
660  }
661 
662  void ParallelTaskManager::setLoopIndices(int loopFirstIndex,
663  unsigned int numIterations, quickmp::ScheduleHint scheduleHint)
664  {
665  if (!mInitialized)
666  {
667  setNumThreads();
668  }
669 
670  if (1 == mNumThreads)
671  {
672  mTaskFirstIndices[0] = loopFirstIndex;
673  mTaskLastIndices[0] = loopFirstIndex + (int)numIterations - 1;
674  mTaskIndexIncrement = 1;
675  return;
676  }
677 
678  // We divide up the iterations as equally as possible among the
679  // threads. The difference between the heaviest and lightest loads
680  // should be no more than one iteration.
681 
682  switch(scheduleHint)
683  {
684  case quickmp::SEQUENTIAL:
685  {
686  // Similar to the OpenMP "static" scheduling type with default
687  // (equal) chunk size. Using large, sequential chunks is
688  // good if all iterations require equal durations. This
689  // reduces thread contention for overlapping memory locations
690  // because loops usually access sequential addresses.
691  unsigned int numIterationsPerThread = numIterations / mNumThreads;
692  unsigned int numRemainderIterations = numIterations % mNumThreads;
693  int currentFirstIndex = loopFirstIndex;
694  for (unsigned int i = 0; i < mNumThreads; ++i)
695  {
696  mTaskFirstIndices[i] = currentFirstIndex;
697 
698  // Distribute the remainder iterations.
699  unsigned int numIterationsForThisThread = numIterationsPerThread;
700  if (i < numRemainderIterations)
701  {
702  ++numIterationsForThisThread;
703  }
704 
705  // The last index represents the final iteration.
706  mTaskLastIndices[i] = currentFirstIndex +
707  (int)numIterationsForThisThread - 1;
708  currentFirstIndex = mTaskLastIndices[i] + 1;
709  }
710  mTaskIndexIncrement = 1;
711  break;
712  }
714  // Similar to the OpenMP "static" scheduling type with chunk
715  // size 1. If the iterations use unequal durations, it is
716  // better to interleave the iterations.
717  for (unsigned int i = 0; i < mNumThreads; ++i)
718  {
719  mTaskFirstIndices[i] = loopFirstIndex + i;
720  mTaskLastIndices[i] = loopFirstIndex + numIterations - 1;
721  }
722  mTaskIndexIncrement = mNumThreads;
723  break;
724  default:
725  break;
726  }
727  }
728 
729  void ParallelTaskManager::setLoopIndices(int loopFirstIndex,
730  unsigned int numIterations)
731  {
732  setLoopIndices(loopFirstIndex, numIterations, quickmp::SEQUENTIAL);
733  }
734 
736  {
737  mInParallelSection = true;
738  QMP_ASSERT(!mCurrentTask);
739  mCurrentTask = task;
740 
741  // Between the barriers the main thread and worker threads are
742  // working on the loop iterations in parallel. (Compare with the
743  // thread routine.)
744 
745  barrier();
746 
747  // Work on a subset of the loop.
748  processSubset(0);
749 
750  barrier();
751 
752  mCurrentTask = NULL;
753  mInParallelSection = false;
754  }
755 
756  void ParallelTaskManager::processSubset(unsigned int threadIndex)
757  {
758  mCurrentTask->run(mTaskFirstIndices[threadIndex],
759  mTaskLastIndices[threadIndex], threadIndex, mTaskIndexIncrement);
760  }
761 
763  {
764  // Avoid synchronization if there are no worker threads.
765  if (mNumThreads < 2)
766  {
767  return;
768  }
769 
770  // Dynamically allocate new synchronization objects on first usage
771  // up to the given id.
772 
773 #ifdef QMP_USE_WINDOWS_THREADS
774  if (id >= mPlatform->userCriticalSections.size())
775  {
776  // Protect against extra allocations by other threads.
777  EnterCriticalSection(&mPlatform->csVectorCriticalSection);
778  while (id >= mPlatform->userCriticalSections.size())
779  {
780  CRITICAL_SECTION* cs = new CRITICAL_SECTION;
781  mPlatform->userCriticalSections.push_back(cs);
782  InitializeCriticalSection(cs);
783  }
784  LeaveCriticalSection(&mPlatform->csVectorCriticalSection);
785  }
786  EnterCriticalSection(mPlatform->userCriticalSections[id]);
787 #else
788  if (id >= mPlatform->userMutexes.size())
789  {
790  // Protect against extra allocations by other threads.
791  int returnCode = pthread_mutex_lock(&mPlatform->mutexVectorMutex);
792  QMP_ASSERT(0 == returnCode);
793  while (id >= mPlatform->userMutexes.size())
794  {
795  pthread_mutex_t* mutex = new pthread_mutex_t;
796  mPlatform->userMutexes.push_back(mutex);
797  returnCode = pthread_mutex_init(mutex, NULL);
798  QMP_ASSERT(0 == returnCode);
799 
800  }
801  returnCode = pthread_mutex_unlock(&mPlatform->mutexVectorMutex);
802  QMP_ASSERT(0 == returnCode);
803  }
804  int returnCode = pthread_mutex_lock(mPlatform->userMutexes[id]);
805  QMP_ASSERT(0 == returnCode);
806 #endif
807  }
808 
810  {
811  // Avoid synchronization if there are no worker threads.
812  if (mNumThreads < 2)
813  {
814  return;
815  }
816 
817 #ifdef QMP_USE_WINDOWS_THREADS
818  if (id >= mPlatform->userCriticalSections.size())
819  {
820  std::cout << "[QuickMP] WARNING: Critical section 'end' (id="
821  << id << ") has no matching 'begin'" << std::endl;
822  }
823  else
824  {
825  LeaveCriticalSection(mPlatform->userCriticalSections[id]);
826  }
827 #else
828  if (id >= mPlatform->userMutexes.size())
829  {
830  std::cout << "[QuickMP] WARNING: Critical section 'end' (id="
831  << id << ") has no matching 'begin'" << std::endl;
832  }
833  else
834  {
835  int returnCode = pthread_mutex_unlock(mPlatform->userMutexes[id]);
836  QMP_ASSERT(0 == returnCode);
837  }
838 #endif
839  }
840 
842  {
843  // Avoid synchronization if there are no worker threads.
844  if (mNumThreads < 2)
845  {
846  return;
847  }
848 
849  // Lock access to the shared variables.
850 #ifdef QMP_USE_WINDOWS_THREADS
851  EnterCriticalSection(&mPlatform->barrierCriticalSection);
852 #else
853  int returnCode = pthread_mutex_lock(&mPlatform->barrierMutex);
854  QMP_ASSERT(0 == returnCode);
855 #endif
856 
857  ++mBarrierCount;
858  if (mBarrierCount == mNumThreads)
859  {
860  // All threads have reached the barrier. First we must reset
861  // the count. Then we signal all threads to continue.
862  mBarrierCount = 0;
863 
864 #ifdef QMP_USE_WINDOWS_THREADS
865  // Use alternating events for every other barrier to avoid race
866  // conditions; otherwise, a fast thread might reach the next
867  // barrier and reset the event before the others get unblocked.
868  if (mPlatform->barrierEventToggle)
869  {
870  SetEvent(mPlatform->barrierEvent1);
871  }
872  else
873  {
874  SetEvent(mPlatform->barrierEvent2);
875  }
876  mPlatform->barrierEventToggle = !mPlatform->barrierEventToggle;
877  LeaveCriticalSection(&mPlatform->barrierCriticalSection);
878 #else
879  // This must be called while the mutex is locked. We must
880  // unlock the mutex afterwards in order to unblock the waiting
881  // threads.
882  returnCode = pthread_cond_broadcast(&mPlatform->barrierCondition);
883  QMP_ASSERT(0 == returnCode);
884  returnCode = pthread_mutex_unlock(&mPlatform->barrierMutex);
885  QMP_ASSERT(0 == returnCode);
886 #endif
887  }
888  else
889  {
890  // Wait until all threads have reached the barrier.
891 
892 #ifdef QMP_USE_WINDOWS_THREADS
893  // The first thread here must reset the event.
894  if (1 == mBarrierCount)
895  {
896  if (mPlatform->barrierEventToggle)
897  {
898  ResetEvent(mPlatform->barrierEvent1);
899  }
900  else
901  {
902  ResetEvent(mPlatform->barrierEvent2);
903  }
904  }
905 
906  if (mPlatform->barrierEventToggle)
907  {
908  LeaveCriticalSection(&mPlatform->barrierCriticalSection);
909  WaitForSingleObject(mPlatform->barrierEvent1, INFINITE);
910  }
911  else
912  {
913  LeaveCriticalSection(&mPlatform->barrierCriticalSection);
914  WaitForSingleObject(mPlatform->barrierEvent2, INFINITE);
915  }
916 #else
917  // This must be called while the mutex is locked. It unlocks
918  // the mutex while waiting and locks it again when finished.
919  returnCode = pthread_cond_wait(&mPlatform->barrierCondition,
920  &mPlatform->barrierMutex);
921  QMP_ASSERT(0 == returnCode);
922  returnCode = pthread_mutex_unlock(&mPlatform->barrierMutex);
923  QMP_ASSERT(0 == returnCode);
924 #endif
925  }
926  }
927 
929  {
930  return mPlatform;
931  }
932 
934  {
935  return mShouldWorkerThreadsExit;
936  }
937 
938  ParallelTaskManager::ParallelTaskManager()
939  {
940  mPlatform = new PlatformThreadObjects();
941  mInitialized = false;
942  mInParallelSection = false;
943  mShouldWorkerThreadsExit = false;
944  mCurrentTask = NULL;
945  mNumThreads = 0;
946  mBarrierCount = 0;
947  mTaskFirstIndices = NULL;
948  mTaskLastIndices = NULL;
949  mTaskIndexIncrement = 0;
950  }
951 
952  ParallelTaskManager::~ParallelTaskManager()
953  {
954  // This is called when the program exits because the singleton
955  // instance is static.
956 
957  destroy();
958  delete mPlatform;
959  }
960 
961  void ParallelTaskManager::destroy()
962  {
963  // If ::exit is called within a worker thread, things get a little
964  // messy: we can't be expected to close all the worker threads
965  // from within one of the worker threads because control would never
966  // return to the main thread. We just have to quit without
967  // deallocating everything, which shouldn't be a big problem because
968  // the process is quitting anyway.
969 #ifdef QMP_USE_WINDOWS_THREADS
970  if (mNumThreads > 1 && GetCurrentThreadId() != mPlatform->threadIDs[0])
971 #else
972  if (mNumThreads > 1 && !pthread_equal(pthread_self(), mPlatform->threads[0]))
973 #endif
974  {
975  return;
976  }
977 
978  // Clean up worker threads and synchronization objects.
979  if (mNumThreads > 1)
980  {
981  // Signal the worker threads to exit, and wait until they're
982  // finished. At this point all the worker threads are waiting
983  // at the first barrier.
984  mShouldWorkerThreadsExit = true;
985  barrier();
986 
987 #ifdef QMP_USE_WINDOWS_THREADS
988  // Wait for all thread handles to become signaled, indicating that
989  // the threads have exited. Note: WaitForMultipleObjects would be
990  // ideal here, but it only supports up to MAXIMUM_WAIT_OBJECTS
991  // threads.
992  for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
993  {
994  DWORD returnCode = WaitForSingleObject(mPlatform->
995  threadHandles[threadIndex], INFINITE);
996  QMP_ASSERT(WAIT_OBJECT_0 == returnCode);
997  }
998 #else
999  // Call pthread_join on all worker threads, which blocks until the
1000  // thread exits.
1001  for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
1002  {
1003  int returnCode = pthread_join(mPlatform->threads[threadIndex], NULL);
1004  QMP_ASSERT(0 == returnCode);
1005  }
1006 #endif
1007 
1008  // Clean up platform-specific objects, and return everything to its
1009  // original state in case we're resetting the number of threads.
1010 #ifdef QMP_USE_WINDOWS_THREADS
1011  DeleteCriticalSection(&mPlatform->barrierCriticalSection);
1012 
1013  mPlatform->barrierEventToggle = false;
1014 
1015  BOOL returnCode2 = CloseHandle(mPlatform->barrierEvent1);
1016  QMP_ASSERT(0 != returnCode2);
1017  mPlatform->barrierEvent1 = NULL;
1018 
1019  returnCode2 = CloseHandle(mPlatform->barrierEvent2);
1020  QMP_ASSERT(0 != returnCode2);
1021  mPlatform->barrierEvent2 = NULL;
1022 
1023  DeleteCriticalSection(&mPlatform->csVectorCriticalSection);
1024 
1025  // The main thread (index 0) handle is not used.
1026  for (unsigned int threadIndex = 1; threadIndex < mNumThreads; ++threadIndex)
1027  {
1028  int returnCode = CloseHandle(mPlatform->
1029  threadHandles[threadIndex]);
1030  QMP_ASSERT(0 != returnCode);
1031  }
1032  delete [] mPlatform->threadHandles;
1033  mPlatform->threadHandles = NULL;
1034 
1035  delete [] mPlatform->threadIDs;
1036  mPlatform->threadIDs = NULL;
1037 
1038  while (!mPlatform->userCriticalSections.empty())
1039  {
1040  DeleteCriticalSection(mPlatform->userCriticalSections.back());
1041  delete mPlatform->userCriticalSections.back();
1042  mPlatform->userCriticalSections.pop_back();
1043  }
1044 #else
1045  delete[] mPlatform->threads;
1046  mPlatform->threads = NULL;
1047 
1048  int returnCode = pthread_mutex_destroy(&mPlatform->barrierMutex);
1049  QMP_ASSERT(0 == returnCode);
1050 
1051  returnCode = pthread_cond_destroy(&mPlatform->barrierCondition);
1052  QMP_ASSERT(0 == returnCode);
1053 
1054  returnCode = pthread_mutex_destroy(&mPlatform->mutexVectorMutex);
1055  QMP_ASSERT(0 == returnCode);
1056 
1057  while (!mPlatform->userMutexes.empty())
1058  {
1059  int returnCode = pthread_mutex_destroy(mPlatform->userMutexes.back());
1060  QMP_ASSERT(0 == returnCode);
1061  delete mPlatform->userMutexes.back();
1062  mPlatform->userMutexes.pop_back();
1063  }
1064 #endif
1065  }
1066 
1067  mInitialized = false;
1068  mInParallelSection = false;
1069  mShouldWorkerThreadsExit = false;
1070  mCurrentTask = NULL;
1071  mNumThreads = 0;
1072  mBarrierCount = 0;
1073 
1074  if (mTaskFirstIndices)
1075  {
1076  delete [] mTaskFirstIndices;
1077  mTaskFirstIndices = NULL;
1078  }
1079  if (mTaskLastIndices)
1080  {
1081  delete [] mTaskLastIndices;
1082  mTaskLastIndices = NULL;
1083  }
1084 
1085  mTaskIndexIncrement = 0;
1086  }
1087 }
1088 
1089 #endif
pthread_cond_t barrierCondition
Definition: quickmp.h:387
unsigned int getNumProcessors() const
Returns the number of processors in the current machine at runtime.
Definition: quickmp.h:612
Definition: quickmp.h:357
This is the default.
Definition: quickmp.h:191
Distributes loop iterations among threads in an interleaved manner, similar to the OpenMP "static" sc...
Definition: quickmp.h:197
void barrier()
Defines a barrier routine used to synchronize threads.
Definition: quickmp.h:841
bool shouldWorkerThreadsExit() const
Returns true if the main thread has requested the worker threads to exit.
Definition: quickmp.h:933
void processSubset(unsigned int threadIndex)
Called by individual threads to process a subset of the loop iterations.
Definition: quickmp.h:756
void setLoopIndices(int loopFirstIndex, unsigned int numIterations, quickmp::ScheduleHint scheduleHint)
Defines the range of the loop index.
Definition: quickmp.h:662
void criticalSectionEnd(unsigned int id)
Defines the end of a critical section used for synchronization.
Definition: quickmp.h:809
static ParallelTaskManager & instance()
Provides access to the singleton instance.
Definition: quickmp.h:456
unsigned int getMaxThreads() const
Returns the total number of threads allocated for use in all parallel for loops.
Definition: quickmp.h:607
unsigned int getNumThreads() const
Returns the number of threads currently being used.
Definition: quickmp.h:595
pthread_mutex_t mutexVectorMutex
Definition: quickmp.h:388
virtual ~ParallelTask()
Definition: quickmp.h:212
A base class for parallel task classes which are defined by a set of macros.
Definition: quickmp.h:209
pthread_mutex_t barrierMutex
Definition: quickmp.h:386
PlatformThreadObjects * getPlatformThreadObjects()
Provides access to the internal platform-specific data, like thread handles and synchronization objec...
Definition: quickmp.h:928
std::vector< pthread_mutex_t * > userMutexes
Definition: quickmp.h:389
void * threadRoutine(void *threadIndex)
The routine to be executed by the threads.
Definition: quickmp.h:398
bool inParallel() const
Returns true if called within a parallel for loop and false otherwise.
Definition: quickmp.h:657
ScheduleHint
Types of loop scheduling methods.
Definition: quickmp.h:185
void criticalSectionBegin(unsigned int id)
Defines the beginning of a critical section used for synchronization.
Definition: quickmp.h:762
virtual void run(int firstIndex, int lastIndex, const unsigned int threadIndex, int indexIncrement)=0
The function which is executed by each thread with different indices.
void setNumThreads(unsigned int numThreads=0)
Specifies the number of threads to use in subsequent parallel for loops.
Definition: quickmp.h:462
pthread_t * threads
Definition: quickmp.h:385
A singleton class to manage parallel code tasks.
Definition: quickmp.h:221
#define QMP_ASSERT(condition)
Assert macro.
Definition: quickmp.h:344
void process(ParallelTask *task)
Unleashes the threads on the new task/loop.
Definition: quickmp.h:735
PlatformThreadObjects()
Definition: quickmp.h:359