00001
00002
00003
00004
00005
00006
00007 #include "xmsgLib.h"
00008 #include "rsApiHandler.h"
00009 #include "reGlobalsExtern.h"
00010 #include "miscServerFunct.h"
00011
00012 #include <boost/thread/thread.hpp>
00013 #include <boost/thread/mutex.hpp>
00014 #include <boost/thread/condition.hpp>
00015
00016 boost::mutex ReqQueCondMutex;
00017 boost::condition_variable ReqQueCond;
00018 boost::thread* ProcReqThread[ NUM_XMSG_THR ];
00019 boost::mutex MessQueCondMutex;
00020
00021
00022
00023 #include "eirods_network_factory.h"
00024
00025
00026 xmsgReq_t *XmsgReqHead = NULL;
00027 xmsgReq_t *XmsgReqTail = NULL;
00028
00029 ticketHashQue_t XmsgHashQue[NUM_HASH_SLOT];
00030 xmsgQue_t XmsgQue;
00031
00032 static msParamArray_t XMsgMsParamArray;
00033 int
00034 initThreadEnv () {
00035 return (0);
00036 }
00037
00038
00039 int
00040 addXmsgToQues(irodsXmsg_t *irodsXmsg, ticketMsgStruct_t *ticketMsgStruct) {
00041
00042 int status;
00043
00044 MessQueCondMutex.lock();
00045
00046 addXmsgToXmsgQue (irodsXmsg, &XmsgQue);
00047 status = addXmsgToTicketMsgStruct (irodsXmsg, ticketMsgStruct);
00048
00049 MessQueCondMutex.unlock();
00050
00051 return(status);
00052
00053 }
00054
00055 int
00056 addXmsgToXmsgQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00057 {
00058
00059 if (xmsg == NULL || xmsgQue == NULL) {
00060 rodsLog (LOG_ERROR,
00061 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00062 return (SYS_INTERNAL_NULL_INPUT_ERR);
00063 }
00064
00065 xmsg->next = xmsg->prev = NULL;
00066
00067 if (xmsgQue->head == NULL) {
00068 xmsgQue->head = xmsgQue->tail = xmsg;
00069 } else {
00070
00071 xmsgQue->head->prev = xmsg;
00072 xmsg->next = xmsgQue->head;
00073 xmsgQue->head = xmsg;
00074 }
00075
00076 return (0);
00077 }
00078
00079 int
00080 rmXmsgFromXmsgQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00081 {
00082 if (xmsg == NULL || xmsgQue == NULL) {
00083 rodsLog (LOG_ERROR,
00084 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00085 return (SYS_INTERNAL_NULL_INPUT_ERR);
00086 }
00087
00088 if (xmsg->prev == NULL) {
00089
00090 xmsgQue->head = xmsg->next;
00091 } else {
00092 xmsg->prev->next = xmsg->next;
00093 }
00094
00095 if (xmsg->next == NULL) {
00096
00097 xmsgQue->tail = xmsg->prev;
00098 } else {
00099 xmsg->next->prev = xmsg->prev;
00100 }
00101
00102 xmsg->prev = xmsg->next = NULL;
00103
00104 return (0);
00105 }
00106
00107 int
00108 rmXmsgFromXmsgTcketQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00109 {
00110 if (xmsg == NULL || xmsgQue == NULL) {
00111 rodsLog (LOG_ERROR,
00112 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00113 return (SYS_INTERNAL_NULL_INPUT_ERR);
00114 }
00115
00116 if (xmsg->tprev == NULL) {
00117
00118 xmsgQue->head = xmsg->tnext;
00119 } else {
00120 xmsg->tprev->tnext = xmsg->tnext;
00121 }
00122
00123 if (xmsg->tnext == NULL) {
00124
00125 xmsgQue->tail = xmsg->tprev;
00126 } else {
00127 xmsg->tnext->tprev = xmsg->tprev;
00128 }
00129
00130 xmsg->tprev = xmsg->tnext = NULL;
00131
00132 return (0);
00133 }
00134
00135 int
00136 addXmsgToTicketMsgStruct (irodsXmsg_t *xmsg,
00137 ticketMsgStruct_t *ticketMsgStruct)
00138 {
00139 if (xmsg == NULL || ticketMsgStruct == NULL) {
00140 rodsLog (LOG_ERROR,
00141 "addXmsgToTicketMsgStruct: input xmsg or ticketMsgStruct is NULL");
00142 return (SYS_INTERNAL_NULL_INPUT_ERR);
00143 }
00144
00145
00146 if (xmsg->sendTime + INC_EXPIRE_INT > ticketMsgStruct->ticket.expireTime) {
00147 ticketMsgStruct->ticket.expireTime = xmsg->sendTime + INC_EXPIRE_INT;
00148 }
00149
00150 if (ticketMsgStruct->xmsgQue.head == NULL) {
00151 ticketMsgStruct->xmsgQue.head = ticketMsgStruct->xmsgQue.tail = xmsg;
00152 xmsg->tnext = xmsg->tprev = NULL;
00153 } else {
00154
00155 ticketMsgStruct->xmsgQue.tail->tnext = xmsg;
00156 xmsg->tprev = ticketMsgStruct->xmsgQue.tail;
00157 ticketMsgStruct->xmsgQue.tail = xmsg;
00158 xmsg->tnext = NULL;
00159 }
00160 xmsg->ticketMsgStruct = ticketMsgStruct;
00161 xmsg->seqNumber = ticketMsgStruct->nxtSeqNumber;
00162 ticketMsgStruct->nxtSeqNumber = ticketMsgStruct->nxtSeqNumber + 1;
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175 return (xmsg->seqNumber);
00176 }
00177
00178 int checkMsgCondition(irodsXmsg_t *irodsXmsg, char *msgCond)
00179 {
00180 char condStr[MAX_NAME_LEN * 2], res[MAX_NAME_LEN * 2];
00181
00182 if (msgCond == NULL || strlen(msgCond) == 0)
00183 return(0);
00184
00185 strcpy(condStr,msgCond);
00186
00187 XMsgMsParamArray.msParam[0]->inOutStruct =(char *) irodsXmsg->sendXmsgInfo->msgType;
00188 XMsgMsParamArray.msParam[1]->inOutStruct =(char *) irodsXmsg->sendUserName;
00189 XMsgMsParamArray.msParam[2]->inOutStruct =(char *) irodsXmsg->sendAddr;
00190 XMsgMsParamArray.msParam[3]->inOutStruct =(char *) irodsXmsg->sendXmsgInfo->miscInfo;
00191 * (int *) XMsgMsParamArray.msParam[4]->inOutStruct = (int) irodsXmsg->sendXmsgInfo->msgNumber;
00192 * (int *) XMsgMsParamArray.msParam[5]->inOutStruct = (int) irodsXmsg->seqNumber;
00193 * (int *) XMsgMsParamArray.msParam[6]->inOutStruct = (int) irodsXmsg->sendTime;
00194
00195 if(strcmp(condStr, "") == 0) {
00196 return 0;
00197 }
00198 int ret;
00199 int grdf[2];
00200 disableReDebugger(grdf);
00201 ret = !(computeExpression(condStr, &XMsgMsParamArray, NULL, 0, res) == 0);
00202 enableReDebugger(grdf);
00203 return ret;
00204
00205 }
00206
00207
00208
00209
00210
00211 int getIrodsXmsg (rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
00212 {
00213 int status,i ;
00214 irodsXmsg_t *tmpIrodsXmsg;
00215 ticketMsgStruct_t *ticketMsgStruct;
00216 int rcvTicket;
00217 char *msgCond;
00218
00219 rcvTicket = rcvXmsgInp->rcvTicket;
00220 msgCond = rcvXmsgInp->msgCondition;
00221
00222 if (outIrodsXmsg == NULL) {
00223 rodsLog (LOG_ERROR,
00224 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00225 return (SYS_INTERNAL_NULL_INPUT_ERR);
00226 }
00227
00228
00229
00230 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00231
00232 if (status < 0) {
00233 return status;
00234 }
00235
00236
00237
00238 MessQueCondMutex.lock();
00239
00240 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00241
00242 if (tmpIrodsXmsg == NULL) {
00243 MessQueCondMutex.unlock();
00244 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00245 }
00246
00247 while (tmpIrodsXmsg != NULL) {
00248
00249 i = checkMsgCondition(tmpIrodsXmsg, msgCond);
00250 if (i == 0)
00251 break;
00252 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00253 }
00254
00255 *outIrodsXmsg = tmpIrodsXmsg;
00256 if (tmpIrodsXmsg == NULL) {
00257 MessQueCondMutex.unlock();
00258 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00259 } else {
00260 return 0;
00261 }
00262 }
00263
00264
00265 #ifdef AAAAA
00266 int getIrodsXmsg (rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
00267 {
00268 int status,i ;
00269 irodsXmsg_t *tmpIrodsXmsg, *prevIrodsXmsg;
00270 ticketMsgStruct_t *ticketMsgStruct;
00271 int rcvTicket;
00272 char *msgCond;
00273
00274 rcvTicket = rcvXmsgInp->rcvTicket;
00275 msgCond = rcvXmsgInp->msgCondition;
00276
00277 if (outIrodsXmsg == NULL) {
00278 rodsLog (LOG_ERROR,
00279 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00280 return (SYS_INTERNAL_NULL_INPUT_ERR);
00281 }
00282
00283
00284
00285 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00286
00287 if (status < 0) {
00288 return status;
00289 }
00290
00291
00292
00293
00294 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00295 prevIrodsXmsg = NULL;
00296
00297 while (tmpIrodsXmsg != NULL) {
00298 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00299
00300 #ifndef windows_platform
00301 pthread_mutex_lock (&MessQueCondMutex);
00302 #endif
00303 if (prevIrodsXmsg == NULL ) {
00304 if (ticketMsgStruct->xmsgQue.head == tmpIrodsXmsg ) {
00305 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00306 break;
00307 }
00308 }
00309 else {
00310 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00311 #ifndef windows_platform
00312 pthread_mutex_unlock (&MessQueCondMutex);
00313 #endif
00314 continue;
00315 }
00316 }
00317 else if ( prevIrodsXmsg->tnext == tmpIrodsXmsg) {
00318 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00319 break;
00320 }
00321 }
00322
00323 #ifndef windows_platform
00324 pthread_mutex_unlock (&MessQueCondMutex);
00325 #endif
00326
00327 }
00328 prevIrodsXmsg = tmpIrodsXmsg;
00329 tmpIrodsXmsg = prevIrodsXmsg->tnext;
00330
00331 }
00332
00333 *outIrodsXmsg = tmpIrodsXmsg;
00334 if (tmpIrodsXmsg == NULL) {
00335 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00336 } else {
00337 return 0;
00338 }
00339 }
00340 #endif
00341
00342 int
00343 getIrodsXmsgByMsgNum (int rcvTicket, int msgNumber,
00344 irodsXmsg_t **outIrodsXmsg)
00345 {
00346 int status;
00347 irodsXmsg_t *tmpIrodsXmsg;
00348 ticketMsgStruct_t *ticketMsgStruct;
00349
00350 if (outIrodsXmsg == NULL) {
00351 rodsLog (LOG_ERROR,
00352 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00353 return (SYS_INTERNAL_NULL_INPUT_ERR);
00354 }
00355
00356
00357
00358 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00359
00360 if (status < 0) {
00361 return status;
00362 }
00363
00364
00365
00366 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00367
00368 if (msgNumber != ANY_MSG_NUMBER) {
00369 while (tmpIrodsXmsg != NULL) {
00370 if ((int) tmpIrodsXmsg->sendXmsgInfo->msgNumber == msgNumber) break;
00371 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00372 }
00373 }
00374 *outIrodsXmsg = tmpIrodsXmsg;
00375 if (tmpIrodsXmsg == NULL) {
00376 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00377 } else {
00378 return 0;
00379 }
00380 }
00381
00382 int
00383 addTicketToHQue (xmsgTicketInfo_t *ticket, ticketHashQue_t *ticketHQue)
00384 {
00385 int status;
00386
00387 ticketMsgStruct_t *tmpTicketMsgStruct;
00388
00389 if (ticket == NULL || ticketHQue == NULL) {
00390 rodsLog (LOG_ERROR,
00391 "addTicketToHQue: input ticket or ticketHQue is NULL");
00392 return (SYS_INTERNAL_NULL_INPUT_ERR);
00393 }
00394
00395 tmpTicketMsgStruct = (ticketMsgStruct_t*)calloc (1, sizeof (ticketMsgStruct_t));
00396
00397
00398
00399 tmpTicketMsgStruct->ticket = *ticket;
00400 status = addTicketMsgStructToHQue (tmpTicketMsgStruct, ticketHQue);
00401
00402 if (status < 0) {
00403 free (tmpTicketMsgStruct);
00404 }
00405
00406 return (status);
00407 }
00408
00409 int
00410 addTicketMsgStructToHQue (ticketMsgStruct_t *ticketMsgStruct,
00411 ticketHashQue_t *ticketHQue)
00412 {
00413 ticketMsgStruct_t *tmpTicketMsgStruct;
00414
00415 if (ticketMsgStruct == NULL || ticketHQue == NULL) {
00416 rodsLog (LOG_ERROR,
00417 "addTicketMsgStructToHQue: ticketMsgStruct or ticketHQue is NULL");
00418 return (SYS_INTERNAL_NULL_INPUT_ERR);
00419 }
00420
00421 ticketMsgStruct->hnext = ticketMsgStruct->hprev = NULL;
00422 ticketMsgStruct->nxtSeqNumber = 0;
00423 ticketMsgStruct->ticketHashQue = ticketHQue;
00424
00425 if (ticketHQue->head == NULL) {
00426 ticketHQue->head = ticketHQue->tail = ticketMsgStruct;
00427 return (0);
00428 }
00429
00430
00431
00432 tmpTicketMsgStruct = ticketHQue->head;
00433 while (tmpTicketMsgStruct != NULL) {
00434 if (ticketMsgStruct->ticket.rcvTicket ==
00435 tmpTicketMsgStruct->ticket.rcvTicket) {
00436 return (SYS_DUPLICATE_XMSG_TICKET);
00437 } else if (ticketMsgStruct->ticket.rcvTicket >
00438 tmpTicketMsgStruct->ticket.rcvTicket) {
00439 break;
00440 } else {
00441 tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
00442 }
00443 }
00444 if (tmpTicketMsgStruct == NULL) {
00445
00446 ticketHQue->tail->hnext = ticketMsgStruct;
00447 ticketMsgStruct->hprev = ticketHQue->tail;
00448 ticketHQue->tail = ticketMsgStruct;
00449 } else if (tmpTicketMsgStruct == ticketHQue->head) {
00450
00451 ticketHQue->head->hprev = ticketMsgStruct;
00452 ticketMsgStruct->hnext = ticketHQue->head;
00453 ticketHQue->head = ticketMsgStruct;
00454 } else {
00455
00456 ticketMsgStruct->hprev = tmpTicketMsgStruct->hprev;
00457 ticketMsgStruct->hnext = tmpTicketMsgStruct;
00458 tmpTicketMsgStruct->hprev->hnext = ticketMsgStruct;
00459 tmpTicketMsgStruct->hprev = tmpTicketMsgStruct;
00460 }
00461
00462 return (0);
00463 }
00464
00465 int
00466 rmTicketMsgStructFromHQue (ticketMsgStruct_t *ticketMsgStruct,
00467 ticketHashQue_t *ticketHQue)
00468 {
00469 if (ticketMsgStruct == NULL || ticketHQue == NULL) {
00470 rodsLog (LOG_ERROR,
00471 "rmTicketMsgStructFromHQue: ticketMsgStruct or ticketHQue is NULL");
00472 return (SYS_INTERNAL_NULL_INPUT_ERR);
00473 }
00474
00475 if (ticketMsgStruct->hprev == NULL) {
00476
00477 ticketHQue->head = ticketMsgStruct->hnext;
00478 } else {
00479 ticketMsgStruct->hprev->hnext = ticketMsgStruct->hnext;
00480 }
00481
00482 if (ticketMsgStruct->hnext == NULL) {
00483
00484 ticketHQue->tail = ticketMsgStruct->hprev;
00485 } else {
00486 ticketMsgStruct->hnext->hprev = ticketMsgStruct->hprev;
00487 }
00488
00489 ticketMsgStruct->hprev = ticketMsgStruct->hnext = NULL;
00490
00491 return (0);
00492 }
00493
00494
00495
00496 int
00497 addReqToQue (int sock)
00498 {
00499 xmsgReq_t *myXmsgReq;
00500
00501 myXmsgReq = (xmsgReq_t*)calloc (1, sizeof (xmsgReq_t));
00502
00503 myXmsgReq->sock = sock;
00504
00505 ReqQueCondMutex.lock();
00506
00507 if (XmsgReqHead == NULL) {
00508 XmsgReqHead = myXmsgReq;
00509 XmsgReqTail = myXmsgReq;
00510 } else {
00511
00512
00513
00514
00515
00516
00517
00518 XmsgReqTail->next = myXmsgReq;
00519 XmsgReqTail = myXmsgReq;
00520 }
00521
00522 ReqQueCond.notify_all();
00523 ReqQueCondMutex.unlock();
00524
00525 return (0);
00526 }
00527
00528 xmsgReq_t *
00529 getReqFromQue ()
00530 {
00531 xmsgReq_t *myXmsgReq = NULL;
00532
00533 while (myXmsgReq == NULL) {
00534 ReqQueCondMutex.lock();
00535 if (XmsgReqHead != NULL) {
00536 myXmsgReq = XmsgReqHead;
00537 XmsgReqHead = XmsgReqHead->next;
00538 ReqQueCondMutex.unlock();
00539 break;
00540 }
00541
00542 boost::unique_lock<boost::mutex> boost_lock( ReqQueCondMutex );
00543 ReqQueCond.wait( boost_lock );
00544 if (XmsgReqHead == NULL) {
00545 boost_lock.unlock();
00546 continue;
00547 } else {
00548 myXmsgReq = XmsgReqHead;
00549 XmsgReqHead = XmsgReqHead->next;
00550 boost_lock.unlock();
00551 break;
00552 }
00553 }
00554
00555 return (myXmsgReq);
00556 }
00557
00558 int
00559 startXmsgThreads ()
00560 {
00561 int status = 0;
00562 int i;
00563 for (i = 0; i < NUM_XMSG_THR; i++) {
00564 ProcReqThread[i] = new boost::thread( procReqRoutine );
00565 }
00566
00567 return (status);
00568 }
00569
00570 void
00571 procReqRoutine ()
00572 {
00573 xmsgReq_t *myXmsgReq = NULL;
00574 startupPack_t *startupPack;
00575 rsComm_t rsComm;
00576 int status;
00577 fd_set sockMask;
00578 struct timeval msgTimeout;
00579
00580 while (1) {
00581 myXmsgReq = getReqFromQue ();
00582 if (myXmsgReq == NULL) {
00583
00584 continue;
00585 }
00586
00587 memset (&rsComm, 0, sizeof (rsComm));
00588 rsComm.sock = myXmsgReq->sock;
00589
00590
00591
00592 eirods::network_object_ptr net_obj;
00593 eirods::error ret = eirods::network_factory( &rsComm, net_obj );
00594 if( !ret.ok() ) {
00595 eirods::log( PASS( ret ) );
00596 }
00597
00598 status = readStartupPack( net_obj, &startupPack, NULL );
00599 if (status < 0) {
00600 rodsLog (LOG_ERROR,
00601 "procReqRoutine: readStartupPack error, status = %d", status);
00602 free (myXmsgReq);
00603 continue;
00604 }
00605 initRsCommWithStartupPack (&rsComm, startupPack);
00606
00607 if (startupPack != NULL) free (startupPack);
00608
00609
00610
00611 ret = sendVersion ( net_obj, 0, 0, NULL, 0);
00612 if( !ret.ok() ) {
00613 sendVersion ( net_obj, SYS_AGENT_INIT_ERR, 0, NULL, 0);
00614 free (myXmsgReq);
00615 continue;
00616 }
00617 FD_ZERO(&sockMask);
00618 memset (&msgTimeout, 0, sizeof (msgTimeout));
00619 msgTimeout.tv_sec = REQ_MSG_TIMEOUT_TIME;
00620 while (1) {
00621 int numSock;
00622
00623 FD_SET (rsComm.sock, &sockMask);
00624 while ((numSock = select (rsComm.sock + 1, &sockMask,
00625 (fd_set *) NULL, (fd_set *) NULL, &msgTimeout)) < 0) {
00626 if (errno == EINTR) {
00627 rodsLog (LOG_NOTICE,
00628 "procReqRoutine: select() interrupted");
00629 FD_SET(rsComm.sock, &sockMask);
00630 continue;
00631 } else {
00632 break;
00633 }
00634 }
00635 if (numSock < 0) break;
00636 status = readAndProcClientMsg (&rsComm, 0);
00637 if (status < 0) break;
00638 }
00639 close (rsComm.sock);
00640 free (myXmsgReq);
00641 }
00642 }
00643
00644
00645
00646
00647
00648 int
00649 ticketHashFunc (uint rcvTicket)
00650 {
00651 int mySlot = rcvTicket % NUM_HASH_SLOT;
00652
00653 return (mySlot);
00654 }
00655
00656 int
00657 initXmsgHashQue ()
00658 {
00659
00660 xmsgTicketInfo_t *outXmsgTicketInfo;
00661 time_t thisTime;
00662 int hashSlotNum;
00663
00664 memset (XmsgHashQue, 0, NUM_HASH_SLOT * sizeof (ticketHashQue_t));
00665 memset (&XmsgQue, 0, sizeof (XmsgQue));
00666
00667
00668
00669 thisTime = time (NULL);
00670
00671 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00672 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00673 outXmsgTicketInfo->rcvTicket = 1;
00674 outXmsgTicketInfo->sendTicket = 1;
00675 outXmsgTicketInfo->flag = 1;
00676 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00677 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00678 free( outXmsgTicketInfo );
00679 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00680 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00681 outXmsgTicketInfo->rcvTicket = 2;
00682 outXmsgTicketInfo->sendTicket = 2;
00683 outXmsgTicketInfo->flag = 1;
00684 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00685 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00686 free( outXmsgTicketInfo );
00687
00688 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00689 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00690 outXmsgTicketInfo->rcvTicket = 3;
00691 outXmsgTicketInfo->sendTicket = 3;
00692 outXmsgTicketInfo->flag = 1;
00693 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00694 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00695 free( outXmsgTicketInfo );
00696
00697 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00698 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00699 outXmsgTicketInfo->rcvTicket = 4;
00700 outXmsgTicketInfo->sendTicket = 4;
00701 outXmsgTicketInfo->flag = 1;
00702 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00703 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00704 free( outXmsgTicketInfo );
00705
00706 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00707 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00708 outXmsgTicketInfo->rcvTicket = 5;
00709 outXmsgTicketInfo->sendTicket = 5;
00710 outXmsgTicketInfo->flag = 1;
00711 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00712 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00713 free( outXmsgTicketInfo );
00714
00715 addMsParam(&XMsgMsParamArray, "*XHDR",STR_MS_T, NULL,NULL);
00716 addMsParam(&XMsgMsParamArray, "*XUSER",STR_MS_T, NULL,NULL);
00717 addMsParam(&XMsgMsParamArray, "*XADDR",STR_MS_T, NULL,NULL);
00718 addMsParam(&XMsgMsParamArray, "*XMISC",STR_MS_T, NULL,NULL);
00719 addIntParamToArray(&XMsgMsParamArray, "*XMSGNUM",0);
00720 addIntParamToArray(&XMsgMsParamArray, "*XSEQNUM",0);
00721 addIntParamToArray(&XMsgMsParamArray, "*XTIME",0);
00722
00723
00724
00725
00726
00727 return (0);
00728 }
00729
00730 int
00731 getTicketMsgStructByTicket (uint rcvTicket,
00732 ticketMsgStruct_t **outTicketMsgStruct)
00733 {
00734 int hashSlotNum;
00735 ticketMsgStruct_t *tmpTicketMsgStruct;
00736
00737 hashSlotNum = ticketHashFunc (rcvTicket);
00738
00739 tmpTicketMsgStruct = XmsgHashQue[hashSlotNum].head;
00740
00741 while (tmpTicketMsgStruct != NULL) {
00742 if (rcvTicket == tmpTicketMsgStruct->ticket.rcvTicket) {
00743 *outTicketMsgStruct = tmpTicketMsgStruct;
00744 return 0;
00745 } else if (rcvTicket > tmpTicketMsgStruct->ticket.rcvTicket) {
00746 *outTicketMsgStruct = NULL;
00747 return SYS_UNMATCHED_XMSG_TICKET;
00748 } else {
00749 tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
00750 }
00751 }
00752
00753
00754 *outTicketMsgStruct = NULL;
00755 return SYS_UNMATCHED_XMSG_TICKET;
00756 }
00757
00758 int
00759 _rsRcvXmsg (irodsXmsg_t *irodsXmsg, rcvXmsgOut_t *rcvXmsgOut)
00760 {
00761 sendXmsgInfo_t *sendXmsgInfo;
00762 ticketMsgStruct_t *ticketMsgStruct;
00763
00764 if (irodsXmsg == NULL || rcvXmsgOut == NULL) {
00765 rodsLog (LOG_ERROR,
00766 "_rsRcvXmsg: input irodsXmsg or rcvXmsgOut is NULL");
00767 MessQueCondMutex.unlock();
00768 return (SYS_INTERNAL_NULL_INPUT_ERR);
00769 }
00770
00771 sendXmsgInfo = irodsXmsg->sendXmsgInfo;
00772 ticketMsgStruct = (ticketMsgStruct_t*)irodsXmsg->ticketMsgStruct;
00773
00774
00775
00776
00777 sendXmsgInfo = irodsXmsg->sendXmsgInfo;
00778
00779 sendXmsgInfo->numRcv--;
00780
00781 if (sendXmsgInfo->numRcv <= 0 && sendXmsgInfo->numDeli <= 0) {
00782
00783 rcvXmsgOut->msg = sendXmsgInfo->msg;
00784 rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
00785 rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
00786 sendXmsgInfo->msg = NULL;
00787 rstrcpy (rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN);
00788 rstrcpy (rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
00789 NAME_LEN);
00790 rstrcpy (rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
00791 NAME_LEN);
00792 rmXmsgFromXmsgQue (irodsXmsg, &XmsgQue);
00793 rmXmsgFromXmsgTcketQue (irodsXmsg, &ticketMsgStruct->xmsgQue);
00794 clearSendXmsgInfo (sendXmsgInfo);
00795
00796 free(sendXmsgInfo);
00797
00798 free (irodsXmsg);
00799
00800
00801
00802
00803
00804
00805
00806
00807 } else {
00808 rcvXmsgOut->msg = strdup (sendXmsgInfo->msg);
00809 rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
00810 rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
00811 rstrcpy (rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN);
00812 rstrcpy (rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
00813 NAME_LEN);
00814 rstrcpy (rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
00815 NAME_LEN);
00816 }
00817 MessQueCondMutex.unlock();
00818 return (0);
00819 }
00820
00821 int
00822 clearOneXMessage(ticketMsgStruct_t *ticketMsgStruct, int seqNum)
00823 {
00824
00825
00826 irodsXmsg_t *tmpIrodsXmsg;
00827
00828 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00829 while (tmpIrodsXmsg != NULL) {
00830 if ((int) tmpIrodsXmsg->seqNumber == seqNum) {
00831 rmXmsgFromXmsgQue (tmpIrodsXmsg, &XmsgQue);
00832 rmXmsgFromXmsgTcketQue (tmpIrodsXmsg,&ticketMsgStruct->xmsgQue);
00833 clearSendXmsgInfo (tmpIrodsXmsg->sendXmsgInfo);
00834 free(tmpIrodsXmsg->sendXmsgInfo);
00835 free (tmpIrodsXmsg);
00836 return(0);
00837 }
00838 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00839 }
00840
00841
00842 return(0);
00843 }
00844
00845 int
00846 clearAllXMessages(ticketMsgStruct_t *ticketMsgStruct)
00847 {
00848
00849 irodsXmsg_t *tmpIrodsXmsg, *tmpIrodsXmsg2;
00850
00851 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00852 while (tmpIrodsXmsg != NULL) {
00853 tmpIrodsXmsg2 = tmpIrodsXmsg->tnext;
00854 rmXmsgFromXmsgQue (tmpIrodsXmsg, &XmsgQue);
00855 clearSendXmsgInfo (tmpIrodsXmsg->sendXmsgInfo);
00856
00857 free(tmpIrodsXmsg->sendXmsgInfo);
00858
00859 free (tmpIrodsXmsg);
00860 tmpIrodsXmsg = tmpIrodsXmsg2;
00861 }
00862
00863 ticketMsgStruct->xmsgQue.head = NULL;
00864 ticketMsgStruct->xmsgQue.tail = NULL;
00865 return(0);
00866 }
00867