Mixe for Privacy and Anonymity in the Internet
CAFirstMixA.cpp
Go to the documentation of this file.
1 /*
2 Copyright (c) 2000, The JAP-Team
3 All rights reserved.
4 Redistribution and use in source and binary forms, with or without modification,
5 are permitted provided that the following conditions are met:
6 
7  - Redistributions of source code must retain the above copyright notice,
8  this list of conditions and the following disclaimer.
9 
10  - Redistributions in binary form must reproduce the above copyright notice,
11  this list of conditions and the following disclaimer in the documentation and/or
12  other materials provided with the distribution.
13 
14  - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors
15  may be used to endorse or promote products derived from this software without specific
16  prior written permission.
17 
18 
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
20 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
21 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS
22 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
24 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
27 */
28 #include "../StdAfx.h"
29 #if !defined ONLY_LOCAL_PROXY || defined INCLUDE_FIRST_MIX
30 #include "CAFirstMixA.hpp"
31 #include "../CALibProxytest.hpp"
32 #include "../CAThread.hpp"
33 #include "../CASingleSocketGroup.hpp"
34 #include "../CAInfoService.hpp"
35 #include "../CAPool.hpp"
36 #include "../CACmdLnOptions.hpp"
37 #include "../CAAccountingInstance.hpp"
38 #include "../CAStatusManager.hpp"
39 #ifdef HAVE_EPOLL
40  #include "../CASocketGroupEpoll.hpp"
41 #endif
42 #include "../CASymChannelCipherFactory.hpp"
43 
45 {
46  m_bIsShuttingDown = true;
47  m_bLoop=false;
48 #ifdef PAYMENT
49  UINT32 connectionsClosed = 0;
50  fmHashTableEntry* timeoutHashEntry;
51 
52 
53  /* make sure no reconnect is possible when shutting down */
54  if(m_pthreadAcceptUsers!=NULL)
55  {
56  CAMsg::printMsg(LOG_CRIT,"Wait for LoopAcceptUsers!\n");
57  m_bRestart=true;
59  delete m_pthreadAcceptUsers;
60  }
62 
63  if(m_pInfoService != NULL)
64  {
65  CAMsg::printMsg(LOG_DEBUG,"Shutting down infoservice.\n");
67  }
68  if(m_pChannelList!=NULL) // may happen if mixes did not yet connect to each other
69  {
70  while ((timeoutHashEntry = m_pChannelList->popTimeoutEntry(true)) != NULL)
71  {
72  CAMsg::printMsg(LOG_DEBUG,"Shutting down, closing client connection.\n");
73  connectionsClosed++;
74  closeConnection(timeoutHashEntry);
75  }
76  CAMsg::printMsg(LOG_DEBUG,"Closed %i client connections.\n", connectionsClosed);
77  }
78 #endif
79  m_bRestart = true;
80  m_bIsShuttingDown = false;
81 }
82 
83 #ifndef MULTI_THREADED_PACKET_PROCESSING
84 
86 {
87  if (pHashEntry == NULL)
88  {
89  return E_UNKNOWN;
90  }
91 
92  INIT_STACK;
93  BEGIN_STACK("CAFirstMixA::closeConnection");
94 
95 
96  fmChannelListEntry* pEntry;
97  tQueueEntry* pQueueEntry = new tQueueEntry;
98  MIXPACKET* pMixPacket=&pQueueEntry->packet;
99 
100  #ifdef LOG_TRAFFIC_PER_USER
101  UINT64 current_time;
102  getcurrentTimeMillis(current_time);
103  CAMsg::printMsg(LOG_DEBUG,"Removing Connection wiht ID: %Lu -- login time [ms] %Lu -- logout time [ms] %Lu -- Traffic was: IN: %u -- OUT: %u\n",pHashEntry->id,pHashEntry->timeCreated,current_time,pHashEntry->trafficIn,pHashEntry->trafficOut);
104  #endif
105  m_pIPList->removeIP(pHashEntry->peerIP);
106 
107  m_psocketgroupUsersRead->remove(*(pHashEntry->pMuxSocket));
108  m_psocketgroupUsersWrite->remove(*(pHashEntry->pMuxSocket));
109  pEntry = m_pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket);
110 
111  while(pEntry!=NULL)
112  {
113  getRandom(pMixPacket->data,DATA_SIZE);
114  pMixPacket->flags=CHANNEL_CLOSE;
115  pMixPacket->channel=pEntry->channelOut;
116  #ifdef LOG_PACKET_TIMES
117  setZero64(pQueueEntry->timestamp_proccessing_start);
118  #endif
119  m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
120  delete pEntry->pCipher;
121  pEntry->pCipher = NULL;
122  pEntry=m_pChannelList->getNextChannel(pEntry);
123 #ifdef CH_LOG_STUDY
124  nrOfChOpMutex->lock();
125  currentOpenedChannels--;
126  nrOfChOpMutex->unlock();
127 #endif //CH_LOG_STUDY
128  }
129  ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL");
130  delete pHashEntry->pQueueSend;
131  pHashEntry->pQueueSend = NULL;
132  delete pHashEntry->pSymCipher;
133  pHashEntry->pSymCipher = NULL;
134 
135  #ifdef COUNTRY_STATS
136  decUsers(pHashEntry);
137  #else
138  decUsers();
139  #endif
140 
141  CAMuxSocket* pMuxSocket = pHashEntry->pMuxSocket;
142  // Save the socket - its pointer will be deleted in this method!!! Crazy mad programming...
143  m_pChannelList->remove(pHashEntry->pMuxSocket);
144  delete pMuxSocket;
145  pMuxSocket = NULL;
146  //pHashEntry->pMuxSocket = NULL; // not needed now, but maybe in the future...
147 
148  delete pQueueEntry;
149  pQueueEntry = NULL;
150 
151  FINISH_STACK("CAFirstMixA::closeConnection");
152 
153  return E_SUCCESS;
154 }
155 
156 #define FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS 2*KEY_SIZE
157 
159  {
160 #ifndef NEW_MIX_TYPE
161 #ifdef DELAY_USERS
162  m_pChannelList->setDelayParameters( CALibProxytest::getOptions()->getDelayChannelUnlimitTraffic(),
163  CALibProxytest::getOptions()->getDelayChannelBucketGrow(),
164  CALibProxytest::getOptions()->getDelayChannelBucketGrowIntervall());
165 #endif
166 
167  // CASingleSocketGroup osocketgroupMixOut;
168  SINT32 countRead=0;
169  //#ifdef LOG_PACKET_TIMES
170  // tPoolEntry* pPoolEntry=new tPoolEntry;
171  // MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
172  //#else
173  tQueueEntry* pQueueEntry = new tQueueEntry;
174  MIXPACKET* pMixPacket=&pQueueEntry->packet;
175  //#endif
176  m_nUser=0;
177  SINT32 ret;
178  //osocketgroupMixOut.add(*m_pMuxOut);
179 
180  UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
181  CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
182  bool bAktiv;
183  UINT8 rsaBuff[RSA_SIZE];
184 
185 #ifdef LOG_TRAFFIC_PER_USER
186  UINT64 current_time;
187  UINT32 diff_time;
188  CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
189  CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
190  CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
191 #endif
193 #ifdef _DEBUG
194  CAThread* pLogThread=new CAThread((UINT8*)"CAFirstMixA - LogLoop");
195  pLogThread->setMainLoop(fm_loopLog);
196  pLogThread->start(this);
197 #endif
198 
199 #ifdef LOG_CRIME
200  CAIPAddrWithNetmask* surveillanceIPs = CALibProxytest::getOptions()->getCrimeSurveillanceIPs();
201  UINT32 nrOfSurveillanceIPs = CALibProxytest::getOptions()->getNrOfCrimeSurveillanceIPs();
202 #endif
203 // CAThread threadReadFromUsers;
204 // threadReadFromUsers.setMainLoop(loopReadFromUsers);
205 // threadReadFromUsers.start(this);
206 
207  while(!m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */
208  {
209 
210  bAktiv=false;
211 
212 //LOOP_START:
213 #ifdef PAYMENT
214  // while checking if there are connections to close: synch with login threads
215  m_pmutexLogin->lock();
218 #endif
219 //First Step
220 //Checking for new connections
221 // Now in a separate Thread....
222 
223 // Second Step
224 // Checking for data from users
225 // Now in a separate Thread (see loopReadFromUsers())
226 //Only proccess user data, if queue to next mix is not to long!!
227 
229  {
230  countRead=m_psocketgroupUsersRead->select(/*false,*/0); // how many JAP<->mix connections have received data from their coresponding JAP
231  if(countRead>0)
232  bAktiv=true;
233 #ifdef HAVE_EPOLL
234  //if we have epoll we do not need to search the whole list
235  //of connected JAPs to find the ones who have sent data
236  //as epoll will return ONLY these connections.
238  while(pHashEntry!=NULL)
239  {
240  CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
241 #else
242  //if we do not have epoll we have to go to the whole
243  //list of open connections to find the ones which
244  //actually have sent some data
246  while(pHashEntry!=NULL&&countRead>0) // iterate through all connections as long as there is at least one active left
247  {
248  CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
249  if(m_psocketgroupUsersRead->isSignaled(*pMuxSocket)) // if this one seems to have data
250  {
251 #endif
252 /*#ifdef DELAY_USERS
253  * Don't delay upstream
254  if( m_pChannelList->hasDelayBuckets(pHashEntry->delayBucketID) )
255  {
256 #endif*/
257  countRead--;
258  ret=pMuxSocket->receive(pMixPacket,0);
259 
260  #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
261  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
262  set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start);
263  #endif
264  #ifdef DATA_RETENTION_LOG
265  pQueueEntry->dataRetentionLogEntry.t_in=htonl(time(NULL));
266  #endif
267  if(ret<0&&ret!=E_AGAIN/*||pHashEntry->accessUntil<time()*/)
268  {
269  // remove dead connections
270  closeConnection(pHashEntry);
271  }
272  else if(ret==MIXPACKET_SIZE) // we've read enough data for a whole mix packet. nice!
273  {
274 #ifdef PAYMENT
275  if (pHashEntry->bRecoverTimeout)
276  {
277  // renew the timeout only if recovery is allowed
278  m_pChannelList->pushTimeoutEntry(pHashEntry);
279  }
280 #endif
281  #ifdef LOG_TRAFFIC_PER_USER
282  pHashEntry->trafficIn++;
283  #endif
284  #ifdef COUNTRY_STATS
285  m_PacketsPerCountryIN[pHashEntry->countryID].inc();
286  #endif
287  //New control channel code...!
288  if(pMixPacket->channel>0&&pMixPacket->channel<256)
289  {
290  if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
291  {
292  goto NEXT_USER;
293  }
294  else
295  {
296  CAMsg::printMsg(LOG_DEBUG, "Control channel packet is invalid and could not be processed!\n");
297  closeConnection(pHashEntry);
298  goto NEXT_USER;
299  }
300  }
301 #ifdef ANON_DEBUG_MODE
302  bool bIsDebugPacket=false;
303  if (pMixPacket->flags&CHANNEL_DEBUG)
304  {
305  bIsDebugPacket=true;
306  UINT8 base64Payload[DATA_SIZE << 1];
307  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
308  pMixPacket->flags &= ~CHANNEL_DEBUG;
309  CAMsg::printMsg(LOG_DEBUG, "AN.ON packet debug: %s\n", base64Payload);
310  }
311 
312 #endif
313 #ifdef PAYMENT
314  if(accountTrafficUpstream(pHashEntry) != E_SUCCESS) goto NEXT_USER;
315 #endif
316  if(pMixPacket->flags==CHANNEL_DUMMY) // just a dummy to keep the connection alife in e.g. NAT gateways
317  {
318  CAMsg::printMsg(LOG_DEBUG,"received dummy traffic\n");
319  getRandom(pMixPacket->data,DATA_SIZE);
320  #ifdef LOG_PACKET_TIMES
321  setZero64(pQueueEntry->timestamp_proccessing_start);
322  #endif
323  #ifdef LOG_TRAFFIC_PER_USER
324  pHashEntry->trafficOut++;
325  #endif
326  #ifdef COUNTRY_STATS
327  m_PacketsPerCountryOUT[pHashEntry->countryID].inc();
328  #endif
329  pHashEntry->pQueueSend->add(pQueueEntry,sizeof(tQueueEntry));
330  #ifdef HAVE_EPOLL
331  //m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry);
332  #else
333  m_psocketgroupUsersWrite->add(*pMuxSocket);
334  #endif
335  }
336  else if(pMixPacket->flags==CHANNEL_CLOSE) // closing one mix-channel (not the JAP<->mix connection!)
337  {
338  fmChannelListEntry* pEntry;
339  pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
340  if(pEntry!=NULL)
341  {
342  pMixPacket->channel=pEntry->channelOut;
343  getRandom(pMixPacket->data,DATA_SIZE);
344  #ifdef LOG_PACKET_TIMES
345  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
346  #endif
347  m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
348  /* Don't delay upstream
349  #ifdef DELAY_USERS
350  m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
351  #endif*/
352  #ifdef LOG_CHANNEL
353  //pEntry->packetsInFromUser++;
354  getcurrentTimeMicros(current_time);
355  diff_time=diff64(current_time,pEntry->timeCreated);
356  CAMsg::printMsg(LOG_DEBUG,"1:%u,%Lu,%Lu,%u,%u,%u\n",
357  pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
358  diff_time);
359  #endif
360  delete pEntry->pCipher; // forget the symetric key of this connection
361  pEntry->pCipher = NULL;
362  m_pChannelList->removeChannel(pMuxSocket,pEntry->channelIn);
363 
364  #ifdef CH_LOG_STUDY
365  nrOfChOpMutex->lock();
366  currentOpenedChannels--;
367  nrOfChOpMutex->unlock();
368  #endif //CH_LOG_STUDY
369  }
370  #ifdef _DEBUG
371  else
372  {
373 // CAMsg::printMsg(LOG_DEBUG,"Invalid ID to close from Browser!\n");
374  }
375  #endif
376  }
377  else // finally! a normal mix packet
378  {
379  CASymChannelCipher* pCipher=NULL;
380  fmChannelListEntry* pEntry;
381  pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
382  if (pEntry != NULL&&pMixPacket->flags == CHANNEL_DATA)
383  {
384  pMixPacket->channel = pEntry->channelOut;
385  pCipher = pEntry->pCipher;
386  pCipher->crypt1(pMixPacket->data, pMixPacket->data, DATA_SIZE);
387  // queue the packet for sending to the next mix.
388 #ifdef LOG_PACKET_TIMES
389  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
390 #endif
391 
392  //check if this IP must be logged due to crime detection
393 #ifdef LOG_CRIME
394  crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs, pEntry->pHead->peerIP,pEntry->pHead->peerPort, pMixPacket);
395 #endif
396 #ifdef ANON_DEBUG_MODE
397  if(bIsDebugPacket)
398  {
399  pMixPacket->flags|=CHANNEL_DEBUG;
400  }
401  #endif
402  m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
403  /* Don't delay upstream
404  #ifdef DELAY_USERS
405  m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
406  #endif*/
407  incMixedPackets();
408  #ifdef LOG_CHANNEL
409  pEntry->packetsInFromUser++;
410  #endif
411  }
412  else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN) // open a new mix channel
413  { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ?
414  //es gilt: open -> data
415 
416  //log symcrypto
417  //UINT8* tmpstr=bytes2hex(pMixPacket->data, DATA_SIZE);
418  //CAMsg::printMsg(LOG_DEBUG, "Plain Packet recevied form user: %s\n", tmpstr);
419  //delete tmpstr;
420  //tmpstr = NULL;
421  //end log symcrpyto
422 
423 
424  pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
425  #ifdef REPLAY_DETECTION
426  // replace time(NULL) with the real timestamp ()
427  // packet-timestamp*REPLAY_BASE + m_u64ReferenceTime
428  if(m_pReplayDB->insert(rsaBuff,time(NULL))!=E_SUCCESS)
429  {
430  CAMsg::printMsg(LOG_INFO,"Replay: Duplicate packet ignored.\n");
431  continue;
432  }
433  #endif
434 
435 
436 
437  pCipher= CASymChannelCipherFactory::createCipher(CALibProxytest::getOptions()->getSymChannelCipherAlgorithm());
438  pCipher->setKeys(rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
439  for(int i=0;i<16;i++)
440  rsaBuff[i]=0xFF;
441  pCipher->setIV2(rsaBuff);
444  #if defined (LOG_CHANNEL) ||defined(DATA_RETENTION_LOG)
445  HCHANNEL tmpC=pMixPacket->channel;
446  #endif
447 #if defined LOG_CRIME || defined _DEBUG
448  HCHANNEL inChannel = pMixPacket->channel;
449 #endif
450  if(m_pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS)
451  { //todo move up ?
452  delete pCipher;
453  pCipher = NULL;
454  }
455  else
456  {
457  #ifdef CH_LOG_STUDY
458  nrOfChOpMutex->lock();
459  if(pHashEntry->channelOpenedLastIntervalTS !=
460  lastLogTime)
461  {
462  pHashEntry->channelOpenedLastIntervalTS =
463  lastLogTime;
464  nrOfOpenedChannels++;
465  }
466  currentOpenedChannels++;
467  nrOfChOpMutex->unlock();
468  #endif //CH_LOG_STUDY
469 
470  #ifdef LOG_PACKET_TIMES
471  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
472  #endif
473  #ifdef LOG_CHANNEL
474  fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC);
475  pTmpEntry->packetsInFromUser++;
476  set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start);
477  #endif
478  #ifdef DATA_RETENTION_LOG
479  pQueueEntry->dataRetentionLogEntry.entity.first.channelid=htonl(pMixPacket->channel);
480  fmChannelListEntry* pTmpEntry1=m_pChannelList->get(pMuxSocket,tmpC);
481  memcpy(pQueueEntry->dataRetentionLogEntry.entity.first.ip_in,pTmpEntry1->pHead->peerIP,4);
482  pQueueEntry->dataRetentionLogEntry.entity.first.port_in=(UINT16)pTmpEntry1->pHead->peerPort;
483  pQueueEntry->dataRetentionLogEntry.entity.first.port_in=htons(pQueueEntry->dataRetentionLogEntry.entity.first.port_in);
484  #endif
485 
486  //check if this IP must be logged due to crime detection
487  #ifdef LOG_CRIME
488 
489  pEntry=m_pChannelList->get(pMuxSocket, inChannel);
490  if(pEntry != NULL)
491  {
492  crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs, pEntry->pHead->peerIP,pEntry->pHead->peerPort, pMixPacket);
493  }
494  #endif
495 #ifdef ANON_DEBUG_MODE
496  if (bIsDebugPacket)
497  {
498  pMixPacket->flags |= CHANNEL_DEBUG;
499  pEntry = m_pChannelList->get(pMuxSocket, inChannel);
500  pEntry->bDebug = true;
501  }
502 #endif
503  //log symcrypto
504  //tmpstr=bytes2hex(pMixPacket->data, DATA_SIZE);
505  //CAMsg::printMsg(LOG_DEBUG, "Plain Packet sent to next mix: %s\n", tmpstr);
506  //delete tmpstr;
507  //end log symcrpyto
508 
509  m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
510  /* Don't delay upstream
511  #ifdef DELAY_USERS
512  m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
513  #endif*/
514  incMixedPackets();
515  #ifdef _DEBUG
516  CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u for in channel: %u\n",pMixPacket->channel,inChannel);
517  #endif
518  }
519  }
520  }
521  }
522 /*#ifdef DELAY_USERS
523  }
524 #endif*/
525  #ifdef HAVE_EPOLL
526 NEXT_USER:
528  #else
529  }//if is signaled
530 NEXT_USER:
531  pHashEntry=m_pChannelList->getNext();
532  #endif
533  }
534  if(countRead>0)
535  {
536  CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::loop() - read from user --> countRead >0 after processing all connections!\n");
537  }
538  }
539 //Third step
540 //Sending to next mix
541 
542 // Now in a separate Thread (see loopSendToMix())
543 
544 //Step 4
545 //Step 4a Receiving from mix to queue now in a separate thread
546 //Step 4b Processing MixPackets received from Mix
547 //todo check for error!!!
548  countRead=m_nUser+1;
549  while(countRead>0&&m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
550  {
551  bAktiv=true;
552  countRead--;
553  ret=sizeof(tQueueEntry);
554  m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
555 
556  #ifdef LOG_PACKET_TIMES
557  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
558  #endif
559 #ifdef ANON_DEBUG_MODE
560  if (pMixPacket->flags&CHANNEL_DEBUG)
561  {
562  UINT8 base64Payload[DATA_SIZE << 1];
563  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
564  CAMsg::printMsg(LOG_DEBUG, "Dequeued Downstream AN.ON packet from previous Mix debug: %s\n", base64Payload);
565  pMixPacket->flags &= ~CHANNEL_DEBUG;
566  }
567 
568 #endif
569 
570  if(pMixPacket->flags==CHANNEL_CLOSE) //close event
571  {
572  #if defined(_DEBUG) && !defined(__MIX_TEST)
573 // CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
574  #endif
575  fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
576  if(pEntry!=NULL)
577  {
578  /* a hack to solve the SSL problem:
579  * set channel of downstream packet to in channel after they are dequeued
580  * from pEntry->pQueueSend so we can retrieve the channel entry to decrement
581  * the per channel count of enqueued downstream bytes.
582  */
583  #ifndef SSL_HACK
584  pMixPacket->channel=pEntry->channelIn;
585  #endif
586 
587  pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
588  //getRandom(pMixPacket->data,DATA_SIZE);
589  #ifdef LOG_PACKET_TIMES
590  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
591  #endif
592  pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
593  #ifdef LOG_TRAFFIC_PER_USER
594  pEntry->pHead->trafficOut++;
595  #endif
596  #ifdef COUNTRY_STATS
598  #endif
599  #ifdef SSL_HACK
600  /* a hack to solve the SSL problem:
601  * per channel count of enqueued downstream bytes
602  */
603  pEntry->downStreamBytes += sizeof(tQueueEntry);
604  #endif
605  #ifdef LOG_CHANNEL
606  pEntry->packetsOutToUser++;
607  getcurrentTimeMicros(current_time);
608  diff_time=diff64(current_time,pEntry->timeCreated);
609  CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%Lu,%u,%u,%u\n",
610  pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
611  diff_time);
612  #endif
613 
614  #ifdef HAVE_EPOLL
615  //m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
616  #else
618  #endif
619 
620 
621  #ifndef SSL_HACK
622  delete pEntry->pCipher; // forget the symetric key of this connection
623  pEntry->pCipher = NULL;
625  #ifdef CH_LOG_STUDY
626  nrOfChOpMutex->lock();
627  currentOpenedChannels--;
628  nrOfChOpMutex->unlock();
629  #endif
630  /* a hack to solve the SSL problem:
631  * remove channel after the close packet is enqueued
632  * from pEntry->pQueueSend
633  */
634  #endif
635  }
636  else
637  {
638  #ifdef DEBUG
639  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: close channel -> client but channel does not exist.\n");
640  #endif
641  }
642 
643  }
644  else
645  {//flag !=close
646  #if defined(_DEBUG) && !defined(__MIX_TEST)
647 // CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
648  #endif
649  fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
650 
651  if(pEntry!=NULL)
652  {
653  #ifdef LOG_CRIME
654  if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
655  {
656  //UINT32 id=(pMixPacket->flags>>8)&0x000000FF;
657  int log=LOG_ENCRYPTED;
658  if(!CALibProxytest::getOptions()->isEncryptedLogEnabled())
659  log=LOG_CRIT;
660  CAMsg::printMsg(log,"Detecting crime activity - next mix channel: %u -- "
661  "Incoming (User) IP:Port is: %u.%u.%u.%u:%u \n", pMixPacket->channel,
662  pEntry->pHead->peerIP[0],
663  pEntry->pHead->peerIP[1],
664  pEntry->pHead->peerIP[2],
665  pEntry->pHead->peerIP[3],
666  pEntry->pHead->peerPort);
667  continue;
668  }
669  #endif
670 
671  /* a hack to solve the SSL problem:
672  * same as CHANNEL_CLOSE packets
673  */
674  #ifndef SSL_HACK
675  pMixPacket->channel=pEntry->channelIn;
676  #endif
677 
678  pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
679 
680  #ifdef LOG_PACKET_TIMES
681  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
682  #endif
683 #ifdef ANON_DEBUG_MODE
684  if (pEntry->bDebug)
685  {
686  pMixPacket->flags |= CHANNEL_DEBUG;
687  UINT8 base64Payload[DATA_SIZE << 1];
688  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
689  CAMsg::printMsg(LOG_DEBUG, "Send Dowwstream AN.ON packet debug: %s\n", base64Payload);
690  }
691 
692 #endif
693  pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
694  /*CAMsg::printMsg(
695  LOG_INFO,"adding data packet to queue: %x, queue size: %u bytes\n",
696  pEntry->pHead->pQueueSend, pEntry->pHead->pQueueSend->getSize());*/
697  #ifdef LOG_TRAFFIC_PER_USER
698  pEntry->pHead->trafficOut++;
699  #endif
700  #ifdef COUNTRY_STATS
702  #endif
703  #ifdef LOG_CHANNEL
704  pEntry->packetsOutToUser++;
705  #endif
706  #ifdef SSL_HACK
707  /* a hack to solve the SSL problem:
708  * per channel count of downstream packets in bytes
709  */
710  pEntry->downStreamBytes += sizeof(tQueueEntry);
711  #endif
712 
713  #ifdef HAVE_EPOLL
714  /*int epret = m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
715  if(epret == E_UNKNOWN)
716  {
717  epret=errno;
718  CAMsg::printMsg(LOG_INFO,"epoll_add returns: %s (return value: %d) \n", strerror(epret), epret);
719  }*/
720  #else
722  #endif
723 
724  incMixedPackets();
725  }
726  else
727  {
728  #ifdef _DEBUG
729  if(pMixPacket->flags!=CHANNEL_DUMMY)
730  {
731 /* CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
732  "Channel-Id %u not valid!\n",pMixPacket->channel
733  );*/
734  #ifdef LOG_CHANNEL
735  CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel);
736  #endif
737  }
738  #endif
739  }
740  }
741  }
742 
745  bAktiv = sendToUsers();
746 #ifndef FAST_PROCESSING
747  if(!bAktiv)
748  msSleep(1);
749 #endif
750  }
751 //ERR:
752  CAMsg::printMsg(LOG_CRIT,"Seems that we are restarting now!!\n");
753  m_bRunLog=false;
754  clean();
755  delete pQueueEntry;
756  pQueueEntry = NULL;
757  delete []tmpBuff;
758  tmpBuff = NULL;
759 #ifdef _DEBUG
760  pLogThread->join();
761  delete pLogThread;
762  pLogThread = NULL;
763 #endif
764  CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
765 #endif//NEW_MIX_TYPE
766  return E_UNKNOWN;
767  }
768 
769 /* last part of the main loop:
770  * return true if the loop when at least one packet was sent
771  * or false otherwise.
772  */
774 {
775 #ifndef HAVE_EPOLL
776  SINT32 countRead =
777 #endif
778  m_psocketgroupUsersWrite->select(/*true,*/0);
779  tQueueEntry *packetToSend = NULL;
780  SINT32 packetSize = sizeof(tQueueEntry);
781  CAQueue *controlMessageUserQueue = NULL;
782  CAQueue *dataMessageUserQueue = NULL;
783 
784  CAQueue *processedQueue = NULL; /* one the above queues that should be used for processing*/
785  UINT32 extractSize = 0;
786  bool bAktiv = false;
787  UINT32 iSocketErrors = 0;
788 
789 /* Cyclic polling: gets all open sockets that will not block when invoking send()
790  * but will only send at most one packet. After that control is returned to loop()
791  */
792 #ifdef HAVE_EPOLL
793  fmHashTableEntry* pfmHashEntry=
795 
796  while(pfmHashEntry != NULL)
797  {
798 
799 #else
800  fmHashTableEntry* pfmHashEntry=m_pChannelList->getFirst();
801  while( (countRead > 0) && (pfmHashEntry != NULL) )
802  {
803  if(m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
804  {
805  countRead--;
806 #endif
807  /* loop turn init */
808  extractSize = 0;
809  processedQueue = NULL;
810  packetToSend = &(pfmHashEntry->oQueueEntry);
811  controlMessageUserQueue = pfmHashEntry->pControlMessageQueue;
812  dataMessageUserQueue = pfmHashEntry->pQueueSend;
813 
814  //Control messages have a higher priority.
815  if(controlMessageUserQueue->getSize() > 0)
816  {
817  processedQueue = controlMessageUserQueue;
818  pfmHashEntry->bCountPacket = false;
819  }
820  else if( (dataMessageUserQueue->getSize() > 0)
821 #ifdef DELAY_USERS
822  && m_pChannelList->hasDelayBuckets(pfmHashEntry->delayBucketID)
823 #endif
824  )
825  {
826  processedQueue = dataMessageUserQueue;
827  pfmHashEntry->bCountPacket = true;
828  }
829 
830  if(processedQueue != NULL)
831  {
832  extractSize = packetSize;
833  bAktiv=true;
834 
835  if(pfmHashEntry->uAlreadySendPacketSize == -1)
836  {
837  processedQueue->get((UINT8*) packetToSend, &extractSize);
838 
839  /* Hack for SSL BUG */
840 #ifdef SSL_HACK
841  finishPacket(pfmHashEntry);
842 #endif //SSL_HACK
843 #ifdef ANON_DEBUG_MODE
844  if (packetToSend->packet.flags&CHANNEL_DEBUG)
845  {
846  UINT8 base64Payload[DATA_SIZE << 1];
847  EVP_EncodeBlock(base64Payload, packetToSend->packet.data, DATA_SIZE);//base64 encoding (without newline!)
848  CAMsg::printMsg(LOG_DEBUG, "Send Downstream AN.ON packet to user debug: %s\n", base64Payload);
849  packetToSend->packet.flags &= ~CHANNEL_DEBUG;
850  }
851 
852 #endif
853  pfmHashEntry->pMuxSocket->prepareForSend(&(packetToSend->packet));
854  pfmHashEntry->uAlreadySendPacketSize = 0;
855  }
856  }
857 
858  if( (extractSize > 0) || (pfmHashEntry->uAlreadySendPacketSize > 0) )
859  {
861  UINT8* packetToSendOffset = ((UINT8*)&(packetToSend->packet)) + pfmHashEntry->uAlreadySendPacketSize;
862  CASocket* clientSocket = pfmHashEntry->pMuxSocket->getCASocket();
863 
864  SINT32 ret = clientSocket->send(packetToSendOffset, len);
865 
866  if(ret > 0)
867  {
868 #ifdef PAYMENT
869  SINT32 accounting = E_SUCCESS;
870 #endif
871  pfmHashEntry->uAlreadySendPacketSize += ret;
872 
873  if(pfmHashEntry->uAlreadySendPacketSize == MIXPACKET_SIZE)
874  {
875  #ifdef DELAY_USERS
876  if(processedQueue != controlMessageUserQueue)
877  {
879  }
880  #endif
881 
882  #ifdef LOG_PACKET_TIMES
883  if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
884  {
885  getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
886  m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false);
887  }
888  #endif
889  pfmHashEntry->uAlreadySendPacketSize=-1;
890 #ifdef PAYMENT
891  /* count this packet for accounting */
892  accounting = accountTrafficDownstream(pfmHashEntry);
893 #endif
894  }
895 
896  }
897  else if(ret<0&&ret!=E_AGAIN)
898  {
899  iSocketErrors++;
900  // if (iSocketErrors == 1) // show debug message only at the first error; otherwise, the log may get huge
901  {
902  SOCKET sock=clientSocket->getSocket();
903  CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::sendtoUser() - send error %d on socket %d. Reason: %s (%i)\n", ret, sock, GET_NET_ERROR_STR(GET_NET_ERROR), GET_NET_ERROR);
904  }
905  // kick the user out - these only happens in extreme situations...
906  closeConnection(pfmHashEntry);
907  }
908  //TODO error handling
909  }
910 
911 #ifdef HAVE_EPOLL
913  }
914 #else
915  }//if is socket signaled
916  pfmHashEntry=m_pChannelList->getNext();
917  }
918 #endif
919  if (iSocketErrors > 1)
920  {
921  CAMsg::printMsg(LOG_ERR, "CAFirstMixA::sendtoUser() - %d send errors on a socket occured!\n", iSocketErrors);
922  }
923 
924  return bAktiv;
925 }
926 
927 #ifdef PAYMENT
929 {
930  SINT32 ret = E_SUCCESS;
931 
932  SINT32 handleResult = CAAccountingInstance::handleJapPacket(pHashEntry, false, false);
933 
935  {
936  // renew the timeout
937  //pHashEntry->bRecoverTimeout = true;
938  m_pChannelList->pushTimeoutEntry(pHashEntry);
939  }
941  {
942  // do not forward this packet
943  pHashEntry->bRecoverTimeout = false;
945  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: 1. setting bRecover timout to false for entry %x!\n", pHashEntry);
946  //m_pChannelList->pushTimeoutEntry(pHashEntry);
947  //don't let any upstream data messages pass for this user.
948  ret = E_UNKNOWN;
949  }
951  {
952  // kickout this user - he deserves it...
953  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: kickout upstream!\n");
954  closeConnection(pHashEntry);
955  ret = E_UNKNOWN;
956  }
957  //please remember that these values also may be returned even though they do not require
958  //any further processing
959  /*else if ( (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_UNCHECKED) &&
960  (ret == CAAccountingInstance::HANDLE_PACKET_HOLD_CONNECTION) )
961  {
962 
963  }*/
964  return ret;
965 }
966 #endif
967 
968 #ifdef PAYMENT
970 {
971  // count packet for payment
972  SINT32 ret = CAAccountingInstance::handleJapPacket(pfmHashEntry, !(pfmHashEntry->bCountPacket), true);
974  {
975  // renew the timeout
976  //pfmHashEntry->bRecoverTimeout = true;
977  m_pChannelList->pushTimeoutEntry(pfmHashEntry);
978  }
980  {
981  // when all control messages are sent the users connection will be closed
982  //pfmHashEntry->bRecoverTimeout = false;
984  //m_pChannelList->pushTimeoutEntry(pfmHashEntry);
985  }
987  {
988  // close users connection immediately
989  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: Closing JAP connection due to illegal payment status!\n", ret);
990  closeConnection(pfmHashEntry);
992  }
993  //please remember that these values also may be returned even though they do not require
994  //any further processing
995  /*else if ( (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_UNCHECKED) &&
996  (ret == CAAccountingInstance::HANDLE_PACKET_HOLD_CONNECTION) )
997  {
998 
999  }*/
1000  return E_SUCCESS;
1001 }
1002 #endif
1003 
1005 {
1006  if(pfmHashEntry == NULL)
1007  return;
1009  tQueueEntry* pQueueEntry=new tQueueEntry;
1010  MIXPACKET *notifyPacket = &(pQueueEntry->packet);
1011  memset(notifyPacket, 0, MIXPACKET_SIZE);
1012 
1013  notifyPacket->flags = flags;
1014  while(pEntry != NULL)
1015  {
1016  if(pEntry->bIsSuspended)
1017  {
1018  notifyPacket->channel = pEntry->channelOut;
1019  getRandom(notifyPacket->data,DATA_SIZE);
1020 #ifdef _DEBUG
1021  CAMsg::printMsg(LOG_INFO,"Sent flags %u for channel: %u\n", flags, notifyPacket->channel);
1022 #endif
1023  m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
1024  pEntry->bIsSuspended = false;
1025  }
1026  pEntry=m_pChannelList->getNextChannel(pEntry);
1027  }
1028  pfmHashEntry->cSuspend=0;
1029  delete pQueueEntry;
1030 }
1031 
1032 //@todo: not a reliable solution. Still have to find the bug that causes SSL connections to be resetted
1033 //while large downloads are performed by the same user (only occurs in cascades with more than two mixes)
1034 #ifdef SSL_HACK
1035 
1037 {
1038  tQueueEntry *packetToSend = &(pfmHashEntry->oQueueEntry);
1039  fmChannelList* cListEntry=m_pChannelList->get(packetToSend->packet.channel);
1040  if(cListEntry != NULL)
1041  {
1042  packetToSend->packet.channel = cListEntry->channelIn;
1043  cListEntry->downStreamBytes -= sizeof(tQueueEntry);
1044 #ifdef DEBUG
1045  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: channels of current packet, in: %u, out: %u, count: %u, flags: 0x%x\n",
1046  cListEntry->channelIn, cListEntry->channelOut, cListEntry->downStreamBytes,
1047  packetToSend->packet.flags);
1048 #endif
1049  if(packetToSend->packet.flags == CHANNEL_CLOSE)
1050  {
1051  delete cListEntry->pCipher;
1052  cListEntry->pCipher = NULL;
1053  m_pChannelList->removeChannel(pfmHashEntry->pMuxSocket, cListEntry->channelIn);
1054 #ifdef CH_LOG_STUDY
1055  nrOfChOpMutex->lock();
1056  currentOpenedChannels--;
1057  nrOfChOpMutex->unlock();
1058 #endif //CH_LOG_STUDY
1059  }
1060  }
1061 }
1062 
1063 #endif //SSL_HACK
1064 
1065 #ifdef PAYMENT
1067 {
1068  // check the timeout for all connections
1069  fmHashTableEntry* timeoutHashEntry;
1070  fmHashTableEntry* firstIteratorEntry = NULL;
1071  bool currentEntryKickoutForced = false;
1072  /* this check also includes forced kickouts which have not bRecoverTimeout set. */
1073  while ( (timeoutHashEntry = m_pChannelList->popTimeoutEntry(true)) != NULL )
1074  {
1075  currentEntryKickoutForced = m_pChannelList->isKickoutForced(timeoutHashEntry);
1076  if(firstIteratorEntry == timeoutHashEntry)
1077  {
1078  m_pChannelList->pushTimeoutEntry(timeoutHashEntry, currentEntryKickoutForced);
1079  break;
1080  }
1081 
1082  if (!currentEntryKickoutForced)
1083  {
1084  //CAMsg::printMsg(LOG_ERR, "%p\n, ", timeoutHashEntry);
1085  if(m_pChannelList->isTimedOut(timeoutHashEntry) )
1086  {
1087  CAMsg::printMsg(LOG_DEBUG,"Client connection closed due to timeout.\n");
1088  closeConnection(timeoutHashEntry);
1089  continue;
1090  }
1091  }
1092  else
1093  {
1094  //A user to be kicked out: empty his downstream data queue.
1095  timeoutHashEntry->pQueueSend->clean();
1096 
1097  if( (timeoutHashEntry->pControlMessageQueue->getSize() == 0) ||
1098  (timeoutHashEntry->kickoutSendRetries <= 0) )
1099  {
1100  CAMsg::printMsg(LOG_WARNING, "Kickout immediately owner %x!\n", timeoutHashEntry);
1101  UINT32 authFlags = CAAccountingInstance::getAuthFlags(timeoutHashEntry);
1102  if (authFlags > 0)
1103  {
1104  CAMsg::printMsg(LOG_WARNING,"Client connection closed due to forced timeout! Payment auth flags: %u\n", authFlags);
1105  }
1106  else
1107  {
1108  CAMsg::printMsg(LOG_WARNING,"Client connection closed due to forced timeout!\n");
1109  }
1110  //CAAccountingInstance::setPrepaidBytesToZero(timeoutHashEntry->pAccountingInfo);
1111  closeConnection(timeoutHashEntry);
1112  continue;
1113  }
1114  else
1115  {
1116  //Note this counter initialized by calling CAFirstMixChannelList::add
1117  //and accessed by this thread, both do never run concurrently.
1118  //So we can avoid locking.
1119  timeoutHashEntry->kickoutSendRetries--;
1120  CAMsg::printMsg(LOG_INFO, "Size of control message queue for user to be kicked out: %u bytes, retries %d.\n",
1121  timeoutHashEntry->pControlMessageQueue->getSize(), timeoutHashEntry->kickoutSendRetries);
1122  }
1123  // Let the client obtain all his remaining control message packets
1124  //(which in most cases contain the error message with the kickout reason.
1125  CAMsg::printMsg(LOG_WARNING,"A kickout is supposed to happen. Let the user get his %u control message bytes before...\n",
1126  timeoutHashEntry->pControlMessageQueue->getSize());
1127  }
1128  if(firstIteratorEntry == NULL)
1129  {
1130  firstIteratorEntry = timeoutHashEntry;
1131  }
1132  m_pChannelList->pushTimeoutEntry(timeoutHashEntry, currentEntryKickoutForced);
1133  }
1134 }
1135 #endif
1136 
1137 #ifdef LOG_CRIME
1138 void CAFirstMixA::crimeSurveillance(CAIPAddrWithNetmask* surveillanceIPs, UINT32 nrOfSurveillanceIPs,UINT8 *peerIP,SINT32 peerPort, MIXPACKET *pMixPacket)
1139 {
1140  if( (nrOfSurveillanceIPs > 0) && (surveillanceIPs != NULL) )
1141  {
1142  for(UINT32 i = 0; i < nrOfSurveillanceIPs; i++)
1143  {
1144  if(surveillanceIPs[i].equals(peerIP))
1145  {
1146  CAMsg::printMsg(LOG_CRIT,"Crime detection: User surveillance, IP %u.%u.%u.%u Port %i with next mix channel %u\n",peerIP[0], peerIP[1], peerIP[2], peerIP[3],peerPort,pMixPacket->channel);
1147  pMixPacket->flags |= CHANNEL_SIG_CRIME;
1148  return;
1149  }
1150  }
1151  }
1152 }
1153 #endif
1154 
1155 
1156 
1157 
1158 
1159 
1160 
1161 #else //MULTIPLE THREADS
1162 
1163 
1164 
1165 
1166 
1167 #ifdef HAVE_EPOLL
1168 SINT32 CAFirstMixA::closeConnection(fmHashTableEntry* pHashEntry, CASocketGroupEpoll* psocketgroupUsersRead,CASocketGroupEpoll* psocketgroupUsersWrite, CAFirstMixChannelList* pChannelList)
1169 #else
1170 SINT32 CAFirstMixA::closeConnection(fmHashTableEntry* pHashEntry, CASocketGroup* psocketgroupUsersRead,CASocketGroup* psocketgroupUsersWrite, CAFirstMixChannelList* pChannelList)
1171 #endif
1172 {
1173  if (pHashEntry == NULL)
1174  {
1175  return E_UNKNOWN;
1176  }
1177 
1178  INIT_STACK;
1179  BEGIN_STACK("CAFirstMixA::closeConnection");
1180 
1181 
1182  fmChannelListEntry* pEntry;
1183  tQueueEntry* pQueueEntry = new tQueueEntry;
1184  MIXPACKET* pMixPacket=&pQueueEntry->packet;
1185 
1186  #ifdef LOG_TRAFFIC_PER_USER
1187  UINT64 current_time;
1188  getcurrentTimeMillis(current_time);
1189  CAMsg::printMsg(LOG_DEBUG,"Removing Connection wiht ID: %Lu -- login time [ms] %Lu -- logout time [ms] %Lu -- Traffic was: IN: %u -- OUT: %u\n",pHashEntry->id,pHashEntry->timeCreated,current_time,pHashEntry->trafficIn,pHashEntry->trafficOut);
1190  #endif
1191  m_pIPList->removeIP(pHashEntry->peerIP);
1192 
1193  psocketgroupUsersRead->remove(*(pHashEntry->pMuxSocket));
1194  psocketgroupUsersWrite->remove(*(pHashEntry->pMuxSocket));
1195  pEntry = pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket);
1196 
1197  while(pEntry!=NULL)
1198  {
1199  getRandom(pMixPacket->data,DATA_SIZE);
1200  pMixPacket->flags=CHANNEL_CLOSE;
1201  m_pChannelToQueueList->removeChannel(pEntry->channelOut);
1202  pMixPacket->channel=pEntry->channelOut;
1203  #ifdef LOG_PACKET_TIMES
1204  setZero64(pQueueEntry->timestamp_proccessing_start);
1205  #endif
1206  m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
1207  delete pEntry->pCipher;
1208  pEntry->pCipher = NULL;
1209  pEntry=pChannelList->getNextChannel(pEntry);
1210 #ifdef CH_LOG_STUDY
1211  nrOfChOpMutex->lock();
1212  currentOpenedChannels--;
1213  nrOfChOpMutex->unlock();
1214 #endif //CH_LOG_STUDY
1215  }
1216  ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL");
1217  delete pHashEntry->pQueueSend;
1218  pHashEntry->pQueueSend = NULL;
1219  delete pHashEntry->pSymCipher;
1220  pHashEntry->pSymCipher = NULL;
1221 
1222  #ifdef COUNTRY_STATS
1223  decUsers(pHashEntry);
1224  #else
1225  decUsers();
1226  #endif
1227 
1228  CAMuxSocket* pMuxSocket = pHashEntry->pMuxSocket;
1229  // Save the socket - its pointer will be deleted in this method!!! Crazy mad programming...
1230  pChannelList->remove(pHashEntry->pMuxSocket);
1231  delete pMuxSocket;
1232  pMuxSocket = NULL;
1233  //pHashEntry->pMuxSocket = NULL; // not needed now, but maybe in the future...
1234 
1235  delete pQueueEntry;
1236  pQueueEntry = NULL;
1237 
1238  FINISH_STACK("CAFirstMixA::closeConnection");
1239 
1240  return E_SUCCESS;
1241 }
1242 
1243 #define FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS 2*KEY_SIZE
1244 
1245 
1246 struct fm_packet_proccessing_loop_args_t
1247  {
1248  CAFirstMixA* pFirstMix;
1249  CAQueue* pIncomingPacketQueue;
1250  CAFirstMixChannelList* pChannelList;
1251 #ifdef HAVE_EPOLL
1252  CASocketGroupEpoll* psocketgroupUsersRead;
1253  CASocketGroupEpoll* psocketgroupUsersWrite;
1254 #else
1255  CASocketGroup* psocketgroupUsersRead;
1256  CASocketGroup* psocketgroupUsersWrite;
1257 #endif
1258  };
1259 typedef struct fm_packet_proccessing_loop_args_t tPacketProcessingLoopArgs;
1260 
1261 
1263  {
1264 #ifndef NEW_MIX_TYPE
1265  m_pthreadsPacketProccessingLoop = new CAThreadPool(m_numThreads, m_numThreads, false);
1266 
1267  for (UINT32 i = 0; i < m_numThreads; i++)
1268  {
1269 #ifdef DELAY_USERS
1270  m_arpChannelList[i]->setDelayParameters(CALibProxytest::getOptions()->getDelayChannelUnlimitTraffic(),
1271  CALibProxytest::getOptions()->getDelayChannelBucketGrow(),
1272  CALibProxytest::getOptions()->getDelayChannelBucketGrowIntervall());
1273 #endif
1274  tPacketProcessingLoopArgs* params = new tPacketProcessingLoopArgs;
1275  params->pFirstMix = this;
1276  params->pChannelList = m_arpChannelList[i];
1277  params->psocketgroupUsersRead = m_arpsocketgroupUsersRead[i];
1278  params->psocketgroupUsersWrite = m_arpsocketgroupUsersWrite[i];
1279  params->pIncomingPacketQueue = new CAQueue();
1280  m_pthreadsPacketProccessingLoop->addRequest(fm_loopPacketProcessing, params);
1281  }
1282  // CASingleSocketGroup osocketgroupMixOut;
1283  //#ifdef LOG_PACKET_TIMES
1284  // tPoolEntry* pPoolEntry=new tPoolEntry;
1285  // MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
1286  //#else
1287  tQueueEntry* pQueueEntry = new tQueueEntry;
1288  MIXPACKET* pMixPacket=&pQueueEntry->packet;
1289  //#endif
1290  m_nUser=0;
1291  SINT32 ret;
1292  //osocketgroupMixOut.add(*m_pMuxOut);
1293 
1294  UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
1295  CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
1296  bool bAktiv;
1297 
1298 #ifdef LOG_TRAFFIC_PER_USER
1299  UINT64 current_time;
1300  UINT32 diff_time;
1301  CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
1302  CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
1303  CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
1304 #endif
1306 #ifdef _DEBUG
1307 // CAThread* pLogThread=new CAThread((UINT8*)"CAFirstMixA - LogLoop");
1308 // pLogThread->setMainLoop(fm_loopLog);
1309 // pLogThread->start(this);
1310 #endif
1311 
1312 #ifdef LOG_CRIME
1313  CAIPAddrWithNetmask* surveillanceIPs = CALibProxytest::getOptions()->getCrimeSurveillanceIPs();
1314  UINT32 nrOfSurveillanceIPs = CALibProxytest::getOptions()->getNrOfCrimeSurveillanceIPs();
1315 #endif
1316 // CAThread threadReadFromUsers;
1317 // threadReadFromUsers.setMainLoop(loopReadFromUsers);
1318 // threadReadFromUsers.start(this);
1319 
1320  while(!m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */
1321  {
1322 //Step 4
1323 //Step 4a Receiving from mix to queue now in a separate thread
1324 //Step 4b Processing MixPackets received from Mix
1325 //todo check for error!!!
1326  while(m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
1327  {
1328  bAktiv=true;
1329  ret=sizeof(tQueueEntry);
1330  m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
1331  CAQueue* pQueue = m_pChannelToQueueList->get(pMixPacket->channel);
1332  if(pQueue!=NULL)
1333  pQueue->add(pQueueEntry, ret);
1334  #ifdef LOG_PACKET_TIMES
1335  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
1336  #endif
1337 #ifdef ANON_DEBUG_MODE
1338  if (pMixPacket->flags&CHANNEL_DEBUG)
1339  {
1340  UINT8 base64Payload[DATA_SIZE << 1];
1341  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
1342  CAMsg::printMsg(LOG_DEBUG, "Dequeued Downstream AN.ON packet from previous Mix debug: %s\n", base64Payload);
1343  pMixPacket->flags &= ~CHANNEL_DEBUG;
1344  }
1345 #endif
1346  }
1347 
1348 #ifndef FAST_PROCESSING
1349  if(!bAktiv)
1350  msSleep(1);
1351 #endif
1352  }
1353 //ERR:
1354  CAMsg::printMsg(LOG_CRIT,"Seems that we are restarting now!!\n");
1355  m_bRunLog=false;
1356  clean();
1357  delete pQueueEntry;
1358  pQueueEntry = NULL;
1359  delete []tmpBuff;
1360  tmpBuff = NULL;
1361 #ifdef _DEBUG
1362 // pLogThread->join();
1363 // delete pLogThread;
1364 // pLogThread = NULL;
1365 #endif
1366  CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
1367 #endif//NEW_MIX_TYPE
1368  return E_UNKNOWN;
1369  }
1370 
1371 
1373  {
1374 #ifndef NEW_MIX_TYPE
1375  CAFirstMixA* pMix = static_cast<tPacketProcessingLoopArgs*>(params)->pFirstMix;
1376 #ifdef HAVE_EPOLL
1377  CASocketGroupEpoll* psocketgroupUsersRead = static_cast<tPacketProcessingLoopArgs*>(params)->psocketgroupUsersRead;
1378  CASocketGroupEpoll* psocketgroupUsersWrite= static_cast<tPacketProcessingLoopArgs*>(params)->psocketgroupUsersWrite;
1379 #else
1380  CASocketGroup* psocketgroupUsersRead = static_cast<tPacketProcessingLoopArgs*>(params)->psocketgroupUsersRead;
1381  CASocketGroup* psocketgroupUsersWrite= static_cast<tPacketProcessingLoopArgs*>(params)->psocketgroupUsersWrite;
1382 #endif
1383  CAQueue* pQueueReadFromMix = static_cast<tPacketProcessingLoopArgs*>(params)->pIncomingPacketQueue;
1384  CAFirstMixChannelList* pChannelList = static_cast<tPacketProcessingLoopArgs*>(params)->pChannelList;
1385 
1386 
1387 
1388 
1389 #ifdef DELAY_USERS
1393 #endif
1394 
1395  // CASingleSocketGroup osocketgroupMixOut;
1396  SINT32 countRead=0;
1397  //#ifdef LOG_PACKET_TIMES
1398  // tPoolEntry* pPoolEntry=new tPoolEntry;
1399  // MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
1400  //#else
1401  tQueueEntry* pQueueEntry = new tQueueEntry;
1402  MIXPACKET* pMixPacket=&pQueueEntry->packet;
1403  //#endif
1404  SINT32 ret;
1405  //osocketgroupMixOut.add(*m_pMuxOut);
1406 
1407  UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
1408  CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
1409  bool bAktiv;
1410  UINT8 rsaBuff[RSA_SIZE];
1411 
1412 #ifdef LOG_TRAFFIC_PER_USER
1413  UINT64 current_time;
1414  UINT32 diff_time;
1415  CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
1416  CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
1417  CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
1418 #endif
1419 
1420 #ifdef LOG_CRIME
1421  CAIPAddrWithNetmask* surveillanceIPs = CALibProxytest::getOptions()->getCrimeSurveillanceIPs();
1422  UINT32 nrOfSurveillanceIPs = CALibProxytest::getOptions()->getNrOfCrimeSurveillanceIPs();
1423 #endif
1424 // CAThread threadReadFromUsers;
1425 // threadReadFromUsers.setMainLoop(loopReadFromUsers);
1426 // threadReadFromUsers.start(this);
1427 
1428  while(!pMix->m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */
1429  {
1430 
1431  bAktiv=false;
1432 
1433 //LOOP_START:
1434 #ifdef PAYMENT
1435  // while checking if there are connections to close: synch with login threads
1436  m_pmutexLogin->lock();
1438  m_pmutexLogin->unlock();
1439 #endif
1440 //First Step
1441 //Checking for new connections
1442 // Now in a separate Thread....
1443 
1444 // Second Step
1445 // Checking for data from users
1446 // Now in a separate Thread (see loopReadFromUsers())
1447 //Only proccess user data, if queue to next mix is not to long!!
1448 
1449  if(pMix->m_pQueueSendToMix->getSize()<MAX_NEXT_MIX_QUEUE_SIZE)
1450  {
1451  countRead=psocketgroupUsersRead->select(/*false,*/0); // how many JAP<->mix connections have received data from their coresponding JAP
1452  if(countRead>0)
1453  bAktiv=true;
1454 #ifdef HAVE_EPOLL
1455  //if we have epoll we do not need to search the whole list
1456  //of connected JAPs to find the ones who have sent data
1457  //as epoll will return ONLY these connections.
1458  fmHashTableEntry* pHashEntry=(fmHashTableEntry*)psocketgroupUsersRead->getFirstSignaledSocketData();
1459  while(pHashEntry!=NULL)
1460  {
1461  CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
1462 #else
1463  //if we do not have epoll we have to go to the whole
1464  //list of open connections to find the ones which
1465  //actually have sent some data
1466  fmHashTableEntry* pHashEntry=pChannelList->getFirst();
1467  while(pHashEntry!=NULL&&countRead>0) // iterate through all connections as long as there is at least one active left
1468  {
1469  CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
1470  if(psocketgroupUsersRead->isSignaled(*pMuxSocket)) // if this one seems to have data
1471  {
1472 #endif
1473 /*#ifdef DELAY_USERS
1474  * Don't delay upstream
1475  if( m_pChannelList->hasDelayBuckets(pHashEntry->delayBucketID) )
1476  {
1477 #endif*/
1478  countRead--;
1479  ret=pMuxSocket->receive(pMixPacket,0);
1480 
1481  #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
1482  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
1483  set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start);
1484  #endif
1485  #ifdef DATA_RETENTION_LOG
1486  pQueueEntry->dataRetentionLogEntry.t_in=htonl(time(NULL));
1487  #endif
1488  if(ret<0&&ret!=E_AGAIN/*||pHashEntry->accessUntil<time()*/)
1489  {
1490  // remove dead connections
1491  pMix->closeConnection(pHashEntry,psocketgroupUsersRead,psocketgroupUsersRead,pChannelList);
1492  }
1493  else if(ret==MIXPACKET_SIZE) // we've read enough data for a whole mix packet. nice!
1494  {
1495 #ifdef PAYMENT
1496  if (pHashEntry->bRecoverTimeout)
1497  {
1498  // renew the timeout only if recovery is allowed
1499  m_pChannelList->pushTimeoutEntry(pHashEntry);
1500  }
1501 #endif
1502  #ifdef LOG_TRAFFIC_PER_USER
1503  pHashEntry->trafficIn++;
1504  #endif
1505  #ifdef COUNTRY_STATS
1506  m_PacketsPerCountryIN[pHashEntry->countryID].inc();
1507  #endif
1508  //New control channel code...!
1509  if(pMixPacket->channel>0&&pMixPacket->channel<256)
1510  {
1511  if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
1512  {
1513  goto NEXT_USER;
1514  }
1515  else
1516  {
1517  CAMsg::printMsg(LOG_DEBUG, "Control channel packet is invalid and could not be processed!\n");
1518  pMix->closeConnection(pHashEntry,psocketgroupUsersRead,psocketgroupUsersRead,pChannelList);
1519  goto NEXT_USER;
1520  }
1521  }
1522 #ifdef ANON_DEBUG_MODE
1523  bool bIsDebugPacket=false;
1524  if (pMixPacket->flags&CHANNEL_DEBUG)
1525  {
1526  bIsDebugPacket=true;
1527  UINT8 base64Payload[DATA_SIZE << 1];
1528  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
1529  pMixPacket->flags &= ~CHANNEL_DEBUG;
1530  CAMsg::printMsg(LOG_DEBUG, "AN.ON packet debug: %s\n", base64Payload);
1531  }
1532 
1533 #endif
1534 #ifdef PAYMENT
1535  if(accountTrafficUpstream(pHashEntry) != E_SUCCESS) goto NEXT_USER;
1536 #endif
1537  if(pMixPacket->flags==CHANNEL_DUMMY) // just a dummy to keep the connection alife in e.g. NAT gateways
1538  {
1539  CAMsg::printMsg(LOG_DEBUG,"received dummy traffic\n");
1540  getRandom(pMixPacket->data,DATA_SIZE);
1541  #ifdef LOG_PACKET_TIMES
1542  setZero64(pQueueEntry->timestamp_proccessing_start);
1543  #endif
1544  #ifdef LOG_TRAFFIC_PER_USER
1545  pHashEntry->trafficOut++;
1546  #endif
1547  #ifdef COUNTRY_STATS
1548  m_PacketsPerCountryOUT[pHashEntry->countryID].inc();
1549  #endif
1550  pHashEntry->pQueueSend->add(pQueueEntry,sizeof(tQueueEntry));
1551  #ifdef HAVE_EPOLL
1552  //m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry);
1553  #else
1554  psocketgroupUsersWrite->add(*pMuxSocket);
1555  #endif
1556  }
1557  else if(pMixPacket->flags==CHANNEL_CLOSE) // closing one mix-channel (not the JAP<->mix connection!)
1558  {
1559 
1560  fmChannelListEntry* pEntry;
1561  pEntry=pChannelList->get(pMuxSocket,pMixPacket->channel);
1562  if(pEntry!=NULL)
1563  {
1564  pMix->m_pChannelToQueueList->removeChannel(pEntry->channelOut);
1565  pMixPacket->channel=pEntry->channelOut;
1566  getRandom(pMixPacket->data,DATA_SIZE);
1567  #ifdef LOG_PACKET_TIMES
1568  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
1569  #endif
1570  pMix->m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
1571  /* Don't delay upstream
1572  #ifdef DELAY_USERS
1573  m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
1574  #endif*/
1575  #ifdef LOG_CHANNEL
1576  //pEntry->packetsInFromUser++;
1577  getcurrentTimeMicros(current_time);
1578  diff_time=diff64(current_time,pEntry->timeCreated);
1579  CAMsg::printMsg(LOG_DEBUG,"1:%u,%Lu,%Lu,%u,%u,%u\n",
1580  pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
1581  diff_time);
1582  #endif
1583  delete pEntry->pCipher; // forget the symetric key of this connection
1584  pEntry->pCipher = NULL;
1585  pChannelList->removeChannel(pMuxSocket,pEntry->channelIn);
1586 
1587  #ifdef CH_LOG_STUDY
1588  nrOfChOpMutex->lock();
1589  currentOpenedChannels--;
1590  nrOfChOpMutex->unlock();
1591  #endif //CH_LOG_STUDY
1592  }
1593  #ifdef _DEBUG
1594  else
1595  {
1596 // CAMsg::printMsg(LOG_DEBUG,"Invalid ID to close from Browser!\n");
1597  }
1598  #endif
1599  }
1600  else // finally! a normal mix packet
1601  {
1602  CASymChannelCipher* pCipher=NULL;
1603  fmChannelListEntry* pEntry;
1604  pEntry=pChannelList->get(pMuxSocket,pMixPacket->channel);
1605  if (pEntry != NULL&&pMixPacket->flags == CHANNEL_DATA)
1606  {
1607  pMixPacket->channel = pEntry->channelOut;
1608  pCipher = pEntry->pCipher;
1609  pCipher->crypt1(pMixPacket->data, pMixPacket->data, DATA_SIZE);
1610  // queue the packet for sending to the next mix.
1611 #ifdef LOG_PACKET_TIMES
1612  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
1613 #endif
1614 
1615  //check if this IP must be logged due to crime detection
1616 #ifdef LOG_CRIME
1617  crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs, pEntry->pHead->peerIP,pEntry->pHead->peerPort, pMixPacket);
1618 #endif
1619 #ifdef ANON_DEBUG_MODE
1620  if(bIsDebugPacket)
1621  {
1622  pMixPacket->flags|=CHANNEL_DEBUG;
1623  }
1624  #endif
1625  pMix->m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
1626  /* Don't delay upstream
1627  #ifdef DELAY_USERS
1628  m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
1629  #endif*/
1630  pMix->incMixedPackets();
1631  #ifdef LOG_CHANNEL
1632  pEntry->packetsInFromUser++;
1633  #endif
1634  }
1635  else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN) // open a new mix channel
1636  { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ?
1637  //es gilt: open -> data
1638  pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
1639  #ifdef REPLAY_DETECTION
1640  // replace time(NULL) with the real timestamp ()
1641  // packet-timestamp*REPLAY_BASE + m_u64ReferenceTime
1642  if(m_pReplayDB->insert(rsaBuff,time(NULL))!=E_SUCCESS)
1643  {
1644  CAMsg::printMsg(LOG_INFO,"Replay: Duplicate packet ignored.\n");
1645  continue;
1646  }
1647  #endif
1648  pCipher= CASymChannelCipherFactory::createCipher(CALibProxytest::getOptions()->getSymChannelCipherAlgorithm());
1649  pCipher->setKeys(rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
1650  for(int i=0;i<16;i++)
1651  rsaBuff[i]=0xFF;
1652  pCipher->setIV2(rsaBuff);
1655  #if defined (LOG_CHANNEL) ||defined(DATA_RETENTION_LOG)
1656  HCHANNEL tmpC=pMixPacket->channel;
1657  #endif
1658 
1659  HCHANNEL inChannel = pMixPacket->channel;
1660  if(pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS)
1661  { //todo move up ?
1662  delete pCipher;
1663  pCipher = NULL;
1664  }
1665  else
1666  {
1667  pMix->m_pChannelToQueueList->add(pMixPacket->channel, pQueueReadFromMix);
1668  #ifdef CH_LOG_STUDY
1669  nrOfChOpMutex->lock();
1670  if(pHashEntry->channelOpenedLastIntervalTS !=
1671  lastLogTime)
1672  {
1673  pHashEntry->channelOpenedLastIntervalTS =
1674  lastLogTime;
1675  nrOfOpenedChannels++;
1676  }
1677  currentOpenedChannels++;
1678  nrOfChOpMutex->unlock();
1679  #endif //CH_LOG_STUDY
1680 
1681  #ifdef LOG_PACKET_TIMES
1682  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
1683  #endif
1684  #ifdef LOG_CHANNEL
1685  fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC);
1686  pTmpEntry->packetsInFromUser++;
1687  set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start);
1688  #endif
1689  #ifdef DATA_RETENTION_LOG
1690  pQueueEntry->dataRetentionLogEntry.entity.first.channelid=htonl(pMixPacket->channel);
1691  fmChannelListEntry* pTmpEntry1=m_pChannelList->get(pMuxSocket,tmpC);
1692  memcpy(pQueueEntry->dataRetentionLogEntry.entity.first.ip_in,pTmpEntry1->pHead->peerIP,4);
1693  pQueueEntry->dataRetentionLogEntry.entity.first.port_in=(UINT16)pTmpEntry1->pHead->peerPort;
1694  pQueueEntry->dataRetentionLogEntry.entity.first.port_in=htons(pQueueEntry->dataRetentionLogEntry.entity.first.port_in);
1695  #endif
1696 
1697  //check if this IP must be logged due to crime detection
1698  #ifdef LOG_CRIME
1699 
1700  pEntry=m_pChannelList->get(pMuxSocket, inChannel);
1701  if(pEntry != NULL)
1702  {
1703  crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs, pEntry->pHead->peerIP,pEntry->pHead->peerPort, pMixPacket);
1704  }
1705  #endif
1706 #ifdef ANON_DEBUG_MODE
1707  if (bIsDebugPacket)
1708  {
1709  pMixPacket->flags |= CHANNEL_DEBUG;
1710  pEntry = m_pChannelList->get(pMuxSocket, inChannel);
1711  pEntry->bDebug = true;
1712  }
1713 #endif
1714  pMix->m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
1715  /* Don't delay upstream
1716  #ifdef DELAY_USERS
1717  m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
1718  #endif*/
1719  pMix->incMixedPackets();
1720  #ifdef _DEBUG
1721  CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u for in channel: %u\n",pMixPacket->channel,inChannel);
1722  #endif
1723  }
1724  }
1725  }
1726  }
1727 /*#ifdef DELAY_USERS
1728  }
1729 #endif*/
1730  #ifdef HAVE_EPOLL
1731 NEXT_USER:
1732  pHashEntry=(fmHashTableEntry*)psocketgroupUsersRead->getNextSignaledSocketData();
1733  #else
1734  }//if is signaled
1735 NEXT_USER:
1736  pHashEntry=pChannelList->getNext();
1737  #endif
1738  }
1739  if(countRead>0)
1740  {
1741  CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::loop() - read from user --> countRead >0 after processing all connections!\n");
1742  }
1743  }
1744 //Third step
1745 //Sending to next mix
1746 
1747 // Now in a separate Thread (see loopSendToMix())
1748 
1749 //Step 4
1750 //Step 4a Receiving from mix to queue now in a separate thread
1751 //Step 4b Processing MixPackets received from Mix
1752 //todo check for error!!!
1753  countRead=pMix->m_nUser+1;
1754  while(countRead>0&&pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
1755  {
1756  bAktiv=true;
1757  countRead--;
1758  ret=sizeof(tQueueEntry);
1759  pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
1760 
1761  #ifdef LOG_PACKET_TIMES
1762  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
1763  #endif
1764 #ifdef ANON_DEBUG_MODE
1765  if (pMixPacket->flags&CHANNEL_DEBUG)
1766  {
1767  UINT8 base64Payload[DATA_SIZE << 1];
1768  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
1769  CAMsg::printMsg(LOG_DEBUG, "Dequeued Downstream AN.ON packet from previous Mix debug: %s\n", base64Payload);
1770  pMixPacket->flags &= ~CHANNEL_DEBUG;
1771  }
1772 
1773 #endif
1774 
1775  if(pMixPacket->flags==CHANNEL_CLOSE) //close event
1776  {
1777  #if defined(_DEBUG) && !defined(__MIX_TEST)
1778 // CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
1779  #endif
1780  pMix->m_pChannelToQueueList->removeChannel(pMixPacket->channel);
1781  fmChannelList* pEntry=pChannelList->get(pMixPacket->channel);
1782  if(pEntry!=NULL)
1783  {
1784  /* a hack to solve the SSL problem:
1785  * set channel of downstream packet to in channel after they are dequeued
1786  * from pEntry->pQueueSend so we can retrieve the channel entry to decrement
1787  * the per channel count of enqueued downstream bytes.
1788  */
1789  #ifndef SSL_HACK
1790  pMixPacket->channel=pEntry->channelIn;
1791  #endif
1792 
1793  pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
1794  //getRandom(pMixPacket->data,DATA_SIZE);
1795  #ifdef LOG_PACKET_TIMES
1796  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
1797  #endif
1798  pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
1799  #ifdef LOG_TRAFFIC_PER_USER
1800  pEntry->pHead->trafficOut++;
1801  #endif
1802  #ifdef COUNTRY_STATS
1804  #endif
1805  #ifdef SSL_HACK
1806  /* a hack to solve the SSL problem:
1807  * per channel count of enqueued downstream bytes
1808  */
1809  pEntry->downStreamBytes += sizeof(tQueueEntry);
1810  #endif
1811  #ifdef LOG_CHANNEL
1812  pEntry->packetsOutToUser++;
1813  getcurrentTimeMicros(current_time);
1814  diff_time=diff64(current_time,pEntry->timeCreated);
1815  CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%Lu,%u,%u,%u\n",
1816  pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
1817  diff_time);
1818  #endif
1819 
1820  #ifdef HAVE_EPOLL
1821  //m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
1822  #else
1823  psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket);
1824  #endif
1825 
1826 
1827  #ifndef SSL_HACK
1828  delete pEntry->pCipher; // forget the symetric key of this connection
1829  pEntry->pCipher = NULL;
1830  pChannelList->removeChannel(pEntry->pHead->pMuxSocket, pEntry->channelIn);
1831  #ifdef CH_LOG_STUDY
1832  nrOfChOpMutex->lock();
1833  currentOpenedChannels--;
1834  nrOfChOpMutex->unlock();
1835  #endif
1836  /* a hack to solve the SSL problem:
1837  * remove channel after the close packet is enqueued
1838  * from pEntry->pQueueSend
1839  */
1840  #endif
1841  }
1842  else
1843  {
1844  #ifdef DEBUG
1845  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: close channel -> client but channel does not exist.\n");
1846  #endif
1847  }
1848 
1849  }
1850  else
1851  {//flag !=close
1852  #if defined(_DEBUG) && !defined(__MIX_TEST)
1853 // CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
1854  #endif
1855  fmChannelList* pEntry=pChannelList->get(pMixPacket->channel);
1856 
1857  if(pEntry!=NULL)
1858  {
1859  #ifdef LOG_CRIME
1860  if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
1861  {
1862  //UINT32 id=(pMixPacket->flags>>8)&0x000000FF;
1863  int log=LOG_ENCRYPTED;
1864  if(!CALibProxytest::getOptions()->isEncryptedLogEnabled())
1865  log=LOG_CRIT;
1866  CAMsg::printMsg(log,"Detecting crime activity - next mix channel: %u -- "
1867  "Incoming (User) IP:Port is: %u.%u.%u.%u:%u \n", pMixPacket->channel,
1868  pEntry->pHead->peerIP[0],
1869  pEntry->pHead->peerIP[1],
1870  pEntry->pHead->peerIP[2],
1871  pEntry->pHead->peerIP[3],
1872  pEntry->pHead->peerPort);
1873  continue;
1874  }
1875  #endif
1876 
1877  /* a hack to solve the SSL problem:
1878  * same as CHANNEL_CLOSE packets
1879  */
1880  #ifndef SSL_HACK
1881  pMixPacket->channel=pEntry->channelIn;
1882  #endif
1883 
1884  pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
1885 
1886  #ifdef LOG_PACKET_TIMES
1887  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
1888  #endif
1889 #ifdef ANON_DEBUG_MODE
1890  if (pEntry->bDebug)
1891  {
1892  pMixPacket->flags |= CHANNEL_DEBUG;
1893  UINT8 base64Payload[DATA_SIZE << 1];
1894  EVP_EncodeBlock(base64Payload, pMixPacket->data, DATA_SIZE);//base64 encoding (without newline!)
1895  CAMsg::printMsg(LOG_DEBUG, "Send Dowwstream AN.ON packet debug: %s\n", base64Payload);
1896  }
1897 
1898 #endif
1899  pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
1900  /*CAMsg::printMsg(
1901  LOG_INFO,"adding data packet to queue: %x, queue size: %u bytes\n",
1902  pEntry->pHead->pQueueSend, pEntry->pHead->pQueueSend->getSize());*/
1903  #ifdef LOG_TRAFFIC_PER_USER
1904  pEntry->pHead->trafficOut++;
1905  #endif
1906  #ifdef COUNTRY_STATS
1908  #endif
1909  #ifdef LOG_CHANNEL
1910  pEntry->packetsOutToUser++;
1911  #endif
1912  #ifdef SSL_HACK
1913  /* a hack to solve the SSL problem:
1914  * per channel count of downstream packets in bytes
1915  */
1916  pEntry->downStreamBytes += sizeof(tQueueEntry);
1917  #endif
1918 
1919  #ifdef HAVE_EPOLL
1920  /*int epret = m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
1921  if(epret == E_UNKNOWN)
1922  {
1923  epret=errno;
1924  CAMsg::printMsg(LOG_INFO,"epoll_add returns: %s (return value: %d) \n", strerror(epret), epret);
1925  }*/
1926  #else
1927  psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket);
1928  #endif
1929 
1930  pMix->incMixedPackets();
1931  }
1932  else
1933  {
1934  #ifdef _DEBUG
1935  if(pMixPacket->flags!=CHANNEL_DUMMY)
1936  {
1937 /* CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
1938  "Channel-Id %u not valid!\n",pMixPacket->channel
1939  );*/
1940  #ifdef LOG_CHANNEL
1941  CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel);
1942  #endif
1943  }
1944  #endif
1945  }
1946  }
1947  }
1948 
1951  bAktiv =pMix->sendToUsers(psocketgroupUsersWrite,psocketgroupUsersRead,pChannelList);
1952 #ifndef FAST_PROCESSING
1953  if(!bAktiv)
1954  msSleep(1);
1955 #endif
1956  }
1957 //ERR:
1958  CAMsg::printMsg(LOG_CRIT,"Seems that we are restarting now!!\n");
1959  delete pQueueEntry;
1960  pQueueEntry = NULL;
1961  delete []tmpBuff;
1962  tmpBuff = NULL;
1963  CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
1964 #endif//NEW_MIX_TYPE
1966  }
1967 
1968 /* last part of the main loop:
1969  * return true if the loop when at least one packet was sent
1970  * or false otherwise.
1971  */
1972 #ifdef HAVE_EPOLL
1973 bool CAFirstMixA::sendToUsers(CASocketGroupEpoll* psocketgroupUsersWrite,CASocketGroupEpoll* psocketgroupUsersRead,CAFirstMixChannelList* pChannelList)
1974 #else
1975 bool CAFirstMixA::sendToUsers(CASocketGroup* psocketgroupUsersWrite,CASocketGroup* psocketgroupUsersRead,CAFirstMixChannelList* pChannelList)
1976 #endif
1977 {
1978  SINT32 countRead = psocketgroupUsersWrite->select(/*true,*/0);
1979  tQueueEntry *packetToSend = NULL;
1980  SINT32 packetSize = sizeof(tQueueEntry);
1981  CAQueue *controlMessageUserQueue = NULL;
1982  CAQueue *dataMessageUserQueue = NULL;
1983 
1984  CAQueue *processedQueue = NULL; /* one the above queues that should be used for processing*/
1985  UINT32 extractSize = 0;
1986  bool bAktiv = false;
1987  UINT32 iSocketErrors = 0;
1988 
1989 /* Cyclic polling: gets all open sockets that will not block when invoking send()
1990  * but will only send at most one packet. After that control is returned to loop()
1991  */
1992 #ifdef HAVE_EPOLL
1993  fmHashTableEntry* pfmHashEntry=
1994  (fmHashTableEntry*) psocketgroupUsersWrite->getFirstSignaledSocketData();
1995 
1996  while(pfmHashEntry != NULL)
1997  {
1998 
1999 #else
2000  fmHashTableEntry* pfmHashEntry=pChannelList->getFirst();
2001  while( (countRead > 0) && (pfmHashEntry != NULL) )
2002  {
2003  if(psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
2004  {
2005  countRead--;
2006 #endif
2007  /* loop turn init */
2008  extractSize = 0;
2009  processedQueue = NULL;
2010  packetToSend = &(pfmHashEntry->oQueueEntry);
2011  controlMessageUserQueue = pfmHashEntry->pControlMessageQueue;
2012  dataMessageUserQueue = pfmHashEntry->pQueueSend;
2013 
2014  //Control messages have a higher priority.
2015  if(controlMessageUserQueue->getSize() > 0)
2016  {
2017  processedQueue = controlMessageUserQueue;
2018  pfmHashEntry->bCountPacket = false;
2019  }
2020  else if( (dataMessageUserQueue->getSize() > 0)
2021 #ifdef DELAY_USERS
2022  && pChannelList->hasDelayBuckets(pfmHashEntry->delayBucketID)
2023 #endif
2024  )
2025  {
2026  processedQueue = dataMessageUserQueue;
2027  pfmHashEntry->bCountPacket = true;
2028  }
2029 
2030  if(processedQueue != NULL)
2031  {
2032  extractSize = packetSize;
2033  bAktiv=true;
2034 
2035  if(pfmHashEntry->uAlreadySendPacketSize == -1)
2036  {
2037  processedQueue->get((UINT8*) packetToSend, &extractSize);
2038 
2039  /* Hack for SSL BUG */
2040 #ifdef SSL_HACK
2041  finishPacket(pfmHashEntry);
2042 #endif //SSL_HACK
2043 #ifdef ANON_DEBUG_MODE
2044  if (packetToSend->packet.flags&CHANNEL_DEBUG)
2045  {
2046  UINT8 base64Payload[DATA_SIZE << 1];
2047  EVP_EncodeBlock(base64Payload, packetToSend->packet.data, DATA_SIZE);//base64 encoding (without newline!)
2048  CAMsg::printMsg(LOG_DEBUG, "Send Downstream AN.ON packet to user debug: %s\n", base64Payload);
2049  packetToSend->packet.flags &= ~CHANNEL_DEBUG;
2050  }
2051 
2052 #endif
2053  pfmHashEntry->pMuxSocket->prepareForSend(&(packetToSend->packet));
2054  pfmHashEntry->uAlreadySendPacketSize = 0;
2055  }
2056  }
2057 
2058  if( (extractSize > 0) || (pfmHashEntry->uAlreadySendPacketSize > 0) )
2059  {
2060  SINT32 len = MIXPACKET_SIZE - pfmHashEntry->uAlreadySendPacketSize;
2061  UINT8* packetToSendOffset = ((UINT8*)&(packetToSend->packet)) + pfmHashEntry->uAlreadySendPacketSize;
2062  CASocket* clientSocket = pfmHashEntry->pMuxSocket->getCASocket();
2063 
2064  SINT32 ret = clientSocket->send(packetToSendOffset, len);
2065 
2066  if(ret > 0)
2067  {
2068 #ifdef PAYMENT
2069  SINT32 accounting = E_SUCCESS;
2070 #endif
2071  pfmHashEntry->uAlreadySendPacketSize += ret;
2072 
2073  if(pfmHashEntry->uAlreadySendPacketSize == MIXPACKET_SIZE)
2074  {
2075  #ifdef DELAY_USERS
2076  if(processedQueue != controlMessageUserQueue)
2077  {
2078  pChannelList->decDelayBuckets(pfmHashEntry->delayBucketID);
2079  }
2080  #endif
2081 
2082  #ifdef LOG_PACKET_TIMES
2083  if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
2084  {
2085  getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
2086  m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false);
2087  }
2088  #endif
2089  pfmHashEntry->uAlreadySendPacketSize=-1;
2090 #ifdef PAYMENT
2091  /* count this packet for accounting */
2092  accounting = accountTrafficDownstream(pfmHashEntry);
2093 #endif
2094  }
2095 
2096  }
2097  else if(ret<0&&ret!=E_AGAIN)
2098  {
2099  iSocketErrors++;
2100  // if (iSocketErrors == 1) // show debug message only at the first error; otherwise, the log may get huge
2101  {
2102  SOCKET sock=clientSocket->getSocket();
2103  CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::sendtoUser() - send error %d on socket %d. Reason: %s (%i)\n", ret, sock, GET_NET_ERROR_STR(GET_NET_ERROR), GET_NET_ERROR);
2104  }
2105  // kick the user out - these only happens in extreme situations...
2106  closeConnection(pfmHashEntry,psocketgroupUsersRead,psocketgroupUsersWrite,pChannelList);
2107  }
2108  //TODO error handling
2109  }
2110 
2111 #ifdef HAVE_EPOLL
2112  pfmHashEntry=(fmHashTableEntry*)psocketgroupUsersWrite->getNextSignaledSocketData();
2113  }
2114 #else
2115  }//if is socket signaled
2116  pfmHashEntry=pChannelList->getNext();
2117  }
2118 #endif
2119  if (iSocketErrors > 1)
2120  {
2121  CAMsg::printMsg(LOG_ERR, "CAFirstMixA::sendtoUser() - %d send errors on a socket occured!\n", iSocketErrors);
2122  }
2123 
2124  return bAktiv;
2125 }
2126 #endif //MULTIPLE THREADS
2127 
2128 
2129 #endif //ONLY_LOCAL_PROXY
#define RSA_SIZE
#define FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS
#define KICKOUT_FORCED
#define LOG_ENCRYPTED
Definition: CAMsg.hpp:46
#define INIT_STACK
Definition: CAThread.hpp:48
#define BEGIN_STACK(methodName)
Definition: CAThread.hpp:49
#define FINISH_STACK(methodName)
Definition: CAThread.hpp:50
SINT32 getcurrentTimeMillis(UINT64 &u64Time)
Gets the current Systemtime in milli seconds.
Definition: CAUtil.cpp:252
SINT32 getcurrentTimeMicros(UINT64 &u64Time)
Gets the current Systemtime in micros seconds.
Definition: CAUtil.cpp:280
bool equals(const XMLCh *const e1, const char *const e2)
Definition: CAUtil.cpp:645
SINT32 getRandom(UINT32 *val)
Gets 32 random bits.
Definition: CAUtil.cpp:346
SINT32 msSleep(UINT32 ms)
Sleeps ms milliseconds.
Definition: CAUtil.cpp:406
UINT32 diff64(const UINT64 &bigop, const UINT64 &smallop)
Definition: CAUtil.hpp:398
void setZero64(UINT64 &op1)
Definition: CAUtil.hpp:355
bool isZero64(UINT64 &op1)
Definition: CAUtil.hpp:464
void set64(UINT64 &op1, UINT32 op2)
Definition: CAUtil.hpp:321
#define GET_NET_ERROR
Definition: StdAfx.h:469
#define GET_NET_ERROR_STR(x)
Definition: StdAfx.h:471
#define THREAD_RETURN
Definition: StdAfx.h:540
#define THREAD_RETURN_SUCCESS
Definition: StdAfx.h:542
#define ERR_INTERN_SOCKET_CLOSED
Definition: StdAfx.h:477
#define MAX_NEXT_MIX_QUEUE_SIZE
Definition: StdAfx.h:229
#define ASSERT(cond, msg)
Definition: StdAfx.h:546
#define SOCKET
Definition: StdAfx.h:460
unsigned short UINT16
Definition: basetypedefs.h:133
signed int SINT32
Definition: basetypedefs.h:132
unsigned char UINT8
Definition: basetypedefs.h:135
unsigned int UINT32
Definition: basetypedefs.h:131
static const SINT32 HANDLE_PACKET_CONNECTION_OK
static const SINT32 HANDLE_PACKET_CLOSE_CONNECTION
static SINT32 handleJapPacket(fmHashTableEntry *pHashEntry, bool a_bControlMessage, bool a_bMessageToJAP)
This should be called by the FirstMix for every incoming Jap packet.
static UINT32 getAuthFlags(fmHashTableEntry *pHashEntry)
static const SINT32 HANDLE_PACKET_PREPARE_FOR_CLOSING_CONNECTION
UINT32 getDelayChannelBucketGrow()
UINT32 getDelayChannelBucketGrowIntervall()
UINT32 getDelayChannelUnlimitTraffic()
bool proccessMixPacket(const MIXPACKET *pPacket)
SINT32 insert(UINT8 key[16], UINT64 timestamp)
Inserts this key in the replay DB.
Definition: CADatabase.cpp:93
void checkUserConnections()
virtual void shutDown()
Definition: CAFirstMixA.cpp:44
void finishPacket(fmHashTableEntry *pfmHashEntry)
friend THREAD_RETURN fm_loopPacketProcessing(void *params)
SINT32 loop()
SINT32 closeConnection(fmHashTableEntry *pHashEntry)
Definition: CAFirstMixA.cpp:85
bool sendToUsers()
SINT32 accountTrafficDownstream(fmHashTableEntry *pfmHashEntry)
SINT32 accountTrafficUpstream(fmHashTableEntry *pHashEntry)
void notifyAllUserChannels(fmHashTableEntry *pfmHashEntry, UINT16 flags)
Data structure that stores all information about the currently open Mix channels.
fmChannelListEntry * getFirstChannelForSocket(CAMuxSocket *pMuxSocket)
Gets the first channel for a given connection.
void setDelayParameters(UINT32 unlimitTraffic, UINT32 bucketGrow, UINT32 intervall)
fmHashTableEntry * getFirst()
Gets the first connection of all connections in the list.
bool isKickoutForced(fmHashTableEntry *pHashTableEntry)
SINT32 addChannel(CAMuxSocket *pMuxSocket, HCHANNEL channelIn, CASymChannelCipher *pCipher, HCHANNEL *channelOut)
Adds a new channel for a given connection to the channel list.
fmChannelListEntry * get(CAMuxSocket *pMuxSocket, HCHANNEL channelIn)
Returns the information for a given Input-Channel-ID.
fmHashTableEntry * getNext()
Gets the next entry in the connections-list.
SINT32 removeChannel(CAMuxSocket *pMuxSocket, HCHANNEL channelIn)
Removes a single channel from the list.
bool isTimedOut(fmHashTableEntry *pHashTableEntry)
void decDelayBuckets(UINT32 delayBucketID)
fmChannelListEntry * getNextChannel(fmChannelListEntry *pEntry)
Gets the next channel for a given connection.
SINT32 remove(CAMuxSocket *pMuxSocket)
Removes all channels, which belongs to the given connection and the connection itself from the list.
void setKickoutForced(fmHashTableEntry *pHashTableEntry, bool kickoutForced)
fmHashTableEntry * popTimeoutEntry()
SINT32 pushTimeoutEntry(fmHashTableEntry *pHashTableEntry, bool kickoutForced=!KICKOUT_FORCED)
adds the entry to the timeout queue with mutex
bool hasDelayBuckets(UINT32 delayBucketID)
SINT32 incMixedPackets()
Definition: CAFirstMix.hpp:427
volatile bool m_bRestart
Definition: CAFirstMix.hpp:456
CAThread * m_pthreadAcceptUsers
Definition: CAFirstMix.hpp:504
CAQueue * m_pQueueSendToMix
Definition: CAFirstMix.hpp:448
volatile bool m_bRunLog
Definition: CAFirstMix.hpp:552
CAIPList * m_pIPList
Definition: CAFirstMix.hpp:446
friend THREAD_RETURN fm_loopLog(void *)
bool m_bIsShuttingDown
Definition: CAFirstMix.hpp:550
CASocketGroupEpoll * m_psocketgroupUsersWrite
Definition: CAFirstMix.hpp:467
volatile UINT32 m_nUser
Definition: CAFirstMix.hpp:454
CAMutex * m_pmutexLogin
Definition: CAFirstMix.hpp:555
CASocketGroupEpoll * m_psocketgroupUsersRead
Definition: CAFirstMix.hpp:466
tUINT32withLock * m_PacketsPerCountryOUT
Definition: CAFirstMix.hpp:537
SINT32 decUsers(LP_fmHashTableEntry pHashEntry)
Definition: CAFirstMix.hpp:415
CAFirstMixChannelList * m_pChannelList
Definition: CAFirstMix.hpp:464
tUINT32withLock * m_PacketsPerCountryIN
Definition: CAFirstMix.hpp:536
SINT32 clean()
CAQueue * m_pQueueReadFromMix
Definition: CAFirstMix.hpp:449
SINT32 removeIP(const UINT8 ip[4])
Removes the IP-Address from the list.
Definition: CAIPList.cpp:189
static CACmdLnOptions * getOptions()
SINT32 lock()
Locks the lockable object by threadsafe incrementing a reference counter.
Definition: CALockAble.hpp:55
volatile bool m_bLoop
Definition: CAMix.hpp:135
CAInfoService * m_pInfoService
Definition: CAMix.hpp:184
CADatabase * m_pReplayDB
static SINT32 printMsg(UINT32 typ, const char *format,...)
Writes a given message to the log.
Definition: CAMsg.cpp:251
SINT32 unlock()
Definition: CAMutex.hpp:52
SINT32 lock()
Definition: CAMutex.hpp:41
SINT32 receive(MIXPACKET *pPacket)
Receives a whole MixPacket.
CASocket * getCASocket()
Definition: CAMuxSocket.hpp:84
SINT32 prepareForSend(MIXPACKET *inoutPacket)
This is a simple FIFO-Queue.
Definition: CAQueue.hpp:50
SINT32 add(const void *buff, UINT32 size)
Adds data to the Queue.
Definition: CAQueue.cpp:76
SINT32 clean()
Removes any stored data from the Queue.
Definition: CAQueue.cpp:47
SINT32 get(UINT8 *pbuff, UINT32 *psize)
Gets up to psize number of bytes from the Queue.
Definition: CAQueue.cpp:148
UINT32 getSize()
Returns the size of stored data in byte.
Definition: CAQueue.hpp:101
SINT32 remove(CASocket &s)
bool isSignaled(CASocket &s)
SINT32 add(CASocket &s)
Adds the socket s to the socket group.
bool isSignaled(CASocket &s)
SINT32 remove(CASocket &s)
virtual SINT32 send(const UINT8 *buff, UINT32 len)
Sends some data over the network.
Definition: CASocket.cpp:400
static CASymChannelCipher * createCipher(SYMCHANNELCIPHER_ALGORITHM alg)
virtual SINT32 setKeys(const UINT8 *key, UINT32 keysize)=0
Sets the keys for crypt1() and crypt2() either to the same key (if keysize==KEY_SIZE) or to different...
virtual SINT32 setIV2(const UINT8 *p_iv)=0
Sets iv2 to p_iv.
virtual SINT32 crypt2(const UINT8 *in, UINT8 *out, UINT32 len)=0
virtual SINT32 crypt1(const UINT8 *in, UINT8 *out, UINT32 len)=0
SINT32 start(void *param, bool bDaemon=false, bool bSilent=false)
Starts the execution of the main function of this thread.
Definition: CAThread.cpp:115
SINT32 setMainLoop(THREAD_MAIN_TYP fnc)
Sets the main function which will be executed within this thread.
Definition: CAThread.hpp:148
SINT32 join()
Waits for the main function to finish execution.
Definition: CAThread.cpp:187
This class bla bla.
#define DELAY_USERS
Definition: doxygen.h:6
const SINT32 E_SUCCESS
Definition: errorcodes.hpp:2
#define E_AGAIN
Definition: errorcodes.hpp:9
#define E_UNKNOWN
Definition: errorcodes.hpp:3
CAMix * pMix
Definition: proxytest.cpp:75
HCHANNEL channel
Definition: typedefs.hpp:117
UINT8 data[DATA_SIZE]
Definition: typedefs.hpp:121
UINT16 flags
Definition: typedefs.hpp:118
CASymChannelCipher * pCipher
volatile UINT32 delayBucketID
CASymChannelCipher * pSymCipher
CAControlChannelDispatcher * pControlChannelDispatcher
Definition: typedefs.hpp:169
MIXPACKET packet
Definition: typedefs.hpp:170
#define CHANNEL_DATA
Definition: typedefs.hpp:42
#define MIXPACKET_SIZE
Definition: typedefs.hpp:40
#define CHANNEL_DEBUG
Definition: typedefs.hpp:51
UINT16 flags
Definition: typedefs.hpp:1
#define CHANNEL_DUMMY
Definition: typedefs.hpp:50
UINT32 HCHANNEL
Definition: typedefs.hpp:34
#define CHANNEL_CLOSE
Definition: typedefs.hpp:47
struct t_queue_entry tQueueEntry
Definition: typedefs.hpp:188
UINT16 len
Definition: typedefs.hpp:0
#define CHANNEL_SIG_CRIME
Definition: typedefs.hpp:58
#define CHANNEL_OPEN
Definition: typedefs.hpp:43
#define DATA_SIZE
Definition: typedefs.hpp:69