00001
00002
00003
00004
00005
00006
00007
00008 #include "reGlobalsExtern.h"
00009 #include "icatHighLevelRoutines.h"
00010 #include "rodsXmsg.h"
00011 #include "getXmsgTicket.h"
00012 #include "sendXmsg.h"
00013 #include "rcvXmsg.h"
00014
00015 #ifdef ADDR_64BITS
00016 #define CAST_PTR_INT (long int)
00017 #else
00018 #define CAST_PTR_INT
00019 #endif
00020
00021 static rcComm_t *xmsgServerConn = NULL;
00022 static rodsEnv myRodsXmsgEnv;
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 int msiXmsgServerConnect(msParam_t* outConnParam, ruleExecInfo_t *rei)
00056 {
00057
00058 rcComm_t *conn;
00059 rodsEnv myRodsEnv;
00060 rErrMsg_t errMsg;
00061 int status;
00062
00063 RE_TEST_MACRO (" Calling msiXmsgServerConnect");
00064
00065 status = getRodsEnv (&myRodsEnv);
00066 if (status < 0) {
00067 rodsLog (LOG_ERROR, "msiXmsgServerConnect: getRodsEnv failed:%i", status);
00068 return(status);
00069 }
00070 conn = rcConnectXmsg (&myRodsEnv, &errMsg);
00071 if (conn == NULL) {
00072 rodsLog (LOG_ERROR,
00073 "msiXmsgServerConnect: rcConnectXmsg failed:%i :%s\n", errMsg.status, errMsg.msg);
00074 return(errMsg.status);
00075 }
00076 status = clientLogin(conn);
00077 if (status != 0) {
00078 rodsLog (LOG_ERROR, "msiXmsgServerConnect: clientLogin failed:%i", status);
00079 return(status);
00080 }
00081
00082 outConnParam->inOutStruct = (void *) conn;
00083 outConnParam->type = (char *) strdup("RcComm_MS_T");
00084
00085 return(0);
00086
00087 }
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122 int msiXmsgCreateStream(msParam_t* inConnParam,
00123 msParam_t* inGgetXmsgTicketInpParam,
00124 msParam_t* outXmsgTicketInfoParam,
00125 ruleExecInfo_t *rei)
00126 {
00127
00128 rcComm_t *conn;
00129 getXmsgTicketInp_t *getXmsgTicketInp;
00130 xmsgTicketInfo_t *outXmsgTicketInfo = NULL;
00131 int status;
00132 int allocFlag = 0;
00133
00134 RE_TEST_MACRO (" Calling msiXmsgCreateStream");
00135
00136 if (inConnParam->inOutStruct == NULL) {
00137 rodsLog (LOG_ERROR,
00138 "msiXmsgCreateStream: input inConnParam is NULL");
00139 return (SYS_INTERNAL_NULL_INPUT_ERR);
00140 }
00141 conn = (rcComm_t *) inConnParam->inOutStruct;
00142
00143 if (inGgetXmsgTicketInpParam->inOutStruct != NULL && !(strcmp(inGgetXmsgTicketInpParam->type, STR_MS_T) == 0 && strcmp((char *)inGgetXmsgTicketInpParam->inOutStruct, "") == 0))
00144 getXmsgTicketInp = (getXmsgTicketInp_t *) inGgetXmsgTicketInpParam->inOutStruct;
00145 else {
00146 getXmsgTicketInp = (getXmsgTicketInp_t *) malloc(sizeof (getXmsgTicketInp_t));
00147 memset (getXmsgTicketInp, 0, sizeof (getXmsgTicketInp_t));
00148 allocFlag = 1;
00149 }
00150
00151 status = rcGetXmsgTicket (conn, getXmsgTicketInp, &outXmsgTicketInfo);
00152 if (status != 0) {
00153 rodsLog (LOG_ERROR, "msiXmsgCreateStream: rcGetXmsgTicket failed:%i", status);
00154 return(status);
00155 }
00156
00157 outXmsgTicketInfoParam->inOutStruct = (void *) outXmsgTicketInfo;
00158 outXmsgTicketInfoParam->type = (char *) strdup(XmsgTicketInfo_MS_T);
00159 if (allocFlag == 1)
00160 free(getXmsgTicketInp);
00161 return(0);
00162
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207 int msiCreateXmsgInp(msParam_t* inMsgNumber,
00208 msParam_t* inMsgType,
00209 msParam_t* inNumberOfReceivers,
00210 msParam_t* inMsg,
00211 msParam_t* inNumberOfDeliverySites,
00212 msParam_t* inDeliveryAddressList,
00213 msParam_t* inDeliveryPortList,
00214 msParam_t* inMiscInfo,
00215 msParam_t* inXmsgTicketInfoParam,
00216 msParam_t* outSendXmsgInpParam,
00217 ruleExecInfo_t *rei)
00218 {
00219
00220
00221 sendXmsgInp_t *sendXmsgInp;
00222 xmsgTicketInfo_t *xmsgTicketInfo;
00223
00224 if (inXmsgTicketInfoParam == NULL) {
00225 rodsLog (LOG_ERROR, "msiSendXmsg: input inXmsgTicketInfoParam is NULL");
00226 return (SYS_INTERNAL_NULL_INPUT_ERR);
00227 }
00228 xmsgTicketInfo = (xmsgTicketInfo_t *) inXmsgTicketInfoParam->inOutStruct;
00229
00230 sendXmsgInp = (sendXmsgInp_t *) malloc(sizeof (sendXmsgInp_t));
00231
00232
00233 sendXmsgInp->ticket.sendTicket = xmsgTicketInfo->sendTicket;
00234 sendXmsgInp->ticket.rcvTicket = xmsgTicketInfo->rcvTicket;
00235 sendXmsgInp->ticket.expireTime = xmsgTicketInfo->expireTime;
00236 sendXmsgInp->ticket.flag = xmsgTicketInfo->flag;
00237 if (!strcmp(inMsgNumber->type,STR_MS_T))
00238 sendXmsgInp->sendXmsgInfo.msgNumber = (uint) atoi( (char*)inMsgNumber->inOutStruct);
00239 else
00240 sendXmsgInp->sendXmsgInfo.msgNumber = (uint) CAST_PTR_INT inMsgNumber->inOutStruct;
00241 strcpy(sendXmsgInp->sendXmsgInfo.msgType, (char*)inMsgType->inOutStruct);
00242 if (!strcmp(inNumberOfReceivers->type,STR_MS_T))
00243 sendXmsgInp->sendXmsgInfo.numRcv = (uint) atoi( (char*)inNumberOfReceivers->inOutStruct);
00244 else
00245 sendXmsgInp->sendXmsgInfo.numRcv = (uint) CAST_PTR_INT inNumberOfReceivers->inOutStruct;
00246 sendXmsgInp->sendXmsgInfo.msg = strdup((char *) inMsg->inOutStruct);
00247 if (!strcmp(inNumberOfDeliverySites->type,STR_MS_T))
00248 sendXmsgInp->sendXmsgInfo.numDeli = (int) atoi( (char*)inNumberOfDeliverySites->inOutStruct);
00249 else
00250 sendXmsgInp->sendXmsgInfo.numDeli = (int) CAST_PTR_INT inNumberOfDeliverySites->inOutStruct;
00251 if (sendXmsgInp->sendXmsgInfo.numDeli == 0) {
00252 sendXmsgInp->sendXmsgInfo.deliAddress = NULL;
00253 sendXmsgInp->sendXmsgInfo.deliPort = NULL;
00254 }
00255 else {
00256 sendXmsgInp->sendXmsgInfo.deliAddress = (char**)inDeliveryAddressList->inOutStruct;
00257 sendXmsgInp->sendXmsgInfo.deliPort = (uint*)inDeliveryPortList->inOutStruct;
00258 }
00259 sendXmsgInp->sendXmsgInfo.miscInfo = strdup((char *) inMiscInfo->inOutStruct);
00260
00261 outSendXmsgInpParam->inOutStruct = (void *) sendXmsgInp;
00262 outSendXmsgInpParam->type = (char *) strdup(SendXmsgInp_MS_T);
00263 return(0);
00264
00265
00266 }
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303 int msiSendXmsg(msParam_t* inConnParam,
00304 msParam_t* inSendXmsgInpParam,
00305 ruleExecInfo_t *rei)
00306 {
00307
00308 rcComm_t *conn;
00309 sendXmsgInp_t *sendXmsgInp;
00310 int status;
00311
00312
00313
00314 RE_TEST_MACRO (" Calling msiSendXmsg");
00315
00316 if (inConnParam->inOutStruct == NULL) {
00317 rodsLog (LOG_ERROR, "msiSendXmsg: input inConnParam is NULL");
00318 return (SYS_INTERNAL_NULL_INPUT_ERR);
00319 }
00320 conn = (rcComm_t *) inConnParam->inOutStruct;
00321
00322 if (inSendXmsgInpParam == NULL) {
00323 rodsLog (LOG_ERROR, "msiSendXmsg: input inSendXmsgInpParam is NULL");
00324 return (SYS_INTERNAL_NULL_INPUT_ERR);
00325 }
00326 sendXmsgInp = (sendXmsgInp_t *) inSendXmsgInpParam->inOutStruct;
00327
00328
00329 status = rcSendXmsg (conn, sendXmsgInp);
00330 if (status < 0) {
00331 rodsLog (LOG_ERROR, "msiSendXmsg: rcSendXmsg failed:%i", status);
00332 return(status);
00333 }
00334 return(status);
00335 }
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378 int msiRcvXmsg(msParam_t* inConnParam,
00379 msParam_t* inTicketNumber,
00380 msParam_t* inMsgNumber,
00381 msParam_t* outMsgType,
00382 msParam_t* outMsg,
00383 msParam_t* outSendUser,
00384 ruleExecInfo_t *rei)
00385 {
00386
00387 rcComm_t *conn;
00388 rcvXmsgInp_t rcvXmsgInp;
00389 rcvXmsgOut_t *rcvXmsgOut = NULL;
00390 xmsgTicketInfo_t *xmsgTicketInfo = NULL;
00391 int status;
00392
00393
00394 RE_TEST_MACRO (" Calling msiRcvXmsg");
00395
00396 if (inConnParam->inOutStruct == NULL) {
00397 rodsLog (LOG_ERROR, "msiRcvXmsg: input inConnParam is NULL");
00398 return (SYS_INTERNAL_NULL_INPUT_ERR);
00399 }
00400 conn = (rcComm_t *) inConnParam->inOutStruct;
00401
00402 memset (&rcvXmsgInp, 0, sizeof (rcvXmsgInp));
00403 if (!strcmp(inTicketNumber->type,XmsgTicketInfo_MS_T)) {
00404 xmsgTicketInfo = (xmsgTicketInfo_t *) inTicketNumber->inOutStruct;
00405 rcvXmsgInp.rcvTicket = xmsgTicketInfo->rcvTicket;
00406 }
00407 else if (!strcmp(inTicketNumber->type,STR_MS_T)) {
00408 rcvXmsgInp.rcvTicket = (uint) atoi( (char*)inTicketNumber->inOutStruct);
00409 }
00410 else
00411 rcvXmsgInp.rcvTicket = (uint) CAST_PTR_INT inTicketNumber->inOutStruct;
00412 if (!strcmp(inMsgNumber->type,STR_MS_T))
00413 rcvXmsgInp.msgNumber = (uint) atoi( (char*)inMsgNumber->inOutStruct);
00414 else
00415 rcvXmsgInp.msgNumber = (uint) CAST_PTR_INT inMsgNumber->inOutStruct;
00416
00417 status = rcRcvXmsg (conn, &rcvXmsgInp, &rcvXmsgOut);
00418 if (status < 0 || rcvXmsgOut == NULL ) {
00419 rodsLog (LOG_ERROR, "msiRcvXmsg: rcRcvXmsg failed:%i", status);
00420 return(status);
00421 }
00422
00423 outMsgType->inOutStruct = (void *) strdup(rcvXmsgOut->msgType);
00424 outMsgType->type = strdup(STR_MS_T);
00425 outMsg->inOutStruct = (void *) rcvXmsgOut->msg;
00426 outMsg->type = strdup(STR_MS_T);
00427 outSendUser->inOutStruct = (void *) strdup(rcvXmsgOut->sendUserName);
00428 outSendUser->type = strdup(STR_MS_T);
00429 return(status);
00430 }
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463 int msiXmsgServerDisConnect(msParam_t* inConnParam, ruleExecInfo_t *rei)
00464 {
00465
00466 rcComm_t *conn;
00467 int status;
00468
00469 RE_TEST_MACRO (" Calling msiXmsgServerDisConnect");
00470
00471 if (inConnParam->inOutStruct == NULL) {
00472 rodsLog (LOG_ERROR, "msiSendXmsg: input inConnParam is NULL");
00473 return (SYS_INTERNAL_NULL_INPUT_ERR);
00474 }
00475 conn = (rcComm_t *) inConnParam->inOutStruct;
00476 status = rcDisconnect(conn);
00477 if (status == 0)
00478 inConnParam->inOutStruct = NULL;
00479 return(status);
00480
00481 }
00482
00483
00484 int _writeXMsg(int streamId, char *hdr, char *msg)
00485 {
00486 int i;
00487 xmsgTicketInfo_t xmsgTicketInfo;
00488 sendXmsgInp_t sendXmsgInp;
00489 rcComm_t *conn;
00490 rErrMsg_t errMsg;
00491 char myHostName[MAX_NAME_LEN];
00492
00493
00494 if (xmsgServerConn == NULL) {
00495 i = getRodsEnv (&myRodsXmsgEnv);
00496 if (i < 0) {
00497 rodsLog (LOG_ERROR, "_writeXMsg: getRodsEnv failed:%i",i);
00498 return(i);
00499 }
00500 conn = rcConnectXmsg (&myRodsXmsgEnv, &errMsg);
00501 if (conn == NULL) {
00502 rodsLog (LOG_ERROR, "_writeXMsg: rcConnectXmsg failed:%i:%s",errMsg.status,errMsg.msg);
00503 return(errMsg.status);
00504 }
00505 i = clientLogin(conn);
00506 if (i != 0) {
00507 rodsLog (LOG_ERROR, "msiXmsgServerConnect: clientLogin failed:%i", i);
00508 rcDisconnect(conn);
00509 return(i);
00510 }
00511 xmsgServerConn = conn;
00512 }
00513 else {
00514 conn = xmsgServerConn;
00515 }
00516 myHostName[0] = '\0';
00517 gethostname (myHostName, MAX_NAME_LEN);
00518
00519 memset (&xmsgTicketInfo, 0, sizeof (xmsgTicketInfo));
00520 memset (&sendXmsgInp, 0, sizeof (sendXmsgInp));
00521 xmsgTicketInfo.sendTicket = streamId;
00522 xmsgTicketInfo.rcvTicket = streamId;
00523 xmsgTicketInfo.flag = 1;
00524 sendXmsgInp.ticket = xmsgTicketInfo;
00525 sendXmsgInp.sendXmsgInfo.numRcv = 1;
00526 sendXmsgInp.sendXmsgInfo.msgNumber = 0;
00527 snprintf(sendXmsgInp.sendXmsgInfo.msgType, HEADER_TYPE_LEN, "%s",hdr);
00528 snprintf(sendXmsgInp.sendAddr, NAME_LEN, "%s:%i", myHostName, getpid ());
00529 sendXmsgInp.sendXmsgInfo.msg = msg;
00530 i = rcSendXmsg (conn, &sendXmsgInp);
00531 if (i < 0)
00532 rodsLog (LOG_NOTICE,"_writeXmsg: Unable to send message to stream %i\n", streamId);
00533
00534 return(i);
00535 }
00536
00537 int _readXMsg(int streamId, char *condRead, int *msgNum, int *seqNum,
00538 char **hdr, char **msg, char **user, char **addr)
00539 {
00540 int i;
00541 rcvXmsgInp_t rcvXmsgInp;
00542 rcvXmsgOut_t *rcvXmsgOut = NULL;
00543 rcComm_t *conn;
00544 rErrMsg_t errMsg;
00545
00546 if (xmsgServerConn == NULL) {
00547 i = getRodsEnv (&myRodsXmsgEnv);
00548 if (i < 0) {
00549 rodsLog (LOG_ERROR, "_readXMsg: getRodsEnv failed:%i",i);
00550 return(i);
00551 }
00552 conn = rcConnectXmsg (&myRodsXmsgEnv, &errMsg);
00553 if (conn == NULL) {
00554 rodsLog (LOG_ERROR, "_readXMsg: rcConnectXmsg failed:%i:%s",errMsg.status,errMsg.msg);
00555 return(errMsg.status);
00556 }
00557 i = clientLogin(conn);
00558 if (i != 0) {
00559 rodsLog (LOG_ERROR, "msiXmsgServerConnect: clientLogin failed:%i", i);
00560 rcDisconnect(conn);
00561 return(i);
00562 }
00563 xmsgServerConn = conn;
00564 }
00565 else {
00566 conn = xmsgServerConn;
00567 }
00568 memset (&rcvXmsgInp, 0, sizeof (rcvXmsgInp));
00569 rcvXmsgInp.rcvTicket = streamId;
00570 rcvXmsgInp.msgNumber = 0;
00571 strncpy(rcvXmsgInp.msgCondition, condRead, MAX_NAME_LEN);
00572 i = rcRcvXmsg (conn, &rcvXmsgInp, &rcvXmsgOut);
00573 if (i < 0 || rcvXmsgOut == NULL ) {
00574
00575 rodsLog (LOG_NOTICE,"_readXmsg: Unable to receive message from stream %i\n", streamId);
00576 return(i);
00577 }
00578 *msgNum = rcvXmsgOut->msgNumber;
00579 *seqNum = rcvXmsgOut->seqNumber;
00580 *hdr = strdup(rcvXmsgOut->msgType);
00581 *msg = strdup(rcvXmsgOut->msg);
00582 *user = strdup(rcvXmsgOut->sendUserName);
00583 *addr = strdup(rcvXmsgOut->sendAddr);
00584
00585 return(i);
00586 }