Mixe for Privacy and Anonymity in the Internet
CAFirstMixA.cpp
Go to the documentation of this file.
00001 /*
00002 Copyright (c) 2000, The JAP-Team
00003 All rights reserved.
00004 Redistribution and use in source and binary forms, with or without modification,
00005 are permitted provided that the following conditions are met:
00006 
00007   - Redistributions of source code must retain the above copyright notice,
00008     this list of conditions and the following disclaimer.
00009 
00010   - Redistributions in binary form must reproduce the above copyright notice,
00011     this list of conditions and the following disclaimer in the documentation and/or
00012     other materials provided with the distribution.
00013 
00014   - Neither the name of the University of Technology Dresden, Germany nor the names of its contributors
00015     may be used to endorse or promote products derived from this software without specific
00016     prior written permission.
00017 
00018 
00019 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
00020 OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
00021 AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS
00022 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00023 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
00024 OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00025 IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00026 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE
00027 */
00028 #include "../StdAfx.h"
00029 #ifndef ONLY_LOCAL_PROXY
00030 #include "CAFirstMixA.hpp"
00031 #include "../CALibProxytest.hpp"
00032 #include "../CAThread.hpp"
00033 #include "../CASingleSocketGroup.hpp"
00034 #include "../CAInfoService.hpp"
00035 #include "../CAPool.hpp"
00036 #include "../CACmdLnOptions.hpp"
00037 #include "../CAAccountingInstance.hpp"
00038 #include "../CAStatusManager.hpp"
00039 #ifdef HAVE_EPOLL
00040   #include "../CASocketGroupEpoll.hpp"
00041 #endif
00042 
00043 void CAFirstMixA::shutDown()
00044 {
00045   m_bIsShuttingDown = true;
00046 
00047 #ifdef PAYMENT
00048   UINT32 connectionsClosed = 0;
00049   fmHashTableEntry* timeoutHashEntry;
00050 
00051 
00052   /* make sure no reconnect is possible when shutting down */
00053   if(m_pthreadAcceptUsers!=NULL)
00054   {
00055     CAMsg::printMsg(LOG_CRIT,"Wait for LoopAcceptUsers!\n");
00056     m_bRestart=true;
00057     m_pthreadAcceptUsers->join();
00058     delete m_pthreadAcceptUsers;
00059   }
00060   m_pthreadAcceptUsers=NULL;
00061 
00062   if(m_pInfoService != NULL)
00063   {
00064     CAMsg::printMsg(LOG_DEBUG,"Shutting down infoservice.\n");
00065     m_pInfoService->stop();
00066   }
00067 
00068   if(m_pChannelList!=NULL) // may happen if mixes did not yet connect to each other
00069   {
00070     while ((timeoutHashEntry = m_pChannelList->popTimeoutEntry(true)) != NULL)
00071     {
00072       CAMsg::printMsg(LOG_DEBUG,"Shutting down, closing client connection.\n");
00073       connectionsClosed++;
00074       closeConnection(timeoutHashEntry);
00075     }
00076     CAMsg::printMsg(LOG_DEBUG,"Closed %i client connections.\n", connectionsClosed);
00077   }
00078 #endif
00079   m_bRestart = true;
00080   m_bIsShuttingDown = false;
00081 }
00082 
00083 
00084 
00085 SINT32 CAFirstMixA::closeConnection(fmHashTableEntry* pHashEntry)
00086 {
00087   if (pHashEntry == NULL)
00088   {
00089     return E_UNKNOWN;
00090   }
00091 
00092   INIT_STACK;
00093   BEGIN_STACK("CAFirstMixA::closeConnection");
00094 
00095 
00096   fmChannelListEntry* pEntry;
00097   tQueueEntry* pQueueEntry = new tQueueEntry;
00098   MIXPACKET* pMixPacket=&pQueueEntry->packet;
00099 
00100   #ifdef LOG_TRAFFIC_PER_USER
00101     UINT64 current_time;
00102     getcurrentTimeMillis(current_time);
00103     CAMsg::printMsg(LOG_DEBUG,"Removing Connection wiht ID: %Lu -- login time [ms] %Lu -- logout time [ms] %Lu -- Traffic was: IN: %u  --  OUT: %u\n",pHashEntry->id,pHashEntry->timeCreated,current_time,pHashEntry->trafficIn,pHashEntry->trafficOut);
00104   #endif
00105   m_pIPList->removeIP(pHashEntry->peerIP);
00106 
00107   m_psocketgroupUsersRead->remove(*(pHashEntry->pMuxSocket));
00108   m_psocketgroupUsersWrite->remove(*(pHashEntry->pMuxSocket));
00109   pEntry = m_pChannelList->getFirstChannelForSocket(pHashEntry->pMuxSocket);
00110 
00111   while(pEntry!=NULL)
00112   {
00113     getRandom(pMixPacket->data,DATA_SIZE);
00114     pMixPacket->flags=CHANNEL_CLOSE;
00115     pMixPacket->channel=pEntry->channelOut;
00116     #ifdef LOG_PACKET_TIMES
00117       setZero64(pQueueEntry->timestamp_proccessing_start);
00118     #endif
00119     m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00120     delete pEntry->pCipher;
00121     pEntry->pCipher = NULL;
00122     pEntry=m_pChannelList->getNextChannel(pEntry);
00123 #ifdef CH_LOG_STUDY
00124     nrOfChOpMutex->lock();
00125     currentOpenedChannels--;
00126     nrOfChOpMutex->unlock();
00127 #endif //CH_LOG_STUDY
00128   }
00129   ASSERT(pHashEntry->pQueueSend!=NULL,"Send queue is NULL");
00130   delete pHashEntry->pQueueSend;
00131   pHashEntry->pQueueSend = NULL;
00132   delete pHashEntry->pSymCipher;
00133   pHashEntry->pSymCipher = NULL;
00134 
00135   #ifdef COUNTRY_STATS
00136     decUsers(pHashEntry);
00137   #else
00138     decUsers();
00139   #endif
00140 
00141   CAMuxSocket* pMuxSocket = pHashEntry->pMuxSocket;
00142   // Save the socket - its pointer will be deleted in this method!!! Crazy mad programming...
00143   m_pChannelList->remove(pHashEntry->pMuxSocket);
00144   delete pMuxSocket;
00145   pMuxSocket = NULL;
00146   //pHashEntry->pMuxSocket = NULL; // not needed now, but maybe in the future...
00147 
00148   delete pQueueEntry;
00149   pQueueEntry = NULL;
00150 
00151   FINISH_STACK("CAFirstMixA::closeConnection");
00152 
00153   return E_SUCCESS;
00154 }
00155 
00156 #define FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS 2*KEY_SIZE
00157 
00158 SINT32 CAFirstMixA::loop()
00159   {
00160 #ifndef NEW_MIX_TYPE
00161 #ifdef DELAY_USERS
00162     m_pChannelList->setDelayParameters( CALibProxytest::getOptions()->getDelayChannelUnlimitTraffic(),
00163                                       CALibProxytest::getOptions()->getDelayChannelBucketGrow(),
00164                                       CALibProxytest::getOptions()->getDelayChannelBucketGrowIntervall());
00165 #endif
00166 
00167   //  CASingleSocketGroup osocketgroupMixOut;
00168     SINT32 countRead=0;
00169     //#ifdef LOG_PACKET_TIMES
00170     //  tPoolEntry* pPoolEntry=new tPoolEntry;
00171     //  MIXPACKET* pMixPacket=&pPoolEntry->mixpacket;
00172     //#else
00173     tQueueEntry* pQueueEntry = new tQueueEntry;
00174     MIXPACKET* pMixPacket=&pQueueEntry->packet;
00175     //#endif
00176     m_nUser=0;
00177     SINT32 ret;
00178     //osocketgroupMixOut.add(*m_pMuxOut);
00179 
00180     UINT8* tmpBuff=new UINT8[sizeof(tQueueEntry)];
00181     CAMsg::printMsg(LOG_DEBUG,"Starting Message Loop... \n");
00182     bool bAktiv;
00183     UINT8 rsaBuff[RSA_SIZE];
00184 
00185 #ifdef LOG_TRAFFIC_PER_USER
00186     UINT64 current_time;
00187     UINT32 diff_time;
00188     CAMsg::printMsg(LOG_DEBUG,"Channel log formats:\n");
00189     CAMsg::printMsg(LOG_DEBUG,"1. Close received from user (times in micros) - 1:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open),PacketsOut (only data),ChannelDuration (open packet received --> close packet put into send queue to next mix)\n");
00190     CAMsg::printMsg(LOG_DEBUG,"2. Channel close from Mix(times in micros)- 2.:Channel-ID,Connection-ID,Channel open timestamp (microseconds),PacketsIn (only data and open), PacketsOut (only data),ChannelDuration (open packet received)--> close packet put into send queue to next user\n");
00191 #endif
00192 
00193 #ifdef _DEBUG
00194     CAThread* pLogThread=new CAThread((UINT8*)"CAFirstMixA - LogLoop");
00195     pLogThread->setMainLoop(fm_loopLog);
00196     pLogThread->start(this);
00197 #endif
00198 
00199 #ifdef LOG_CRIME
00200     CAIPAddrWithNetmask* surveillanceIPs = CALibProxytest::getOptions()->getCrimeSurveillanceIPs();
00201     UINT32 nrOfSurveillanceIPs = CALibProxytest::getOptions()->getNrOfCrimeSurveillanceIPs();
00202 #endif
00203 //    CAThread threadReadFromUsers;
00204 //    threadReadFromUsers.setMainLoop(loopReadFromUsers);
00205 //    threadReadFromUsers.start(this);
00206 
00207     while(!m_bRestart) /* the main mix loop as long as there are things that are not handled by threads. */
00208       {
00209 
00210         bAktiv=false;
00211 
00212 //LOOP_START:
00213 #ifdef PAYMENT
00214         // while checking if there are connections to close: synch with login threads
00215         m_pmutexLogin->lock();
00216         checkUserConnections();
00217         m_pmutexLogin->unlock();
00218 #endif
00219 //First Step
00220 //Checking for new connections
00221 // Now in a separate Thread....
00222 
00223 // Second Step
00224 // Checking for data from users
00225 // Now in a separate Thread (see loopReadFromUsers())
00226 //Only proccess user data, if queue to next mix is not to long!!
00227 
00228         if(m_pQueueSendToMix->getSize()<MAX_NEXT_MIX_QUEUE_SIZE)
00229           {
00230             countRead=m_psocketgroupUsersRead->select(/*false,*/0);       // how many JAP<->mix connections have received data from their coresponding JAP
00231             if(countRead>0)
00232               bAktiv=true;
00233 #ifdef HAVE_EPOLL
00234             //if we have epoll we do not need to search the whole list
00235             //of connected JAPs to find the ones who have sent data
00236             //as epoll will return ONLY these connections.
00237             fmHashTableEntry* pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getFirstSignaledSocketData();
00238             while(pHashEntry!=NULL)
00239               {
00240                 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
00241 #else
00242             //if we do not have epoll we have to go to the whole
00243             //list of open connections to find the ones which
00244             //actually have sent some data
00245             fmHashTableEntry* pHashEntry=m_pChannelList->getFirst();
00246             while(pHashEntry!=NULL&&countRead>0)                      // iterate through all connections as long as there is at least one active left
00247               {
00248                 CAMuxSocket* pMuxSocket=pHashEntry->pMuxSocket;
00249                 if(m_psocketgroupUsersRead->isSignaled(*pMuxSocket))  // if this one seems to have data
00250                   {
00251 #endif
00252 /*#ifdef DELAY_USERS
00253  * Don't delay upstream
00254                 if( m_pChannelList->hasDelayBuckets(pHashEntry->delayBucketID) )
00255                 {
00256 #endif*/
00257                     countRead--;
00258                     ret=pMuxSocket->receive(pMixPacket,0);
00259 
00260                     #if defined LOG_PACKET_TIMES||defined(LOG_CHANNEL)
00261                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start);
00262                       set64(pQueueEntry->timestamp_proccessing_start_OP,pQueueEntry->timestamp_proccessing_start);
00263                     #endif
00264                     #ifdef DATA_RETENTION_LOG
00265                       pQueueEntry->dataRetentionLogEntry.t_in=htonl(time(NULL));
00266                     #endif
00267                     if(ret<0&&ret!=E_AGAIN/*||pHashEntry->accessUntil<time()*/)
00268                     {
00269                       // remove dead connections
00270                       closeConnection(pHashEntry);
00271                     }
00272                     else if(ret==MIXPACKET_SIZE)                      // we've read enough data for a whole mix packet. nice!
00273                       {
00274 #ifdef PAYMENT
00275                         if (pHashEntry->bRecoverTimeout)
00276                         {
00277                           // renew the timeout only if recovery is allowed
00278                           m_pChannelList->pushTimeoutEntry(pHashEntry);
00279                         }
00280 #endif
00281                         #ifdef LOG_TRAFFIC_PER_USER
00282                           pHashEntry->trafficIn++;
00283                         #endif
00284                         #ifdef COUNTRY_STATS
00285                           m_PacketsPerCountryIN[pHashEntry->countryID].inc();
00286                         #endif
00287                         //New control channel code...!
00288                         if(pMixPacket->channel>0&&pMixPacket->channel<256)
00289                         {
00290                           if (pHashEntry->pControlChannelDispatcher->proccessMixPacket(pMixPacket))
00291                           {
00292                             goto NEXT_USER;
00293                           }
00294                           else
00295                           {
00296                             CAMsg::printMsg(LOG_DEBUG, "Control channel packet is invalid and could not be processed!\n");
00297                             closeConnection(pHashEntry);
00298                             goto NEXT_USER;
00299                           }
00300                         }
00301 #ifdef PAYMENT
00302                         if(accountTrafficUpstream(pHashEntry) != E_SUCCESS) goto NEXT_USER;
00303 #endif
00304                         if(pMixPacket->flags==CHANNEL_DUMMY) // just a dummy to keep the connection alife in e.g. NAT gateways
00305                         {
00306                           CAMsg::printMsg(LOG_DEBUG,"received dummy traffic\n");
00307                           getRandom(pMixPacket->data,DATA_SIZE);
00308                           #ifdef LOG_PACKET_TIMES
00309                             setZero64(pQueueEntry->timestamp_proccessing_start);
00310                           #endif
00311                           #ifdef LOG_TRAFFIC_PER_USER
00312                             pHashEntry->trafficOut++;
00313                           #endif
00314                           #ifdef COUNTRY_STATS
00315                             m_PacketsPerCountryOUT[pHashEntry->countryID].inc();
00316                           #endif
00317                           pHashEntry->pQueueSend->add(pQueueEntry,sizeof(tQueueEntry));
00318                           #ifdef HAVE_EPOLL
00319                             //m_psocketgroupUsersWrite->add(*pMuxSocket,pHashEntry);
00320                           #else
00321                             m_psocketgroupUsersWrite->add(*pMuxSocket);
00322                           #endif
00323                         }
00324                         else if(pMixPacket->flags==CHANNEL_CLOSE)     // closing one mix-channel (not the JAP<->mix connection!)
00325                         {
00326                           fmChannelListEntry* pEntry;
00327                           pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
00328                           if(pEntry!=NULL)
00329                           {
00330                             pMixPacket->channel=pEntry->channelOut;
00331                             getRandom(pMixPacket->data,DATA_SIZE);
00332                             #ifdef LOG_PACKET_TIMES
00333                               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00334                             #endif
00335                             m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00336                             /* Don't delay upstream
00337                             #ifdef DELAY_USERS
00338                             m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
00339                             #endif*/
00340                             #ifdef LOG_CHANNEL
00341                               //pEntry->packetsInFromUser++;
00342                               getcurrentTimeMicros(current_time);
00343                               diff_time=diff64(current_time,pEntry->timeCreated);
00344                               CAMsg::printMsg(LOG_DEBUG,"1:%u,%Lu,%Lu,%u,%u,%u\n",
00345                                                         pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
00346                                                         diff_time);
00347                             #endif
00348                             delete pEntry->pCipher;              // forget the symetric key of this connection
00349                             pEntry->pCipher = NULL;
00350                             m_pChannelList->removeChannel(pMuxSocket,pEntry->channelIn);
00351 
00352                             #ifdef CH_LOG_STUDY
00353                             nrOfChOpMutex->lock();
00354                             currentOpenedChannels--;
00355                             nrOfChOpMutex->unlock();
00356                             #endif //CH_LOG_STUDY
00357                           }
00358                           #ifdef _DEBUG
00359                           else
00360                           {
00361 //                            CAMsg::printMsg(LOG_DEBUG,"Invalid ID to close from Browser!\n");
00362                           }
00363                           #endif
00364                         }
00365                         else                                         // finally! a normal mix packet
00366                         {
00367                           CASymCipher* pCipher=NULL;
00368                           fmChannelListEntry* pEntry;
00369                           pEntry=m_pChannelList->get(pMuxSocket,pMixPacket->channel);
00370                           if(pEntry!=NULL&&pMixPacket->flags==CHANNEL_DATA)
00371                           {
00372                             pMixPacket->channel=pEntry->channelOut;
00373                             pCipher=pEntry->pCipher;
00374                             pCipher->crypt1(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00375                                                  // queue the packet for sending to the next mix.
00376                             #ifdef LOG_PACKET_TIMES
00377                               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00378                             #endif
00379 
00380                             //check if this IP must be logged due to crime detection
00381                             #ifdef LOG_CRIME
00382                               crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs, pEntry->pHead->peerIP,pEntry->pHead->peerPort, pMixPacket);
00383                             #endif
00384                             m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00385                             /* Don't delay upstream
00386                             #ifdef DELAY_USERS
00387                             m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
00388                             #endif*/
00389                             incMixedPackets();
00390                             #ifdef LOG_CHANNEL
00391                               pEntry->packetsInFromUser++;
00392                             #endif
00393                           }
00394                           else if(pEntry==NULL&&pMixPacket->flags==CHANNEL_OPEN)  // open a new mix channel
00395                           { // stefan: muesste das nicht vor die behandlung von CHANNEL_DATA? oder gilt OPEN => !DATA ?
00396                             //es gilt: open -> data
00397                             pHashEntry->pSymCipher->crypt1(pMixPacket->data,rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00398                             #ifdef REPLAY_DETECTION
00399                             // replace time(NULL) with the real timestamp ()
00400                             // packet-timestamp*REPLAY_BASE + m_u64ReferenceTime
00401                               if(m_pReplayDB->insert(rsaBuff,time(NULL))!=E_SUCCESS)
00402                               {
00403                                 CAMsg::printMsg(LOG_INFO,"Replay: Duplicate packet ignored.\n");
00404                                 continue;
00405                               }
00406                             #endif
00407                             pCipher= new CASymCipher();
00408                             pCipher->setKeys(rsaBuff,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00409                             for(int i=0;i<16;i++)
00410                               rsaBuff[i]=0xFF;
00411                             pCipher->setIV2(rsaBuff);
00412                             pCipher->crypt1(pMixPacket->data+FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS,pMixPacket->data,DATA_SIZE-FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00413                             getRandom(pMixPacket->data+DATA_SIZE-FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS,FIRST_MIX_SIZE_OF_SYMMETRIC_KEYS);
00414                             #if defined (LOG_CHANNEL) ||defined(DATA_RETENTION_LOG)
00415                               HCHANNEL tmpC=pMixPacket->channel;
00416                             #endif
00417 
00418                             HCHANNEL inChannel = pMixPacket->channel;
00419                             if(m_pChannelList->addChannel(pMuxSocket,pMixPacket->channel,pCipher,&pMixPacket->channel)!=E_SUCCESS)
00420                             { //todo move up ?
00421                               delete pCipher;
00422                               pCipher = NULL;
00423                             }
00424                             else
00425                             {
00426                               #ifdef CH_LOG_STUDY
00427                               nrOfChOpMutex->lock();
00428                               if(pHashEntry->channelOpenedLastIntervalTS !=
00429                                 lastLogTime)
00430                               {
00431                                 pHashEntry->channelOpenedLastIntervalTS =
00432                                   lastLogTime;
00433                                 nrOfOpenedChannels++;
00434                               }
00435                               currentOpenedChannels++;
00436                               nrOfChOpMutex->unlock();
00437                               #endif //CH_LOG_STUDY
00438 
00439                               #ifdef LOG_PACKET_TIMES
00440                                 getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00441                               #endif
00442                               #ifdef LOG_CHANNEL
00443                                 fmChannelListEntry* pTmpEntry=m_pChannelList->get(pMuxSocket,tmpC);
00444                                 pTmpEntry->packetsInFromUser++;
00445                                 set64(pTmpEntry->timeCreated,pQueueEntry->timestamp_proccessing_start);
00446                               #endif
00447                               #ifdef DATA_RETENTION_LOG
00448                                 pQueueEntry->dataRetentionLogEntry.entity.first.channelid=htonl(pMixPacket->channel);
00449                                 fmChannelListEntry* pTmpEntry1=m_pChannelList->get(pMuxSocket,tmpC);
00450                                 memcpy(pQueueEntry->dataRetentionLogEntry.entity.first.ip_in,pTmpEntry1->pHead->peerIP,4);
00451                                 pQueueEntry->dataRetentionLogEntry.entity.first.port_in=(UINT16)pTmpEntry1->pHead->peerPort;
00452                                 pQueueEntry->dataRetentionLogEntry.entity.first.port_in=htons(pQueueEntry->dataRetentionLogEntry.entity.first.port_in);
00453                               #endif
00454 
00455                               //check if this IP must be logged due to crime detection
00456                               #ifdef LOG_CRIME
00457 
00458                                 pEntry=m_pChannelList->get(pMuxSocket, inChannel);
00459                                 if(pEntry != NULL)
00460                                 {
00461                                   crimeSurveillance(surveillanceIPs, nrOfSurveillanceIPs, pEntry->pHead->peerIP,pEntry->pHead->peerPort, pMixPacket);
00462                                 }
00463                               #endif
00464                               m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00465                               /* Don't delay upstream
00466                               #ifdef DELAY_USERS
00467                               m_pChannelList->decDelayBuckets(pHashEntry->delayBucketID);
00468                               #endif*/
00469                               incMixedPackets();
00470                               #ifdef _DEBUG
00471 //                                      CAMsg::printMsg(LOG_DEBUG,"Added out channel: %u\n",pMixPacket->channel);
00472                               #endif
00473                             }
00474                           }
00475                         }
00476                       }
00477 /*#ifdef DELAY_USERS
00478                   }
00479 #endif*/
00480                 #ifdef HAVE_EPOLL
00481 NEXT_USER:
00482                   pHashEntry=(fmHashTableEntry*)m_psocketgroupUsersRead->getNextSignaledSocketData();
00483                 #else
00484                   }//if is signaled
00485 NEXT_USER:
00486                   pHashEntry=m_pChannelList->getNext();
00487                 #endif
00488               }
00489               if(countRead>0)
00490               {
00491                 CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::loop() - read from user --> countRead >0 after processing all connections!\n");
00492               }
00493           }
00494 //Third step
00495 //Sending to next mix
00496 
00497 // Now in a separate Thread (see loopSendToMix())
00498 
00499 //Step 4
00500 //Step 4a Receiving from mix to queue now in a separate thread
00501 //Step 4b Processing MixPackets received from Mix
00502 //todo check for error!!!
00503         countRead=m_nUser+1;
00504         while(countRead>0&&m_pQueueReadFromMix->getSize()>=sizeof(tQueueEntry))
00505           {
00506             bAktiv=true;
00507             countRead--;
00508             ret=sizeof(tQueueEntry);
00509             m_pQueueReadFromMix->get((UINT8*)pQueueEntry,(UINT32*)&ret);
00510             #ifdef LOG_PACKET_TIMES
00511               getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_start_OP);
00512             #endif
00513             if(pMixPacket->flags==CHANNEL_CLOSE) //close event
00514               {
00515                 #if defined(_DEBUG) && !defined(__MIX_TEST)
00516 //                  CAMsg::printMsg(LOG_DEBUG,"Closing Channel: %u ...\n",pMixPacket->channel);
00517                 #endif
00518                 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
00519                 if(pEntry!=NULL)
00520                   {
00521                     /* a hack to solve the SSL problem:
00522                      * set channel of downstream packet to in channel after they are dequeued
00523                      * from pEntry->pQueueSend so we can retrieve the channel entry to decrement
00524                      * the per channel count of enqueued downstream bytes.
00525                      */
00526                     #ifndef SSL_HACK
00527                     pMixPacket->channel=pEntry->channelIn;
00528                     #endif
00529 
00530                     pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00531                     //getRandom(pMixPacket->data,DATA_SIZE);
00532                     #ifdef LOG_PACKET_TIMES
00533                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00534                     #endif
00535                     pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
00536                     #ifdef LOG_TRAFFIC_PER_USER
00537                       pEntry->pHead->trafficOut++;
00538                     #endif
00539                     #ifdef COUNTRY_STATS
00540                       m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
00541                     #endif
00542                     #ifdef SSL_HACK
00543                       /* a hack to solve the SSL problem:
00544                        * per channel count of enqueued downstream bytes
00545                        */
00546                       pEntry->downStreamBytes += sizeof(tQueueEntry);
00547                     #endif
00548                     #ifdef LOG_CHANNEL
00549                       pEntry->packetsOutToUser++;
00550                       getcurrentTimeMicros(current_time);
00551                       diff_time=diff64(current_time,pEntry->timeCreated);
00552                       CAMsg::printMsg(LOG_DEBUG,"2:%u,%Lu,%Lu,%u,%u,%u\n",
00553                                                 pEntry->channelIn,pEntry->pHead->id,pEntry->timeCreated,pEntry->packetsInFromUser,pEntry->packetsOutToUser,
00554                                                 diff_time);
00555                     #endif
00556 
00557                     #ifdef HAVE_EPOLL
00558                       //m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
00559                     #else
00560                       m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket);
00561                     #endif
00562 
00563 
00564                     #ifndef SSL_HACK
00565                       delete pEntry->pCipher;              // forget the symetric key of this connection
00566                       pEntry->pCipher = NULL;
00567                       m_pChannelList->removeChannel(pEntry->pHead->pMuxSocket, pEntry->channelIn);
00568                     #ifdef CH_LOG_STUDY
00569                       nrOfChOpMutex->lock();
00570                       currentOpenedChannels--;
00571                       nrOfChOpMutex->unlock();
00572                     #endif
00573                     /* a hack to solve the SSL problem:
00574                      * remove channel after the close packet is enqueued
00575                      * from pEntry->pQueueSend
00576                      */
00577                     #endif
00578                   }
00579                   else
00580                   {
00581                     #ifdef DEBUG
00582                       CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: close channel -> client but channel does not exist.\n");
00583                     #endif
00584                   }
00585 
00586               }
00587             else
00588               {//flag !=close
00589                 #if defined(_DEBUG) && !defined(__MIX_TEST)
00590 //                  CAMsg::printMsg(LOG_DEBUG,"Sending Data to Browser!\n");
00591                 #endif
00592                 fmChannelList* pEntry=m_pChannelList->get(pMixPacket->channel);
00593 
00594                 if(pEntry!=NULL)
00595                   {
00596                     #ifdef LOG_CRIME
00597                       if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
00598                         {
00599                           //UINT32 id=(pMixPacket->flags>>8)&0x000000FF;
00600                           int log=LOG_ENCRYPTED;
00601                           if(!CALibProxytest::getOptions()->isEncryptedLogEnabled())
00602                             log=LOG_CRIT;
00603                           CAMsg::printMsg(log,"Detecting crime activity - next mix channel: %u -- "
00604                               "In-IP is: %u.%u.%u.%u \n", pMixPacket->channel,
00605                               pEntry->pHead->peerIP[0],
00606                               pEntry->pHead->peerIP[1],
00607                               pEntry->pHead->peerIP[2],
00608                               pEntry->pHead->peerIP[3]);
00609                           continue;
00610                         }
00611                     #endif
00612 
00613                     /* a hack to solve the SSL problem:
00614                      * same as CHANNEL_CLOSE packets
00615                      */
00616                     #ifndef SSL_HACK
00617                     pMixPacket->channel=pEntry->channelIn;
00618                     #endif
00619 
00620                     pEntry->pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
00621 
00622                     #ifdef LOG_PACKET_TIMES
00623                       getcurrentTimeMicros(pQueueEntry->timestamp_proccessing_end_OP);
00624                     #endif
00625                     pEntry->pHead->pQueueSend->add(pQueueEntry, sizeof(tQueueEntry));
00626                     /*CAMsg::printMsg(
00627                         LOG_INFO,"adding data packet to queue: %x, queue size: %u bytes\n",
00628                         pEntry->pHead->pQueueSend, pEntry->pHead->pQueueSend->getSize());*/
00629                     #ifdef LOG_TRAFFIC_PER_USER
00630                       pEntry->pHead->trafficOut++;
00631                     #endif
00632                     #ifdef COUNTRY_STATS
00633                       m_PacketsPerCountryOUT[pEntry->pHead->countryID].inc();
00634                     #endif
00635                     #ifdef LOG_CHANNEL
00636                       pEntry->packetsOutToUser++;
00637                     #endif
00638                     #ifdef SSL_HACK
00639                       /* a hack to solve the SSL problem:
00640                        * per channel count of downstream packets in bytes
00641                        */
00642                       pEntry->downStreamBytes += sizeof(tQueueEntry);
00643                     #endif
00644 
00645                     #ifdef HAVE_EPOLL
00646                       /*int epret = m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket,pEntry->pHead);
00647                       if(epret == E_UNKNOWN)
00648                       {
00649                         epret=errno;
00650                         CAMsg::printMsg(LOG_INFO,"epoll_add returns: %s (return value: %d) \n", strerror(epret), epret);
00651                       }*/
00652                       #else
00653                       m_psocketgroupUsersWrite->add(*pEntry->pHead->pMuxSocket);
00654                     #endif
00655 
00656                     incMixedPackets();
00657                   }
00658                 else
00659                   {
00660                     #ifdef _DEBUG
00661                       if(pMixPacket->flags!=CHANNEL_DUMMY)
00662                         {
00663 /*                          CAMsg::printMsg(LOG_DEBUG,"Error Sending Data to Browser -- "
00664                               "Channel-Id %u not valid!\n",pMixPacket->channel
00665                             );*/
00666                           #ifdef LOG_CHANNEL
00667                             CAMsg::printMsg(LOG_INFO,"Packet late arrive for channel: %u\n",pMixPacket->channel);
00668                           #endif
00669                         }
00670                     #endif
00671                   }
00672               }
00673           }
00674 
00677         bAktiv = sendToUsers();
00678 
00679         if(!bAktiv)
00680           msSleep(100);
00681       }
00682 //ERR:
00683     CAMsg::printMsg(LOG_CRIT,"Seems that we are restarting now!!\n");
00684     m_bRunLog=false;
00685     //clean();
00686     delete pQueueEntry;
00687     pQueueEntry = NULL;
00688     delete []tmpBuff;
00689     tmpBuff = NULL;
00690 #ifdef _DEBUG
00691     pLogThread->join();
00692     delete pLogThread;
00693     pLogThread = NULL;
00694 #endif
00695     CAMsg::printMsg(LOG_CRIT,"Main Loop exited!!\n");
00696 #endif//NEW_MIX_TYPE
00697     return E_UNKNOWN;
00698   }
00699 #endif //ONLY_LOCAL_PROXY
00700 
00701 /* last part of the main loop:
00702  * return true if the loop when at least one packet was sent
00703  * or false otherwise.
00704  */
00705 bool CAFirstMixA::sendToUsers()
00706 {
00707   SINT32 countRead = m_psocketgroupUsersWrite->select(/*true,*/0);
00708   tQueueEntry *packetToSend = NULL;
00709   SINT32 packetSize = sizeof(tQueueEntry);
00710   CAQueue *controlMessageUserQueue = NULL;
00711   CAQueue *dataMessageUserQueue = NULL;
00712 
00713   CAQueue *processedQueue = NULL; /* one the above queues that should be used for processing*/
00714   UINT32 extractSize = 0;
00715   bool bAktiv = false;
00716   UINT32 iSocketErrors = 0;
00717 
00718 /* Cyclic polling: gets all open sockets that will not block when invoking send()
00719  * but will only send at most one packet. After that control is returned to loop()
00720  */
00721 #ifdef HAVE_EPOLL
00722   fmHashTableEntry* pfmHashEntry=
00723     (fmHashTableEntry*) m_psocketgroupUsersWrite->getFirstSignaledSocketData();
00724 
00725   while(pfmHashEntry != NULL)
00726   {
00727 
00728 #else
00729   fmHashTableEntry* pfmHashEntry=m_pChannelList->getFirst();
00730   while( (countRead > 0) && (pfmHashEntry != NULL) )
00731   {
00732     if(m_psocketgroupUsersWrite->isSignaled(*pfmHashEntry->pMuxSocket))
00733     {
00734       countRead--;
00735 #endif
00736       /* loop turn init */
00737       extractSize = 0;
00738       processedQueue = NULL;
00739       packetToSend = &(pfmHashEntry->oQueueEntry);
00740       controlMessageUserQueue = pfmHashEntry->pControlMessageQueue;
00741       dataMessageUserQueue = pfmHashEntry->pQueueSend;
00742 
00743       //Control messages have a higher priority.
00744       if(controlMessageUserQueue->getSize() > 0)
00745       {
00746         processedQueue = controlMessageUserQueue;
00747         pfmHashEntry->bCountPacket = false;
00748       }
00749       else if( (dataMessageUserQueue->getSize() > 0)
00750 #ifdef DELAY_USERS
00751           && m_pChannelList->hasDelayBuckets(pfmHashEntry->delayBucketID)
00752 #endif
00753       )
00754       {
00755         processedQueue = dataMessageUserQueue;
00756         pfmHashEntry->bCountPacket = true;
00757       }
00758 
00759       if(processedQueue != NULL)
00760       {
00761         extractSize = packetSize;
00762         bAktiv=true;
00763 
00764         if(pfmHashEntry->uAlreadySendPacketSize == -1)
00765         {
00766           processedQueue->get((UINT8*) packetToSend, &extractSize);
00767 
00768           /* Hack for SSL BUG */
00769 #ifdef SSL_HACK
00770           finishPacket(pfmHashEntry);
00771 #endif //SSL_HACK
00772           pfmHashEntry->pMuxSocket->prepareForSend(&(packetToSend->packet));
00773           pfmHashEntry->uAlreadySendPacketSize = 0;
00774         }
00775       }
00776 
00777       if( (extractSize > 0) || (pfmHashEntry->uAlreadySendPacketSize > 0) )
00778       {
00779         SINT32 len =  MIXPACKET_SIZE - pfmHashEntry->uAlreadySendPacketSize;
00780         UINT8* packetToSendOffset = ((UINT8*)&(packetToSend->packet)) + pfmHashEntry->uAlreadySendPacketSize;
00781         CASocket* clientSocket = pfmHashEntry->pMuxSocket->getCASocket();
00782 
00783         SINT32 ret = clientSocket->send(packetToSendOffset, len);
00784 
00785         if(ret > 0)
00786         {
00787 #ifdef PAYMENT
00788           SINT32 accounting = E_SUCCESS;
00789 #endif
00790           pfmHashEntry->uAlreadySendPacketSize += ret;
00791 
00792           if(pfmHashEntry->uAlreadySendPacketSize == MIXPACKET_SIZE)
00793           {
00794             #ifdef DELAY_USERS
00795             if(processedQueue != controlMessageUserQueue)
00796             {
00797               m_pChannelList->decDelayBuckets(pfmHashEntry->delayBucketID);
00798             }
00799             #endif
00800 
00801             #ifdef LOG_PACKET_TIMES
00802               if(!isZero64(pfmHashEntry->oQueueEntry.timestamp_proccessing_start))
00803                 {
00804                   getcurrentTimeMicros(pfmHashEntry->oQueueEntry.timestamp_proccessing_end);
00805                   m_pLogPacketStats->addToTimeingStats(pfmHashEntry->oQueueEntry,CHANNEL_DATA,false);
00806                 }
00807             #endif
00808             pfmHashEntry->uAlreadySendPacketSize=-1;
00809 #ifdef PAYMENT
00810             /* count this packet for accounting */
00811             accounting = accountTrafficDownstream(pfmHashEntry);
00812 #endif
00813           }
00814 
00815         }
00816         else if(ret<0&&ret!=E_AGAIN)
00817         {
00818           iSocketErrors++;
00819           // if (iSocketErrors == 1) // show debug message only at the first error; otherwise, the log may get huge
00820           {
00821             SOCKET sock=clientSocket->getSocket();
00822             CAMsg::printMsg(LOG_DEBUG,"CAFirstMixA::sendtoUser() - send error %d on socket %d. Reason: %s (%i)\n", ret, sock, GET_NET_ERROR_STR(GET_NET_ERROR), GET_NET_ERROR);
00823           }
00824           // kick the user out - these only happens in extreme situations...
00825           closeConnection(pfmHashEntry);
00826         }
00827         //TODO error handling
00828       }
00829 
00830 #ifdef HAVE_EPOLL
00831     pfmHashEntry=(fmHashTableEntry*)m_psocketgroupUsersWrite->getNextSignaledSocketData();
00832   }
00833 #else
00834     }//if is socket signaled
00835     pfmHashEntry=m_pChannelList->getNext();
00836   }
00837 #endif
00838   if (iSocketErrors > 1)
00839   {
00840     CAMsg::printMsg(LOG_ERR, "CAFirstMixA::sendtoUser() - %d send errors on a socket occured!\n", iSocketErrors);
00841   }
00842   
00843   return bAktiv;
00844 }
00845 
00846 #ifdef PAYMENT
00847 SINT32 CAFirstMixA::accountTrafficUpstream(fmHashTableEntry* pHashEntry)
00848 {
00849   SINT32 ret = E_SUCCESS;
00850 
00851   SINT32 handleResult = CAAccountingInstance::handleJapPacket(pHashEntry, false, false);
00852 
00853   if (CAAccountingInstance::HANDLE_PACKET_CONNECTION_OK == handleResult)
00854   {
00855     // renew the timeout
00856     //pHashEntry->bRecoverTimeout = true;
00857     m_pChannelList->pushTimeoutEntry(pHashEntry);
00858   }
00859   else if (CAAccountingInstance::HANDLE_PACKET_PREPARE_FOR_CLOSING_CONNECTION == handleResult)
00860   {
00861     // do not forward this packet
00862     pHashEntry->bRecoverTimeout = false;
00863     m_pChannelList->setKickoutForced(pHashEntry, KICKOUT_FORCED);
00864     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: 1. setting bRecover timout to false for entry %x!\n", pHashEntry);
00865     //m_pChannelList->pushTimeoutEntry(pHashEntry);
00866     //don't let any upstream data messages pass for this user.
00867     ret = E_UNKNOWN;
00868   }
00869   else if (CAAccountingInstance::HANDLE_PACKET_CLOSE_CONNECTION == handleResult)
00870   {
00871     // kickout this user - he deserves it...
00872     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: kickout upstream!\n");
00873     closeConnection(pHashEntry);
00874     ret = E_UNKNOWN;
00875   }
00876   //please remember that these values also may be returned even though they do not require
00877   //any further processing
00878   /*else if ( (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_UNCHECKED) &&
00879         (ret == CAAccountingInstance::HANDLE_PACKET_HOLD_CONNECTION) )
00880   {
00881 
00882   }*/
00883   return ret;
00884 }
00885 #endif
00886 
00887 #ifdef PAYMENT
00888 SINT32 CAFirstMixA::accountTrafficDownstream(fmHashTableEntry* pfmHashEntry)
00889 {
00890   // count packet for payment
00891   SINT32 ret = CAAccountingInstance::handleJapPacket(pfmHashEntry, !(pfmHashEntry->bCountPacket), true);
00892   if (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_OK )
00893   {
00894     // renew the timeout
00895     //pfmHashEntry->bRecoverTimeout = true;
00896     m_pChannelList->pushTimeoutEntry(pfmHashEntry);
00897   }
00898   else if (ret == CAAccountingInstance::HANDLE_PACKET_PREPARE_FOR_CLOSING_CONNECTION )
00899   {
00900     // when all control messages are sent the users connection will be closed
00901     //pfmHashEntry->bRecoverTimeout = false;
00902     m_pChannelList->setKickoutForced(pfmHashEntry, KICKOUT_FORCED);
00903     //m_pChannelList->pushTimeoutEntry(pfmHashEntry);
00904   }
00905   else if (ret == CAAccountingInstance::HANDLE_PACKET_CLOSE_CONNECTION )
00906   {
00907     // close users connection immediately
00908     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: Closing JAP connection due to illegal payment status!\n", ret);
00909     closeConnection(pfmHashEntry);
00910     return ERR_INTERN_SOCKET_CLOSED;
00911   }
00912   //please remember that these values also may be returned even though they do not require
00913   //any further processing
00914   /*else if ( (ret == CAAccountingInstance::HANDLE_PACKET_CONNECTION_UNCHECKED) &&
00915         (ret == CAAccountingInstance::HANDLE_PACKET_HOLD_CONNECTION) )
00916   {
00917 
00918   }*/
00919   return E_SUCCESS;
00920 }
00921 #endif
00922 
00923 void CAFirstMixA::notifyAllUserChannels(fmHashTableEntry *pfmHashEntry, UINT16 flags)
00924 {
00925   if(pfmHashEntry == NULL) 
00926     return;
00927   fmChannelListEntry* pEntry = m_pChannelList->getFirstChannelForSocket(pfmHashEntry->pMuxSocket);
00928   tQueueEntry* pQueueEntry=new tQueueEntry;
00929   MIXPACKET *notifyPacket = &(pQueueEntry->packet);
00930   memset(notifyPacket, 0, MIXPACKET_SIZE);
00931 
00932   notifyPacket->flags = flags;
00933   while(pEntry != NULL)
00934   {
00935     if(pEntry->bIsSuspended)
00936     {
00937       notifyPacket->channel = pEntry->channelOut;
00938       getRandom(notifyPacket->data,DATA_SIZE);
00939 #ifdef _DEBUG
00940       CAMsg::printMsg(LOG_INFO,"Sent flags %u for channel: %u\n", flags, notifyPacket->channel);
00941 #endif
00942       m_pQueueSendToMix->add(pQueueEntry, sizeof(tQueueEntry));
00943       pEntry->bIsSuspended = false;
00944     }
00945     pEntry=m_pChannelList->getNextChannel(pEntry);
00946   }
00947   pfmHashEntry->cSuspend=0;
00948   delete pQueueEntry;
00949 }
00950 
00951 //@todo: not a reliable solution. Still have to find the bug that causes SSL connections to be resetted
00952 //while large downloads are performed by the same user (only occurs in cascades with more than two mixes)
00953 #ifdef SSL_HACK
00954 
00955 void CAFirstMixA::finishPacket(fmHashTableEntry *pfmHashEntry)
00956 {
00957   tQueueEntry *packetToSend = &(pfmHashEntry->oQueueEntry);
00958   fmChannelList* cListEntry=m_pChannelList->get(packetToSend->packet.channel);
00959   if(cListEntry != NULL)
00960   {
00961     packetToSend->packet.channel = cListEntry->channelIn;
00962     cListEntry->downStreamBytes -= sizeof(tQueueEntry);
00963 #ifdef DEBUG
00964     CAMsg::printMsg(LOG_DEBUG, "CAFirstMixA: channels of current packet, in: %u, out: %u, count: %u, flags: 0x%x\n",
00965         cListEntry->channelIn, cListEntry->channelOut, cListEntry->downStreamBytes,
00966         packetToSend->packet.flags);
00967 #endif
00968     if(packetToSend->packet.flags == CHANNEL_CLOSE)
00969     {
00970       delete cListEntry->pCipher;
00971       cListEntry->pCipher = NULL;
00972       m_pChannelList->removeChannel(pfmHashEntry->pMuxSocket, cListEntry->channelIn);
00973 #ifdef CH_LOG_STUDY
00974       nrOfChOpMutex->lock();
00975       currentOpenedChannels--;
00976       nrOfChOpMutex->unlock();
00977 #endif //CH_LOG_STUDY
00978     }
00979   }
00980 }
00981 
00982 #endif //SSL_HACK
00983 
00984 #ifdef PAYMENT
00985 void CAFirstMixA::checkUserConnections()
00986 {
00987   // check the timeout for all connections
00988   fmHashTableEntry* timeoutHashEntry;
00989   fmHashTableEntry* firstIteratorEntry = NULL;
00990   bool currentEntryKickoutForced = false;
00991   /* this check also includes forced kickouts which have not bRecoverTimeout set. */
00992   while ( (timeoutHashEntry = m_pChannelList->popTimeoutEntry(true)) != NULL )
00993   {
00994     currentEntryKickoutForced = m_pChannelList->isKickoutForced(timeoutHashEntry);
00995     if(firstIteratorEntry == timeoutHashEntry)
00996     {
00997       m_pChannelList->pushTimeoutEntry(timeoutHashEntry, currentEntryKickoutForced);
00998       break;
00999     }
01000 
01001     if (!currentEntryKickoutForced)
01002     {
01003       //CAMsg::printMsg(LOG_ERR, "%p\n, ", timeoutHashEntry);
01004       if(m_pChannelList->isTimedOut(timeoutHashEntry) )
01005       {
01006         CAMsg::printMsg(LOG_DEBUG,"Client connection closed due to timeout.\n");
01007         closeConnection(timeoutHashEntry);
01008         continue;
01009       }
01010     }
01011     else
01012     {
01013       //A user to be kicked out: empty his downstream data queue.
01014       timeoutHashEntry->pQueueSend->clean();
01015 
01016       if( (timeoutHashEntry->pControlMessageQueue->getSize() == 0) ||
01017         (timeoutHashEntry->kickoutSendRetries <= 0) )
01018       {
01019         CAMsg::printMsg(LOG_WARNING, "Kickout immediately owner %x!\n", timeoutHashEntry);
01020         UINT32 authFlags = CAAccountingInstance::getAuthFlags(timeoutHashEntry);
01021         if (authFlags > 0)
01022         {
01023           CAMsg::printMsg(LOG_WARNING,"Client connection closed due to forced timeout! Payment auth flags: %u\n", authFlags);
01024         }
01025         else
01026         {
01027           CAMsg::printMsg(LOG_WARNING,"Client connection closed due to forced timeout!\n");
01028         }
01029         //CAAccountingInstance::setPrepaidBytesToZero(timeoutHashEntry->pAccountingInfo);
01030         closeConnection(timeoutHashEntry);
01031         continue;
01032       }
01033       else
01034       {
01035         //Note this counter initialized by calling CAFirstMixChannelList::add
01036         //and accessed by this thread, both do never run concurrently.
01037         //So we can avoid locking.
01038         timeoutHashEntry->kickoutSendRetries--;
01039         CAMsg::printMsg(LOG_INFO, "Size of control message queue for user to be kicked out: %u bytes, retries %d.\n",
01040             timeoutHashEntry->pControlMessageQueue->getSize(), timeoutHashEntry->kickoutSendRetries);
01041       }
01042       // Let the client obtain all his remaining control message packets
01043       //(which in most cases contain the error message with the kickout reason.
01044       CAMsg::printMsg(LOG_WARNING,"A kickout is supposed to happen. Let the user get his %u control message bytes before...\n",
01045           timeoutHashEntry->pControlMessageQueue->getSize());
01046     }
01047     if(firstIteratorEntry == NULL)
01048     {
01049       firstIteratorEntry = timeoutHashEntry;
01050     }
01051     m_pChannelList->pushTimeoutEntry(timeoutHashEntry, currentEntryKickoutForced);
01052   }
01053 }
01054 #endif
01055 
01056 #ifdef LOG_CRIME
01057 void CAFirstMixA::crimeSurveillance(CAIPAddrWithNetmask* surveillanceIPs, UINT32 nrOfSurveillanceIPs,UINT8 *peerIP,SINT32 peerPort, MIXPACKET *pMixPacket)
01058 {
01059   if( (nrOfSurveillanceIPs > 0) && (surveillanceIPs != NULL) )
01060   {
01061     for(UINT32 i = 0; i < nrOfSurveillanceIPs; i++)
01062     {
01063       if(surveillanceIPs[i].equals(peerIP))
01064       {
01065         CAMsg::printMsg(LOG_CRIT,"Crime detection: User surveillance, IP %u.%u.%u.%u Port %i with next mix channel %u\n",peerIP[0], peerIP[1], peerIP[2], peerIP[3],peerPort,pMixPacket->channel);
01066         pMixPacket->flags |= CHANNEL_SIG_CRIME;
01067         return;
01068       }
01069     }
01070   }
01071 }
01072 #endif