Mixe for Privacy and Anonymity in the Internet
Public Member Functions | Private Attributes | Friends | List of all members
CAThreadPool Class Reference

This class bla bla. More...

#include <CAThreadPool.hpp>

Collaboration diagram for CAThreadPool:

Public Member Functions

 CAThreadPool (UINT32 num_worker_threads, UINT32 max_queue_size, bool b_do_not_block_when_full)
 
 ~CAThreadPool ()
 
SINT32 destroy (bool bWaitForFinish)
 
SINT32 addRequest (THREAD_MAIN_TYP, void *args)
 Adds a new request (task) to this threadpool. More...
 
UINT32 countRequests ()
 

Private Attributes

UINT32 m_NumThreads
 
UINT32 m_MaxQueueSize
 
bool m_bDoNotBlockWhenFull
 
CAThread ** m_parThreads
 
volatile UINT32 m_CurQueueSize
 
tpool_work_tm_pQueueHead
 
tpool_work_tm_pQueueTail
 
volatile bool m_bQueueClosed
 
volatile bool m_bShutdown
 
CAMutexm_pmutexQueue
 
CAConditionVariablem_pcondNotEmpty
 
CAConditionVariablem_pcondNotFull
 
CAConditionVariablem_pcondEmpty
 

Friends

THREAD_RETURN worker_thread_main_loop (void *args)
 

Detailed Description

This class bla bla.

Definition at line 31 of file CAThreadPool.hpp.

Constructor & Destructor Documentation

◆ CAThreadPool()

CAThreadPool::CAThreadPool ( UINT32  num_worker_threads,
UINT32  max_queue_size,
bool  b_do_not_block_when_full 
)

Definition at line 13 of file CAThreadPool.cpp.

16  {
17  UINT i;
18 
19  /* initialize fields */
20  m_NumThreads = num_worker_threads;
21  m_MaxQueueSize = max_queue_size;
22  m_bDoNotBlockWhenFull = b_do_not_block_when_full;
23  m_parThreads=new CAThread*[num_worker_threads];
24  m_CurQueueSize = 0;
25  m_pQueueHead = NULL;
26  m_pQueueTail = NULL;
27  m_bQueueClosed = false;
28  m_bShutdown = false;
29  m_pmutexQueue=new CAMutex();
33 
34  char thread_str[24];
35 
36  /* create threads */
37  for (i = 0; i != num_worker_threads; i++)
38  {
39  snprintf(thread_str, 16, "Pool Thread %3d", i);
40  m_parThreads[i]=new CAThread((UINT8*)thread_str);
42  m_parThreads[i]->start(this);
43  }
44  }
unsigned int UINT
Definition: basetypedefs.h:155
unsigned char UINT8
Definition: basetypedefs.h:135
SINT32 start(void *param, bool bDaemon=false, bool bSilent=false)
Starts the execution of the main function of this thread.
Definition: CAThread.cpp:115
SINT32 setMainLoop(THREAD_MAIN_TYP fnc)
Sets the main function which will be executed within this thread.
Definition: CAThread.hpp:148
CAThread ** m_parThreads
volatile bool m_bShutdown
CAConditionVariable * m_pcondNotFull
CAMutex * m_pmutexQueue
friend THREAD_RETURN worker_thread_main_loop(void *args)
tpool_work_t * m_pQueueTail
volatile bool m_bQueueClosed
UINT32 m_NumThreads
CAConditionVariable * m_pcondNotEmpty
volatile UINT32 m_CurQueueSize
bool m_bDoNotBlockWhenFull
UINT32 m_MaxQueueSize
tpool_work_t * m_pQueueHead
CAConditionVariable * m_pcondEmpty

References m_bDoNotBlockWhenFull, m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_MaxQueueSize, m_NumThreads, m_parThreads, m_pcondEmpty, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, m_pQueueTail, CAThread::setMainLoop(), CAThread::start(), and worker_thread_main_loop.

Here is the call graph for this function:

◆ ~CAThreadPool()

CAThreadPool::~CAThreadPool ( )
inline

Definition at line 37 of file CAThreadPool.hpp.

38  {
39  destroy(true);
40  }
SINT32 destroy(bool bWaitForFinish)

References destroy().

Here is the call graph for this function:

Member Function Documentation

◆ addRequest()

SINT32 CAThreadPool::addRequest ( THREAD_MAIN_TYP  routine,
void *  args 
)

Adds a new request (task) to this threadpool.

Return values
E_SPACeif there was no more space in the waiting queue and we do not want to wait for an other request to finish
E_SUCCESSif this request was added to the working queue

Definition at line 51 of file CAThreadPool.cpp.

52 {
54  tpool_work_t *workp;
55 
56  // no space and this caller doesn't want to wait
58  {
60  return E_SPACE;
61  }
62 
63  while((m_CurQueueSize == m_MaxQueueSize) &&
64  (!(m_bShutdown || m_bQueueClosed)) )
65  {
66  CAMsg::printMsg(LOG_INFO,"CAThreadPool::addRequest() -the Thread pool is full...waiting!\n");
68  }
69 
70  // the pool is in the process of being destroyed
72  {
74  return E_UNKNOWN;
75  }
76 
77  // allocate work structure
78  workp = new tpool_work_t;
79  workp->routine = routine;
80  workp->arg = args;
81  workp->next = NULL;
82 
83  if (m_CurQueueSize == 0)
84  {
85  m_pQueueTail = m_pQueueHead = workp;
87  }
88  else
89  {
90  m_pQueueTail->next = workp;
91  m_pQueueTail = workp;
92  }
93 
94  m_CurQueueSize++;
96  return E_SUCCESS;
97 }
struct tpool_work tpool_work_t
SINT32 broadcast()
Signals this object.
SINT32 wait()
Waits for a signal or for a timeout.
static SINT32 printMsg(UINT32 typ, const char *format,...)
Writes a given message to the log.
Definition: CAMsg.cpp:251
SINT32 unlock()
Definition: CAMutex.hpp:52
SINT32 lock()
Definition: CAMutex.hpp:41
const SINT32 E_SUCCESS
Definition: errorcodes.hpp:2
#define E_SPACE
Definition: errorcodes.hpp:7
#define E_UNKNOWN
Definition: errorcodes.hpp:3
THREAD_MAIN_TYP routine
struct tpool_work * next

References tpool_work::arg, CAConditionVariable::broadcast(), E_SPACE, E_SUCCESS, E_UNKNOWN, CAMutex::lock(), m_bDoNotBlockWhenFull, m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_MaxQueueSize, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, m_pQueueTail, tpool_work::next, CAMsg::printMsg(), tpool_work::routine, CAMutex::unlock(), and CAConditionVariable::wait().

Referenced by CALastMixA::loop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ countRequests()

UINT32 CAThreadPool::countRequests ( )
inline

Definition at line 46 of file CAThreadPool.hpp.

47  {
48  return m_CurQueueSize;
49  }

References m_CurQueueSize.

◆ destroy()

SINT32 CAThreadPool::destroy ( bool  bWaitForFinish)

Definition at line 99 of file CAThreadPool.cpp.

100  {
101  tpool_work_t *cur_nodep;
102  // Is a shutdown already in progress?
104  {
105  return E_SUCCESS;
106  }
107 
108  m_pmutexQueue->lock();
109  m_bQueueClosed = true;
110  // If the finish flag is set, wait for workers to
111  // drain queue
112  if (bWaitForFinish)
113  {
114  while (m_CurQueueSize != 0)
115  {
117  }
118  }
119 
120  m_bShutdown = true;
122 
123  // Wake up any workers so they recheck shutdown flag
126 
127  // Wait for workers to exit
128  for(UINT32 i=0; i < m_NumThreads; i++)
129  {
130  m_parThreads[i]->join();
131  delete m_parThreads[i];
132  m_parThreads[i] = NULL;
133  }
134  // Now free pool structures
135  delete[] m_parThreads;
136  m_parThreads = NULL;
137  while(m_pQueueHead != NULL)
138  {
139  cur_nodep = m_pQueueHead->next;
140  delete m_pQueueHead;
141  m_pQueueHead = cur_nodep;
142  }
143  delete m_pmutexQueue;
144  m_pmutexQueue = NULL;
145  delete m_pcondEmpty;
146  m_pcondEmpty = NULL;
147  delete m_pcondNotEmpty;
148  m_pcondNotEmpty = NULL;
149  delete m_pcondNotFull;
150  m_pcondNotFull = NULL;
151 
152  return E_SUCCESS;
153  }
unsigned int UINT32
Definition: basetypedefs.h:131
SINT32 join()
Waits for the main function to finish execution.
Definition: CAThread.cpp:187

References CAConditionVariable::broadcast(), E_SUCCESS, CAThread::join(), CAMutex::lock(), m_bQueueClosed, m_bShutdown, m_CurQueueSize, m_NumThreads, m_parThreads, m_pcondEmpty, m_pcondNotEmpty, m_pcondNotFull, m_pmutexQueue, m_pQueueHead, tpool_work::next, CAMutex::unlock(), and CAConditionVariable::wait().

Referenced by CALastMixA::loop(), CAFirstMixB::loop(), and ~CAThreadPool().

Here is the call graph for this function:
Here is the caller graph for this function:

Friends And Related Function Documentation

◆ worker_thread_main_loop

THREAD_RETURN worker_thread_main_loop ( void *  args)
friend

Definition at line 155 of file CAThreadPool.cpp.

156 {
157  CAThreadPool* pPool = (CAThreadPool*)arg;
158  tpool_work_t *my_workp;
159 
160  for(;;)
161  {
162  // Check queue for work
163  pPool->m_pmutexQueue->lock();
164  while ((pPool->m_CurQueueSize == 0) && (!pPool->m_bShutdown))
165  {
166  pPool->m_pcondNotEmpty->wait(pPool->m_pmutexQueue);
167  }
168  //sSleep(5);
169  // Has a shutdown started while i was sleeping?
170  if (pPool->m_bShutdown)
171  {
172  pPool->m_pmutexQueue->unlock();
174  }
175 
176  // Get to work, dequeue the next item
177  my_workp = pPool->m_pQueueHead;
178  pPool->m_CurQueueSize--;
179  if (pPool->m_CurQueueSize == 0)
180  pPool->m_pQueueHead = pPool->m_pQueueTail = NULL;
181  else
182  pPool->m_pQueueHead = my_workp->next;
183 
184  // Handle waiting add_work threads
185  if ((!pPool->m_bDoNotBlockWhenFull) &&
186  (pPool->m_CurQueueSize == (pPool->m_MaxQueueSize - 1)))
187  pPool->m_pcondNotFull->broadcast();
188  // Handle waiting destroyer threads
189  if (pPool->m_CurQueueSize == 0)
190  pPool->m_pcondEmpty->signal();
191  pPool->m_pmutexQueue->unlock();
192 
193  // Do this work item
194  (*(my_workp->routine))(my_workp->arg);
195  delete my_workp;
196  my_workp = NULL;
197  }
198 }
#define THREAD_RETURN_SUCCESS
Definition: StdAfx.h:542
SINT32 signal()
Signals this object.
This class bla bla.

Referenced by CAThreadPool().

Member Data Documentation

◆ m_bDoNotBlockWhenFull

bool CAThreadPool::m_bDoNotBlockWhenFull
private

Definition at line 55 of file CAThreadPool.hpp.

Referenced by addRequest(), and CAThreadPool().

◆ m_bQueueClosed

volatile bool CAThreadPool::m_bQueueClosed
private

Definition at line 61 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

◆ m_bShutdown

volatile bool CAThreadPool::m_bShutdown
private

Definition at line 62 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

◆ m_CurQueueSize

volatile UINT32 CAThreadPool::m_CurQueueSize
private

Definition at line 58 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), countRequests(), and destroy().

◆ m_MaxQueueSize

UINT32 CAThreadPool::m_MaxQueueSize
private

Definition at line 54 of file CAThreadPool.hpp.

Referenced by addRequest(), and CAThreadPool().

◆ m_NumThreads

UINT32 CAThreadPool::m_NumThreads
private

Definition at line 53 of file CAThreadPool.hpp.

Referenced by CAThreadPool(), and destroy().

◆ m_parThreads

CAThread** CAThreadPool::m_parThreads
private

Definition at line 57 of file CAThreadPool.hpp.

Referenced by CAThreadPool(), and destroy().

◆ m_pcondEmpty

CAConditionVariable* CAThreadPool::m_pcondEmpty
private

Definition at line 67 of file CAThreadPool.hpp.

Referenced by CAThreadPool(), and destroy().

◆ m_pcondNotEmpty

CAConditionVariable* CAThreadPool::m_pcondNotEmpty
private

Definition at line 65 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

◆ m_pcondNotFull

CAConditionVariable* CAThreadPool::m_pcondNotFull
private

Definition at line 66 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

◆ m_pmutexQueue

CAMutex* CAThreadPool::m_pmutexQueue
private

Definition at line 64 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

◆ m_pQueueHead

tpool_work_t* CAThreadPool::m_pQueueHead
private

Definition at line 59 of file CAThreadPool.hpp.

Referenced by addRequest(), CAThreadPool(), and destroy().

◆ m_pQueueTail

tpool_work_t* CAThreadPool::m_pQueueTail
private

Definition at line 60 of file CAThreadPool.hpp.

Referenced by addRequest(), and CAThreadPool().


The documentation for this class was generated from the following files: