Mixe for Privacy and Anonymity in the Internet
Macros | Functions
CAMiddleMix.cpp File Reference
#include "StdAfx.h"
#include "CAMiddleMix.hpp"
#include "CASingleSocketGroup.hpp"
#include "CAMsg.hpp"
#include "CACmdLnOptions.hpp"
#include "CASocketAddrINet.hpp"
#include "CASocketAddrUnix.hpp"
#include "CAThread.hpp"
#include "CAInfoService.hpp"
#include "CAUtil.hpp"
#include "CABase64.hpp"
#include "CAPool.hpp"
#include "xml/DOM_Output.hpp"
#include "CAStatusManager.hpp"
#include "CALibProxytest.hpp"
#include "CAControlChannelDispatcher.hpp"
#include "CASymChannelCipher.hpp"
#include "CASymChannelCipherFactory.hpp"
Include dependency graph for CAMiddleMix.cpp:

Go to the source code of this file.

Macros

#define MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS   2*KEY_SIZE
 
#define MIDDLE_MIX_ASYM_PADDING_SIZE   42
 
#define RETRIES   100
 
#define RETRYTIME   10
 

Functions

THREAD_RETURN mm_loopSendToMixAfter (void *param)
 UPSTREAM (to WEB) Take the packets from the Queue and write them to the Socket. More...
 
THREAD_RETURN mm_loopSendToMixBefore (void *param)
 DOWNSTREAM (to Client) Take the packets from the Queue and write them to the Socket. More...
 
THREAD_RETURN mm_loopReadFromMixBefore (void *param)
 
THREAD_RETURN mm_loopReadFromMixAfter (void *param)
 

Macro Definition Documentation

◆ MIDDLE_MIX_ASYM_PADDING_SIZE

#define MIDDLE_MIX_ASYM_PADDING_SIZE   42

Definition at line 1041 of file CAMiddleMix.cpp.

◆ MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS

#define MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS   2*KEY_SIZE

Definition at line 1040 of file CAMiddleMix.cpp.

◆ RETRIES

#define RETRIES   100

◆ RETRYTIME

#define RETRYTIME   10

Function Documentation

◆ mm_loopReadFromMixAfter()

THREAD_RETURN mm_loopReadFromMixAfter ( void *  param)

SGX MIX locksem(downstreamSemPreId, SN_EMPTY); memcpy(downstreamPreBuffer,pPoolEntry, sizeof(tPoolEntry)); unlocksem(downstreamSemPreId, SN_FULL);

locksem(downstreamSemPostId, SN_FULL); memcpy(pPoolEntry,downstreamPostBuffer, sizeof(tPoolEntry)); unlocksem(downstreamSemPostId, SN_EMPTY); getRandom(pMixPacket->data,DATA_SIZE);

SGX MIX locksem(pMix->downstreamSemPreId, SN_EMPTY); memcpy(pMix->downstreamPreBuffer,pPoolEntry, sizeof(tPoolEntry)); unlocksem(pMix->downstreamSemPreId, SN_FULL);

locksem(pMix->downstreamSemPostId, SN_FULL); memcpy(pPoolEntry,pMix->downstreamPostBuffer, sizeof(tPoolEntry)); unlocksem(pMix->downstreamSemPostId, SN_EMPTY);

Definition at line 1254 of file CAMiddleMix.cpp.

1255  {
1256  CAMiddleMix* pMix = static_cast<CAMiddleMix*>(param);
1257  HCHANNEL channelIn;
1258  CASymChannelCipher* pCipher=NULL;
1259 
1260  tPoolEntry* pPoolEntry=new tPoolEntry;
1261  MIXPACKET* pMixPacket=&pPoolEntry->packet;
1262 
1263  SINT32 ret;
1264  CASingleSocketGroup oSocketGroup(false);
1265  oSocketGroup.add(*(pMix->m_pMuxOut));
1266 
1267  CAQueue* pQueueSendtoMix=pMix->m_pQueueSendToMixBefore;
1268 
1269 #ifdef USE_POOL
1270  CAPool* pPool=new CAPool(MIX_POOL_SIZE);
1271 #endif
1272  while(pMix->m_bRun)
1273  {
1274  if(pQueueSendtoMix->getSize()>MAX_READ_FROM_NEXT_MIX_QUEUE_SIZE)
1275  {
1276 #ifdef DEBUG
1277  CAMsg::printMsg(LOG_DEBUG,"CAMiddleMix::Queue next is full!\n");
1278 #endif
1279  msSleep(200);
1280  continue;
1281  }
1282  #ifndef USE_POOL
1283  ret=oSocketGroup.select(1000);
1284  #else
1285  ret=oSocketGroup.select(MIX_POOL_TIMEOUT);
1286  #endif
1287  if(ret!=1)
1288  {
1289  if(ret==E_TIMEDOUT)
1290  {
1291  #ifndef USE_POOL
1292  continue;
1293  #else
1294  pMixPacket->channel=DUMMY_CHANNEL;
1295  pMixPacket->flags=CHANNEL_DUMMY;
1296  getRandom(pMixPacket->data,DATA_SIZE);
1297  pPool->pool(pPoolEntry);
1298 /* SGX MIX
1299  locksem(downstreamSemPreId, SN_EMPTY);
1300  memcpy(downstreamPreBuffer,pPoolEntry, sizeof(tPoolEntry));
1301  unlocksem(downstreamSemPreId, SN_FULL);
1302 
1303  locksem(downstreamSemPostId, SN_FULL);
1304  memcpy(pPoolEntry,downstreamPostBuffer, sizeof(tPoolEntry));
1305  unlocksem(downstreamSemPostId, SN_EMPTY);
1306 */
1307  if(pMix->m_pMuxIn->send(pMixPacket)==SOCKET_ERROR)
1308  pMix->m_bRun=false;
1309  #endif
1310  }
1311  else
1312  {
1313  CAMsg::printMsg(LOG_CRIT,"loopReadFromMixAfter -- Error on select() while waiting for data from next mix -- goto ERR!\n");
1314  pMix->m_bRun=false;
1316  }
1317  }
1318  else
1319  {
1320  ret=pMix->m_pMuxOut->receive(pMixPacket);
1321  if ((ret!=SOCKET_ERROR)&&(pMixPacket->flags & ~CHANNEL_ALLOWED_FLAGS))
1322  {
1323  CAMsg::printMsg(LOG_INFO,"loopReadFromMixAfter -- received a packet with invalid flags: %0X . Removing them.\n",(pMixPacket->flags & ~CHANNEL_ALLOWED_FLAGS));
1324  pMixPacket->flags&=CHANNEL_ALLOWED_FLAGS;
1325  }
1326  if(ret==SOCKET_ERROR)
1327  {
1328  CAMsg::printMsg(LOG_CRIT,"loopReadFromMixAfter -- Error while receiving data from next mix. Reason: %s (%i)\n",
1330  pMix->m_bRun=false;
1332  }
1333  #ifdef USE_POOL
1334  else if(pMixPacket->channel==DUMMY_CHANNEL)
1335  {
1336  pMixPacket->flags=CHANNEL_DUMMY;
1337  getRandom(pMixPacket->data,DATA_SIZE);
1338  pPool->pool(pPoolEntry);
1348  if(pMix->m_pMuxIn->send(pMixPacket)==SOCKET_ERROR)
1349  pMix->m_bRun=false;
1350  }
1351  #endif
1352  #ifdef REPLAY_DETECTION
1353  else if(pMixPacket->channel==REPLAY_CONTROL_CHANNEL_ID)
1354  {
1355  pQueue->add(pPoolEntry,sizeof(tPoolEntry));
1356  }
1357  #endif
1358  else if(pMix->m_pMiddleMixChannelList->getOutToIn(&channelIn,pMixPacket->channel,&pCipher)==E_SUCCESS)
1359  {//connection found
1360 
1371 #ifdef LOG_CRIME
1372  HCHANNEL channelOut = pMixPacket->channel;
1373 #endif
1374  pMixPacket->channel=channelIn;
1375 #ifdef LOG_CRIME
1376  if((pMixPacket->flags&CHANNEL_SIG_CRIME)==CHANNEL_SIG_CRIME)
1377  {
1378  getRandom(pMixPacket->data,DATA_SIZE);
1379  //Log in and out channel number, to allow
1380  CAMsg::printMsg(LOG_CRIT,"Detecting crime activity - previous mix channel: %u, "
1381  "next mix channel: %u\n", channelIn, channelOut);
1382  }
1383  else
1384 #endif
1385  pCipher->crypt2(pMixPacket->data,pMixPacket->data,DATA_SIZE);
1386  pCipher->unlock();
1387  #ifdef USE_POOL
1388  pPool->pool(pPoolEntry);
1389  #endif
1390  if((pMixPacket->flags&CHANNEL_CLOSE)!=0)
1391  {//Channel close received -->remove channel form channellist
1392  pMix->m_pMiddleMixChannelList->remove(channelIn);
1393  }
1394  pMix->putMixPacketIntoQueueSendToMixBefore(pPoolEntry);
1395  }
1396  }
1397  }
1398 
1399  CAMsg::printMsg(LOG_CRIT,"loopReadFromMixAfter -- Exiting clean ups...\n");
1400  pMix->m_bRun=false;
1401  UINT8 b[sizeof(tQueueEntry)+1];
1402  /* write bytes to the send queues to accelerate loop()-joins for the send threads*/
1403  if(pMix->m_pQueueSendToMixBefore!=NULL)
1404  {
1405  pMix->m_pQueueSendToMixBefore->add(b,sizeof(tQueueEntry)+1);
1406  }
1407  if(pMix->m_pQueueSendToMixAfter!=NULL)
1408  {
1409  pMix->m_pQueueSendToMixAfter->add(b,sizeof(tQueueEntry)+1);
1410  }
1411  delete pPoolEntry;
1412  pPoolEntry = NULL;
1413  #ifdef USE_POOL
1414  delete pPool;
1415  pPool = NULL;
1416  #endif
1417  CAMsg::printMsg(LOG_CRIT,"loopReadFromMixAfter -- Now Exiting!\n");
1419  }
#define MONITORING_FIRE_NET_EVENT(e_type)
SINT32 getRandom(UINT32 *val)
Gets 32 random bits.
Definition: CAUtil.cpp:346
SINT32 msSleep(UINT32 ms)
Sleeps ms milliseconds.
Definition: CAUtil.cpp:406
#define GET_NET_ERROR
Definition: StdAfx.h:469
#define GET_NET_ERROR_STR(x)
Definition: StdAfx.h:471
#define MIX_POOL_TIMEOUT
Definition: StdAfx.h:245
#define SOCKET_ERROR
Definition: StdAfx.h:464
#define THREAD_RETURN_SUCCESS
Definition: StdAfx.h:542
#define MAX_READ_FROM_NEXT_MIX_QUEUE_SIZE
Definition: StdAfx.h:227
#define DUMMY_CHANNEL
Definition: StdAfx.h:246
#define MIX_POOL_SIZE
Definition: StdAfx.h:243
signed int SINT32
Definition: basetypedefs.h:132
unsigned char UINT8
Definition: basetypedefs.h:135
SINT32 unlock()
Unlocks the lockable object by threadsafe decrementing a reference counter.
Definition: CALockAble.hpp:67
static SINT32 printMsg(UINT32 typ, const char *format,...)
Writes a given message to the log.
Definition: CAMsg.cpp:251
This class implements the pool strategie of a Mix.
Definition: CAPool.hpp:43
SINT32 pool(tPoolEntry *pPoolEntry)
Definition: CAPool.cpp:83
This is a simple FIFO-Queue.
Definition: CAQueue.hpp:50
UINT32 getSize()
Returns the size of stored data in byte.
Definition: CAQueue.hpp:101
virtual SINT32 crypt2(const UINT8 *in, UINT8 *out, UINT32 len)=0
#define REPLAY_CONTROL_CHANNEL_ID
const SINT32 E_SUCCESS
Definition: errorcodes.hpp:2
#define E_TIMEDOUT
Definition: errorcodes.hpp:10
@ ev_net_nextConnectionClosed
CAMix * pMix
Definition: proxytest.cpp:75
HCHANNEL channel
Definition: typedefs.hpp:117
UINT8 data[DATA_SIZE]
Definition: typedefs.hpp:121
UINT16 flags
Definition: typedefs.hpp:118
Definition: typedefs.hpp:169
MIXPACKET packet
Definition: typedefs.hpp:170
#define CHANNEL_ALLOWED_FLAGS
Definition: typedefs.hpp:60
#define CHANNEL_DUMMY
Definition: typedefs.hpp:50
UINT32 HCHANNEL
Definition: typedefs.hpp:34
#define CHANNEL_CLOSE
Definition: typedefs.hpp:47
struct t_queue_entry tQueueEntry
Definition: typedefs.hpp:188
tQueueEntry tPoolEntry
Definition: typedefs.hpp:192
#define CHANNEL_SIG_CRIME
Definition: typedefs.hpp:58
#define DATA_SIZE
Definition: typedefs.hpp:69

◆ mm_loopReadFromMixBefore()

THREAD_RETURN mm_loopReadFromMixBefore ( void *  param)

Definition at line 1045 of file CAMiddleMix.cpp.

1046  {
1047  CAMiddleMix* pMix=(CAMiddleMix*)param;
1048  HCHANNEL channelOut = 0;
1049 #ifdef LOG_CRIME
1050  HCHANNEL channelIn = 0;
1051 #endif
1052  tPoolEntry* pPoolEntry=new tPoolEntry;
1053  MIXPACKET* pMixPacket=&pPoolEntry->packet;
1054 
1055  CAQueue* pQueue=pMix->m_pQueueSendToMixAfter;
1056 
1057  CASymChannelCipher* pCipher;
1058  SINT32 ret;
1059  UINT8* tmpRSABuff=new UINT8[RSA_SIZE];
1060  UINT32 rsaOutLen=RSA_SIZE;
1061  CASingleSocketGroup oSocketGroup(false);
1062  oSocketGroup.add(*(pMix->m_pMuxIn));
1063 
1064  #ifdef USE_POOL
1065  CAPool* pPool=new CAPool(MIX_POOL_SIZE);
1066  #endif
1067 
1068  while(pMix->m_bRun)
1069  {
1071  {
1072 #ifdef DEBUG
1073  CAMsg::printMsg(LOG_DEBUG,"CAFirstMix::Queue prev is full!\n");
1074 #endif
1075  msSleep(200);
1076  continue;
1077  }
1078  #ifndef USE_POOL
1079  ret=oSocketGroup.select(1000);
1080  #else
1081  ret=oSocketGroup.select(MIX_POOL_TIMEOUT);
1082  #endif
1083  if(ret!=1)
1084  {
1085  if(ret==E_TIMEDOUT)
1086  {
1087  #ifndef USE_POOL
1088  continue;
1089  #else
1090  pMixPacket->channel=DUMMY_CHANNEL;
1091  pMixPacket->flags=CHANNEL_DUMMY;
1092  getRandom(pMixPacket->data,DATA_SIZE);
1093  pPool->pool(pPoolEntry);
1094 /*
1095  locksem(upstreamSemPreId, SN_EMPTY);
1096  memcpy(upstreamPreBuffer,pPoolEntry, sizeof(tPoolEntry));
1097  unlocksem(upstreamSemPreId, SN_FULL);
1098 
1099  locksem(upstreamSemPostId, SN_FULL);
1100  memcpy(pPoolEntry,upstreamPostBuffer, sizeof(tPoolEntry));
1101  unlocksem(upstreamSemPostId, SN_EMPTY);
1102 */
1103  if(m_pMuxOut->send(pMixPacket)==SOCKET_ERROR)
1104  pMix->m_bRun=false;
1105  #endif
1106  }
1107  else
1108  {
1109  CAMsg::printMsg(LOG_CRIT,"loopUpStream -- Fehler bei select() -- goto ERR!\n");
1110  pMix->m_bRun=false;
1112  }
1113  }
1114  else
1115  {
1116  ret=pMix->m_pMuxIn->receive(pMixPacket);
1117 
1118  if ((ret!=SOCKET_ERROR)&&(pMixPacket->flags & ~CHANNEL_ALLOWED_FLAGS))
1119  {
1120  CAMsg::printMsg(LOG_INFO,"loopUpStream received a packet with invalid flags: %0X . Removing them.\n",(pMixPacket->flags & ~CHANNEL_ALLOWED_FLAGS));
1121  pMixPacket->flags&=CHANNEL_ALLOWED_FLAGS;
1122  }
1123 
1124  if(ret==SOCKET_ERROR)
1125  {
1126 
1127  CAMsg::printMsg(LOG_CRIT,"Fehler beim Empfangen -- Exiting!\n");
1128  pMix->m_bRun=false;
1130  }
1131  #ifdef USE_POOL
1132  else if(pMixPacket->channel==DUMMY_CHANNEL)
1133  {
1134  pMixPacket->flags=CHANNEL_DUMMY;
1135  getRandom(pMixPacket->data,DATA_SIZE);
1136  pPool->pool(pPoolEntry);
1137 /* SGX MIX locksem(upstreamSemPreId, SN_EMPTY);
1138  memcpy(upstreamPreBuffer,pPoolEntry, sizeof(tPoolEntry));
1139  unlocksem(upstreamSemPreId, SN_FULL);
1140  locksem(upstreamSemPostId, SN_FULL);
1141  memcpy(pPoolEntry,upstreamPostBuffer, sizeof(tPoolEntry));
1142  unlocksem(upstreamSemPostId, SN_EMPTY);
1143 */
1144  if(pMix->m_pMuxOut->send(pMixPacket)==SOCKET_ERROR)
1145  pMix->m_bRun=false;
1146  }
1147  #endif
1148 
1149  else //receive successful
1150  {
1151 
1152 /* locksem(pMix->upstreamSemPreId, SN_EMPTY);
1153  memcpy(pMix->upstreamPreBuffer,pPoolEntry, sizeof(tPoolEntry));
1154 SGX MIX unlocksem(pMix->upstreamSemPreId, SN_FULL);
1155  locksem(pMix->upstreamSemPostId, SN_FULL);
1156  memcpy(pPoolEntry,pMix->upstreamPostBuffer, sizeof(tPoolEntry));
1157  unlocksem(pMix->upstreamSemPostId, SN_EMPTY);
1158  if(pMixPacket->channel==0) continue;
1159  #ifdef LOG_CRIME
1160  crimeSurveillanceUpstream(pMixPacket, channelIn);
1161  #endif
1162  pQueue->add(pPoolEntry,sizeof(tPoolEntry));
1163 */
1164 #ifdef LOG_CRIME
1165  channelIn = pMixPacket->channel;
1166 #endif
1167  if(pMix->m_pMiddleMixChannelList->getInToOut(pMixPacket->channel,&channelOut,&pCipher)!=E_SUCCESS)
1168  {//new connection ?
1169  if(pMixPacket->flags & CHANNEL_OPEN) //if not channel-open flag set -->drop this packet
1170  {
1171  #ifdef _DEBUG
1172  CAMsg::printMsg(LOG_DEBUG,"New Connection from previous Mix!\n");
1173  #endif
1174  if (pMix->m_pRSA->decryptOAEP(pMixPacket->data, tmpRSABuff, &rsaOutLen) != E_SUCCESS)
1175  {
1176  CAMsg::printMsg(LOG_ERR, "Received wrongly decrypted Channel Open Packet!\n");
1177  continue;
1178  }
1179  #ifdef REPLAY_DETECTION
1180  // replace time(NULL) with the real timestamp ()
1181  // packet-timestamp + m_u64ReferenceTime
1182  UINT32 stamp=((UINT32)(tmpRSABuff[13]<<16)+(UINT32)(tmpRSABuff[14]<<8)+(UINT32)(tmpRSABuff[15]))*REPLAY_BASE;
1183  if(pMix->m_pReplayDB->insert(tmpRSABuff,stamp+pMix->m_u64ReferenceTime)!=E_SUCCESS)
1184 // if(pMix->m_pReplayDB->insert(tmpRSABuff,time(NULL))!=E_SUCCESS)
1185  {
1186  CAMsg::printMsg(LOG_INFO,"Replay: Duplicate packet ignored.\n");
1187  continue;
1188  }
1189  #endif
1190 
1191  pCipher=CASymChannelCipherFactory::createCipher(CALibProxytest::getOptions()->getSymChannelCipherAlgorithm());
1192  pCipher->setKeys(tmpRSABuff,MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS);
1193  pCipher->crypt1(pMixPacket->data+RSA_SIZE,
1194  pMixPacket->data+rsaOutLen-MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS,
1196  memcpy(pMixPacket->data,tmpRSABuff+MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS,rsaOutLen-MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS);
1198  pMix->m_pMiddleMixChannelList->add(pMixPacket->channel,pCipher,&channelOut);
1199  pMixPacket->channel=channelOut;
1200  #ifdef USE_POOL
1201  pPool->pool(pPoolEntry);
1202  #endif
1203  #ifdef LOG_CRIME
1204  crimeSurveillanceUpstream(pMixPacket, channelIn);
1205  #endif
1206  pQueue->add(pPoolEntry,sizeof(tPoolEntry));
1207  }
1208  }
1209  else
1210  {//established connection
1211  pCipher->crypt1(pMixPacket->data,pMixPacket->data,DATA_SIZE);
1212  pCipher->unlock();
1213  #ifdef USE_POOL
1214  pPool->pool(pPoolEntry);
1215  #endif
1216  if((pMixPacket->flags&CHANNEL_CLOSE)!=0)
1217  {//Channel close received -->remove channel form channellist
1218  pMix->m_pMiddleMixChannelList->remove(pMixPacket->channel);
1219  }
1220  pMixPacket->channel=channelOut;
1221  #ifdef LOG_CRIME
1222  crimeSurveillanceUpstream(pMixPacket, channelIn);
1223  #endif
1224  pQueue->add(pPoolEntry,sizeof(tPoolEntry));
1225  }
1226  }
1227  }
1228  }
1229 
1230  CAMsg::printMsg(LOG_CRIT,"loopReadFromMixBefore -- Exiting clean ups...\n");
1231  pMix->m_bRun=false;
1232  UINT8 b[sizeof(tQueueEntry)+1];
1233  /* write bytes to the send queues to accelerate loop()-joins for the send threads*/
1234  if(pMix->m_pQueueSendToMixBefore!=NULL)
1235  {
1236  pMix->m_pQueueSendToMixBefore->add(b,sizeof(tQueueEntry)+1);
1237  }
1238  if(pMix->m_pQueueSendToMixAfter!=NULL)
1239  {
1240  pMix->m_pQueueSendToMixAfter->add(b,sizeof(tQueueEntry)+1);
1241  }
1242  delete[] tmpRSABuff;
1243  tmpRSABuff = NULL;
1244  delete pPoolEntry;
1245  pPoolEntry = NULL;
1246  #ifdef USE_POOL
1247  delete pPool;
1248  pPool = NULL;
1249  #endif
1250  CAMsg::printMsg(LOG_CRIT,"loopReadFromMixBefore -- Now Exiting!\n");
1252  }
#define RSA_SIZE
#define MIDDLE_MIX_ASYM_PADDING_SIZE
#define MIDDLE_MIX_SIZE_OF_SYMMETRIC_KEYS
#define REPLAY_BASE
#define MAX_READ_FROM_PREV_MIX_QUEUE_SIZE
Definition: StdAfx.h:226
unsigned int UINT32
Definition: basetypedefs.h:131
static CACmdLnOptions * getOptions()
SINT32 add(const void *buff, UINT32 size)
Adds data to the Queue.
Definition: CAQueue.cpp:76
static CASymChannelCipher * createCipher(SYMCHANNELCIPHER_ALGORITHM alg)
virtual SINT32 setKeys(const UINT8 *key, UINT32 keysize)=0
Sets the keys for crypt1() and crypt2() either to the same key (if keysize==KEY_SIZE) or to different...
virtual SINT32 crypt1(const UINT8 *in, UINT8 *out, UINT32 len)=0
@ ev_net_prevConnectionClosed
#define CHANNEL_OPEN
Definition: typedefs.hpp:43

◆ mm_loopSendToMixAfter()

THREAD_RETURN mm_loopSendToMixAfter ( void *  param)

UPSTREAM (to WEB) Take the packets from the Queue and write them to the Socket.

Definition at line 920 of file CAMiddleMix.cpp.

921  {
922  INIT_STACK;
923  BEGIN_STACK("CAFirstMix::fm_loopSendToMixAfter");
924 
925  CAMiddleMix* pMiddleMix = static_cast<CAMiddleMix*>(param);
926  CAQueue* pQueue = pMiddleMix->m_pQueueSendToMixAfter;
927  CAMuxSocket* pMuxSocket=pMiddleMix->m_pMuxOut;
928 
929  UINT32 len;
930  SINT32 ret;
931  tPoolEntry* pPoolEntry=new tPoolEntry;
932  MIXPACKET* pMixPacket=&pPoolEntry->packet;
933  UINT32 u32KeepAliveSendInterval=pMiddleMix->m_u32KeepAliveSendInterval2;
934  while(pMiddleMix->m_bRun)
935  {
936  len=sizeof(tPoolEntry);
937  ret=pQueue->getOrWait((UINT8*)pPoolEntry,&len,u32KeepAliveSendInterval);
938  if(!(pMiddleMix->m_bRun))
939  {
940  CAMsg::printMsg(LOG_INFO,"SendToMixAfter thread: was interrupted.\n");
942  break;
943  }
944  if(ret==E_TIMEDOUT)
945  {//send a dummy as keep-alive-traffic
946  pMixPacket->flags=CHANNEL_DUMMY;
947  pMixPacket->channel=DUMMY_CHANNEL;
948  getRandom(pMixPacket->data,DATA_SIZE);
949  }
950  else if(ret!=E_SUCCESS||len!=sizeof(tQueueEntry))
951  {
953  CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopSendToMixAfter - Error in dequeueing MixPaket\n");
954  CAMsg::printMsg(LOG_ERR,"ret=%i len=%i\n",ret,len);
955  break;
956  }
957  if((pMuxSocket->send(pMixPacket)!=MIXPACKET_SIZE))
958  {
960  CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopSendToMixAfter - Error in sending MixPaket\n");
961  break;
962  }
963 #ifdef LOG_PACKET_TIMES
964  if(!isZero64(pPoolEntry->timestamp_proccessing_start))
965  {
966  getcurrentTimeMicros(pPoolEntry->timestamp_proccessing_end);
967  pMiddleMix->m_pLogPacketStats->addToTimeingStats(*pPoolEntry, pMixPacket->flags, true);
968  }
969 #endif
970  }
971  pMiddleMix->m_bRun = false;
972  delete pPoolEntry;
973  pPoolEntry = NULL;
974  FINISH_STACK("CAFirstMix::fm_loopSendToMixAfter");
975 
976  CAMsg::printMsg(LOG_DEBUG,"Exiting Thread SendToMixAfter\n");
978  }
#define INIT_STACK
Definition: CAThread.hpp:48
#define BEGIN_STACK(methodName)
Definition: CAThread.hpp:49
#define FINISH_STACK(methodName)
Definition: CAThread.hpp:50
SINT32 getcurrentTimeMicros(UINT64 &u64Time)
Gets the current Systemtime in micros seconds.
Definition: CAUtil.cpp:280
bool isZero64(UINT64 &op1)
Definition: CAUtil.hpp:464
UINT32 m_u32KeepAliveSendInterval2
CAQueue * m_pQueueSendToMixAfter
volatile bool m_bRun
CAMuxSocket * m_pMuxOut
SINT32 send(MIXPACKET *pPacket)
Sends a MixPacket over the Network.
SINT32 getOrWait(UINT8 *pbuff, UINT32 *psize)
Gets data from the Queue or waits until some data is available, if the Queue is empty.
Definition: CAQueue.cpp:209
#define MIXPACKET_SIZE
Definition: typedefs.hpp:40
UINT16 len
Definition: typedefs.hpp:0

◆ mm_loopSendToMixBefore()

THREAD_RETURN mm_loopSendToMixBefore ( void *  param)

DOWNSTREAM (to Client) Take the packets from the Queue and write them to the Socket.

Definition at line 981 of file CAMiddleMix.cpp.

982  {
983  INIT_STACK;
984  BEGIN_STACK("CAFirstMix::fm_loopSendToMixBefore");
985 
986  CAMiddleMix* pMiddleMix = static_cast<CAMiddleMix*>(param);
987  CAQueue* pQueue=pMiddleMix->m_pQueueSendToMixBefore;
988  CAMuxSocket* pMuxSocket=pMiddleMix->m_pMuxIn;
989 
990  UINT32 len;
991  SINT32 ret;
992  tPoolEntry* pPoolEntry=new tPoolEntry;
993  MIXPACKET* pMixPacket=&pPoolEntry->packet;
994  UINT32 u32KeepAliveSendInterval=pMiddleMix->m_u32KeepAliveSendInterval;
995  while(pMiddleMix->m_bRun)
996  {
997  len=sizeof(tPoolEntry);
998  ret=pQueue->getOrWait((UINT8*)pPoolEntry,&len,u32KeepAliveSendInterval);
999  if(!(pMiddleMix->m_bRun))
1000  {
1001  CAMsg::printMsg(LOG_INFO,"SendToMixBefore thread: was interrupted.\n");
1002  break;
1003  }
1004  if(ret==E_TIMEDOUT)
1005  {//send a dummy as keep-alive-traffic
1006  pMixPacket->flags=CHANNEL_DUMMY;
1007  pMixPacket->channel=DUMMY_CHANNEL;
1008  getRandom(pMixPacket->data,DATA_SIZE);
1009  }
1010  else if(ret!=E_SUCCESS||len!=sizeof(tQueueEntry))
1011  {
1013  CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopSendToMixBefore - Error in dequeueing MixPaket\n");
1014  CAMsg::printMsg(LOG_ERR,"ret=%i len=%i\n",ret,len);
1015  break;
1016  }
1017  if((pMuxSocket->send(pMixPacket)!=MIXPACKET_SIZE))
1018  {
1020  CAMsg::printMsg(LOG_ERR,"CAFirstMix::lm_loopSendToMixBefore - Error in sending MixPaket\n");
1021  break;
1022  }
1023 #ifdef LOG_PACKET_TIMES
1024  if(!isZero64(pPoolEntry->timestamp_proccessing_start))
1025  {
1026  getcurrentTimeMicros(pPoolEntry->timestamp_proccessing_end);
1027  pMiddleMix->m_pLogPacketStats->addToTimeingStats(*pPoolEntry,pMixPacket->flags,true);
1028  }
1029 #endif
1030  }
1031  pMiddleMix->m_bRun = false;
1032  delete pPoolEntry;
1033  pPoolEntry = NULL;
1034  FINISH_STACK("CAFirstMix::fm_loopSendToMixBefore");
1035 
1036  CAMsg::printMsg(LOG_DEBUG,"Exiting Thread SendToMixBefore\n");
1038  }
CAQueue * m_pQueueSendToMixBefore
CAMuxSocket * m_pMuxIn
UINT32 m_u32KeepAliveSendInterval
Definition: CAMix.hpp:187