Mixe for Privacy and Anonymity in the Internet
CAThreadPool.cpp
Go to the documentation of this file.
1 /********************************************************
2  * A Thread pool class inspired by:
3  * "Using POSIX Threads: Programming with Pthreads"
4  * by Brad nichols, Dick Buttlar, Jackie Farrell
5  * O'Reilly & Associates, Inc.
6  */
7 #include "StdAfx.h"
8 #if !defined ONLY_LOCAL_PROXY || defined INCLUDE_FIRST_MIX
9 #include "CAThreadPool.hpp"
10 #include "CAMsg.hpp"
11 void *tpool_thread(void *);
12 
13 CAThreadPool::CAThreadPool( UINT32 num_worker_threads,
14  UINT32 max_queue_size,
15  bool b_do_not_block_when_full)
16  {
17  UINT i;
18 
19  /* initialize fields */
20  m_NumThreads = num_worker_threads;
21  m_MaxQueueSize = max_queue_size;
22  m_bDoNotBlockWhenFull = b_do_not_block_when_full;
23  m_parThreads=new CAThread*[num_worker_threads];
24  m_CurQueueSize = 0;
25  m_pQueueHead = NULL;
26  m_pQueueTail = NULL;
27  m_bQueueClosed = false;
28  m_bShutdown = false;
29  m_pmutexQueue=new CAMutex();
33 
34  char thread_str[24];
35 
36  /* create threads */
37  for (i = 0; i != num_worker_threads; i++)
38  {
39  snprintf(thread_str, 16, "Pool Thread %3d", i);
40  m_parThreads[i]=new CAThread((UINT8*)thread_str);
42  m_parThreads[i]->start(this);
43  }
44  }
45 
52 {
54  tpool_work_t *workp;
55 
56  // no space and this caller doesn't want to wait
58  {
60  return E_SPACE;
61  }
62 
63  while((m_CurQueueSize == m_MaxQueueSize) &&
64  (!(m_bShutdown || m_bQueueClosed)) )
65  {
66  CAMsg::printMsg(LOG_INFO,"CAThreadPool::addRequest() -the Thread pool is full...waiting!\n");
68  }
69 
70  // the pool is in the process of being destroyed
72  {
74  return E_UNKNOWN;
75  }
76 
77  // allocate work structure
78  workp = new tpool_work_t;
79  workp->routine = routine;
80  workp->arg = args;
81  workp->next = NULL;
82 
83  if (m_CurQueueSize == 0)
84  {
85  m_pQueueTail = m_pQueueHead = workp;
87  }
88  else
89  {
90  m_pQueueTail->next = workp;
91  m_pQueueTail = workp;
92  }
93 
94  m_CurQueueSize++;
96  return E_SUCCESS;
97 }
98 
99 SINT32 CAThreadPool::destroy(bool bWaitForFinish)
100  {
101  tpool_work_t *cur_nodep;
102  // Is a shutdown already in progress?
104  {
105  return E_SUCCESS;
106  }
107 
108  m_pmutexQueue->lock();
109  m_bQueueClosed = true;
110  // If the finish flag is set, wait for workers to
111  // drain queue
112  if (bWaitForFinish)
113  {
114  while (m_CurQueueSize != 0)
115  {
117  }
118  }
119 
120  m_bShutdown = true;
122 
123  // Wake up any workers so they recheck shutdown flag
126 
127  // Wait for workers to exit
128  for(UINT32 i=0; i < m_NumThreads; i++)
129  {
130  m_parThreads[i]->join();
131  delete m_parThreads[i];
132  m_parThreads[i] = NULL;
133  }
134  // Now free pool structures
135  delete[] m_parThreads;
136  m_parThreads = NULL;
137  while(m_pQueueHead != NULL)
138  {
139  cur_nodep = m_pQueueHead->next;
140  delete m_pQueueHead;
141  m_pQueueHead = cur_nodep;
142  }
143  delete m_pmutexQueue;
144  m_pmutexQueue = NULL;
145  delete m_pcondEmpty;
146  m_pcondEmpty = NULL;
147  delete m_pcondNotEmpty;
148  m_pcondNotEmpty = NULL;
149  delete m_pcondNotFull;
150  m_pcondNotFull = NULL;
151 
152  return E_SUCCESS;
153  }
154 
156 {
157  CAThreadPool* pPool = (CAThreadPool*)arg;
158  tpool_work_t *my_workp;
159 
160  for(;;)
161  {
162  // Check queue for work
163  pPool->m_pmutexQueue->lock();
164  while ((pPool->m_CurQueueSize == 0) && (!pPool->m_bShutdown))
165  {
166  pPool->m_pcondNotEmpty->wait(pPool->m_pmutexQueue);
167  }
168  //sSleep(5);
169  // Has a shutdown started while i was sleeping?
170  if (pPool->m_bShutdown)
171  {
172  pPool->m_pmutexQueue->unlock();
174  }
175 
176  // Get to work, dequeue the next item
177  my_workp = pPool->m_pQueueHead;
178  pPool->m_CurQueueSize--;
179  if (pPool->m_CurQueueSize == 0)
180  pPool->m_pQueueHead = pPool->m_pQueueTail = NULL;
181  else
182  pPool->m_pQueueHead = my_workp->next;
183 
184  // Handle waiting add_work threads
185  if ((!pPool->m_bDoNotBlockWhenFull) &&
186  (pPool->m_CurQueueSize == (pPool->m_MaxQueueSize - 1)))
187  pPool->m_pcondNotFull->broadcast();
188  // Handle waiting destroyer threads
189  if (pPool->m_CurQueueSize == 0)
190  pPool->m_pcondEmpty->signal();
191  pPool->m_pmutexQueue->unlock();
192 
193  // Do this work item
194  (*(my_workp->routine))(my_workp->arg);
195  delete my_workp;
196  my_workp = NULL;
197  }
198 }
199 #endif //ONLY_LOCAL_PROXY
THREAD_RETURN(* THREAD_MAIN_TYP)(void *)
Defines the type of the main function of the thread.
Definition: CAThread.hpp:67
THREAD_RETURN worker_thread_main_loop(void *arg)
void * tpool_thread(void *)
struct tpool_work tpool_work_t
#define THREAD_RETURN
Definition: StdAfx.h:540
#define THREAD_RETURN_SUCCESS
Definition: StdAfx.h:542
unsigned int UINT
Definition: basetypedefs.h:155
signed int SINT32
Definition: basetypedefs.h:132
unsigned char UINT8
Definition: basetypedefs.h:135
unsigned int UINT32
Definition: basetypedefs.h:131
SINT32 signal()
Signals this object.
SINT32 broadcast()
Signals this object.
SINT32 wait()
Waits for a signal or for a timeout.
static SINT32 printMsg(UINT32 typ, const char *format,...)
Writes a given message to the log.
Definition: CAMsg.cpp:251
SINT32 unlock()
Definition: CAMutex.hpp:52
SINT32 lock()
Definition: CAMutex.hpp:41
SINT32 start(void *param, bool bDaemon=false, bool bSilent=false)
Starts the execution of the main function of this thread.
Definition: CAThread.cpp:115
SINT32 setMainLoop(THREAD_MAIN_TYP fnc)
Sets the main function which will be executed within this thread.
Definition: CAThread.hpp:148
SINT32 join()
Waits for the main function to finish execution.
Definition: CAThread.cpp:187
This class bla bla.
CAThread ** m_parThreads
volatile bool m_bShutdown
CAConditionVariable * m_pcondNotFull
CAMutex * m_pmutexQueue
friend THREAD_RETURN worker_thread_main_loop(void *args)
SINT32 destroy(bool bWaitForFinish)
tpool_work_t * m_pQueueTail
SINT32 addRequest(THREAD_MAIN_TYP, void *args)
Adds a new request (task) to this threadpool.
volatile bool m_bQueueClosed
UINT32 m_NumThreads
CAConditionVariable * m_pcondNotEmpty
volatile UINT32 m_CurQueueSize
CAThreadPool(UINT32 num_worker_threads, UINT32 max_queue_size, bool b_do_not_block_when_full)
bool m_bDoNotBlockWhenFull
UINT32 m_MaxQueueSize
tpool_work_t * m_pQueueHead
CAConditionVariable * m_pcondEmpty
const SINT32 E_SUCCESS
Definition: errorcodes.hpp:2
#define E_SPACE
Definition: errorcodes.hpp:7
#define E_UNKNOWN
Definition: errorcodes.hpp:3
THREAD_MAIN_TYP routine
struct tpool_work * next