00001
00002
00003
00004
00005
00006
00007 #ifndef windows_platform
00008 #include <pthread.h>
00009 #endif
00010 #include "xmsgLib.h"
00011 #include "rsApiHandler.h"
00012 #include "reGlobalsExtern.h"
00013 #include "miscServerFunct.h"
00014
00015 #ifndef windows_platform
00016 #ifdef USE_BOOST
00017 #include <boost/thread/thread.hpp>
00018 #include <boost/thread/mutex.hpp>
00019 #include <boost/thread/condition.hpp>
00020 boost::mutex ReqQueCondMutex;
00021 boost::condition_variable ReqQueCond;
00022 boost::thread* ProcReqThread[ NUM_XMSG_THR ];
00023 boost::mutex MessQueCondMutex;
00024 #else
00025 pthread_mutex_t ReqQueCondMutex;
00026 pthread_cond_t ReqQueCond;
00027 pthread_t ProcReqThread[NUM_XMSG_THR];
00028 pthread_mutex_t MessQueCondMutex;
00029 #endif
00030 #endif
00031
00032 xmsgReq_t *XmsgReqHead = NULL;
00033 xmsgReq_t *XmsgReqTail = NULL;
00034
00035 ticketHashQue_t XmsgHashQue[NUM_HASH_SLOT];
00036 xmsgQue_t XmsgQue;
00037
00038 static msParamArray_t XMsgMsParamArray;
00039 int
00040 initThreadEnv ()
00041 {
00042 #ifndef windows_platform
00043 #ifndef USE_BOOST
00044 pthread_mutex_init (&ReqQueCondMutex, NULL);
00045 pthread_cond_init (&ReqQueCond, NULL);
00046 pthread_mutex_init (&MessQueCondMutex, NULL);
00047 #endif
00048 #endif
00049
00050 return (0);
00051 }
00052
00053
00054 int
00055 addXmsgToQues(irodsXmsg_t *irodsXmsg, ticketMsgStruct_t *ticketMsgStruct) {
00056
00057 int status;
00058
00059 #ifndef windows_platform
00060 #ifdef USE_BOOST
00061 MessQueCondMutex.lock();
00062 #else
00063 pthread_mutex_lock (&MessQueCondMutex);
00064 #endif
00065 #endif
00066
00067 addXmsgToXmsgQue (irodsXmsg, &XmsgQue);
00068 status = addXmsgToTicketMsgStruct (irodsXmsg, ticketMsgStruct);
00069
00070 #ifndef windows_platform
00071 #ifdef USE_BOOST
00072 MessQueCondMutex.unlock();
00073 #else
00074 pthread_mutex_unlock (&MessQueCondMutex);
00075 #endif
00076 #endif
00077
00078
00079 return(status);
00080
00081 }
00082
00083 int
00084 addXmsgToXmsgQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00085 {
00086
00087 if (xmsg == NULL || xmsgQue == NULL) {
00088 rodsLog (LOG_ERROR,
00089 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00090 return (SYS_INTERNAL_NULL_INPUT_ERR);
00091 }
00092
00093 xmsg->next = xmsg->prev = NULL;
00094
00095 if (xmsgQue->head == NULL) {
00096 xmsgQue->head = xmsgQue->tail = xmsg;
00097 } else {
00098
00099 xmsgQue->head->prev = xmsg;
00100 xmsg->next = xmsgQue->head;
00101 xmsgQue->head = xmsg;
00102 }
00103
00104 return (0);
00105 }
00106
00107 int
00108 rmXmsgFromXmsgQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00109 {
00110 if (xmsg == NULL || xmsgQue == NULL) {
00111 rodsLog (LOG_ERROR,
00112 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00113 return (SYS_INTERNAL_NULL_INPUT_ERR);
00114 }
00115
00116 if (xmsg->prev == NULL) {
00117
00118 xmsgQue->head = xmsg->next;
00119 } else {
00120 xmsg->prev->next = xmsg->next;
00121 }
00122
00123 if (xmsg->next == NULL) {
00124
00125 xmsgQue->tail = xmsg->prev;
00126 } else {
00127 xmsg->next->prev = xmsg->prev;
00128 }
00129
00130 xmsg->prev = xmsg->next = NULL;
00131
00132 return (0);
00133 }
00134
00135 int
00136 rmXmsgFromXmsgTcketQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00137 {
00138 if (xmsg == NULL || xmsgQue == NULL) {
00139 rodsLog (LOG_ERROR,
00140 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00141 return (SYS_INTERNAL_NULL_INPUT_ERR);
00142 }
00143
00144 if (xmsg->tprev == NULL) {
00145
00146 xmsgQue->head = xmsg->tnext;
00147 } else {
00148 xmsg->tprev->tnext = xmsg->tnext;
00149 }
00150
00151 if (xmsg->tnext == NULL) {
00152
00153 xmsgQue->tail = xmsg->tprev;
00154 } else {
00155 xmsg->tnext->tprev = xmsg->tprev;
00156 }
00157
00158 xmsg->tprev = xmsg->tnext = NULL;
00159
00160 return (0);
00161 }
00162
00163 int
00164 addXmsgToTicketMsgStruct (irodsXmsg_t *xmsg,
00165 ticketMsgStruct_t *ticketMsgStruct)
00166 {
00167 if (xmsg == NULL || ticketMsgStruct == NULL) {
00168 rodsLog (LOG_ERROR,
00169 "addXmsgToTicketMsgStruct: input xmsg or ticketMsgStruct is NULL");
00170 return (SYS_INTERNAL_NULL_INPUT_ERR);
00171 }
00172
00173
00174 if (xmsg->sendTime + INC_EXPIRE_INT > ticketMsgStruct->ticket.expireTime) {
00175 ticketMsgStruct->ticket.expireTime = xmsg->sendTime + INC_EXPIRE_INT;
00176 }
00177
00178 if (ticketMsgStruct->xmsgQue.head == NULL) {
00179 ticketMsgStruct->xmsgQue.head = ticketMsgStruct->xmsgQue.tail = xmsg;
00180 xmsg->tnext = xmsg->tprev = NULL;
00181 } else {
00182
00183 ticketMsgStruct->xmsgQue.tail->tnext = xmsg;
00184 xmsg->tprev = ticketMsgStruct->xmsgQue.tail;
00185 ticketMsgStruct->xmsgQue.tail = xmsg;
00186 xmsg->tnext = NULL;
00187 }
00188 xmsg->ticketMsgStruct = ticketMsgStruct;
00189 xmsg->seqNumber = ticketMsgStruct->nxtSeqNumber;
00190 ticketMsgStruct->nxtSeqNumber = ticketMsgStruct->nxtSeqNumber + 1;
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203 return (xmsg->seqNumber);
00204 }
00205
00206 int checkMsgCondition(irodsXmsg_t *irodsXmsg, char *msgCond)
00207 {
00208 char condStr[MAX_NAME_LEN * 2], res[MAX_NAME_LEN * 2];
00209
00210 if (msgCond == NULL || strlen(msgCond) == 0)
00211 return(0);
00212
00213 strcpy(condStr,msgCond);
00214
00215 XMsgMsParamArray.msParam[0]->inOutStruct =(char *) irodsXmsg->sendXmsgInfo->msgType;
00216 XMsgMsParamArray.msParam[1]->inOutStruct =(char *) irodsXmsg->sendUserName;
00217 XMsgMsParamArray.msParam[2]->inOutStruct =(char *) irodsXmsg->sendAddr;
00218 XMsgMsParamArray.msParam[3]->inOutStruct =(char *) irodsXmsg->sendXmsgInfo->miscInfo;
00219 * (int *) XMsgMsParamArray.msParam[4]->inOutStruct = (int) irodsXmsg->sendXmsgInfo->msgNumber;
00220 * (int *) XMsgMsParamArray.msParam[5]->inOutStruct = (int) irodsXmsg->seqNumber;
00221 * (int *) XMsgMsParamArray.msParam[6]->inOutStruct = (int) irodsXmsg->sendTime;
00222
00223 if(strcmp(condStr, "") == 0) {
00224 return 0;
00225 }
00226 int ret;
00227 int grdf[2];
00228 disableReDebugger(grdf);
00229 ret = !(computeExpression(condStr, &XMsgMsParamArray, NULL, 0, res) == 0);
00230 enableReDebugger(grdf);
00231 return ret;
00232
00233 }
00234
00235
00236
00237
00238
00239 int getIrodsXmsg (rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
00240 {
00241 int status,i ;
00242 irodsXmsg_t *tmpIrodsXmsg;
00243 ticketMsgStruct_t *ticketMsgStruct;
00244 int rcvTicket;
00245 char *msgCond;
00246
00247 rcvTicket = rcvXmsgInp->rcvTicket;
00248 msgCond = rcvXmsgInp->msgCondition;
00249
00250 if (outIrodsXmsg == NULL) {
00251 rodsLog (LOG_ERROR,
00252 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00253 return (SYS_INTERNAL_NULL_INPUT_ERR);
00254 }
00255
00256
00257
00258 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00259
00260 if (status < 0) {
00261 return status;
00262 }
00263
00264
00265
00266 #ifndef windows_platform
00267 #ifdef USE_BOOST
00268 MessQueCondMutex.lock();
00269 #else
00270 pthread_mutex_lock (&MessQueCondMutex);
00271 #endif
00272 #endif
00273
00274 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00275
00276 if (tmpIrodsXmsg == NULL) {
00277 #ifndef windows_platform
00278 #ifdef USE_BOOST
00279 MessQueCondMutex.unlock();
00280 #else
00281 pthread_mutex_unlock (&MessQueCondMutex);
00282 #endif
00283 #endif
00284 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00285 }
00286
00287 while (tmpIrodsXmsg != NULL) {
00288
00289 i = checkMsgCondition(tmpIrodsXmsg, msgCond);
00290 if (i == 0)
00291 break;
00292 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00293 }
00294
00295 *outIrodsXmsg = tmpIrodsXmsg;
00296 if (tmpIrodsXmsg == NULL) {
00297 #ifndef windows_platform
00298 #ifdef USE_BOOST
00299 MessQueCondMutex.unlock();
00300 #else
00301 pthread_mutex_unlock (&MessQueCondMutex);
00302 #endif
00303 #endif
00304 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00305 } else {
00306 return 0;
00307 }
00308 }
00309
00310
00311 #ifdef AAAAA
00312 int getIrodsXmsg (rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
00313 {
00314 int status,i ;
00315 irodsXmsg_t *tmpIrodsXmsg, *prevIrodsXmsg;
00316 ticketMsgStruct_t *ticketMsgStruct;
00317 int rcvTicket;
00318 char *msgCond;
00319
00320 rcvTicket = rcvXmsgInp->rcvTicket;
00321 msgCond = rcvXmsgInp->msgCondition;
00322
00323 if (outIrodsXmsg == NULL) {
00324 rodsLog (LOG_ERROR,
00325 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00326 return (SYS_INTERNAL_NULL_INPUT_ERR);
00327 }
00328
00329
00330
00331 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00332
00333 if (status < 0) {
00334 return status;
00335 }
00336
00337
00338
00339
00340 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00341 prevIrodsXmsg = NULL;
00342
00343 while (tmpIrodsXmsg != NULL) {
00344 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00345
00346 #ifndef windows_platform
00347 pthread_mutex_lock (&MessQueCondMutex);
00348 #endif
00349 if (prevIrodsXmsg == NULL ) {
00350 if (ticketMsgStruct->xmsgQue.head == tmpIrodsXmsg ) {
00351 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00352 break;
00353 }
00354 }
00355 else {
00356 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00357 #ifndef windows_platform
00358 pthread_mutex_unlock (&MessQueCondMutex);
00359 #endif
00360 continue;
00361 }
00362 }
00363 else if ( prevIrodsXmsg->tnext == tmpIrodsXmsg) {
00364 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00365 break;
00366 }
00367 }
00368
00369 #ifndef windows_platform
00370 pthread_mutex_unlock (&MessQueCondMutex);
00371 #endif
00372
00373 }
00374 prevIrodsXmsg = tmpIrodsXmsg;
00375 tmpIrodsXmsg = prevIrodsXmsg->tnext;
00376
00377 }
00378
00379 *outIrodsXmsg = tmpIrodsXmsg;
00380 if (tmpIrodsXmsg == NULL) {
00381 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00382 } else {
00383 return 0;
00384 }
00385 }
00386 #endif
00387
00388 int
00389 getIrodsXmsgByMsgNum (int rcvTicket, int msgNumber,
00390 irodsXmsg_t **outIrodsXmsg)
00391 {
00392 int status;
00393 irodsXmsg_t *tmpIrodsXmsg;
00394 ticketMsgStruct_t *ticketMsgStruct;
00395
00396 if (outIrodsXmsg == NULL) {
00397 rodsLog (LOG_ERROR,
00398 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00399 return (SYS_INTERNAL_NULL_INPUT_ERR);
00400 }
00401
00402
00403
00404 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00405
00406 if (status < 0) {
00407 return status;
00408 }
00409
00410
00411
00412 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00413
00414 if (msgNumber != ANY_MSG_NUMBER) {
00415 while (tmpIrodsXmsg != NULL) {
00416 if ((int) tmpIrodsXmsg->sendXmsgInfo->msgNumber == msgNumber) break;
00417 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00418 }
00419 }
00420 *outIrodsXmsg = tmpIrodsXmsg;
00421 if (tmpIrodsXmsg == NULL) {
00422 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00423 } else {
00424 return 0;
00425 }
00426 }
00427
00428 int
00429 addTicketToHQue (xmsgTicketInfo_t *ticket, ticketHashQue_t *ticketHQue)
00430 {
00431 int status;
00432
00433 ticketMsgStruct_t *tmpTicketMsgStruct;
00434
00435 if (ticket == NULL || ticketHQue == NULL) {
00436 rodsLog (LOG_ERROR,
00437 "addTicketToHQue: input ticket or ticketHQue is NULL");
00438 return (SYS_INTERNAL_NULL_INPUT_ERR);
00439 }
00440
00441 tmpTicketMsgStruct = (ticketMsgStruct_t*)calloc (1, sizeof (ticketMsgStruct_t));
00442
00443
00444
00445 tmpTicketMsgStruct->ticket = *ticket;
00446 status = addTicketMsgStructToHQue (tmpTicketMsgStruct, ticketHQue);
00447
00448 if (status < 0) {
00449 free (tmpTicketMsgStruct);
00450 }
00451
00452 return (status);
00453 }
00454
00455 int
00456 addTicketMsgStructToHQue (ticketMsgStruct_t *ticketMsgStruct,
00457 ticketHashQue_t *ticketHQue)
00458 {
00459 ticketMsgStruct_t *tmpTicketMsgStruct;
00460
00461 if (ticketMsgStruct == NULL || ticketHQue == NULL) {
00462 rodsLog (LOG_ERROR,
00463 "addTicketMsgStructToHQue: ticketMsgStruct or ticketHQue is NULL");
00464 return (SYS_INTERNAL_NULL_INPUT_ERR);
00465 }
00466
00467 ticketMsgStruct->hnext = ticketMsgStruct->hprev = NULL;
00468 ticketMsgStruct->nxtSeqNumber = 0;
00469 ticketMsgStruct->ticketHashQue = ticketHQue;
00470
00471 if (ticketHQue->head == NULL) {
00472 ticketHQue->head = ticketHQue->tail = ticketMsgStruct;
00473 return (0);
00474 }
00475
00476
00477
00478 tmpTicketMsgStruct = ticketHQue->head;
00479 while (tmpTicketMsgStruct != NULL) {
00480 if (ticketMsgStruct->ticket.rcvTicket ==
00481 tmpTicketMsgStruct->ticket.rcvTicket) {
00482 return (SYS_DUPLICATE_XMSG_TICKET);
00483 } else if (ticketMsgStruct->ticket.rcvTicket >
00484 tmpTicketMsgStruct->ticket.rcvTicket) {
00485 break;
00486 } else {
00487 tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
00488 }
00489 }
00490 if (tmpTicketMsgStruct == NULL) {
00491
00492 ticketHQue->tail->hnext = ticketMsgStruct;
00493 ticketMsgStruct->hprev = ticketHQue->tail;
00494 ticketHQue->tail = ticketMsgStruct;
00495 } else if (tmpTicketMsgStruct == ticketHQue->head) {
00496
00497 ticketHQue->head->hprev = ticketMsgStruct;
00498 ticketMsgStruct->hnext = ticketHQue->head;
00499 ticketHQue->head = ticketMsgStruct;
00500 } else {
00501
00502 ticketMsgStruct->hprev = tmpTicketMsgStruct->hprev;
00503 ticketMsgStruct->hnext = tmpTicketMsgStruct;
00504 tmpTicketMsgStruct->hprev->hnext = ticketMsgStruct;
00505 tmpTicketMsgStruct->hprev = tmpTicketMsgStruct;
00506 }
00507
00508 return (0);
00509 }
00510
00511 int
00512 rmTicketMsgStructFromHQue (ticketMsgStruct_t *ticketMsgStruct,
00513 ticketHashQue_t *ticketHQue)
00514 {
00515 if (ticketMsgStruct == NULL || ticketHQue == NULL) {
00516 rodsLog (LOG_ERROR,
00517 "rmTicketMsgStructFromHQue: ticketMsgStruct or ticketHQue is NULL");
00518 return (SYS_INTERNAL_NULL_INPUT_ERR);
00519 }
00520
00521 if (ticketMsgStruct->hprev == NULL) {
00522
00523 ticketHQue->head = ticketMsgStruct->hnext;
00524 } else {
00525 ticketMsgStruct->hprev->hnext = ticketMsgStruct->hnext;
00526 }
00527
00528 if (ticketMsgStruct->hnext == NULL) {
00529
00530 ticketHQue->tail = ticketMsgStruct->hprev;
00531 } else {
00532 ticketMsgStruct->hnext->hprev = ticketMsgStruct->hprev;
00533 }
00534
00535 ticketMsgStruct->hprev = ticketMsgStruct->hnext = NULL;
00536
00537 return (0);
00538 }
00539
00540
00541
00542 int
00543 addReqToQue (int sock)
00544 {
00545 xmsgReq_t *myXmsgReq;
00546
00547 myXmsgReq = (xmsgReq_t*)calloc (1, sizeof (xmsgReq_t));
00548
00549 myXmsgReq->sock = sock;
00550
00551 #ifndef windows_platform
00552 #ifdef USE_BOOST
00553 ReqQueCondMutex.lock();
00554 #else
00555 pthread_mutex_lock (&ReqQueCondMutex);
00556 #endif
00557 #endif
00558
00559 if (XmsgReqHead == NULL) {
00560 XmsgReqHead = myXmsgReq;
00561 XmsgReqTail = myXmsgReq;
00562 } else {
00563
00564
00565
00566
00567
00568
00569
00570 XmsgReqTail->next = myXmsgReq;
00571 XmsgReqTail = myXmsgReq;
00572 }
00573
00574 #ifndef windows_platform
00575 #ifdef USE_BOOST
00576 ReqQueCond.notify_all();
00577 ReqQueCondMutex.unlock();
00578 #else
00579 pthread_cond_signal (&ReqQueCond);
00580 pthread_mutex_unlock (&ReqQueCondMutex);
00581 #endif
00582 #endif
00583
00584 return (0);
00585 }
00586
00587 xmsgReq_t *
00588 getReqFromQue ()
00589 {
00590 xmsgReq_t *myXmsgReq = NULL;
00591
00592 while (myXmsgReq == NULL) {
00593 #ifndef windows_platform
00594 #ifdef USE_BOOST
00595 ReqQueCondMutex.lock();
00596 #else
00597 pthread_mutex_lock (&ReqQueCondMutex);
00598 #endif
00599 #endif
00600 if (XmsgReqHead != NULL) {
00601 myXmsgReq = XmsgReqHead;
00602 XmsgReqHead = XmsgReqHead->next;
00603 #ifndef windows_platform
00604 #ifdef USE_BOOST
00605 ReqQueCondMutex.unlock();
00606 #else
00607 pthread_mutex_unlock (&ReqQueCondMutex);
00608 #endif
00609 #endif
00610 break;
00611 }
00612
00613 #ifndef windows_platform
00614 #ifdef USE_BOOST
00615 boost::unique_lock<boost::mutex> boost_lock( ReqQueCondMutex );
00616 ReqQueCond.wait( boost_lock );
00617 #else
00618 pthread_cond_wait (&ReqQueCond, &ReqQueCondMutex);
00619 #endif
00620 #endif
00621 if (XmsgReqHead == NULL) {
00622 #ifndef windows_platform
00623 #ifdef USE_BOOST
00624 boost_lock.unlock();
00625 #else
00626 pthread_mutex_unlock (&ReqQueCondMutex);
00627 #endif
00628 #endif
00629 continue;
00630 } else {
00631 myXmsgReq = XmsgReqHead;
00632 XmsgReqHead = XmsgReqHead->next;
00633 #ifndef windows_platform
00634 #ifdef USE_BOOST
00635 boost_lock.unlock();
00636 #else
00637 pthread_mutex_unlock (&ReqQueCondMutex);
00638 #endif
00639 #endif
00640 break;
00641 }
00642 }
00643
00644 return (myXmsgReq);
00645 }
00646
00647 int
00648 startXmsgThreads ()
00649 {
00650 int status = 0;
00651 #ifndef windows_platform
00652 int i;
00653 for (i = 0; i < NUM_XMSG_THR; i++) {
00654 #ifdef USE_BOOST
00655 ProcReqThread[i] = new boost::thread( procReqRoutine );
00656 #else
00657 status = pthread_create(&ProcReqThread[i], NULL,
00658 (void *(*)(void *)) procReqRoutine, (void *) NULL);
00659 #endif
00660 }
00661 #endif
00662
00663 return (status);
00664 }
00665
00666 void
00667 procReqRoutine ()
00668 {
00669 xmsgReq_t *myXmsgReq = NULL;
00670 startupPack_t *startupPack;
00671 rsComm_t rsComm;
00672 int status;
00673 fd_set sockMask;
00674 struct timeval msgTimeout;
00675
00676 while (1) {
00677 myXmsgReq = getReqFromQue ();
00678 if (myXmsgReq == NULL) {
00679
00680 continue;
00681 }
00682
00683 status = readStartupPack (myXmsgReq->sock, &startupPack, NULL);
00684 if (status < 0) {
00685 rodsLog (LOG_ERROR,
00686 "procReqRoutine: readStartupPack error, status = %d", status);
00687 free (myXmsgReq);
00688 continue;
00689 }
00690 memset (&rsComm, 0, sizeof (rsComm));
00691 initRsCommWithStartupPack (&rsComm, startupPack);
00692
00693 if (startupPack != NULL) free (startupPack);
00694
00695 rsComm.sock = myXmsgReq->sock;
00696 status = sendVersion (rsComm.sock, 0, 0, NULL, 0);
00697
00698 if (status < 0) {
00699 sendVersion (rsComm.sock, SYS_AGENT_INIT_ERR, 0, NULL, 0);
00700 free (myXmsgReq);
00701 continue;
00702 }
00703 FD_ZERO(&sockMask);
00704 memset (&msgTimeout, 0, sizeof (msgTimeout));
00705 msgTimeout.tv_sec = REQ_MSG_TIMEOUT_TIME;
00706 while (1) {
00707 int numSock;
00708
00709 FD_SET (rsComm.sock, &sockMask);
00710 while ((numSock = select (rsComm.sock + 1, &sockMask,
00711 (fd_set *) NULL, (fd_set *) NULL, &msgTimeout)) < 0) {
00712 if (errno == EINTR) {
00713 rodsLog (LOG_NOTICE,
00714 "procReqRoutine: select() interrupted");
00715 FD_SET(rsComm.sock, &sockMask);
00716 continue;
00717 } else {
00718 break;
00719 }
00720 }
00721 if (numSock < 0) break;
00722 status = readAndProcClientMsg (&rsComm, 0);
00723 if (status < 0) break;
00724 }
00725 close (rsComm.sock);
00726 free (myXmsgReq);
00727 }
00728 }
00729
00730
00731
00732
00733
00734 int
00735 ticketHashFunc (uint rcvTicket)
00736 {
00737 int mySlot = rcvTicket % NUM_HASH_SLOT;
00738
00739 return (mySlot);
00740 }
00741
00742 int
00743 initXmsgHashQue ()
00744 {
00745
00746 xmsgTicketInfo_t *outXmsgTicketInfo;
00747 time_t thisTime;
00748 int hashSlotNum;
00749
00750 memset (XmsgHashQue, 0, NUM_HASH_SLOT * sizeof (ticketHashQue_t));
00751 memset (&XmsgQue, 0, sizeof (XmsgQue));
00752
00753
00754
00755 thisTime = time (NULL);
00756
00757 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00758 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00759 outXmsgTicketInfo->rcvTicket = 1;
00760 outXmsgTicketInfo->sendTicket = 1;
00761 outXmsgTicketInfo->flag = 1;
00762 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00763 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00764 free( outXmsgTicketInfo );
00765 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00766 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00767 outXmsgTicketInfo->rcvTicket = 2;
00768 outXmsgTicketInfo->sendTicket = 2;
00769 outXmsgTicketInfo->flag = 1;
00770 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00771 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00772 free( outXmsgTicketInfo );
00773
00774 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00775 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00776 outXmsgTicketInfo->rcvTicket = 3;
00777 outXmsgTicketInfo->sendTicket = 3;
00778 outXmsgTicketInfo->flag = 1;
00779 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00780 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00781 free( outXmsgTicketInfo );
00782
00783 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00784 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00785 outXmsgTicketInfo->rcvTicket = 4;
00786 outXmsgTicketInfo->sendTicket = 4;
00787 outXmsgTicketInfo->flag = 1;
00788 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00789 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00790 free( outXmsgTicketInfo );
00791
00792 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00793 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00794 outXmsgTicketInfo->rcvTicket = 5;
00795 outXmsgTicketInfo->sendTicket = 5;
00796 outXmsgTicketInfo->flag = 1;
00797 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00798 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00799 free( outXmsgTicketInfo );
00800
00801 addMsParam(&XMsgMsParamArray, "*XHDR",STR_MS_T, NULL,NULL);
00802 addMsParam(&XMsgMsParamArray, "*XUSER",STR_MS_T, NULL,NULL);
00803 addMsParam(&XMsgMsParamArray, "*XADDR",STR_MS_T, NULL,NULL);
00804 addMsParam(&XMsgMsParamArray, "*XMISC",STR_MS_T, NULL,NULL);
00805 addIntParamToArray(&XMsgMsParamArray, "*XMSGNUM",0);
00806 addIntParamToArray(&XMsgMsParamArray, "*XSEQNUM",0);
00807 addIntParamToArray(&XMsgMsParamArray, "*XTIME",0);
00808
00809
00810
00811
00812
00813 return (0);
00814 }
00815
00816 int
00817 getTicketMsgStructByTicket (uint rcvTicket,
00818 ticketMsgStruct_t **outTicketMsgStruct)
00819 {
00820 int hashSlotNum;
00821 ticketMsgStruct_t *tmpTicketMsgStruct;
00822
00823 hashSlotNum = ticketHashFunc (rcvTicket);
00824
00825 tmpTicketMsgStruct = XmsgHashQue[hashSlotNum].head;
00826
00827 while (tmpTicketMsgStruct != NULL) {
00828 if (rcvTicket == tmpTicketMsgStruct->ticket.rcvTicket) {
00829 *outTicketMsgStruct = tmpTicketMsgStruct;
00830 return 0;
00831 } else if (rcvTicket > tmpTicketMsgStruct->ticket.rcvTicket) {
00832 *outTicketMsgStruct = NULL;
00833 return SYS_UNMATCHED_XMSG_TICKET;
00834 } else {
00835 tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
00836 }
00837 }
00838
00839
00840 *outTicketMsgStruct = NULL;
00841 return SYS_UNMATCHED_XMSG_TICKET;
00842 }
00843
00844 int
00845 _rsRcvXmsg (irodsXmsg_t *irodsXmsg, rcvXmsgOut_t *rcvXmsgOut)
00846 {
00847 sendXmsgInfo_t *sendXmsgInfo;
00848 ticketMsgStruct_t *ticketMsgStruct;
00849
00850 if (irodsXmsg == NULL || rcvXmsgOut == NULL) {
00851 rodsLog (LOG_ERROR,
00852 "_rsRcvXmsg: input irodsXmsg or rcvXmsgOut is NULL");
00853 #ifndef windows_platform
00854 #ifdef USE_BOOST
00855 MessQueCondMutex.unlock();
00856 #else
00857 pthread_mutex_unlock (&MessQueCondMutex);
00858 #endif
00859 #endif
00860 return (SYS_INTERNAL_NULL_INPUT_ERR);
00861 }
00862
00863 sendXmsgInfo = irodsXmsg->sendXmsgInfo;
00864 ticketMsgStruct = (ticketMsgStruct_t*)irodsXmsg->ticketMsgStruct;
00865
00866
00867
00868
00869 sendXmsgInfo = irodsXmsg->sendXmsgInfo;
00870
00871 sendXmsgInfo->numRcv--;
00872
00873 if (sendXmsgInfo->numRcv <= 0 && sendXmsgInfo->numDeli <= 0) {
00874
00875 rcvXmsgOut->msg = sendXmsgInfo->msg;
00876 rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
00877 rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
00878 sendXmsgInfo->msg = NULL;
00879 rstrcpy (rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN);
00880 rstrcpy (rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
00881 NAME_LEN);
00882 rstrcpy (rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
00883 NAME_LEN);
00884 rmXmsgFromXmsgQue (irodsXmsg, &XmsgQue);
00885 rmXmsgFromXmsgTcketQue (irodsXmsg, &ticketMsgStruct->xmsgQue);
00886 clearSendXmsgInfo (sendXmsgInfo);
00887
00888 free(sendXmsgInfo);
00889
00890 free (irodsXmsg);
00891
00892
00893
00894
00895
00896
00897
00898
00899 } else {
00900 rcvXmsgOut->msg = strdup (sendXmsgInfo->msg);
00901 rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
00902 rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
00903 rstrcpy (rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN);
00904 rstrcpy (rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
00905 NAME_LEN);
00906 rstrcpy (rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
00907 NAME_LEN);
00908 }
00909 #ifndef windows_platform
00910 #ifdef USE_BOOST
00911 MessQueCondMutex.unlock();
00912 #else
00913 pthread_mutex_unlock (&MessQueCondMutex);
00914 #endif
00915 #endif
00916 return (0);
00917 }
00918
00919 int
00920 clearOneXMessage(ticketMsgStruct_t *ticketMsgStruct, int seqNum)
00921 {
00922
00923
00924 irodsXmsg_t *tmpIrodsXmsg;
00925
00926 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00927 while (tmpIrodsXmsg != NULL) {
00928 if ((int) tmpIrodsXmsg->seqNumber == seqNum) {
00929 rmXmsgFromXmsgQue (tmpIrodsXmsg, &XmsgQue);
00930 rmXmsgFromXmsgTcketQue (tmpIrodsXmsg,&ticketMsgStruct->xmsgQue);
00931 clearSendXmsgInfo (tmpIrodsXmsg->sendXmsgInfo);
00932 free(tmpIrodsXmsg->sendXmsgInfo);
00933 free (tmpIrodsXmsg);
00934 return(0);
00935 }
00936 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00937 }
00938
00939
00940 return(0);
00941 }
00942
00943 int
00944 clearAllXMessages(ticketMsgStruct_t *ticketMsgStruct)
00945 {
00946
00947 irodsXmsg_t *tmpIrodsXmsg, *tmpIrodsXmsg2;
00948
00949 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00950 while (tmpIrodsXmsg != NULL) {
00951 tmpIrodsXmsg2 = tmpIrodsXmsg->tnext;
00952 rmXmsgFromXmsgQue (tmpIrodsXmsg, &XmsgQue);
00953 clearSendXmsgInfo (tmpIrodsXmsg->sendXmsgInfo);
00954
00955 free(tmpIrodsXmsg->sendXmsgInfo);
00956
00957 free (tmpIrodsXmsg);
00958 tmpIrodsXmsg = tmpIrodsXmsg2;
00959 }
00960
00961 ticketMsgStruct->xmsgQue.head = NULL;
00962 ticketMsgStruct->xmsgQue.tail = NULL;
00963 return(0);
00964 }
00965