Mixe for Privacy and Anonymity in the Internet
Protected Member Functions | List of all members
CAFirstMixB Class Reference

#include <CAFirstMixB.hpp>

Inheritance diagram for CAFirstMixB:
Collaboration diagram for CAFirstMixB:

Protected Member Functions

SINT32 loop ()
 
- Protected Member Functions inherited from CAFirstMix
bool isShuttingDown ()
 
SINT32 init ()
 
SINT32 clean ()
 
virtual SINT32 initOnce ()
 
virtual SINT32 processKeyExchange ()
 
SINT32 initMixParameters (DOMElement *elemMixes)
 Initialises the MixParameters info for each mix form the <Mixes> element received from the second mix. More...
 
SINT32 incUsers (LP_fmHashTableEntry pHashEntry)
 
SINT32 decUsers (LP_fmHashTableEntry pHashEntry)
 
SINT32 incMixedPackets ()
 
SINT32 doUserLogin (CAMuxSocket *pNewUSer, UINT8 perrIP[4])
 
SINT32 reconfigure ()
 
SINT32 deleteCountryStats ()
 
- Protected Member Functions inherited from CAMix
SINT32 checkCompatibility (DOMNode *a_parent, const char *a_mixPosition)
 
SINT32 appendCompatibilityInfo (DOMNode *a_parent)
 
SINT32 addMixInfo (DOMNode *a_element, bool a_bForceFirstNode)
 
virtual SINT32 initMixCascadeInfo (DOMElement *elemMixes)
 This will initialize the XML Cascade Info struct XMLFirstMixToInfoService that is sent to the InfoService in CAInfoService::sendCascadeHelo() More...
 
SINT32 signXML (DOMNode *a_element)
 

Additional Inherited Members

- Public Types inherited from CAMix
enum  tMixType { FIRST_MIX , MIDDLE_MIX , LAST_MIX , JAP }
 
- Public Member Functions inherited from CAFirstMix
 CAFirstMix ()
 
virtual ~CAFirstMix ()
 
tMixType getType () const
 
bool forceKickout (fmHashTableEntry *pHashTableEntry, const XERCES_CPP_NAMESPACE::DOMDocument *pErrDoc=NULL)
 
CAMutexgetLoginMutex ()
 
SINT32 connectToNextMix (CASocketAddr *a_pAddrNext)
 
SINT32 getMixedPackets (UINT64 &ppackets)
 
UINT32 getNrOfUsers ()
 
SINT32 getLevel (SINT32 *puser, SINT32 *prisk, SINT32 *ptraffic)
 
TermsAndConditionsgetTermsAndConditions (const UINT8 *opSki)
 
DOMNode * getTermsAndConditionsTemplate (UINT8 *templateRefID)
 
SINT32 getMixCount ()
 
tMixParametersgetMixParameters ()
 Returns the ordered list of the mix parameters from the first mix to the last mix. More...
 
SINT32 setMixParameters (const tMixParameters &params)
 Sets the parameters for the mix specified in the params.m_strMixID field. More...
 
SINT32 handleKeyInfoExtensions (DOMElement *root)
 
SINT32 handleTermsAndConditionsExtension (DOMElement *extensionRoot)
 
- Public Member Functions inherited from CAMixWithReplayDB
 CAMixWithReplayDB ()
 
CADatabasegetReplayDB () const
 
- Public Member Functions inherited from CAMix
 CAMix ()
 
virtual ~CAMix ()
 
SINT32 start ()
 
virtual void shutDown ()
 
virtual bool isShutDown ()
 
SINT32 getMixCascadeInfo (XERCES_CPP_NAMESPACE::DOMDocument *&docMixCascadeInfo)
 Returns the Mix-Cascade info which should be send to the InfoService. More...
 
bool acceptsReconfiguration ()
 
CAControlChannelDispatchergetDownstreamControlChannelDispatcher () const
 
CAControlChannelDispatchergetUpstreamControlChannelDispatcher () const
 
UINT32 getLastConnectionTime ()
 
bool isConnected ()
 
- Public Attributes inherited from CAFirstMix
UINT64 m_u64LastTimestampReceived
 
- Public Attributes inherited from CAMixWithReplayDB
UINT64 m_u64ReferenceTime
 
- Static Public Attributes inherited from CAMix
static const UINT32 TIMEOUT_MIX_CONNECTION_ESTABLISHEMENT = 60000
 
- Protected Attributes inherited from CAFirstMix
CAIPListm_pIPList
 
CATempIPBlockListm_pIPBlockList
 
CAQueuem_pQueueSendToMix
 
CAQueuem_pQueueReadFromMix
 
volatile UINT32 m_nUser
 
UINT32 m_nSocketsIn
 
volatile bool m_bRestart
 
CASocket ** m_arrSocketsIn
 
UINT32 m_u32MixCount
 
tMixParametersm_arMixParameters
 
CAFirstMixChannelListm_pChannelList
 
CASocketGroupEpollm_psocketgroupUsersRead
 
CASocketGroupEpollm_psocketgroupUsersWrite
 
CAMuxSocketm_pMuxOut
 
UINT8m_xmlKeyInfoBuff
 
UINT16 m_xmlKeyInfoSize
 
XERCES_CPP_NAMESPACE::DOMDocument * m_docMixCascadeInfo
 
UINT64 m_nMixedPackets
 
CAASymCipherm_pRSA
 
CAMutexm_pmutexUser
 
CAMutexm_pmutexMixedPackets
 
CAMutexm_pmutexLoginThreads
 
CAThreadm_pthreadAcceptUsers
 
CAThreadPoolm_pthreadsLogin
 
CAThreadm_pthreadSendToMix
 
CAThreadm_pthreadReadFromMix
 
UINT32 m_nrOfTermsAndConditionsDefs
 
TermsAndConditions ** m_tnCDefs
 
UINT32 m_nrOfTermsAndConditionsTemplates
 
DOMNode ** m_tcTemplates
 
XERCES_CPP_NAMESPACE::DOMDocument * m_templatesOwner
 
const XMLCh * TNC_REQUEST
 
const XMLCh * TNC_CONFIRM
 
const XMLCh * TNC_INTERRUPT
 
tUINT32withLockm_PacketsPerCountryIN
 
tUINT32withLockm_PacketsPerCountryOUT
 
bool m_bIsShuttingDown
 
volatile bool m_bRunLog
 
CAMutexm_pmutexLogin
 
- Protected Attributes inherited from CAMixWithReplayDB
CADatabasem_pReplayDB
 
CAReplayCtrlChannelMsgProcm_pReplayMsgProc
 
- Protected Attributes inherited from CAMix
volatile bool m_bLoop
 
bool m_bReconfiguring
 
volatile bool m_bShutDown
 
CAMultiSignaturem_pMultiSignature
 
CAInfoServicem_pInfoService
 
UINT32 m_u32KeepAliveRecvInterval
 
UINT32 m_u32KeepAliveSendInterval
 
bool m_acceptReconfiguration
 
volatile bool m_bConnected
 
volatile UINT32 m_lLastConnectionTime
 
XERCES_CPP_NAMESPACE::DOMDocument * m_docMixCascadeInfo
 
CAControlChannelDispatcherm_pMuxOutControlChannelDispatcher
 
CAControlChannelDispatcherm_pMuxInControlChannelDispatcher
 

Detailed Description

Definition at line 34 of file CAFirstMixB.hpp.

Member Function Documentation

◆ loop()

SINT32 CAFirstMixB::loop ( )
protectedvirtual

Implements CAFirstMix.

Definition at line 41 of file CAFirstMixB.cpp.

42 {
43 #ifdef NEW_MIX_TYPE
44  /* should only be compiled if new NEW_MIX_TYPE is defined */
45 #ifdef DELAY_USERS
46  m_pChannelList->setDelayParameters(pglobalOptions->getDelayChannelUnlimitTraffic(),
47  pglobalOptions->getDelayChannelBucketGrow(),
48  pglobalOptions->getDelayChannelBucketGrowIntervall());
49 #endif
50 
51  // CASingleSocketGroup osocketgroupMixOut;
52  SINT32 countRead;
53  //#ifdef LOG_PACKET_TIMES
54  // tPoolEntry* pPoolEntry=new tPoolEntry;
55  // MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
56  //#else
57  tQueueEntry* pQueueEntry = new tQueueEntry;
58  MIXPACKET* pMixPacket = &pQueueEntry->packet;
59  //#endif
60  m_nUser = 0;
61  SINT32 ret;
62  //osocketgroupMixOut.add(*m_pMuxOut);
63 
64  UINT8* tmpBuff = new UINT8[sizeof(tQueueEntry)];
65  CAMsg::printMsg(LOG_DEBUG, "Starting Message Loop... \n");
66  bool bAktiv;
67  UINT8 rsaBuff[RSA_SIZE];
68 #ifdef LOG_TRAFFIC_PER_USER
69  UINT64 current_time;
70  UINT32 diff_time;
71  CAMsg::printMsg(LOG_DEBUG, "Channel log formats:\n");
72  CAMsg::printMsg(LOG_DEBUG, "1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
73  CAMsg::printMsg(LOG_DEBUG, "2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
74 #endif
75  // CAThread threadReadFromUsers;
76  // threadReadFromUsers.setMainLoop(loopReadFromUsers);
77  // threadReadFromUsers.start(this);
78 
79  while (!m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */
80  {
81  bAktiv = false;
82  //LOOP_START:
83 
84  //First Step
85  //Checking for new connections
86  // Now in a separat Thread....
87 
88  // Second Step
89  // Checking for data from users
90  // Now in a separate Thread (see loopReadFromUsers())
91  //Only proccess user data, if queue to next mix is not to long!!
92 #define MAX_NEXT_MIX_QUEUE_SIZE 10000000 //10 MByte
94  {
95  countRead = m_psocketgroupUsersRead->select(/*false,*/0); // how many JAP<->mix connections have received data from their coresponding JAP
96  if (countRead > 0)
97  bAktiv = true;
98 #ifdef HAVE_EPOLL
99  //if we have epool we do not need to search the whole list
100  //of connected JAPs to find the ones who have sent data
101  //as epool will return ONLY these connections.
103  while (pHashEntry != NULL)
104  {
105  CAMuxSocket* pMuxSocket = pHashEntry->pMuxSocket;
106 #else
107  //if we do not have epoll we have to go to the whole
108  //list of open connections to find the ones which
109  //actually have sent some data
110  fmHashTableEntry* pHashEntry = m_pChannelList->getFirst();
111  while (pHashEntry != NULL&&countRead > 0) // iterate through all connections as long as there is at least one active left
112  {
113  CAMuxSocket* pMuxSocket = pHashEntry->pMuxSocket;
114  if (m_psocketgroupUsersRead->isSignaled(*pMuxSocket)) // if this one seems to have data
115  {
116  countRead--;
117 #endif
118  ret = pMuxSocket->receive(pMixPacket, 0);
119 #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
120  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
121  set64(pQueueEntry->timestamp_proccessing_start_OP, pQueueEntry->timestamp_proccessing_start);
122 #endif
123  if (ret == SOCKET_ERROR/*||pHashEntry->accessUntil<time()*/)
124  {
125  // remove dead connections
126 #ifdef LOG_TRAFFIC_PER_USER
127  getcurrentTimeMillis(current_time);
128  diff_time = diff64(current_time, pHashEntry->timeCreated);
129  m_pIPList->removeIP(pHashEntry->peerIP, diff_time, pHashEntry->trafficIn, pHashEntry->trafficOut);
130 #else
131  m_pIPList->removeIP(pHashEntry->peerIP);
132 #endif
133  m_psocketgroupUsersRead->remove(pMuxSocket->getSocket());
134  m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket());
135  ASSERT(pHashEntry->pQueueSend != NULL, "Send queue is NULL");
136  delete pHashEntry->pQueueSend;
137  pHashEntry->pQueueSend = NULL;
138  delete pHashEntry->pSymCipher;
139  pHashEntry->pSymCipher = NULL;
140 #ifdef COUNTRY_STATS
141  decUsers(pHashEntry);
142 #else
143  decUsers();
144 #endif
145  /* remove the client part (MuxSocket) but keep the
146  * outgoing channels until a CHANNEL-CLOSE from the
147  * last mix is received
148  */
149  m_pChannelList->removeClientPart(pMuxSocket);
150  delete pMuxSocket;
151  pMuxSocket = NULL;
152  }
153  else if (ret == MIXPACKET_SIZE) // we've read enough data for a whole mix packet. nice!
154  {
155 #ifdef LOG_TRAFFIC_PER_USER
156  pHashEntry->trafficIn++;
157 #endif
158 #ifdef COUNTRY_STATS
159  m_PacketsPerCountryIN[pHashEntry->countryID].inc();
160 #endif
161  //New control channel code...!
162  SINT32 ret = 0;
163  if (pMixPacket->channel > 0 && pMixPacket->channel < 256)
164  {
165  if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
166  {
167  goto NEXT_USER;
168  }
169  else
170  {
171  CAMsg::printMsg(LOG_DEBUG, "Packet is invalid and could not be processed!\n");
172  ret = 3;
173  }
174  }
175 #ifdef PAYMENT
176  // payment code added by Bastian Voigt
177  if (ret == 0)
178  {
179  ret = CAAccountingInstance::handleJapPacket(pHashEntry);
180  }
181  if (ret == 2)
182  {
183  goto NEXT_USER;
184  }
185 #endif
186  if (ret == 3)
187  {
188  // this jap is evil! terminate connection and add IP to blacklist
189  CAMsg::printMsg(LOG_DEBUG, "CAFirstMixB: Detected evil Jap.. closing connection! Removing IP..\n", ret);
190  fmChannelListEntry* pEntry;
191  pEntry = m_pChannelList->getFirstChannelForSocket(pMuxSocket);
192  while (pEntry != NULL)
193  {
194  getRandom(pMixPacket->data, DATA_SIZE);
195  pMixPacket->flags = CHANNEL_CLOSE;
196  pMixPacket->channel = pEntry->channelOut;
197 #ifdef LOG_PACKET_TIMES
198  setZero64(pQueueEntry->timestamp_proccessing_start);
199 #endif
200  m_pQueueSendToMix->add(pMixPacket, sizeof(tQueueEntry));
201  delete pEntry->pCipher;
202  pEntry->pCipher = NULL;
203  pEntry = m_pChannelList->getNextChannel(pEntry);
204  }
205  m_pIPList->removeIP(pHashEntry->peerIP);
206  m_psocketgroupUsersRead->remove(pMuxSocket->getSocket());
207  m_psocketgroupUsersWrite->remove(pMuxSocket->getSocket());
208  delete pHashEntry->pQueueSend;
209  pHashEntry->pQueueSend = NULL;
210  delete pHashEntry->pSymCipher;
211  pHashEntry->pSymCipher = NULL;
212  m_pChannelList->remove(pMuxSocket);
213  delete pMuxSocket;
214  pMuxSocket = NULL;
215  decUsers();
216  goto NEXT_USER;
217  }
218 
219  if (pMixPacket->flags == CHANNEL_DUMMY) // just a dummy to keep the connection alife in e.g. NAT gateways
220  {
221  getRandom(pMixPacket->data, DATA_SIZE);
222 #ifdef LOG_PACKET_TIMES
223  setZero64(pQueueEntry->timestamp_proccessing_start);
224 #endif
225  pHashEntry->pQueueSend->add(pMixPacket, sizeof(tQueueEntry));
226 #ifdef HAVE_EPOLL
227  m_psocketgroupUsersWrite->add(*pMuxSocket, pHashEntry);
228 #else
229  m_psocketgroupUsersWrite->add(*pMuxSocket);
230 #endif
231  }
232  else // finally! a normal mix packet
233  {
234  CASymCipher* pCipher = NULL;
235  fmChannelListEntry* pEntry;
236  pEntry = m_pChannelList->get(pMuxSocket, pMixPacket->channel);
237  if (pEntry != NULL&&pMixPacket->flags == CHANNEL_DATA)
238  {
239  pMixPacket->channel = pEntry->channelOut;
240  pCipher = pEntry->pCipher;
241  pCipher->crypt1(pMixPacket->data, pMixPacket->data, DATA_SIZE);
242  // queue the packet for sending to the next mix.
243 #ifdef LOG_PACKET_TIMES
244  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
245 #endif
246  m_pQueueSendToMix->add(pMixPacket, sizeof(tQueueEntry));
247  incMixedPackets();
248 #ifdef LOG_CHANNEL
249  pEntry->packetsInFromUser++;
250 #endif
251  }
252  else if (pEntry == NULL&&pMixPacket->flags == CHANNEL_OPEN) // open a new mix channel
253  { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ?
254  //es gilt: open -> data
255  pHashEntry->pSymCipher->crypt1(pMixPacket->data, rsaBuff, KEY_SIZE);
256  pCipher = new CASymCipher();
257  pCipher->setKey(rsaBuff);
258  for (int i = 0; i < 16; i++)
259  rsaBuff[i] = 0xFF;
260  pCipher->setIV2(rsaBuff);
261  pCipher->crypt1(pMixPacket->data + KEY_SIZE, pMixPacket->data, DATA_SIZE - KEY_SIZE);
262  getRandom(pMixPacket->data + DATA_SIZE - KEY_SIZE, KEY_SIZE);
263 #ifdef LOG_CHANNEL
264  HCHANNEL tmpC = pMixPacket->channel;
265 #endif
266  if (m_pChannelList->addChannel(pMuxSocket, pMixPacket->channel, pCipher, &pMixPacket->channel) != E_SUCCESS)
267  { //todo move up ?
268  delete pCipher;
269  pCipher = NULL;
270  }
271  else
272  {
273 #ifdef LOG_PACKET_TIMES
274  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
275 #endif
276 #ifdef LOG_CHANNEL
277  fmChannelListEntry* pTmpEntry = m_pChannelList->get(pMuxSocket, tmpC);
278  pTmpEntry->packetsInFromUser++;
279  set64(pTmpEntry->timeCreated, pQueueEntry->timestamp_proccessing_start);
280 #endif
281  m_pQueueSendToMix->add(pMixPacket, sizeof(tQueueEntry));
282  incMixedPackets();
283 #ifdef _DEBUG
284  // CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u\n",pMixPacket->channel);
285 #endif
286  }
287  }
288  }
289  }
290 #ifdef HAVE_EPOLL
291  NEXT_USER :
293 #else
294  }//if is signaled
295  NEXT_USER:
296  pHashEntry = m_pChannelList->getNext();
297 #endif
298  }
299  }
300  //Third step
301  //Sending to next mix
302 
303  // Now in a separate Thread (see loopSendToMix())
304 
305  //Step 4
306  //Stepa 4a Receiving form Mix to Queue now in separat Thread
307  //Step 4b Proccesing MixPackets received from Mix
308  //todo check for error!!!
309  countRead = m_nUser + 1;
310  while (countRead > 0 && m_pQueueReadFromMix->getSize() >= sizeof(tQueueEntry))
311  {
312  bAktiv = true;
313  countRead--;
314  ret = sizeof(tQueueEntry);
315  m_pQueueReadFromMix->get((UINT8*)pQueueEntry, (UINT32*)&ret);
316 #ifdef LOG_PACKET_TIMES
317  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
318 #endif
319  if (pMixPacket->flags == CHANNEL_CLOSE) //close event
320  {
321 #if defined(_DEBUG) && !defined(__MIX_TEST)
322  // CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
323 #endif
324  fmChannelList* pEntry = m_pChannelList->get(pMixPacket->channel);
325  if (pEntry != NULL)
326  {
327  if (pEntry->pHead != NULL) {
328  pMixPacket->channel = pEntry->channelIn;
329  getRandom(pMixPacket->data, DATA_SIZE);
330 #ifdef LOG_PACKET_TIMES
331  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
332 #endif
333  pEntry->pHead->pQueueSend->add(pMixPacket, sizeof(tQueueEntry));
334 #ifdef LOG_TRAFFIC_PER_USER
335  pEntry->pHead->trafficOut++;
336 #endif
337 #ifdef COUNTRY_STATS
339 #endif
340 #ifdef LOG_CHANNEL
341  //pEntry->packetsOutToUser++;
342  getcurrentTimeMicros(current_time);
343  diff_time = diff64(current_time, pEntry->timeCreated);
344  CAMsg::printMsg(LOG_DEBUG, "2:%u,%Lu,%u,%u,%u\n",
345  pEntry->channelIn, pEntry->pHead->id, pEntry->packetsInFromUser, pEntry->packetsOutToUser,
346  diff_time);
347 #endif
348 
349 #ifdef HAVE_EPOLL
350  m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket, pEntry->pHead);
351 #else
353 #endif
354  delete pEntry->pCipher;
355  pEntry->pCipher = NULL;
357  }
358  else {
359  /* the client has already closed the connection but we
360  * have waited for the CHANNEL-CLOSE packet from the
361  * last mix -> now we have it
362  */
363 #ifdef LOG_PACKET_TIMES
364  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
365 #endif
366  delete pEntry->pCipher;
367  pEntry->pCipher = NULL;
368  m_pChannelList->removeVacantOutChannel(pEntry);
369  pEntry = NULL;
370  }
371  }
372  }
373  else
374  {//flag !=close
375 #if defined(_DEBUG) && !defined(__MIX_TEST)
376 // CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
377 #endif
378  fmChannelList* pEntry = m_pChannelList->get(pMixPacket->channel);
379  if (pEntry != NULL) {
380  if (pEntry->pHead != NULL) {
381 #ifdef LOG_CRIME
382  if ((pMixPacket->flags&CHANNEL_SIG_CRIME) == CHANNEL_SIG_CRIME)
383  {
384  UINT32 id = (pMixPacket->flags >> 8) & 0x000000FF;
385  int log = LOG_ENCRYPTED;
386  if (!pglobalOptions->isEncryptedLogEnabled())
387  log = LOG_CRIT;
388  CAMsg::printMsg(log, "Detecting crime activity - ID: %u -- In-IP is: %u.%u.%u.%u \n", id, pEntry->pHead->peerIP[0], pEntry->pHead->peerIP[1], pEntry->pHead->peerIP[2], pEntry->pHead->peerIP[3]);
389  continue;
390  }
391 #endif
392  pMixPacket->channel = pEntry->channelIn;
393  pEntry->pCipher->crypt2(pMixPacket->data, pMixPacket->data, DATA_SIZE);
394 
395 #ifdef LOG_PACKET_TIMES
396  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
397 #endif
398  pEntry->pHead->pQueueSend->add(pMixPacket, sizeof(tQueueEntry));
399 #ifdef LOG_TRAFFIC_PER_USER
400  pEntry->pHead->trafficOut++;
401 #endif
402 #ifdef COUNTRY_STATS
404 #endif
405 #ifdef LOG_CGANNEL
406  pEntry->packetsOutToUser++;
407 #endif
408 #ifdef HAVE_EPOLL
409  m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket, pEntry->pHead);
410 #else
412 #endif
413  incMixedPackets();
414  }
415  else {
416  /* connection to client is already closed -> we wait for
417  * CLOSE-CHANNEL from last mix (but this is no CLOSE-
418  * CHANNEL packet -> do only some compatibility things and
419  * ignore the packet)
420  */
421 #ifdef LOG_CRIME
422  /* we don't have the user-information any more (but also
423  * the user cannot receive any data); nevertheless write
424  * a log-message to show what happened
425  */
426  if ((pMixPacket->flags&CHANNEL_SIG_CRIME) == CHANNEL_SIG_CRIME) {
427  UINT32 id = (pMixPacket->flags >> 8) & 0x000000FF;
428  int log = LOG_ENCRYPTED;
429  if (!pglobalOptions->isEncryptedLogEnabled()) {
430  log = LOG_CRIT;
431  }
432  CAMsg::printMsg(log, "Detecting crime activity - ID: %u -- In-IP is: not available (user has already closed connection)\n", id);
433  continue;
434  }
435 #endif
436 #ifdef LOG_PACKET_TIMES
437  getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
438 #endif
439  }
440  }
441  else
442  {
443 #ifdef _DEBUG
444  if (pMixPacket->flags != CHANNEL_DUMMY)
445  {
446  /* CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
447  "Channel-Id %u not valid!\n",pMixPacket->channel
448  );*/
449 #ifdef LOG_CHANNEL
450  CAMsg::printMsg(LOG_INFO, "Packet late arrive for channel: %u\n", pMixPacket->channel);
451 #endif
452  }
453 #endif
454  }
455  }
456  }
457 
458  //Step 5
459  //Writing to users...
460  countRead = m_psocketgroupUsersWrite->select(/*true,*/0);
461 #ifdef HAVE_EPOLL
463  while (pfmHashEntry != NULL)
464  {
465 #else
466  fmHashTableEntry* pfmHashEntry = m_pChannelList->getFirst();
467  while (countRead > 0 && pfmHashEntry != NULL)
468  {
469  if (m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
470  {
471  countRead--;
472 #endif
473 #ifdef DELAY_USERS
474  if (pfmHashEntry->delayBucket > 0)
475  {
476 #endif
477  if (pfmHashEntry->pQueueSend->getSize() > 0)
478  {
479  bAktiv = true;
480  UINT32 len = sizeof(tQueueEntry);
481  if (pfmHashEntry->uAlreadySendPacketSize == -1)
482  {
483  pfmHashEntry->pQueueSend->get((UINT8*)&pfmHashEntry->oQueueEntry, &len);
484 #ifdef PAYMENT
485  //do not count control channel packets!
486  if (pfmHashEntry->oQueueEntry.packet.channel > 0 && pfmHashEntry->oQueueEntry.packet.channel < 256)
487  pfmHashEntry->bCountPacket = false;
488  else
489  pfmHashEntry->bCountPacket = true;
490 #endif
491  pfmHashEntry->pMuxSocket->prepareForSend(&(pfmHashEntry->oQueueEntry.packet));
492  pfmHashEntry->uAlreadySendPacketSize = 0;
493  }
494  len = MIXPACKET_SIZE - pfmHashEntry->uAlreadySendPacketSize;
495  ret = pfmHashEntry->pMuxSocket->getCASocket()->send(((UINT8*)&(pfmHashEntry->oQueueEntry)) + pfmHashEntry->uAlreadySendPacketSize, len);
496  if (ret > 0)
497  {
498  pfmHashEntry->uAlreadySendPacketSize += ret;
499  if (pfmHashEntry->uAlreadySendPacketSize == MIXPACKET_SIZE)
500  {
501 #ifdef PAYMENT
502  if (pfmHashEntry->bCountPacket)
503  {
504  // count packet for payment
505  if (CAAccountingInstance::handleJapPacket(pfmHashEntry) == 2)
506  {
507  goto NEXT_USER_WRITING;
508  }
509  }
510 #endif
511 #ifdef DELAY_USERS
512  pfmHashEntry->delayBucket--;
513 #endif
514  pfmHashEntry->uAlreadySendPacketSize = -1;
515 #ifdef LOG_PACKET_TIMES
516  if (!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
517  {
518  getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
519  m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry, CHANNEL_DATA, false);
520  }
521 #endif
522  }
523  }
524  }
525 #ifdef DELAY_USERS
526  }
527 #endif
528  //todo error handling
529 #ifdef HAVE_EPOLL
530  NEXT_USER_WRITING :
532 #else
533  }//if is socket signaled
534  NEXT_USER_WRITING:
535  pfmHashEntry = m_pChannelList->getNext();
536 #endif
537  }
538  if (!bAktiv)
539  msSleep(100);
540  }
541  //ERR:
542  //@todo move cleanup to clean() !
543  CAMsg::printMsg(LOG_CRIT, "Seams that we are restarting now!!\n");
544  m_bRestart = true;
545  m_pMuxOut->close();
546  for (UINT32 i = 0; i < m_nSocketsIn; i++)
547  m_arrSocketsIn[i].close();
548  //writng some bytes to the queue...
549  UINT8 b[sizeof(tQueueEntry) + 1];
550  m_pQueueSendToMix->add(b, sizeof(tQueueEntry) + 1);
551  //#if !defined(_DEBUG) && !defined(NO_LOOPACCEPTUSER)
552  CAMsg::printMsg(LOG_CRIT, "Wait for LoopAcceptUsers!\n");
554  //#endif
555  CAMsg::printMsg(LOG_CRIT, "Wait for LoopSendToMix!\n");
556  m_pthreadSendToMix->join(); //will not join if queue is empty (and so wating)!!!
557  CAMsg::printMsg(LOG_CRIT, "Wait for LoopReadFromMix!\n");
559 #ifdef LOG_PACKET_TIMES
560  CAMsg::printMsg(LOG_CRIT, "Wait for LoopLogPacketStats to terminate!\n");
561  m_pLogPacketStats->stop();
562 #endif
563  //waits until all login threads terminates....
564  // we have to be sure that the Accept thread was alread stoped!
565  m_pthreadsLogin->destroy(true);
566  CAMsg::printMsg(LOG_CRIT, "Before deleting CAFirstMixChannelList()!\n");
567  CAMsg::printMsg(LOG_CRIT, "Memeory usage before: %u\n", getMemoryUsage());
568  fmHashTableEntry* pHashEntry = m_pChannelList->getFirst();
569  while (pHashEntry != NULL)
570  {
571  CAMuxSocket * pMuxSocket = pHashEntry->pMuxSocket;
572  delete pHashEntry->pQueueSend;
573  pHashEntry->pQueueSend = NULL;
574  delete pHashEntry->pSymCipher;
575  pHashEntry->pSymCipher = NULL;
576 
578  while (pEntry != NULL)
579  {
580  delete pEntry->pCipher;
581  pEntry->pCipher = NULL;
582 
583  pEntry = m_pChannelList->getNextChannel(pEntry);
584  }
585  m_pChannelList->remove(pHashEntry->pMuxSocket);
586  pMuxSocket->close();
587  delete pMuxSocket;
588  pMuxSocket = NULL;
589  pHashEntry = m_pChannelList->getNext();
590  }
591  /* clean all vacant out-channels (the connection from the client was
592  * closed but we've waited for the CHANNEL-CLOSE from the last mix)
593  */
594  m_pChannelList->cleanVacantOutChannels();
595  CAMsg::printMsg(LOG_CRIT, "Memory usage after: %u\n", getMemoryUsage());
596  delete pQueueEntry;
597  pQueueEntry = NULL;
598  delete[]tmpBuff;
599  tmpBuff = NULL;
600  CAMsg::printMsg(LOG_CRIT, "Main Loop exited!!\n");
601 #endif // NEW_MIX_TYPE
602  return E_UNKNOWN;
603 }
#define RSA_SIZE
#define LOG_ENCRYPTED
Definition: CAMsg.hpp:46
#define KEY_SIZE
Definition: CASymCipher.hpp:31
SINT32 getcurrentTimeMillis(UINT64 &u64Time)
Gets the current Systemtime in milli seconds.
Definition: CAUtil.cpp:252
SINT32 getcurrentTimeMicros(UINT64 &u64Time)
Gets the current Systemtime in micros seconds.
Definition: CAUtil.cpp:280
UINT32 getMemoryUsage()
Definition: CAUtil.cpp:443
SINT32 getRandom(UINT32 *val)
Gets 32 random bits.
Definition: CAUtil.cpp:346
SINT32 msSleep(UINT32 ms)
Sleeps ms milliseconds.
Definition: CAUtil.cpp:406
UINT32 diff64(const UINT64 &bigop, const UINT64 &smallop)
Definition: CAUtil.hpp:398
void setZero64(UINT64 &op1)
Definition: CAUtil.hpp:355
bool isZero64(UINT64 &op1)
Definition: CAUtil.hpp:464
void set64(UINT64 &op1, UINT32 op2)
Definition: CAUtil.hpp:321
#define SOCKET_ERROR
Definition: StdAfx.h:464
#define MAX_NEXT_MIX_QUEUE_SIZE
Definition: StdAfx.h:229
#define ASSERT(cond, msg)
Definition: StdAfx.h:546
signed int SINT32
Definition: basetypedefs.h:132
unsigned char UINT8
Definition: basetypedefs.h:135
unsigned int UINT32
Definition: basetypedefs.h:131
static SINT32 handleJapPacket(fmHashTableEntry *pHashEntry, bool a_bControlMessage, bool a_bMessageToJAP)
This should be called by the FirstMix for every incoming Jap packet.
bool proccessMixPacket(const MIXPACKET *pPacket)
fmChannelListEntry * getFirstChannelForSocket(CAMuxSocket *pMuxSocket)
Gets the first channel for a given connection.
void setDelayParameters(UINT32 unlimitTraffic, UINT32 bucketGrow, UINT32 intervall)
fmHashTableEntry * getFirst()
Gets the first connection of all connections in the list.
SINT32 addChannel(CAMuxSocket *pMuxSocket, HCHANNEL channelIn, CASymChannelCipher *pCipher, HCHANNEL *channelOut)
Adds a new channel for a given connection to the channel list.
fmChannelListEntry * get(CAMuxSocket *pMuxSocket, HCHANNEL channelIn)
Returns the information for a given Input-Channel-ID.
fmHashTableEntry * getNext()
Gets the next entry in the connections-list.
SINT32 removeChannel(CAMuxSocket *pMuxSocket, HCHANNEL channelIn)
Removes a single channel from the list.
fmChannelListEntry * getNextChannel(fmChannelListEntry *pEntry)
Gets the next channel for a given connection.
SINT32 remove(CAMuxSocket *pMuxSocket)
Removes all channels, which belongs to the given connection and the connection itself from the list.
CAThread * m_pthreadReadFromMix
Definition: CAFirstMix.hpp:507
CASocket ** m_arrSocketsIn
Definition: CAFirstMix.hpp:457
SINT32 incMixedPackets()
Definition: CAFirstMix.hpp:427
volatile bool m_bRestart
Definition: CAFirstMix.hpp:456
CAMuxSocket * m_pMuxOut
Definition: CAFirstMix.hpp:490
CAThread * m_pthreadAcceptUsers
Definition: CAFirstMix.hpp:504
CAQueue * m_pQueueSendToMix
Definition: CAFirstMix.hpp:448
CAIPList * m_pIPList
Definition: CAFirstMix.hpp:446
CASocketGroupEpoll * m_psocketgroupUsersWrite
Definition: CAFirstMix.hpp:467
volatile UINT32 m_nUser
Definition: CAFirstMix.hpp:454
CAThread * m_pthreadSendToMix
Definition: CAFirstMix.hpp:506
CASocketGroupEpoll * m_psocketgroupUsersRead
Definition: CAFirstMix.hpp:466
tUINT32withLock * m_PacketsPerCountryOUT
Definition: CAFirstMix.hpp:537
SINT32 decUsers(LP_fmHashTableEntry pHashEntry)
Definition: CAFirstMix.hpp:415
CAFirstMixChannelList * m_pChannelList
Definition: CAFirstMix.hpp:464
tUINT32withLock * m_PacketsPerCountryIN
Definition: CAFirstMix.hpp:536
UINT32 m_nSocketsIn
Definition: CAFirstMix.hpp:455
CAThreadPool * m_pthreadsLogin
Definition: CAFirstMix.hpp:505
CAQueue * m_pQueueReadFromMix
Definition: CAFirstMix.hpp:449
SINT32 removeIP(const UINT8 ip[4])
Removes the IP-Address from the list.
Definition: CAIPList.cpp:189
static SINT32 printMsg(UINT32 typ, const char *format,...)
Writes a given message to the log.
Definition: CAMsg.cpp:251
SOCKET getSocket()
Definition: CAMuxSocket.hpp:92
SINT32 close()
Closes the underlying socket.
SINT32 receive(MIXPACKET *pPacket)
Receives a whole MixPacket.
CASocket * getCASocket()
Definition: CAMuxSocket.hpp:84
SINT32 prepareForSend(MIXPACKET *inoutPacket)
SINT32 add(const void *buff, UINT32 size)
Adds data to the Queue.
Definition: CAQueue.cpp:76
SINT32 get(UINT8 *pbuff, UINT32 *psize)
Gets up to psize number of bytes from the Queue.
Definition: CAQueue.cpp:148
UINT32 getSize()
Returns the size of stored data in byte.
Definition: CAQueue.hpp:101
SINT32 remove(CASocket &s)
bool isSignaled(CASocket &s)
SINT32 add(CASocket &s)
Adds the socket s to the socket group.
virtual SINT32 send(const UINT8 *buff, UINT32 len)
Sends some data over the network.
Definition: CASocket.cpp:400
virtual SINT32 crypt2(const UINT8 *in, UINT8 *out, UINT32 len)=0
virtual SINT32 crypt1(const UINT8 *in, UINT8 *out, UINT32 len)=0
This class could be used for encryption/decryption of data (streams) with AES using 128bit CBC mode.
Definition: CASymCipher.hpp:42
SINT32 setKey(const UINT8 *key)
Sets the key for encryption.
Definition: CASymCipher.cpp:52
SINT32 join()
Waits for the main function to finish execution.
Definition: CAThread.cpp:187
SINT32 destroy(bool bWaitForFinish)
const SINT32 E_SUCCESS
Definition: errorcodes.hpp:2
#define E_UNKNOWN
Definition: errorcodes.hpp:3
HCHANNEL channel
Definition: typedefs.hpp:117
UINT8 data[DATA_SIZE]
Definition: typedefs.hpp:121
UINT16 flags
Definition: typedefs.hpp:118
CASymChannelCipher * pCipher
CASymChannelCipher * pSymCipher
CAControlChannelDispatcher * pControlChannelDispatcher
volatile UINT32 delayBucket
CountryID of this IP Address.
Definition: typedefs.hpp:169
MIXPACKET packet
Definition: typedefs.hpp:170
#define CHANNEL_DATA
Definition: typedefs.hpp:42
#define MIXPACKET_SIZE
Definition: typedefs.hpp:40
#define CHANNEL_DUMMY
Definition: typedefs.hpp:50
UINT32 HCHANNEL
Definition: typedefs.hpp:34
#define CHANNEL_CLOSE
Definition: typedefs.hpp:47
struct t_queue_entry tQueueEntry
Definition: typedefs.hpp:188
UINT16 len
Definition: typedefs.hpp:0
#define CHANNEL_SIG_CRIME
Definition: typedefs.hpp:58
#define CHANNEL_OPEN
Definition: typedefs.hpp:43
#define DATA_SIZE
Definition: typedefs.hpp:69

References CASocketGroupEpoll::add(), CAQueue::add(), CAFirstMixChannelList::addChannel(), ASSERT, t_fmhashtableentry::bCountPacket, t_MixPacket::channel, CHANNEL_CLOSE, CHANNEL_DATA, CHANNEL_DUMMY, CHANNEL_OPEN, CHANNEL_SIG_CRIME, t_firstmixchannellist::channelIn, t_firstmixchannellist::channelOut, CAMuxSocket::close(), t_fmhashtableentry::countryID, CASymChannelCipher::crypt1(), CASymChannelCipher::crypt2(), t_MixPacket::data, DATA_SIZE, CAFirstMix::decUsers(), t_fmhashtableentry::delayBucket, CAThreadPool::destroy(), diff64(), E_SUCCESS, E_UNKNOWN, t_MixPacket::flags, CAFirstMixChannelList::get(), CAQueue::get(), CAMuxSocket::getCASocket(), getcurrentTimeMicros(), getcurrentTimeMillis(), CAFirstMixChannelList::getFirst(), CAFirstMixChannelList::getFirstChannelForSocket(), CASocketGroupEpoll::getFirstSignaledSocketData(), getMemoryUsage(), CAFirstMixChannelList::getNext(), CAFirstMixChannelList::getNextChannel(), CASocketGroupEpoll::getNextSignaledSocketData(), getRandom(), CAQueue::getSize(), CAMuxSocket::getSocket(), CAAccountingInstance::handleJapPacket(), t_fmhashtableentry::id, tUINT32withLock::inc(), CAFirstMix::incMixedPackets(), CASocketGroupEpoll::isSignaled(), isZero64(), CAThread::join(), KEY_SIZE, len, LOG_ENCRYPTED, CAFirstMix::m_arrSocketsIn, CAFirstMix::m_bRestart, CAFirstMix::m_nSocketsIn, CAFirstMix::m_nUser, CAFirstMix::m_PacketsPerCountryIN, CAFirstMix::m_PacketsPerCountryOUT, CAFirstMix::m_pChannelList, CAFirstMix::m_pIPList, CAFirstMix::m_pMuxOut, CAFirstMix::m_pQueueReadFromMix, CAFirstMix::m_pQueueSendToMix, CAFirstMix::m_psocketgroupUsersRead, CAFirstMix::m_psocketgroupUsersWrite, CAFirstMix::m_pthreadAcceptUsers, CAFirstMix::m_pthreadReadFromMix, CAFirstMix::m_pthreadSendToMix, CAFirstMix::m_pthreadsLogin, MAX_NEXT_MIX_QUEUE_SIZE, MIXPACKET_SIZE, msSleep(), t_fmhashtableentry::oQueueEntry, t_queue_entry::packet, t_firstmixchannellist::pCipher, t_fmhashtableentry::pControlChannelDispatcher, t_fmhashtableentry::peerIP, t_firstmixchannellist::pHead, t_fmhashtableentry::pMuxSocket, t_fmhashtableentry::pQueueSend, CAMuxSocket::prepareForSend(), CAMsg::printMsg(), CAControlChannelDispatcher::proccessMixPacket(), t_fmhashtableentry::pSymCipher, CAMuxSocket::receive(), CAFirstMixChannelList::remove(), CASocketGroupEpoll::remove(), CAFirstMixChannelList::removeChannel(), CAIPList::removeIP(), RSA_SIZE, CASocketGroupEpoll::select(), CASocket::send(), set64(), CAFirstMixChannelList::setDelayParameters(), CASymCipher::setKey(), setZero64(), SOCKET_ERROR, and t_fmhashtableentry::uAlreadySendPacketSize.

Here is the call graph for this function:

The documentation for this class was generated from the following files: