00001
00002
00003
00004
00005
00006
00007 #ifndef windows_platform
00008 #include <pthread.h>
00009 #endif
00010 #include "xmsgLib.h"
00011 #include "rsApiHandler.h"
00012 #include "reGlobalsExtern.h"
00013 #include "miscServerFunct.h"
00014
00015 #ifndef windows_platform
00016 #ifdef USE_BOOST
00017 #include <boost/thread/thread.hpp>
00018 #include <boost/thread/mutex.hpp>
00019 #include <boost/thread/condition.hpp>
00020 boost::mutex ReqQueCondMutex;
00021 boost::condition_variable ReqQueCond;
00022 boost::thread* ProcReqThread[ NUM_XMSG_THR ];
00023 boost::mutex MessQueCondMutex;
00024 #else
00025 pthread_mutex_t ReqQueCondMutex;
00026 pthread_cond_t ReqQueCond;
00027 pthread_t ProcReqThread[NUM_XMSG_THR];
00028 pthread_mutex_t MessQueCondMutex;
00029 #endif
00030 #endif
00031
00032 xmsgReq_t *XmsgReqHead = NULL;
00033 xmsgReq_t *XmsgReqTail = NULL;
00034
00035 ticketHashQue_t XmsgHashQue[NUM_HASH_SLOT];
00036 xmsgQue_t XmsgQue;
00037
00038 static msParamArray_t XMsgMsParamArray;
00039 int
00040 initThreadEnv ()
00041 {
00042 #ifndef windows_platform
00043 #ifndef USE_BOOST
00044 pthread_mutex_init (&ReqQueCondMutex, NULL);
00045 pthread_cond_init (&ReqQueCond, NULL);
00046 pthread_mutex_init (&MessQueCondMutex, NULL);
00047 #endif
00048 #endif
00049
00050 return (0);
00051 }
00052
00053
00054 int
00055 addXmsgToQues(irodsXmsg_t *irodsXmsg, ticketMsgStruct_t *ticketMsgStruct) {
00056
00057 int status;
00058
00059 #ifndef windows_platform
00060 #ifdef USE_BOOST
00061 MessQueCondMutex.lock();
00062 #else
00063 pthread_mutex_lock (&MessQueCondMutex);
00064 #endif
00065 #endif
00066
00067 addXmsgToXmsgQue (irodsXmsg, &XmsgQue);
00068 status = addXmsgToTicketMsgStruct (irodsXmsg, ticketMsgStruct);
00069
00070 #ifndef windows_platform
00071 #ifdef USE_BOOST
00072 MessQueCondMutex.unlock();
00073 #else
00074 pthread_mutex_unlock (&MessQueCondMutex);
00075 #endif
00076 #endif
00077
00078
00079 return(status);
00080
00081 }
00082
00083 int
00084 addXmsgToXmsgQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00085 {
00086
00087 if (xmsg == NULL || xmsgQue == NULL) {
00088 rodsLog (LOG_ERROR,
00089 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00090 return (SYS_INTERNAL_NULL_INPUT_ERR);
00091 }
00092
00093 xmsg->next = xmsg->prev = NULL;
00094
00095 if (xmsgQue->head == NULL) {
00096 xmsgQue->head = xmsgQue->tail = xmsg;
00097 } else {
00098
00099 xmsgQue->head->prev = xmsg;
00100 xmsg->next = xmsgQue->head;
00101 xmsgQue->head = xmsg;
00102 }
00103
00104 return (0);
00105 }
00106
00107 int
00108 rmXmsgFromXmsgQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00109 {
00110 if (xmsg == NULL || xmsgQue == NULL) {
00111 rodsLog (LOG_ERROR,
00112 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00113 return (SYS_INTERNAL_NULL_INPUT_ERR);
00114 }
00115
00116 if (xmsg->prev == NULL) {
00117
00118 xmsgQue->head = xmsg->next;
00119 } else {
00120 xmsg->prev->next = xmsg->next;
00121 }
00122
00123 if (xmsg->next == NULL) {
00124
00125 xmsgQue->tail = xmsg->prev;
00126 } else {
00127 xmsg->next->prev = xmsg->prev;
00128 }
00129
00130 xmsg->prev = xmsg->next = NULL;
00131
00132 return (0);
00133 }
00134
00135 int
00136 rmXmsgFromXmsgTcketQue (irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
00137 {
00138 if (xmsg == NULL || xmsgQue == NULL) {
00139 rodsLog (LOG_ERROR,
00140 "addXmsgToQue: input xmsg or xmsgQue is NULL");
00141 return (SYS_INTERNAL_NULL_INPUT_ERR);
00142 }
00143
00144 if (xmsg->tprev == NULL) {
00145
00146 xmsgQue->head = xmsg->tnext;
00147 } else {
00148 xmsg->tprev->tnext = xmsg->tnext;
00149 }
00150
00151 if (xmsg->tnext == NULL) {
00152
00153 xmsgQue->tail = xmsg->tprev;
00154 } else {
00155 xmsg->tnext->tprev = xmsg->tprev;
00156 }
00157
00158 xmsg->tprev = xmsg->tnext = NULL;
00159
00160 return (0);
00161 }
00162
00163 int
00164 addXmsgToTicketMsgStruct (irodsXmsg_t *xmsg,
00165 ticketMsgStruct_t *ticketMsgStruct)
00166 {
00167 if (xmsg == NULL || ticketMsgStruct == NULL) {
00168 rodsLog (LOG_ERROR,
00169 "addXmsgToTicketMsgStruct: input xmsg or ticketMsgStruct is NULL");
00170 return (SYS_INTERNAL_NULL_INPUT_ERR);
00171 }
00172
00173
00174 if (xmsg->sendTime + INC_EXPIRE_INT > ticketMsgStruct->ticket.expireTime) {
00175 ticketMsgStruct->ticket.expireTime = xmsg->sendTime + INC_EXPIRE_INT;
00176 }
00177
00178 if (ticketMsgStruct->xmsgQue.head == NULL) {
00179 ticketMsgStruct->xmsgQue.head = ticketMsgStruct->xmsgQue.tail = xmsg;
00180 xmsg->tnext = xmsg->tprev = NULL;
00181 } else {
00182
00183 ticketMsgStruct->xmsgQue.tail->tnext = xmsg;
00184 xmsg->tprev = ticketMsgStruct->xmsgQue.tail;
00185 ticketMsgStruct->xmsgQue.tail = xmsg;
00186 xmsg->tnext = NULL;
00187 }
00188 xmsg->ticketMsgStruct = ticketMsgStruct;
00189 xmsg->seqNumber = ticketMsgStruct->nxtSeqNumber;
00190 ticketMsgStruct->nxtSeqNumber = ticketMsgStruct->nxtSeqNumber + 1;
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203 return (xmsg->seqNumber);
00204 }
00205
00206 int checkMsgCondition(irodsXmsg_t *irodsXmsg, char *msgCond)
00207 {
00208 char condStr[MAX_NAME_LEN * 2], res[MAX_NAME_LEN * 2];
00209
00210 if (msgCond == NULL || strlen(msgCond) == 0)
00211 return(0);
00212
00213 strcpy(condStr,msgCond);
00214
00215 XMsgMsParamArray.msParam[0]->inOutStruct =(char *) irodsXmsg->sendXmsgInfo->msgType;
00216 XMsgMsParamArray.msParam[1]->inOutStruct =(char *) irodsXmsg->sendUserName;
00217 XMsgMsParamArray.msParam[2]->inOutStruct =(char *) irodsXmsg->sendAddr;
00218 XMsgMsParamArray.msParam[3]->inOutStruct =(char *) irodsXmsg->sendXmsgInfo->miscInfo;
00219 * (int *) XMsgMsParamArray.msParam[4]->inOutStruct = (int) irodsXmsg->sendXmsgInfo->msgNumber;
00220 * (int *) XMsgMsParamArray.msParam[5]->inOutStruct = (int) irodsXmsg->seqNumber;
00221 * (int *) XMsgMsParamArray.msParam[6]->inOutStruct = (int) irodsXmsg->sendTime;
00222
00223 if(strcmp(condStr, "") == 0) {
00224 return 0;
00225 }
00226 int ret;
00227 #ifdef RULE_ENGINE_N
00228 int grdf[2];
00229 disableReDebugger(grdf);
00230 ret = !(computeExpression(condStr, &XMsgMsParamArray, NULL, 0, res) == 0);
00231 enableReDebugger(grdf);
00232 #else
00233 int i = replaceMsParams(condStr, &XMsgMsParamArray);
00234 if(i!=0) {
00235 return 1;
00236 }
00237 ret = !computeExpression(condStr, NULL, 0, res);
00238 #endif
00239 return ret;
00240
00241 }
00242
00243
00244
00245
00246
00247 int getIrodsXmsg (rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
00248 {
00249 int status,i ;
00250 irodsXmsg_t *tmpIrodsXmsg;
00251 ticketMsgStruct_t *ticketMsgStruct;
00252 int rcvTicket;
00253 char *msgCond;
00254
00255 rcvTicket = rcvXmsgInp->rcvTicket;
00256 msgCond = rcvXmsgInp->msgCondition;
00257
00258 if (outIrodsXmsg == NULL) {
00259 rodsLog (LOG_ERROR,
00260 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00261 return (SYS_INTERNAL_NULL_INPUT_ERR);
00262 }
00263
00264
00265
00266 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00267
00268 if (status < 0) {
00269 return status;
00270 }
00271
00272
00273
00274 #ifndef windows_platform
00275 #ifdef USE_BOOST
00276 MessQueCondMutex.lock();
00277 #else
00278 pthread_mutex_lock (&MessQueCondMutex);
00279 #endif
00280 #endif
00281
00282 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00283
00284 if (tmpIrodsXmsg == NULL) {
00285 #ifndef windows_platform
00286 #ifdef USE_BOOST
00287 MessQueCondMutex.unlock();
00288 #else
00289 pthread_mutex_unlock (&MessQueCondMutex);
00290 #endif
00291 #endif
00292 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00293 }
00294
00295 while (tmpIrodsXmsg != NULL) {
00296
00297 i = checkMsgCondition(tmpIrodsXmsg, msgCond);
00298 if (i == 0)
00299 break;
00300 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00301 }
00302
00303 *outIrodsXmsg = tmpIrodsXmsg;
00304 if (tmpIrodsXmsg == NULL) {
00305 #ifndef windows_platform
00306 #ifdef USE_BOOST
00307 MessQueCondMutex.unlock();
00308 #else
00309 pthread_mutex_unlock (&MessQueCondMutex);
00310 #endif
00311 #endif
00312 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00313 } else {
00314 return 0;
00315 }
00316 }
00317
00318
00319 #ifdef AAAAA
00320 int getIrodsXmsg (rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
00321 {
00322 int status,i ;
00323 irodsXmsg_t *tmpIrodsXmsg, *prevIrodsXmsg;
00324 ticketMsgStruct_t *ticketMsgStruct;
00325 int rcvTicket;
00326 char *msgCond;
00327
00328 rcvTicket = rcvXmsgInp->rcvTicket;
00329 msgCond = rcvXmsgInp->msgCondition;
00330
00331 if (outIrodsXmsg == NULL) {
00332 rodsLog (LOG_ERROR,
00333 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00334 return (SYS_INTERNAL_NULL_INPUT_ERR);
00335 }
00336
00337
00338
00339 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00340
00341 if (status < 0) {
00342 return status;
00343 }
00344
00345
00346
00347
00348 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00349 prevIrodsXmsg = NULL;
00350
00351 while (tmpIrodsXmsg != NULL) {
00352 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00353
00354 #ifndef windows_platform
00355 pthread_mutex_lock (&MessQueCondMutex);
00356 #endif
00357 if (prevIrodsXmsg == NULL ) {
00358 if (ticketMsgStruct->xmsgQue.head == tmpIrodsXmsg ) {
00359 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00360 break;
00361 }
00362 }
00363 else {
00364 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00365 #ifndef windows_platform
00366 pthread_mutex_unlock (&MessQueCondMutex);
00367 #endif
00368 continue;
00369 }
00370 }
00371 else if ( prevIrodsXmsg->tnext == tmpIrodsXmsg) {
00372 if ((i = checkMsgCondition(tmpIrodsXmsg, msgCond)) == 0 ) {
00373 break;
00374 }
00375 }
00376
00377 #ifndef windows_platform
00378 pthread_mutex_unlock (&MessQueCondMutex);
00379 #endif
00380
00381 }
00382 prevIrodsXmsg = tmpIrodsXmsg;
00383 tmpIrodsXmsg = prevIrodsXmsg->tnext;
00384
00385 }
00386
00387 *outIrodsXmsg = tmpIrodsXmsg;
00388 if (tmpIrodsXmsg == NULL) {
00389 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00390 } else {
00391 return 0;
00392 }
00393 }
00394 #endif
00395
00396 int
00397 getIrodsXmsgByMsgNum (int rcvTicket, int msgNumber,
00398 irodsXmsg_t **outIrodsXmsg)
00399 {
00400 int status;
00401 irodsXmsg_t *tmpIrodsXmsg;
00402 ticketMsgStruct_t *ticketMsgStruct;
00403
00404 if (outIrodsXmsg == NULL) {
00405 rodsLog (LOG_ERROR,
00406 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL");
00407 return (SYS_INTERNAL_NULL_INPUT_ERR);
00408 }
00409
00410
00411
00412 status = getTicketMsgStructByTicket (rcvTicket, &ticketMsgStruct);
00413
00414 if (status < 0) {
00415 return status;
00416 }
00417
00418
00419
00420 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00421
00422 if (msgNumber != ANY_MSG_NUMBER) {
00423 while (tmpIrodsXmsg != NULL) {
00424 if ((int) tmpIrodsXmsg->sendXmsgInfo->msgNumber == msgNumber) break;
00425 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00426 }
00427 }
00428 *outIrodsXmsg = tmpIrodsXmsg;
00429 if (tmpIrodsXmsg == NULL) {
00430 return SYS_NO_XMSG_FOR_MSG_NUMBER;
00431 } else {
00432 return 0;
00433 }
00434 }
00435
00436 int
00437 addTicketToHQue (xmsgTicketInfo_t *ticket, ticketHashQue_t *ticketHQue)
00438 {
00439 int status;
00440
00441 ticketMsgStruct_t *tmpTicketMsgStruct;
00442
00443 if (ticket == NULL || ticketHQue == NULL) {
00444 rodsLog (LOG_ERROR,
00445 "addTicketToHQue: input ticket or ticketHQue is NULL");
00446 return (SYS_INTERNAL_NULL_INPUT_ERR);
00447 }
00448
00449 tmpTicketMsgStruct = (ticketMsgStruct_t*)calloc (1, sizeof (ticketMsgStruct_t));
00450
00451
00452
00453 tmpTicketMsgStruct->ticket = *ticket;
00454 status = addTicketMsgStructToHQue (tmpTicketMsgStruct, ticketHQue);
00455
00456 if (status < 0) {
00457 free (tmpTicketMsgStruct);
00458 }
00459
00460 return (status);
00461 }
00462
00463 int
00464 addTicketMsgStructToHQue (ticketMsgStruct_t *ticketMsgStruct,
00465 ticketHashQue_t *ticketHQue)
00466 {
00467 ticketMsgStruct_t *tmpTicketMsgStruct;
00468
00469 if (ticketMsgStruct == NULL || ticketHQue == NULL) {
00470 rodsLog (LOG_ERROR,
00471 "addTicketMsgStructToHQue: ticketMsgStruct or ticketHQue is NULL");
00472 return (SYS_INTERNAL_NULL_INPUT_ERR);
00473 }
00474
00475 ticketMsgStruct->hnext = ticketMsgStruct->hprev = NULL;
00476 ticketMsgStruct->nxtSeqNumber = 0;
00477 ticketMsgStruct->ticketHashQue = ticketHQue;
00478
00479 if (ticketHQue->head == NULL) {
00480 ticketHQue->head = ticketHQue->tail = ticketMsgStruct;
00481 return (0);
00482 }
00483
00484
00485
00486 tmpTicketMsgStruct = ticketHQue->head;
00487 while (tmpTicketMsgStruct != NULL) {
00488 if (ticketMsgStruct->ticket.rcvTicket ==
00489 tmpTicketMsgStruct->ticket.rcvTicket) {
00490 return (SYS_DUPLICATE_XMSG_TICKET);
00491 } else if (ticketMsgStruct->ticket.rcvTicket >
00492 tmpTicketMsgStruct->ticket.rcvTicket) {
00493 break;
00494 } else {
00495 tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
00496 }
00497 }
00498 if (tmpTicketMsgStruct == NULL) {
00499
00500 ticketHQue->tail->hnext = ticketMsgStruct;
00501 ticketMsgStruct->hprev = ticketHQue->tail;
00502 ticketHQue->tail = ticketMsgStruct;
00503 } else if (tmpTicketMsgStruct == ticketHQue->head) {
00504
00505 ticketHQue->head->hprev = ticketMsgStruct;
00506 ticketMsgStruct->hnext = ticketHQue->head;
00507 ticketHQue->head = ticketMsgStruct;
00508 } else {
00509
00510 ticketMsgStruct->hprev = tmpTicketMsgStruct->hprev;
00511 ticketMsgStruct->hnext = tmpTicketMsgStruct;
00512 tmpTicketMsgStruct->hprev->hnext = ticketMsgStruct;
00513 tmpTicketMsgStruct->hprev = tmpTicketMsgStruct;
00514 }
00515
00516 return (0);
00517 }
00518
00519 int
00520 rmTicketMsgStructFromHQue (ticketMsgStruct_t *ticketMsgStruct,
00521 ticketHashQue_t *ticketHQue)
00522 {
00523 if (ticketMsgStruct == NULL || ticketHQue == NULL) {
00524 rodsLog (LOG_ERROR,
00525 "rmTicketMsgStructFromHQue: ticketMsgStruct or ticketHQue is NULL");
00526 return (SYS_INTERNAL_NULL_INPUT_ERR);
00527 }
00528
00529 if (ticketMsgStruct->hprev == NULL) {
00530
00531 ticketHQue->head = ticketMsgStruct->hnext;
00532 } else {
00533 ticketMsgStruct->hprev->hnext = ticketMsgStruct->hnext;
00534 }
00535
00536 if (ticketMsgStruct->hnext == NULL) {
00537
00538 ticketHQue->tail = ticketMsgStruct->hprev;
00539 } else {
00540 ticketMsgStruct->hnext->hprev = ticketMsgStruct->hprev;
00541 }
00542
00543 ticketMsgStruct->hprev = ticketMsgStruct->hnext = NULL;
00544
00545 return (0);
00546 }
00547
00548
00549
00550 int
00551 addReqToQue (int sock)
00552 {
00553 xmsgReq_t *myXmsgReq;
00554
00555 myXmsgReq = (xmsgReq_t*)calloc (1, sizeof (xmsgReq_t));
00556
00557 myXmsgReq->sock = sock;
00558
00559 #ifndef windows_platform
00560 #ifdef USE_BOOST
00561 ReqQueCondMutex.lock();
00562 #else
00563 pthread_mutex_lock (&ReqQueCondMutex);
00564 #endif
00565 #endif
00566
00567 if (XmsgReqHead == NULL) {
00568 XmsgReqHead = myXmsgReq;
00569 XmsgReqTail = myXmsgReq;
00570 } else {
00571
00572
00573
00574
00575
00576
00577
00578 XmsgReqTail->next = myXmsgReq;
00579 XmsgReqTail = myXmsgReq;
00580 }
00581
00582 #ifndef windows_platform
00583 #ifdef USE_BOOST
00584 ReqQueCond.notify_all();
00585 ReqQueCondMutex.unlock();
00586 #else
00587 pthread_cond_signal (&ReqQueCond);
00588 pthread_mutex_unlock (&ReqQueCondMutex);
00589 #endif
00590 #endif
00591
00592 return (0);
00593 }
00594
00595 xmsgReq_t *
00596 getReqFromQue ()
00597 {
00598 xmsgReq_t *myXmsgReq = NULL;
00599
00600 while (myXmsgReq == NULL) {
00601 #ifndef windows_platform
00602 #ifdef USE_BOOST
00603 ReqQueCondMutex.lock();
00604 #else
00605 pthread_mutex_lock (&ReqQueCondMutex);
00606 #endif
00607 #endif
00608 if (XmsgReqHead != NULL) {
00609 myXmsgReq = XmsgReqHead;
00610 XmsgReqHead = XmsgReqHead->next;
00611 #ifndef windows_platform
00612 #ifdef USE_BOOST
00613 ReqQueCondMutex.unlock();
00614 #else
00615 pthread_mutex_unlock (&ReqQueCondMutex);
00616 #endif
00617 #endif
00618 break;
00619 }
00620
00621 #ifndef windows_platform
00622 #ifdef USE_BOOST
00623 boost::unique_lock<boost::mutex> boost_lock( ReqQueCondMutex );
00624 ReqQueCond.wait( boost_lock );
00625 #else
00626 pthread_cond_wait (&ReqQueCond, &ReqQueCondMutex);
00627 #endif
00628 #endif
00629 if (XmsgReqHead == NULL) {
00630 #ifndef windows_platform
00631 #ifdef USE_BOOST
00632 boost_lock.unlock();
00633 #else
00634 pthread_mutex_unlock (&ReqQueCondMutex);
00635 #endif
00636 #endif
00637 continue;
00638 } else {
00639 myXmsgReq = XmsgReqHead;
00640 XmsgReqHead = XmsgReqHead->next;
00641 #ifndef windows_platform
00642 #ifdef USE_BOOST
00643 boost_lock.unlock();
00644 #else
00645 pthread_mutex_unlock (&ReqQueCondMutex);
00646 #endif
00647 #endif
00648 break;
00649 }
00650 }
00651
00652 return (myXmsgReq);
00653 }
00654
00655 int
00656 startXmsgThreads ()
00657 {
00658 int status = 0;
00659 #ifndef windows_platform
00660 int i;
00661 for (i = 0; i < NUM_XMSG_THR; i++) {
00662 #ifdef USE_BOOST
00663 ProcReqThread[i] = new boost::thread( procReqRoutine );
00664 #else
00665 status = pthread_create(&ProcReqThread[i], NULL,
00666 (void *(*)(void *)) procReqRoutine, (void *) NULL);
00667 #endif
00668 }
00669 #endif
00670
00671 return (status);
00672 }
00673
00674 void
00675 procReqRoutine ()
00676 {
00677 xmsgReq_t *myXmsgReq = NULL;
00678 startupPack_t *startupPack;
00679 rsComm_t rsComm;
00680 int status;
00681 fd_set sockMask;
00682 struct timeval msgTimeout;
00683
00684 while (1) {
00685 myXmsgReq = getReqFromQue ();
00686 if (myXmsgReq == NULL) {
00687
00688 continue;
00689 }
00690
00691 status = readStartupPack (myXmsgReq->sock, &startupPack, NULL);
00692 if (status < 0) {
00693 rodsLog (LOG_ERROR,
00694 "procReqRoutine: readStartupPack error, status = %d", status);
00695 free (myXmsgReq);
00696 continue;
00697 }
00698 memset (&rsComm, 0, sizeof (rsComm));
00699 initRsCommWithStartupPack (&rsComm, startupPack);
00700
00701 if (startupPack != NULL) free (startupPack);
00702
00703 rsComm.sock = myXmsgReq->sock;
00704 status = sendVersion (rsComm.sock, 0, 0, NULL, 0);
00705
00706 if (status < 0) {
00707 sendVersion (rsComm.sock, SYS_AGENT_INIT_ERR, 0, NULL, 0);
00708 free (myXmsgReq);
00709 continue;
00710 }
00711 FD_ZERO(&sockMask);
00712 memset (&msgTimeout, 0, sizeof (msgTimeout));
00713 msgTimeout.tv_sec = REQ_MSG_TIMEOUT_TIME;
00714 while (1) {
00715 int numSock;
00716
00717 FD_SET (rsComm.sock, &sockMask);
00718 while ((numSock = select (rsComm.sock + 1, &sockMask,
00719 (fd_set *) NULL, (fd_set *) NULL, &msgTimeout)) < 0) {
00720 if (errno == EINTR) {
00721 rodsLog (LOG_NOTICE,
00722 "procReqRoutine: select() interrupted");
00723 FD_SET(rsComm.sock, &sockMask);
00724 continue;
00725 } else {
00726 break;
00727 }
00728 }
00729 if (numSock < 0) break;
00730 status = readAndProcClientMsg (&rsComm, 0);
00731 if (status < 0) break;
00732 }
00733 close (rsComm.sock);
00734 free (myXmsgReq);
00735 }
00736 }
00737
00738
00739
00740
00741
00742 int
00743 ticketHashFunc (uint rcvTicket)
00744 {
00745 int mySlot = rcvTicket % NUM_HASH_SLOT;
00746
00747 return (mySlot);
00748 }
00749
00750 int
00751 initXmsgHashQue ()
00752 {
00753
00754 xmsgTicketInfo_t *outXmsgTicketInfo;
00755 time_t thisTime;
00756 int hashSlotNum;
00757
00758 memset (XmsgHashQue, 0, NUM_HASH_SLOT * sizeof (ticketHashQue_t));
00759 memset (&XmsgQue, 0, sizeof (XmsgQue));
00760
00761
00762
00763 thisTime = time (NULL);
00764
00765 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00766 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00767 outXmsgTicketInfo->rcvTicket = 1;
00768 outXmsgTicketInfo->sendTicket = 1;
00769 outXmsgTicketInfo->flag = 1;
00770 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00771 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00772 free( outXmsgTicketInfo );
00773 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00774 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00775 outXmsgTicketInfo->rcvTicket = 2;
00776 outXmsgTicketInfo->sendTicket = 2;
00777 outXmsgTicketInfo->flag = 1;
00778 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00779 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00780 free( outXmsgTicketInfo );
00781
00782 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00783 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00784 outXmsgTicketInfo->rcvTicket = 3;
00785 outXmsgTicketInfo->sendTicket = 3;
00786 outXmsgTicketInfo->flag = 1;
00787 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00788 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00789 free( outXmsgTicketInfo );
00790
00791 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00792 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00793 outXmsgTicketInfo->rcvTicket = 4;
00794 outXmsgTicketInfo->sendTicket = 4;
00795 outXmsgTicketInfo->flag = 1;
00796 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00797 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00798 free( outXmsgTicketInfo );
00799
00800 outXmsgTicketInfo = (xmsgTicketInfo_t*)calloc (1, sizeof (xmsgTicketInfo_t));
00801 outXmsgTicketInfo->expireTime = thisTime + (MAX_EXPIRE_INT * 500);
00802 outXmsgTicketInfo->rcvTicket = 5;
00803 outXmsgTicketInfo->sendTicket = 5;
00804 outXmsgTicketInfo->flag = 1;
00805 hashSlotNum = ticketHashFunc (outXmsgTicketInfo->rcvTicket);
00806 addTicketToHQue (outXmsgTicketInfo, &XmsgHashQue[hashSlotNum]);
00807 free( outXmsgTicketInfo );
00808
00809 addMsParam(&XMsgMsParamArray, "*XHDR",STR_MS_T, NULL,NULL);
00810 addMsParam(&XMsgMsParamArray, "*XUSER",STR_MS_T, NULL,NULL);
00811 addMsParam(&XMsgMsParamArray, "*XADDR",STR_MS_T, NULL,NULL);
00812 addMsParam(&XMsgMsParamArray, "*XMISC",STR_MS_T, NULL,NULL);
00813 addIntParamToArray(&XMsgMsParamArray, "*XMSGNUM",0);
00814 addIntParamToArray(&XMsgMsParamArray, "*XSEQNUM",0);
00815 addIntParamToArray(&XMsgMsParamArray, "*XTIME",0);
00816
00817
00818
00819
00820
00821 return (0);
00822 }
00823
00824 int
00825 getTicketMsgStructByTicket (uint rcvTicket,
00826 ticketMsgStruct_t **outTicketMsgStruct)
00827 {
00828 int hashSlotNum;
00829 ticketMsgStruct_t *tmpTicketMsgStruct;
00830
00831 hashSlotNum = ticketHashFunc (rcvTicket);
00832
00833 tmpTicketMsgStruct = XmsgHashQue[hashSlotNum].head;
00834
00835 while (tmpTicketMsgStruct != NULL) {
00836 if (rcvTicket == tmpTicketMsgStruct->ticket.rcvTicket) {
00837 *outTicketMsgStruct = tmpTicketMsgStruct;
00838 return 0;
00839 } else if (rcvTicket > tmpTicketMsgStruct->ticket.rcvTicket) {
00840 *outTicketMsgStruct = NULL;
00841 return SYS_UNMATCHED_XMSG_TICKET;
00842 } else {
00843 tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
00844 }
00845 }
00846
00847
00848 *outTicketMsgStruct = NULL;
00849 return SYS_UNMATCHED_XMSG_TICKET;
00850 }
00851
00852 int
00853 _rsRcvXmsg (irodsXmsg_t *irodsXmsg, rcvXmsgOut_t *rcvXmsgOut)
00854 {
00855 sendXmsgInfo_t *sendXmsgInfo;
00856 ticketMsgStruct_t *ticketMsgStruct;
00857
00858 if (irodsXmsg == NULL || rcvXmsgOut == NULL) {
00859 rodsLog (LOG_ERROR,
00860 "_rsRcvXmsg: input irodsXmsg or rcvXmsgOut is NULL");
00861 #ifndef windows_platform
00862 #ifdef USE_BOOST
00863 MessQueCondMutex.unlock();
00864 #else
00865 pthread_mutex_unlock (&MessQueCondMutex);
00866 #endif
00867 #endif
00868 return (SYS_INTERNAL_NULL_INPUT_ERR);
00869 }
00870
00871 sendXmsgInfo = irodsXmsg->sendXmsgInfo;
00872 ticketMsgStruct = (ticketMsgStruct_t*)irodsXmsg->ticketMsgStruct;
00873
00874
00875
00876
00877 sendXmsgInfo = irodsXmsg->sendXmsgInfo;
00878
00879 sendXmsgInfo->numRcv--;
00880
00881 if (sendXmsgInfo->numRcv <= 0 && sendXmsgInfo->numDeli <= 0) {
00882
00883 rcvXmsgOut->msg = sendXmsgInfo->msg;
00884 rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
00885 rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
00886 sendXmsgInfo->msg = NULL;
00887 rstrcpy (rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN);
00888 rstrcpy (rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
00889 NAME_LEN);
00890 rstrcpy (rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
00891 NAME_LEN);
00892 rmXmsgFromXmsgQue (irodsXmsg, &XmsgQue);
00893 rmXmsgFromXmsgTcketQue (irodsXmsg, &ticketMsgStruct->xmsgQue);
00894 clearSendXmsgInfo (sendXmsgInfo);
00895
00896 free(sendXmsgInfo);
00897
00898 free (irodsXmsg);
00899
00900
00901
00902
00903
00904
00905
00906
00907 } else {
00908 rcvXmsgOut->msg = strdup (sendXmsgInfo->msg);
00909 rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
00910 rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
00911 rstrcpy (rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN);
00912 rstrcpy (rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
00913 NAME_LEN);
00914 rstrcpy (rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
00915 NAME_LEN);
00916 }
00917 #ifndef windows_platform
00918 #ifdef USE_BOOST
00919 MessQueCondMutex.unlock();
00920 #else
00921 pthread_mutex_unlock (&MessQueCondMutex);
00922 #endif
00923 #endif
00924 return (0);
00925 }
00926
00927 int
00928 clearOneXMessage(ticketMsgStruct_t *ticketMsgStruct, int seqNum)
00929 {
00930
00931
00932 irodsXmsg_t *tmpIrodsXmsg;
00933
00934 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00935 while (tmpIrodsXmsg != NULL) {
00936 if ((int) tmpIrodsXmsg->seqNumber == seqNum) {
00937 rmXmsgFromXmsgQue (tmpIrodsXmsg, &XmsgQue);
00938 rmXmsgFromXmsgTcketQue (tmpIrodsXmsg,&ticketMsgStruct->xmsgQue);
00939 clearSendXmsgInfo (tmpIrodsXmsg->sendXmsgInfo);
00940 free(tmpIrodsXmsg->sendXmsgInfo);
00941 free (tmpIrodsXmsg);
00942 return(0);
00943 }
00944 tmpIrodsXmsg = tmpIrodsXmsg->tnext;
00945 }
00946
00947
00948 return(0);
00949 }
00950
00951 int
00952 clearAllXMessages(ticketMsgStruct_t *ticketMsgStruct)
00953 {
00954
00955 irodsXmsg_t *tmpIrodsXmsg, *tmpIrodsXmsg2;
00956
00957 tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
00958 while (tmpIrodsXmsg != NULL) {
00959 tmpIrodsXmsg2 = tmpIrodsXmsg->tnext;
00960 rmXmsgFromXmsgQue (tmpIrodsXmsg, &XmsgQue);
00961 clearSendXmsgInfo (tmpIrodsXmsg->sendXmsgInfo);
00962
00963 free(tmpIrodsXmsg->sendXmsgInfo);
00964
00965 free (tmpIrodsXmsg);
00966 tmpIrodsXmsg = tmpIrodsXmsg2;
00967 }
00968
00969 ticketMsgStruct->xmsgQue.head = NULL;
00970 ticketMsgStruct->xmsgQue.tail = NULL;
00971 return(0);
00972 }
00973