Mixe for Privacy and Anonymity in the Internet
CAQueue.cpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2000, The JAP-Team
3 All rights reserved.
4 Redistribution and use in source and binary forms, with or without modification,
5 are permitted provided that the following conditions are met:
6 
7  - Redistributions of source code must retain the above copyright notice,
8  this list of conditions and the following disclaimer.
9 
10  - Redistributions in binary form must reproduce the above copyright notice,
11  this list of conditions and the following disclaimer in the documentation and/or
12  other materials provided with the distribution.
13 
14  - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors
15  may be used to endorse or promote products derived from this software without specific
16  prior written permission.
17 
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
20 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS
22 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
24 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
27 */
28 #include "StdAfx.h"
29 #if !defined ONLY_LOCAL_PROXY || defined INCLUDE_MIDDLE_MIX
30 #include "CAQueue.hpp"
31 #include "CAMsg.hpp"
32 #include "CAUtil.hpp"
33 #include "CAThread.hpp"
34 
37  {
38  clean();
39  delete m_pcsQueue;
40  m_pcsQueue = NULL;
41  delete m_pconvarSize;
42  m_pconvarSize = NULL;
43  }
44 
48  {
49  m_pcsQueue->lock();
50  while(m_Queue!=NULL)
51  {
52  delete[] m_Queue->pBuff;
53  m_Queue->pBuff = NULL;
56  delete m_lastElem;
57  }
58 /* while(m_pHeap!=NULL)
59  {
60  delete[] m_pHeap->pBuff;
61  m_lastElem=m_pHeap;
62  m_pHeap=m_pHeap->next;
63  delete m_lastElem;
64  }*/
65  m_nQueueSize=0;
66  m_lastElem=NULL;
67  m_pcsQueue->unlock();
68  return E_SUCCESS;
69  }
76 SINT32 CAQueue::add(const void* buff,UINT32 size)
77  {
78  if(size==0)
79  return E_SUCCESS;
80  if(buff==NULL)
81  return E_UNKNOWN;
82  if (m_bClosed)
83  return E_UNKNOWN;
84  QUEUE* newEntry = new QUEUE;
85  newEntry->pBuff=new UINT8[size];
86  newEntry->next=NULL;
87  newEntry->size=size;
88  newEntry->index=0;
89  memcpy(newEntry->pBuff,buff,size);
90  m_pcsQueue->lock();
91  //if(m_pHeap==NULL)
92  // incHeap();
93 #ifdef _DEBUG
94 // if(m_nExpectedElementSize>0&&size>(m_nExpectedElementSize<<1))
95  /*if(size>1500)
96  {
97  CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: request for add %u bytes in a queue with expected element size of %u bytes !\n",size,m_nExpectedElementSize);
98  }*/
99 #endif
100  if(m_Queue==NULL)
101  {
102  /*m_Queue=m_pHeap;
103  m_pHeap=m_pHeap->next;
104  if(size>m_nExpectedElementSize)
105  {
106  delete[] m_Queue->pBuff;
107  m_Queue->pBuff=new UINT8[size];
108  }*/
109  m_Queue=newEntry;
111  }
112  else
113  {
114 /* m_lastElem->next=m_pHeap;
115  m_lastElem=m_pHeap;
116  m_pHeap=m_pHeap->next;
117  if(size>m_nExpectedElementSize)
118  {
119  delete[] m_lastElem->pBuff;
120  m_lastElem->pBuff=new UINT8[size];
121  }*/
122  m_lastElem->next=newEntry;
123  m_lastElem=newEntry;
124  }
125  m_nQueueSize+=size;
126 #ifdef QUEUE_SIZE_LOG
127  if(m_nLogSize!=0 && m_nQueueSize>m_nLogSize)
128  {
129  CAMsg::printMsg(LOG_DEBUG,"CAQueue::add() WARNING: queue size is now %u bytes which is above the expected maximum size of %u\n !\n",m_nQueueSize,m_nLogSize);
130  }
131 #endif
132  m_pcsQueue->unlock();
133  m_pconvarSize->lock();
136  return E_SUCCESS;
137  }
138 
149  {
150  if(pbuff==NULL||psize==NULL)
151  return E_UNKNOWN;
152  if(*psize==0)
153  return E_SUCCESS;
154  if(m_Queue==NULL)
155  {
156  *psize=0;
157  if ((m_bClosed))
158  {
159  return E_CLOSED;
160  }
161  return E_SUCCESS;
162  }
163  m_pcsQueue->lock();
164  UINT32 space=*psize;
165  *psize=0;
166  while(space>=m_Queue->size)
167  {
168  memcpy(pbuff,m_Queue->pBuff+m_Queue->index,m_Queue->size);
169  *psize+=m_Queue->size;
170  pbuff+=m_Queue->size;
171  space-=m_Queue->size;
173  QUEUE* tmp=(QUEUE*)m_Queue;
175  //tmp->next=m_pHeap;
176  //m_pHeap=tmp;
177  delete[] tmp->pBuff;
178  tmp->pBuff = NULL;
179  delete tmp;
180  if(m_Queue==NULL)
181  {
182  m_pcsQueue->unlock();
183  return E_SUCCESS;
184  }
185  }
186  if(space>0)
187  {
188  memcpy(pbuff,m_Queue->pBuff+m_Queue->index,space);
189  *psize+=space;
190  m_Queue->size-=space;
191  m_Queue->index+=space;
192  m_nQueueSize-=space;
193  //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
194  }
195  m_pcsQueue->unlock();
196  return E_SUCCESS;
197  }
198 
210  {
211  m_pconvarSize->lock();
212  while (m_Queue == NULL&&!m_bClosed)
213  {
214  m_pconvarSize->wait();
215  }
216  SINT32 ret=get(pbuff,psize);
218  return ret;
219  }
220 
232 SINT32 CAQueue::getOrWait(UINT8* pbuff,UINT32* psize,UINT32 msTimeout)
233  {
234  m_pconvarSize->lock();
235  SINT32 ret;
236  while(m_Queue==NULL)
237  {
238  ret=m_pconvarSize->wait(msTimeout);
239  if(ret==E_TIMEDOUT)
240  {
242  return E_TIMEDOUT;
243  }
244  }
245  ret=get(pbuff,psize);
247  return ret;
248  }
249 
259  {
260  if(pbuff==NULL||psize==NULL)
261  return E_UNKNOWN;
262  if(*psize==0)
263  return E_SUCCESS;
264  m_pcsQueue->lock();
265  UINT32 space=*psize;
266  *psize=0;
267  if(m_Queue==NULL)
268  {
269  SINT32 ret=E_SUCCESS;
270  if(m_bClosed)
271  ret=E_CLOSED;
272  m_pcsQueue->unlock();
273  return ret;
274  }
275  QUEUE* tmpQueue=(QUEUE*)m_Queue;
276  while(space>=tmpQueue->size)
277  {
278  memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,tmpQueue->size);
279  *psize+=tmpQueue->size;
280  pbuff+=tmpQueue->size;
281  space-=tmpQueue->size;
282  tmpQueue=tmpQueue->next;
283  if(tmpQueue==NULL)
284  {
285  m_pcsQueue->unlock();
286  return E_SUCCESS;
287  }
288  }
289  memcpy(pbuff,tmpQueue->pBuff+tmpQueue->index,space);
290  *psize+=space;
291  m_pcsQueue->unlock();
292  return E_SUCCESS;
293  }
294 
295 
303  {
304  if(m_Queue==NULL||psize==NULL)
305  return E_UNKNOWN;
306  if(*psize==0)
307  return E_SUCCESS;
308  m_pcsQueue->lock();
309  UINT32 space=*psize;
310  *psize=0;
311  while(space>=m_Queue->size)
312  {
313  *psize+=m_Queue->size;
314  space-=m_Queue->size;
316  QUEUE* tmp=(QUEUE*)m_Queue;
318 // tmp->next=m_pHeap;
319 // m_pHeap=tmp;
320  delete[] tmp->pBuff;
321  tmp->pBuff = NULL;
322  delete tmp;
323  if(m_Queue==NULL)
324  {
325  m_pcsQueue->unlock();
326  return E_SUCCESS;
327  }
328  }
329  if(space>0)
330  {
331  *psize+=space;
332  m_Queue->size-=space;
333  m_nQueueSize-=space;
334  m_Queue->index+=space;
335  //memmove(m_Queue->pBuff,m_Queue->pBuff+space,m_Queue->size);
336  }
337  m_pcsQueue->unlock();
338  return E_SUCCESS;
339  }
340 
342  {
346  };
347 
348 /*
349 THREAD_RETURN producer(void* param)
350  {
351  struct __queue_test* pTest=(struct __queue_test *)param;
352  UINT32 count=0;
353  UINT32 aktSize;
354  while(pTest->len>10)
355  {
356  aktSize=rand();
357  aktSize%=0xFFFF;
358  aktSize%=pTest->len;
359  if(pTest->pQueue->add(pTest->buff+count,aktSize)!=E_SUCCESS)
360  THREAD_RETURN_ERROR;
361  count+=aktSize;
362  pTest->len-=aktSize;
363  msSleep(rand()%100);
364  }
365  if(pTest->pQueue->add(pTest->buff+count,pTest->len)!=E_SUCCESS)
366  THREAD_RETURN_ERROR;
367  THREAD_RETURN_SUCCESS;
368  }
369 
370 THREAD_RETURN consumer(void* param)
371  {
372  struct __queue_test* pTest=(struct __queue_test *)param;
373  UINT32 count=0;
374  UINT32 aktSize;
375  do
376  {
377  aktSize=rand();
378  aktSize%=0xFFFF;
379  if(pTest->pQueue->getOrWait(pTest->buff+count,&aktSize)!=E_SUCCESS)
380  THREAD_RETURN_ERROR;
381  count+=aktSize;
382  pTest->len-=aktSize;
383  }while(pTest->len>10);
384  THREAD_RETURN_SUCCESS;
385  }
386 
387 */
389  {
390  struct __queue_test* pTest = (struct __queue_test *)param;
391  UINT8 buff[992];
392  UINT8 b = 0;
393  UINT32 burst=1;
394  while (pTest->len>10)
395  {
396  buff[0] = b;
397  b++;
398  if (pTest->pQueue->add(buff, 992) != E_SUCCESS)
400  burst--;
401  if (burst == 0)
402  {
403  burst = rand() % 10+1;
404  msSleep(rand() % 100);
405 
406  }
407  }
409  }
410 
412  {
413  struct __queue_test* pTest = (struct __queue_test *)param;
414  UINT32 aktSize=992;
415  UINT8 buff[992];
416  UINT8 b = 0;
417  UINT32 burst=1;
418  do
419  {
420  aktSize = 992;
421  if (pTest->pQueue->getOrWait(buff, &aktSize) != E_SUCCESS)
423  if (buff[0]!=b)
425  b++;
426  burst--;
427  if (burst == 0)
428  {
429  burst = rand() % 10 + 1;
430  msSleep(rand() % 100);
431 
432  }
433  } while (pTest->len>10);
435  }
436 
438  {
439  CAQueue* pQueue=new CAQueue(1000);
440  #define TEST_SIZE 1000000
441  UINT8* source=new UINT8[TEST_SIZE];
442  UINT8* target=new UINT8[TEST_SIZE];
443  getRandom(source,TEST_SIZE);
444  UINT32 count=0;
445  UINT32 aktSize;
446 
447  srand((unsigned)time( NULL ));
448 
449  //Single Thread.....
450  //adding
451 
452  while(TEST_SIZE-count>10000)
453  {
454  aktSize=rand();
455  aktSize%=0xFFFF;
456  aktSize%=(TEST_SIZE-count);
457  if(pQueue->add(source+count,aktSize)!=E_SUCCESS)
458  return E_UNKNOWN;
459  count+=aktSize;
460  if(pQueue->getSize()!=count)
461  return E_UNKNOWN;
462  }
463  if(pQueue->add(source+count,TEST_SIZE-count)!=E_SUCCESS)
464  return E_UNKNOWN;
465  if(pQueue->getSize()!=TEST_SIZE)
466  return E_UNKNOWN;
467 
468  //getting
469  count=0;
470  while(!pQueue->isEmpty())
471  {
472  aktSize=rand();
473  aktSize%=0xFFFF;
474  if(pQueue->get(target+count,&aktSize)!=E_SUCCESS)
475  return E_UNKNOWN;
476  count+=aktSize;
477  }
478  if(count!=TEST_SIZE)
479  return E_UNKNOWN;
480  if(memcmp(source,target,TEST_SIZE)!=0)
481  return E_UNKNOWN;
482 
483  //Multiple Threads....
484  CAThread* pthreadProducer=new CAThread((UINT8*)"Queue Producer Thread");
485  CAThread* pthreadConsumer=new CAThread((UINT8*)"Queue Consumer Thread");
486  pthreadProducer->setMainLoop(producer);
487  pthreadConsumer->setMainLoop(consumer);
488  struct __queue_test t1,t2;
489  t1.buff=source;
490  t2.buff=target;
491  t2.len=t1.len=TEST_SIZE;
492  t2.pQueue=t1.pQueue=pQueue;
493  pthreadProducer->start(&t1);
494  pthreadConsumer->start(&t2);
495  pthreadProducer->join();
496  pthreadConsumer->join();
497  delete pthreadProducer;
498  pthreadProducer = NULL;
499  delete pthreadConsumer;
500  pthreadConsumer = NULL;
501  delete pQueue;
502  pQueue = NULL;
503  if(memcmp(source,target,TEST_SIZE)!=0)
504  return E_UNKNOWN;
505 
506  delete []source;
507  source = NULL;
508  delete []target;
509  target = NULL;
510  return E_SUCCESS;
511  }
512 #endif //ONLY_LOCAL_PROXY
THREAD_RETURN consumer(void *param)
Definition: CAQueue.cpp:411
#define TEST_SIZE
THREAD_RETURN producer(void *param)
Definition: CAQueue.cpp:388
struct _t_queue QUEUE
Definition: CAQueue.hpp:42
SINT32 getRandom(UINT32 *val)
Gets 32 random bits.
Definition: CAUtil.cpp:346
SINT32 msSleep(UINT32 ms)
Sleeps ms milliseconds.
Definition: CAUtil.cpp:406
#define THREAD_RETURN_ERROR
Definition: StdAfx.h:541
#define THREAD_RETURN
Definition: StdAfx.h:540
#define THREAD_RETURN_SUCCESS
Definition: StdAfx.h:542
signed int SINT32
Definition: basetypedefs.h:132
unsigned char UINT8
Definition: basetypedefs.h:135
unsigned int UINT32
Definition: basetypedefs.h:131
SINT32 signal()
Signals this object.
SINT32 wait()
Waits for a signal or for a timeout.
static SINT32 printMsg(UINT32 typ, const char *format,...)
Writes a given message to the log.
Definition: CAMsg.cpp:251
SINT32 unlock()
Definition: CAMutex.hpp:52
SINT32 lock()
Definition: CAMutex.hpp:41
This is a simple FIFO-Queue.
Definition: CAQueue.hpp:50
CAConditionVariable * m_pconvarSize
Definition: CAQueue.hpp:159
SINT32 add(const void *buff, UINT32 size)
Adds data to the Queue.
Definition: CAQueue.cpp:76
static SINT32 test()
Method to test the Queue.
Definition: CAQueue.cpp:437
CAMutex * m_pcsQueue
Definition: CAQueue.hpp:158
SINT32 clean()
Removes any stored data from the Queue.
Definition: CAQueue.cpp:47
volatile UINT32 m_nQueueSize
Definition: CAQueue.hpp:154
volatile bool m_bClosed
Definition: CAQueue.hpp:155
SINT32 getOrWait(UINT8 *pbuff, UINT32 *psize)
Gets data from the Queue or waits until some data is available, if the Queue is empty.
Definition: CAQueue.cpp:209
SINT32 get(UINT8 *pbuff, UINT32 *psize)
Gets up to psize number of bytes from the Queue.
Definition: CAQueue.cpp:148
SINT32 peek(UINT8 *pbuff, UINT32 *psize)
Peeks data from the Queue.
Definition: CAQueue.cpp:258
CAQueue(UINT32 expectedElementSize=0)
Give the size of the amount of data what you will add in one step.
Definition: CAQueue.hpp:57
SINT32 remove(UINT32 *psize)
Removes data from the Queue.
Definition: CAQueue.cpp:302
UINT32 getSize()
Returns the size of stored data in byte.
Definition: CAQueue.hpp:101
~CAQueue()
Deletes this Queue and all stored data.
Definition: CAQueue.cpp:36
volatile QUEUE * m_lastElem
Definition: CAQueue.hpp:153
volatile QUEUE * m_Queue
Definition: CAQueue.hpp:152
bool isEmpty()
Returns true, if the Queue is empty.
Definition: CAQueue.hpp:125
SINT32 start(void *param, bool bDaemon=false, bool bSilent=false)
Starts the execution of the main function of this thread.
Definition: CAThread.cpp:115
SINT32 setMainLoop(THREAD_MAIN_TYP fnc)
Sets the main function which will be executed within this thread.
Definition: CAThread.hpp:148
SINT32 join()
Waits for the main function to finish execution.
Definition: CAThread.cpp:187
const SINT32 E_SUCCESS
Definition: errorcodes.hpp:2
#define E_CLOSED
Definition: errorcodes.hpp:5
#define E_UNKNOWN
Definition: errorcodes.hpp:3
#define E_TIMEDOUT
Definition: errorcodes.hpp:10
SINT32 len
Definition: CAQueue.cpp:345
CAQueue * pQueue
Definition: CAQueue.cpp:343
UINT8 * buff
Definition: CAQueue.cpp:344
UINT8 * pBuff
Definition: CAQueue.hpp:36
UINT32 size
Definition: CAQueue.hpp:38
UINT32 index
Definition: CAQueue.hpp:39
struct _t_queue * next
Definition: CAQueue.hpp:37