00001
00002
00003
00004
00005
00006
00007
00008 #ifndef windows_platform
00009 #include <sys/wait.h>
00010 #endif
00011
00012
00013 #include "miscServerFunct.h"
00014 #include "dataObjOpen.h"
00015 #include "dataObjLseek.h"
00016 #include "dataObjOpr.h"
00017 #include "dataObjClose.h"
00018 #include "dataObjWrite.h"
00019 #include "dataObjRead.h"
00020 #include "rcPortalOpr.h"
00021 #include "initServer.h"
00022 #ifdef PARA_OPR
00023 #ifdef USE_BOOST
00024 #include <boost/thread/thread.hpp>
00025 #else
00026 #include <pthread.h>
00027 #endif
00028 #endif
00029 #if !defined(solaris_platform)
00030 char *__loc1;
00031 #endif
00032 #include "rsGlobalExtern.h"
00033 #include "rcGlobalExtern.h"
00034
00035 #include "eirods_stacktrace.h"
00036
00037 int
00038 svrToSvrConnectNoLogin (rsComm_t *rsComm, rodsServerHost_t *rodsServerHost)
00039 {
00040 rErrMsg_t errMsg;
00041 int reconnFlag;
00042
00043
00044 if (rodsServerHost->conn == NULL) {
00045 if (getenv (RECONNECT_ENV) != NULL) {
00046 reconnFlag = RECONN_TIMEOUT;
00047 } else {
00048 reconnFlag = NO_RECONN;
00049 }
00050 rodsServerHost->conn = _rcConnect (rodsServerHost->hostName->name,
00051 ((zoneInfo_t *) rodsServerHost->zoneInfo)->portNum,
00052 rsComm->myEnv.rodsUserName, rsComm->myEnv.rodsZone,
00053 rsComm->clientUser.userName, rsComm->clientUser.rodsZone, &errMsg,
00054 rsComm->connectCnt, reconnFlag);
00055
00056 if (rodsServerHost->conn == NULL) {
00057 if (errMsg.status < 0) {
00058 return (errMsg.status);
00059 } else {
00060 return (SYS_SVR_TO_SVR_CONNECT_FAILED - errno);
00061 }
00062 }
00063 }
00064
00065 return (rodsServerHost->localFlag);
00066 }
00067
00068 int
00069 svrToSvrConnect (rsComm_t *rsComm, rodsServerHost_t *rodsServerHost)
00070 {
00071 int status;
00072
00073 status = svrToSvrConnectNoLogin (rsComm, rodsServerHost);
00074
00075 if (status < 0) {
00076 return status;
00077 }
00078
00079 status = clientLogin (rodsServerHost->conn);
00080 if (status < 0) {
00081 rodsLog (LOG_NOTICE,
00082 "svrToSvrConnect: clientLogin to %s failed",
00083 rodsServerHost->hostName->name);
00084 return (status);
00085 } else {
00086 return (rodsServerHost->localFlag);
00087 }
00088 }
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100 int
00101 setupSrvPortalForParaOpr (rsComm_t *rsComm, dataOprInp_t *dataOprInp,
00102 int oprType, portalOprOut_t **portalOprOut)
00103 {
00104 portalOprOut_t *myDataObjPutOut;
00105 int portalSock;
00106 int proto;
00107
00108 #ifdef RBUDP_TRANSFER
00109 if (getValByKey (&dataOprInp->condInput, RBUDP_TRANSFER_KW) != NULL) {
00110 proto = SOCK_DGRAM;
00111 } else {
00112 proto = SOCK_STREAM;
00113 }
00114 #else
00115 proto = SOCK_STREAM;
00116 #endif
00117
00118 myDataObjPutOut = (portalOprOut_t *) malloc (sizeof (portalOprOut_t));
00119 memset (myDataObjPutOut, 0, sizeof (portalOprOut_t));
00120
00121 *portalOprOut = myDataObjPutOut;
00122
00123 if (getValByKey (&dataOprInp->condInput, STREAMING_KW) != NULL ||
00124 proto == SOCK_DGRAM) {
00125
00126 myDataObjPutOut->numThreads = 1;
00127 } else {
00128 myDataObjPutOut->numThreads = getNumThreads (rsComm,
00129 dataOprInp->dataSize, dataOprInp->numThreads,
00130 &dataOprInp->condInput,
00131
00132 getValByKey (&dataOprInp->condInput, RESC_HIER_STR_KW), NULL);
00133 }
00134
00135 if (myDataObjPutOut->numThreads == 0) {
00136 return 0;
00137 } else {
00138 portalOpr_t *myPortalOpr;
00139
00140
00141 portalSock = createSrvPortal (rsComm, &myDataObjPutOut->portList,
00142 proto);
00143 if (portalSock < 0) {
00144 rodsLog (LOG_NOTICE,
00145 "setupSrvPortalForParaOpr: createSrvPortal error, ststus = %d",
00146 portalSock);
00147 myDataObjPutOut->status = portalSock;
00148 return portalSock;
00149 }
00150 myPortalOpr = rsComm->portalOpr =
00151 (portalOpr_t *) malloc (sizeof (portalOpr_t));
00152 myPortalOpr->oprType = oprType;
00153 myPortalOpr->portList = myDataObjPutOut->portList;
00154 myPortalOpr->dataOprInp = *dataOprInp;
00155 memset (&dataOprInp->condInput, 0, sizeof (dataOprInp->condInput));
00156 myPortalOpr->dataOprInp.numThreads = myDataObjPutOut->numThreads;
00157 }
00158 return (0);
00159 }
00160
00161
00162
00163
00164
00165
00166 int
00167 createSrvPortal (rsComm_t *rsComm, portList_t *thisPortList, int proto)
00168 {
00169 int lsock = -1;
00170 int lport = 0;
00171 char *laddr = NULL;
00172 int udpsock = -1;
00173 int udpport = 0;
00174 char *udpaddr = NULL;
00175
00176
00177 if (proto != SOCK_DGRAM && proto != SOCK_STREAM) {
00178 rodsLog (LOG_ERROR,
00179 "createSrvPortal: invalid input protocol %d", proto);
00180 return SYS_INVALID_PROTOCOL_TYPE;
00181 }
00182
00183 if ((lsock = svrSockOpenForInConn (rsComm, &lport, &laddr,
00184 SOCK_STREAM)) < 0) {
00185 rodsLog (LOG_ERROR,
00186 "createSrvPortal: svrSockOpenForInConn failed: status=%d",
00187 lsock);
00188 return lsock;
00189 }
00190
00191 thisPortList->sock = lsock;
00192 thisPortList->cookie = random ();
00193 if (ProcessType == CLIENT_PT) {
00194 rstrcpy (thisPortList->hostAddr, laddr, LONG_NAME_LEN);
00195 } else {
00196 struct hostent *hostEnt;
00197
00198 if (LocalServerHost != NULL &&
00199 strcmp (LocalServerHost->hostName->name, "localhost") != 0 &&
00200 (hostEnt = gethostbyname (LocalServerHost->hostName->name)) != NULL){
00201 rstrcpy (thisPortList->hostAddr, hostEnt->h_name, LONG_NAME_LEN);
00202 } else {
00203 rstrcpy (thisPortList->hostAddr, laddr, LONG_NAME_LEN);
00204 }
00205 }
00206 free (laddr);
00207 thisPortList->portNum = lport;
00208 thisPortList->windowSize = rsComm->windowSize;
00209
00210 listen (lsock, SOMAXCONN);
00211
00212 if (proto == SOCK_DGRAM) {
00213 if ((udpsock = svrSockOpenForInConn (rsComm, &udpport, &udpaddr,
00214 SOCK_DGRAM)) < 0) {
00215 rodsLog (LOG_ERROR,
00216 "setupSrvPortal- sockOpenForInConn of SOCK_DGRAM failed: stat=%d",
00217 udpsock);
00218 CLOSE_SOCK (lsock);
00219 return udpsock;
00220 } else {
00221 addUdpPortToPortList (thisPortList, udpport);
00222 addUdpSockToPortList (thisPortList, udpsock);
00223 }
00224 }
00225 free (udpaddr);
00226
00227 return (lsock);
00228 }
00229
00230 int
00231 acceptSrvPortal (rsComm_t *rsComm, portList_t *thisPortList)
00232 {
00233 int myFd = -1;
00234 int myCookie;
00235 int nbytes;
00236 fd_set basemask;
00237 int nSockets, nSelected;
00238 int lsock = getTcpSockFromPortList (thisPortList);
00239 struct timeval selectTimeout;
00240
00241 nSockets = lsock + 1;
00242 FD_ZERO(&basemask);
00243 FD_SET(lsock, &basemask);
00244
00245
00246
00247 selectTimeout.tv_sec = SELECT_TIMEOUT_FOR_CONN;
00248 selectTimeout.tv_usec = 0;
00249
00250 while ((nSelected = select(nSockets, &basemask,
00251 (fd_set *) NULL, (fd_set *) NULL, &selectTimeout)) < 0) {
00252 if (errno == EINTR) {
00253 rodsLog (LOG_ERROR, "acceptSrvPortal: select interrupted\n");
00254 continue;
00255 }
00256 rodsLog (LOG_ERROR, "acceptSrvPortal: select select failed, errno = %d",
00257 errno);
00258 }
00259 myFd = accept (lsock, 0, 0);
00260 if (myFd < 0) {
00261 rodsLog (LOG_NOTICE,
00262 "acceptSrvPortal() -- accept() failed: errno=%d",
00263 errno);
00264 return SYS_SOCK_ACCEPT_ERR - errno;
00265 } else {
00266 rodsSetSockOpt (myFd, rsComm->windowSize);
00267 }
00268 #ifdef _WIN32
00269 nbytes = recv (myFd,&myCookie,sizeof(myCookie),0);
00270 #else
00271 nbytes = read (myFd, &myCookie,sizeof (myCookie));
00272 #endif
00273 myCookie = ntohl (myCookie);
00274 if (nbytes != sizeof (myCookie) || myCookie != thisPortList->cookie) {
00275 rodsLog (LOG_NOTICE,
00276 "acceptSrvPortal: cookie err, bytes read=%d,cookie=%d,inCookie=%d",
00277 nbytes, thisPortList->cookie, myCookie);
00278 CLOSE_SOCK (myFd);
00279 return SYS_PORT_COOKIE_ERR;
00280 }
00281 return (myFd);
00282 }
00283
00284 int
00285 svrPortalPutGet (rsComm_t *rsComm)
00286 {
00287 portalOpr_t *myPortalOpr;
00288 dataOprInp_t *dataOprInp;
00289 portList_t *thisPortList;
00290 rodsLong_t size0, size1, offset0;
00291 int lsock, portalFd;
00292 int i;
00293 int numThreads;
00294 portalTransferInp_t myInput[MAX_NUM_CONFIG_TRAN_THR];
00295 #ifdef PARA_OPR
00296 #ifdef USE_BOOST
00297 boost::thread* tid[MAX_NUM_CONFIG_TRAN_THR];
00298 #else
00299 pthread_t tid[MAX_NUM_CONFIG_TRAN_THR];
00300 #endif
00301 #endif
00302 int oprType;
00303 int flags = 0;
00304 int retVal = 0;
00305
00306 myPortalOpr = rsComm->portalOpr;
00307
00308 if (myPortalOpr == NULL) {
00309 rodsLog (LOG_NOTICE, "svrPortalPut: NULL myPortalOpr");
00310 return (SYS_INTERNAL_NULL_INPUT_ERR);
00311 }
00312
00313 thisPortList = &myPortalOpr->portList;
00314 if (thisPortList == NULL) {
00315 rodsLog (LOG_NOTICE, "svrPortalPut: NULL portList");
00316 return (SYS_INTERNAL_NULL_INPUT_ERR);
00317 }
00318
00319 if (getUdpPortFromPortList (thisPortList) != 0) {
00320 #ifdef RBUDP_TRANSFER
00321
00322 retVal = svrPortalPutGetRbudp (rsComm);
00323 return retVal;
00324 #else
00325 return SYS_UDP_NO_SUPPORT_ERR;
00326 #endif
00327 }
00328
00329 oprType = myPortalOpr->oprType;
00330 dataOprInp = &myPortalOpr->dataOprInp;
00331
00332 if (getValByKey (&dataOprInp->condInput, STREAMING_KW)!= NULL) {
00333 flags |= STREAMING_FLAG;
00334 }
00335
00336 numThreads = dataOprInp->numThreads;
00337
00338 if (numThreads <= 0 || numThreads > MAX_NUM_CONFIG_TRAN_THR) {
00339 rodsLog (LOG_NOTICE,
00340 "svrPortalPut: numThreads %d out of range");
00341 return (SYS_INTERNAL_NULL_INPUT_ERR);
00342 }
00343
00344 memset (myInput, 0, sizeof (myInput));
00345 #ifdef PARA_OPR
00346 memset (tid, 0, sizeof (tid));
00347 #endif
00348
00349 size0 = dataOprInp->dataSize / numThreads;
00350 size1 = dataOprInp->dataSize - size0 * (numThreads - 1);
00351 offset0 = dataOprInp->offset;
00352
00353 lsock = getTcpSockFromPortList (thisPortList);
00354
00355
00356 portalFd = acceptSrvPortal (rsComm, thisPortList);
00357 if (portalFd < 0) {
00358 rodsLog (LOG_NOTICE,
00359 "svrPortalPut: acceptSrvPortal error. errno = %d",
00360 errno);
00361
00362 CLOSE_SOCK (lsock);
00363
00364 return (portalFd);
00365 }
00366
00367 if (oprType == PUT_OPR) {
00368 fillPortalTransferInp (&myInput[0], rsComm,
00369 portalFd, dataOprInp->destL3descInx, 0, dataOprInp->destRescTypeInx,
00370 0, size0, offset0, flags);
00371 } else {
00372 fillPortalTransferInp (&myInput[0], rsComm,
00373 dataOprInp->srcL3descInx, portalFd, dataOprInp->srcRescTypeInx, 0,
00374 0, size0, offset0, flags);
00375 }
00376
00377 if (numThreads == 1) {
00378 if (oprType == PUT_OPR) {
00379 partialDataPut (&myInput[0]);
00380 } else {
00381 partialDataGet (&myInput[0]);
00382 }
00383 CLOSE_SOCK (lsock);
00384
00385 return (myInput[0].status);
00386 } else {
00387 #ifdef PARA_OPR
00388 rodsLong_t mySize = 0;
00389 rodsLong_t myOffset = 0;
00390
00391 for (i = 1; i < numThreads; i++) {
00392 int l3descInx;
00393
00394 portalFd = acceptSrvPortal (rsComm, thisPortList);
00395 if (portalFd < 0) {
00396 rodsLog (LOG_NOTICE,
00397 "svrPortalPut: acceptSrvPortal error. errno = %d",
00398 errno);
00399
00400 CLOSE_SOCK (lsock);
00401
00402 return (portalFd);
00403 }
00404 myOffset += size0;
00405 if (i < numThreads - 1) {
00406 mySize = size0;
00407 } else {
00408 mySize = size1;
00409 }
00410
00411 if (oprType == PUT_OPR) {
00412
00413 l3descInx = l3OpenByHost (rsComm, dataOprInp->destRescTypeInx,
00414 dataOprInp->destL3descInx, O_WRONLY);
00415 fillPortalTransferInp (&myInput[i], rsComm,
00416 portalFd, l3descInx, 0, dataOprInp->destRescTypeInx,
00417 i, mySize, myOffset, flags);
00418 #ifdef USE_BOOST
00419 tid[i] = new boost::thread( partialDataPut, &myInput[i] );
00420 #else
00421 pthread_create (&tid[i], pthread_attr_default,
00422 (void *(*)(void *)) partialDataPut, (void *) &myInput[i]);
00423 #endif
00424
00425 } else {
00426 l3descInx = l3OpenByHost (rsComm, dataOprInp->srcRescTypeInx,
00427 dataOprInp->srcL3descInx, O_RDONLY);
00428 fillPortalTransferInp (&myInput[i], rsComm,
00429 l3descInx, portalFd, dataOprInp->srcRescTypeInx, 0,
00430 i, mySize, myOffset, flags);
00431 #ifdef USE_BOOST
00432 tid[i] = new boost::thread( partialDataGet, &myInput[i] );
00433 #else
00434 pthread_create (&tid[i], pthread_attr_default,
00435 (void *(*)(void *)) partialDataGet, (void *) &myInput[i]);
00436 #endif
00437 }
00438 }
00439
00440
00441
00442 if (oprType == PUT_OPR) {
00443 #ifdef USE_BOOST
00444 tid[0] = new boost::thread( partialDataPut, &myInput[0] );
00445 #else
00446 pthread_create (&tid[0], pthread_attr_default,
00447 (void *(*)(void *)) partialDataPut, (void *) &myInput[0]);
00448 #endif
00449 } else {
00450 #ifdef USE_BOOST
00451 tid[0] = new boost::thread( partialDataGet, &myInput[0] );
00452 #else
00453 pthread_create (&tid[0], pthread_attr_default,
00454 (void *(*)(void *)) partialDataGet, (void *) &myInput[0]);
00455 #endif
00456 }
00457
00458 for ( i = 0; i < numThreads; i++) {
00459 if (tid[i] != 0)
00460 #ifdef USE_BOOST
00461 tid[i]->join();
00462 #else
00463 pthread_join (tid[i], NULL);
00464 #endif
00465 if (myInput[i].status < 0) {
00466 retVal = myInput[i].status;
00467 }
00468 }
00469 CLOSE_SOCK (lsock);
00470 return (retVal);
00471
00472 #else
00473 CLOSE_SOCK (lsock);
00474 return (SYS_PARA_OPR_NO_SUPPORT);
00475 #endif
00476 }
00477 }
00478
00479 int
00480 fillPortalTransferInp (portalTransferInp_t *myInput, rsComm_t *rsComm,
00481 int srcFd, int destFd, int srcRescTypeInx, int destRescTypeInx,
00482 int threadNum, rodsLong_t size, rodsLong_t offset, int flags)
00483 {
00484 if (myInput == NULL)
00485 return (SYS_INTERNAL_NULL_INPUT_ERR);
00486
00487 myInput->rsComm = rsComm;
00488 myInput->destFd = destFd;
00489 myInput->srcFd = srcFd;
00490 myInput->destRescTypeInx = destRescTypeInx;
00491 myInput->srcRescTypeInx = srcRescTypeInx;
00492
00493 myInput->threadNum = threadNum;
00494 myInput->size = size;
00495 myInput->offset = offset;
00496 myInput->flags = flags;
00497
00498 return (0);
00499 }
00500
00501
00502 void
00503 partialDataPut (portalTransferInp_t *myInput)
00504 {
00505 int destL3descInx, srcFd, destRescTypeInx;
00506 char *buf;
00507 int bytesWritten;
00508 rodsLong_t bytesToGet;
00509 rodsLong_t myOffset = 0;
00510
00511 #ifdef PARA_TIMING
00512 time_t startTime, afterSeek, afterTransfer,
00513 endTime;
00514 startTime=time (0);
00515 #endif
00516
00517 if (myInput == NULL) {
00518 rodsLog (LOG_SYS_FATAL, "partialDataPut: NULL myInput");
00519 return;
00520 }
00521
00522 myInput->status = 0;
00523 destL3descInx = myInput->destFd;
00524 srcFd = myInput->srcFd;
00525 destRescTypeInx = myInput->destRescTypeInx;
00526
00527 if (myInput->offset != 0) {
00528 myOffset = _l3Lseek (myInput->rsComm, destRescTypeInx,
00529 destL3descInx, myInput->offset, SEEK_SET);
00530 if (myOffset < 0) {
00531 myInput->status = myOffset;
00532 rodsLog (LOG_NOTICE,
00533 "_partialDataPut: _objSeek error, status = %d ",
00534 myInput->status);
00535 if (myInput->threadNum > 0)
00536 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00537 CLOSE_SOCK (srcFd);
00538 return;
00539 }
00540 }
00541 buf = (char*)malloc (TRANS_BUF_SZ);
00542
00543 #ifdef PARA_TIMING
00544 afterSeek=time(0);
00545 #endif
00546
00547 bytesToGet = myInput->size;
00548
00549 while (bytesToGet > 0) {
00550 int toread0;
00551 int bytesRead;
00552
00553 #ifdef PARA_TIMING
00554 time_t tstart, tafterRead, tafterWrite;
00555 tstart=time(0);
00556 #endif
00557 if (myInput->flags & STREAMING_FLAG) {
00558 toread0 = bytesToGet;
00559 } else if (bytesToGet > TRANS_SZ) {
00560 toread0 = TRANS_SZ;
00561 } else {
00562 toread0 = bytesToGet;
00563 }
00564
00565 myInput->status = sendTranHeader (srcFd, PUT_OPR, myInput->flags,
00566 myOffset, toread0);
00567
00568 if (myInput->status < 0) {
00569 rodsLog (LOG_NOTICE,
00570 "partialDataPut: sendTranHeader error. status = %d",
00571 myInput->status);
00572 if (myInput->threadNum > 0)
00573 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00574 CLOSE_SOCK (srcFd);
00575 free (buf);
00576 return;
00577 }
00578
00579 while (toread0 > 0) {
00580 int toread1;
00581
00582 if (toread0 > TRANS_BUF_SZ) {
00583 toread1 = TRANS_BUF_SZ;
00584 } else {
00585 toread1 = toread0;
00586 }
00587 bytesRead = myRead (srcFd, buf, toread1, SOCK_TYPE, NULL, NULL);
00588
00589 #ifdef PARA_TIMING
00590 tafterRead=time(0);
00591 #endif
00592 if (bytesRead == toread1) {
00593 if ((bytesWritten = _l3Write (myInput->rsComm, destRescTypeInx,
00594 destL3descInx, buf, bytesRead)) != bytesRead) {
00595 rodsLog (LOG_NOTICE,
00596 "_partialDataPut:Bytes written %d don't match read %d",
00597 bytesWritten, bytesRead);
00598
00599 if (bytesWritten < 0) {
00600 myInput->status = bytesWritten;
00601 } else {
00602 myInput->status = SYS_COPY_LEN_ERR;
00603 }
00604 break;
00605 }
00606 bytesToGet -= bytesWritten;
00607 toread0 -= bytesWritten;
00608 myOffset += bytesWritten;
00609 } else if (bytesRead < 0) {
00610 myInput->status = bytesRead;
00611 break;
00612 } else {
00613 rodsLog (LOG_NOTICE,
00614 "_partialDataPut: toread %d bytes, %d bytes read, errno = %d",
00615 toread1, bytesRead, errno);
00616 myInput->status = SYS_COPY_LEN_ERR;
00617 break;
00618 }
00619 #ifdef PARA_TIMING
00620 tafterWrite=time(0);
00621 rodsLog (LOG_NOTICE,
00622 "Thr %d: sz=%d netReadTm=%d diskWriteTm=%d",
00623 myInput->threadNum, bytesWritten, tafterRead-tstart,
00624 tafterWrite-tafterRead);
00625 #endif
00626 }
00627 if (myInput->status < 0)
00628 break;
00629 }
00630 #ifdef PARA_TIMING
00631 afterTransfer=time(0);
00632 #endif
00633 free (buf);
00634 sendTranHeader (srcFd, DONE_OPR, 0, 0, 0);
00635 if (myInput->threadNum > 0)
00636 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00637 mySockClose (srcFd);
00638 #ifdef PARA_TIMING
00639 endTime=time(0);
00640 rodsLog (LOG_NOTICE,
00641 "Thr %d: seekTm=%d transTm=%d endTm=%d",
00642 myInput->threadInx,
00643 afterSeek-afterConn, afterTransfer-afterSeek, endTime-afterTransfer);
00644 #endif
00645 return;
00646 }
00647
00648 void
00649 partialDataGet (portalTransferInp_t *myInput)
00650 {
00651 int srcL3descInx, destFd, srcRescTypeInx;
00652 char *buf;
00653 int bytesWritten;
00654 rodsLong_t bytesToGet;
00655 rodsLong_t myOffset = 0;
00656
00657 #ifdef PARA_TIMING
00658 time_t startTime, afterSeek, afterTransfer,
00659 endTime;
00660 startTime=time (0);
00661 #endif
00662
00663 if (myInput == NULL) {
00664 rodsLog (LOG_SYS_FATAL, "partialDataGet: NULL myInput");
00665 return;
00666 }
00667
00668 myInput->status = 0;
00669 srcL3descInx = myInput->srcFd;
00670 destFd = myInput->destFd;
00671 srcRescTypeInx = myInput->srcRescTypeInx;
00672
00673 if (myInput->offset != 0) {
00674 myOffset = _l3Lseek (myInput->rsComm, srcRescTypeInx,
00675 srcL3descInx, myInput->offset, SEEK_SET);
00676 if (myOffset < 0) {
00677 myInput->status = myOffset;
00678 rodsLog (LOG_NOTICE,
00679 "_partialDataGet: _objSeek error, status = %d ",
00680 myInput->status);
00681 if (myInput->threadNum > 0)
00682 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
00683 CLOSE_SOCK (destFd);
00684 return;
00685 }
00686 }
00687 buf = (char*)malloc (TRANS_BUF_SZ);
00688
00689 #ifdef PARA_TIMING
00690 afterSeek=time(0);
00691 #endif
00692
00693 bytesToGet = myInput->size;
00694
00695 while (bytesToGet > 0) {
00696 int toread0;
00697 int bytesRead;
00698
00699 #ifdef PARA_TIMING
00700 time_t tstart, tafterRead, tafterWrite;
00701 tstart=time(0);
00702 #endif
00703 if (myInput->flags & STREAMING_FLAG) {
00704 toread0 = bytesToGet;
00705 } else if (bytesToGet > TRANS_SZ) {
00706 toread0 = TRANS_SZ;
00707 } else {
00708 toread0 = bytesToGet;
00709 }
00710
00711 myInput->status = sendTranHeader (destFd, GET_OPR, myInput->flags,
00712 myOffset, toread0);
00713
00714 if (myInput->status < 0) {
00715 rodsLog (LOG_NOTICE,
00716 "partialDataGet: sendTranHeader error. status = %d",
00717 myInput->status);
00718 if (myInput->threadNum > 0)
00719 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
00720 CLOSE_SOCK (destFd);
00721 free (buf);
00722 return;
00723 }
00724
00725 while (toread0 > 0) {
00726 int toread1;
00727
00728 if (toread0 > TRANS_BUF_SZ) {
00729 toread1 = TRANS_BUF_SZ;
00730 } else {
00731 toread1 = toread0;
00732 }
00733 bytesRead = _l3Read (myInput->rsComm, srcRescTypeInx,
00734 srcL3descInx, buf, toread1);
00735
00736 #ifdef PARA_TIMING
00737 tafterRead=time(0);
00738 #endif
00739 if (bytesRead == toread1) {
00740 if ((bytesWritten = myWrite (destFd, buf, bytesRead,
00741 SOCK_TYPE, NULL))
00742 != bytesRead) {
00743 rodsLog (LOG_NOTICE,
00744 "_partialDataGet:Bytes written %d don't match read %d",
00745 bytesWritten, bytesRead);
00746
00747 if (bytesWritten < 0) {
00748 myInput->status = bytesWritten;
00749 } else {
00750 myInput->status = SYS_COPY_LEN_ERR;
00751 }
00752 break;
00753 }
00754 bytesToGet -= bytesWritten;
00755 toread0 -= bytesWritten;
00756 myOffset += bytesWritten;
00757 } else if (bytesRead < 0) {
00758 myInput->status = bytesRead;
00759 break;
00760 } else {
00761 rodsLog (LOG_NOTICE,
00762 "_partialDataGet: toread %d bytes, %d bytes read",
00763 toread1, bytesRead);
00764 myInput->status = SYS_COPY_LEN_ERR;
00765 break;
00766 }
00767 #ifdef PARA_TIMING
00768 tafterWrite=time(0);
00769 rodsLog (LOG_NOTICE,
00770 "Thr %d: sz=%d netReadTm=%d diskWriteTm=%d",
00771 myInput->threadNum, bytesWritten, tafterRead-tstart,
00772 tafterWrite-tafterRead);
00773 #endif
00774 }
00775 if (myInput->status < 0)
00776 break;
00777 }
00778 #ifdef PARA_TIMING
00779 afterTransfer=time(0);
00780 #endif
00781 free (buf);
00782 sendTranHeader (destFd, DONE_OPR, 0, 0, 0);
00783 if (myInput->threadNum > 0)
00784 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
00785 CLOSE_SOCK (destFd);
00786 #ifdef PARA_TIMING
00787 endTime=time(0);
00788 rodsLog (LOG_NOTICE,
00789 "Thr %d: seekTm=%d transTm=%d endTm=%d",
00790 myInput->threadInx,
00791 afterSeek-afterConn, afterTransfer-afterSeek, endTime-afterTransfer);
00792 #endif
00793 return;
00794 }
00795
00796 void
00797 remToLocPartialCopy (portalTransferInp_t *myInput)
00798 {
00799 transferHeader_t myHeader;
00800 int destL3descInx, srcFd, destRescTypeInx;
00801 void *buf;
00802 rodsLong_t curOffset = 0;
00803 rodsLong_t myOffset = 0;
00804 int toRead, bytesRead, bytesWritten;
00805
00806 if (myInput == NULL) {
00807 rodsLog (LOG_NOTICE,
00808 "remToLocPartialCopy: NULL input");
00809 return;
00810 }
00811 #ifdef PARA_DEBUG
00812 printf ("remToLocPartialCopy: thread %d at start\n", myInput->threadNum);
00813 #endif
00814
00815 myInput->status = 0;
00816 destL3descInx = myInput->destFd;
00817 srcFd = myInput->srcFd;
00818 destRescTypeInx = myInput->destRescTypeInx;
00819 myInput->bytesWritten = 0;
00820
00821 buf = malloc (TRANS_BUF_SZ);
00822
00823 while (myInput->status >= 0) {
00824 rodsLong_t toGet;
00825
00826 myInput->status = rcvTranHeader (srcFd, &myHeader);
00827
00828 #ifdef PARA_DEBUG
00829 printf ("remToLocPartialCopy: thread %d after rcvTranHeader\n",
00830 myInput->threadNum);
00831 printf ("remToLocPartialCopy: thread %d header offset %lld, len %lld\n",
00832 myInput->threadNum, myHeader.offset, myHeader.length);
00833
00834 #endif
00835
00836 if (myInput->status < 0) {
00837 break;
00838 }
00839
00840 if (myHeader.oprType == DONE_OPR) {
00841 break;
00842 }
00843 if (myHeader.offset != curOffset) {
00844 curOffset = myHeader.offset;
00845 myOffset = _l3Lseek (myInput->rsComm, destRescTypeInx,
00846 destL3descInx, myHeader.offset, SEEK_SET);
00847 if (myOffset < 0) {
00848 myInput->status = myOffset;
00849 rodsLog (LOG_NOTICE,
00850 "remToLocPartialCopy: _objSeek error, status = %d ",
00851 myInput->status);
00852 break;
00853 }
00854 }
00855
00856 toGet = myHeader.length;
00857 while (toGet > 0) {
00858
00859 if (toGet > TRANS_BUF_SZ) {
00860 toRead = TRANS_BUF_SZ;
00861 } else {
00862 toRead = toGet;
00863 }
00864
00865 bytesRead = myRead (srcFd, buf, toRead,
00866 SOCK_TYPE, NULL, NULL);
00867 if (bytesRead != toRead) {
00868 if (bytesRead < 0) {
00869 myInput->status = bytesRead;
00870 rodsLogError (LOG_ERROR, bytesRead,
00871 "remToLocPartialCopy: copy error for %lld", bytesRead);
00872 } else if ((myInput->flags & NO_CHK_COPY_LEN_FLAG) == 0) {
00873 myInput->status = SYS_COPY_LEN_ERR - errno;
00874 rodsLog (LOG_ERROR,
00875 "remToLocPartialCopy: toGet %lld, bytesRead %d",
00876 toGet, bytesRead);
00877 }
00878 break;
00879 }
00880
00881 bytesWritten = _l3Write (myInput->rsComm, destRescTypeInx,
00882 destL3descInx, buf, bytesRead);
00883
00884 if (bytesWritten != bytesRead) {
00885 rodsLog (LOG_NOTICE,
00886 "_partialDataPut:Bytes written %d don't match read %d",
00887 bytesWritten, bytesRead);
00888
00889 if (bytesWritten < 0) {
00890 myInput->status = bytesWritten;
00891 } else {
00892 myInput->status = SYS_COPY_LEN_ERR;
00893 }
00894 break;
00895 }
00896
00897 toGet -= bytesWritten;
00898 }
00899 curOffset += myHeader.length;
00900 myInput->bytesWritten += myHeader.length;
00901 }
00902
00903 free (buf);
00904 if (myInput->threadNum > 0)
00905 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00906 CLOSE_SOCK (srcFd);
00907 }
00908
00909
00910
00911
00912 #ifdef RBUDP_TRANSFER
00913 int
00914 rbudpRemLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
00915 {
00916 portalOprOut_t *portalOprOut;
00917 dataOprInp_t *dataOprInp;
00918 rodsLong_t dataSize;
00919 int oprType;
00920 int veryVerbose, sendRate, packetSize;
00921 char *tmpStr;
00922 int status;
00923
00924 if (dataCopyInp == NULL) {
00925 rodsLog (LOG_NOTICE,
00926 "rbudpRemLocCopy: NULL dataCopyInp input");
00927 return (SYS_INTERNAL_NULL_INPUT_ERR);
00928 }
00929 portalOprOut = &dataCopyInp->portalOprOut;
00930 dataOprInp = &dataCopyInp->dataOprInp;
00931 oprType = dataOprInp->oprType;
00932 dataSize = dataOprInp->dataSize;
00933
00934 if (getValByKey (&dataOprInp->condInput, VERY_VERBOSE_KW) != NULL) {
00935 veryVerbose = 2;
00936 } else {
00937 veryVerbose = 0;
00938 }
00939
00940 if ((tmpStr = getValByKey (&dataOprInp->condInput,
00941 RBUDP_PACK_SIZE_KW)) != NULL) {
00942 packetSize = atoi (tmpStr);
00943 } else {
00944 packetSize = DEF_UDP_PACKET_SIZE;
00945 }
00946
00947 if (oprType == COPY_TO_LOCAL_OPR) {
00948 int destL3descInx = dataOprInp->destL3descInx;
00949
00950 status = getFileToPortalRbudp (portalOprOut, NULL,
00951 FileDesc[destL3descInx].fd, dataSize,
00952 veryVerbose, packetSize);
00953 } else {
00954 int srcL3descInx = dataOprInp->srcL3descInx;
00955
00956 if ((tmpStr = getValByKey (&dataOprInp->condInput,
00957 RBUDP_SEND_RATE_KW)) != NULL) {
00958 sendRate = atoi (tmpStr);
00959 } else {
00960 sendRate = DEF_UDP_SEND_RATE;
00961 }
00962 status = putFileToPortalRbudp (portalOprOut, NULL, NULL,
00963 FileDesc[srcL3descInx].fd, dataSize,
00964 veryVerbose, sendRate, packetSize);
00965 }
00966 return (status);
00967 }
00968 #endif
00969
00970
00971
00972
00973 int
00974 remLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
00975 {
00976 portalOprOut_t *portalOprOut;
00977 dataOprInp_t *dataOprInp;
00978 portList_t *myPortList;
00979 int i, sock, myFd;
00980 int numThreads;
00981 portalTransferInp_t myInput[MAX_NUM_CONFIG_TRAN_THR];
00982 #ifndef windows_platform
00983 #ifdef USE_BOOST
00984 boost::thread* tid[MAX_NUM_CONFIG_TRAN_THR];
00985 #else
00986 pthread_t tid[MAX_NUM_CONFIG_TRAN_THR];
00987 #endif
00988 #endif
00989 int retVal = 0;
00990 rodsLong_t dataSize;
00991 int oprType;
00992
00993 if (dataCopyInp == NULL) {
00994 rodsLog (LOG_NOTICE,
00995 "remLocCopy: NULL dataCopyInp input");
00996 return (SYS_INTERNAL_NULL_INPUT_ERR);
00997 }
00998
00999 portalOprOut = &dataCopyInp->portalOprOut;
01000 numThreads = portalOprOut->numThreads;
01001 if (numThreads == 0) {
01002 retVal = singleRemLocCopy (rsComm, dataCopyInp);
01003 return retVal;
01004 }
01005
01006 dataOprInp = &dataCopyInp->dataOprInp;
01007 oprType = dataOprInp->oprType;
01008 dataSize = dataOprInp->dataSize;
01009
01010 if (getUdpPortFromPortList (&portalOprOut->portList) != 0) {
01011
01012 #ifdef RBUDP_TRANSFER
01013 retVal = rbudpRemLocCopy (rsComm, dataCopyInp);
01014 return (retVal);
01015 #else
01016 return (SYS_UDP_NO_SUPPORT_ERR);
01017 #endif
01018 }
01019
01020 if (numThreads > MAX_NUM_CONFIG_TRAN_THR || numThreads <= 0) {
01021 rodsLog (LOG_NOTICE,
01022 "remLocCopy: numThreads %d out of range",
01023 numThreads);
01024 return (SYS_INVALID_PORTAL_OPR);
01025 }
01026
01027
01028 myPortList = &portalOprOut->portList;
01029
01030 #ifndef windows_platform
01031 memset (tid, 0, sizeof (tid));
01032 #endif
01033 memset (myInput, 0, sizeof (myInput));
01034
01035 sock = connectToRhostPortal (myPortList->hostAddr,
01036 myPortList->portNum, myPortList->cookie, rsComm->windowSize);
01037 if (sock < 0) {
01038 return (sock);
01039 }
01040
01041 if (oprType == COPY_TO_LOCAL_OPR) {
01042 fillPortalTransferInp (&myInput[0], rsComm,
01043 sock, dataOprInp->destL3descInx, 0, dataOprInp->destRescTypeInx,
01044 0, 0, 0, 0);
01045 } else {
01046 fillPortalTransferInp (&myInput[0], rsComm,
01047 dataOprInp->srcL3descInx, sock, dataOprInp->srcRescTypeInx, 0,
01048 0, 0, 0, 0);
01049 }
01050
01051 if (numThreads == 1) {
01052 if (getValByKey (&dataOprInp->condInput,
01053 NO_CHK_COPY_LEN_KW) != NULL) {
01054 myInput[0].flags = NO_CHK_COPY_LEN_FLAG;
01055 }
01056 if (oprType == COPY_TO_LOCAL_OPR) {
01057 remToLocPartialCopy (&myInput[0]);
01058 } else {
01059 locToRemPartialCopy (&myInput[0]);
01060 }
01061 if (myInput[0].status < 0) {
01062 return (myInput[0].status);
01063 } else {
01064 if (myInput[0].bytesWritten == dataSize) {
01065 return (0);
01066 } else {
01067 rodsLog (LOG_NOTICE,
01068 "remLocCopy:bytesWritten %lld dataSize %lld mismatch",
01069 myInput[0].bytesWritten, dataSize);
01070 return (SYS_COPY_LEN_ERR);
01071 }
01072 }
01073 } else {
01074 #ifdef PARA_OPR
01075 rodsLong_t totalWritten = 0;
01076
01077 for (i = 1; i < numThreads; i++) {
01078 sock = connectToRhostPortal (myPortList->hostAddr,
01079 myPortList->portNum, myPortList->cookie, rsComm->windowSize);
01080 if (sock < 0) {
01081 return (sock);
01082 }
01083 if (oprType == COPY_TO_LOCAL_OPR) {
01084 myFd = l3OpenByHost (rsComm, dataOprInp->destRescTypeInx,
01085 dataOprInp->destL3descInx, O_WRONLY);
01086 if (myFd < 0) {
01087 retVal = myFd;
01088 rodsLog (LOG_NOTICE,
01089 "remLocCopy: cannot open file, status = %d",
01090 myFd);
01091 CLOSE_SOCK (sock);
01092 continue;
01093 }
01094
01095 fillPortalTransferInp (&myInput[i], rsComm,
01096 sock, myFd, 0, dataOprInp->destRescTypeInx,
01097 i, 0, 0, 0);
01098
01099 #ifdef USE_BOOST
01100 tid[i] = new boost::thread( remToLocPartialCopy, &myInput[i] );
01101 #else
01102 pthread_create (&tid[i], pthread_attr_default,
01103 (void *(*)(void *)) remToLocPartialCopy, (void *) &myInput[i]);
01104 #endif
01105 } else {
01106 myFd = l3OpenByHost (rsComm, dataOprInp->srcRescTypeInx,
01107 dataOprInp->srcL3descInx, O_RDONLY);
01108 if (myFd < 0) {
01109 retVal = myFd;
01110 rodsLog (LOG_NOTICE,
01111 "remLocCopy: cannot open file, status = %d",
01112 myFd);
01113 CLOSE_SOCK (sock);
01114 continue;
01115 }
01116
01117 fillPortalTransferInp (&myInput[i], rsComm,
01118 myFd, sock, dataOprInp->destRescTypeInx, 0,
01119 i, 0, 0, 0);
01120
01121 #ifdef USE_BOOST
01122 tid[i] = new boost::thread( locToRemPartialCopy, &myInput[i] );
01123 #else
01124 pthread_create (&tid[i], pthread_attr_default,
01125 (void *(*)(void *)) locToRemPartialCopy, (void *) &myInput[i]);
01126 #endif
01127 }
01128 }
01129
01130 if (oprType == COPY_TO_LOCAL_OPR) {
01131 #ifdef USE_BOOST
01132 tid[0] = new boost::thread( remToLocPartialCopy,&myInput[0] );
01133 #else
01134 pthread_create (&tid[0], pthread_attr_default,
01135 (void *(*)(void *)) remToLocPartialCopy, (void *) &myInput[0]);
01136 #endif
01137 } else {
01138 #ifdef USE_BOOST
01139 tid[0] = new boost::thread( locToRemPartialCopy, &myInput[0] );
01140 #else
01141 pthread_create (&tid[0], pthread_attr_default,
01142 (void *(*)(void *)) locToRemPartialCopy, (void *) &myInput[0]);
01143 #endif
01144 }
01145
01146
01147 if (retVal < 0) {
01148 return (retVal);
01149 }
01150
01151 for ( i = 0; i < numThreads; i++) {
01152 if (tid[i] != 0) {
01153 #ifdef USE_BOOST
01154 tid[i]->join();
01155 #else
01156 pthread_join (tid[i], NULL);
01157 #endif
01158 }
01159 totalWritten += myInput[i].bytesWritten;
01160 if (myInput[i].status < 0) {
01161 retVal = myInput[i].status;
01162 }
01163 }
01164 if (retVal < 0) {
01165 return (retVal);
01166 } else {
01167 if (dataSize <= 0 || totalWritten == dataSize) {
01168 return (0);
01169 } else {
01170 rodsLog (LOG_NOTICE,
01171 "remLocCopy: totalWritten %lld dataSize %lld mismatch",
01172 totalWritten, dataSize);
01173 return (SYS_COPY_LEN_ERR);
01174 }
01175 }
01176 #else
01177 return (SYS_PARA_OPR_NO_SUPPORT);
01178 #endif
01179 }
01180 }
01181
01182 int
01183 sameHostCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
01184 {
01185 dataOprInp_t *dataOprInp;
01186 int i, out_fd, in_fd;
01187 int numThreads;
01188 portalTransferInp_t myInput[MAX_NUM_CONFIG_TRAN_THR];
01189 #ifndef windows_platform
01190 #ifdef USE_BOOST
01191 boost::thread* tid[MAX_NUM_CONFIG_TRAN_THR];
01192 #else
01193 pthread_t tid[MAX_NUM_CONFIG_TRAN_THR];
01194 #endif
01195 #endif
01196 int retVal = 0;
01197 rodsLong_t dataSize;
01198 rodsLong_t size0, size1, offset0;
01199
01200 if (dataCopyInp == NULL) {
01201 rodsLog (LOG_NOTICE,
01202 "sameHostCopy: NULL dataCopyInp input");
01203 return (SYS_INTERNAL_NULL_INPUT_ERR);
01204 }
01205
01206 dataOprInp = &dataCopyInp->dataOprInp;
01207
01208 numThreads = dataOprInp->numThreads;
01209
01210 dataSize = dataOprInp->dataSize;
01211
01212 if (numThreads == 0) {
01213 numThreads = 1;
01214 } else if (numThreads > MAX_NUM_CONFIG_TRAN_THR || numThreads < 0) {
01215 rodsLog (LOG_NOTICE,
01216 "sameHostCopy: numThreads %d out of range",
01217 numThreads);
01218 return (SYS_INVALID_PORTAL_OPR);
01219 }
01220
01221 #ifndef windows_platform
01222 memset (tid, 0, sizeof (tid));
01223 #endif
01224 memset (myInput, 0, sizeof (myInput));
01225
01226 size0 = dataOprInp->dataSize / numThreads;
01227 size1 = dataOprInp->dataSize - size0 * (numThreads - 1);
01228 offset0 = dataOprInp->offset;
01229
01230 fillPortalTransferInp (&myInput[0], rsComm,
01231 dataOprInp->srcL3descInx, dataOprInp->destL3descInx,
01232 dataOprInp->srcRescTypeInx, dataOprInp->destRescTypeInx,
01233 0, size0, offset0, 0);
01234
01235 if (numThreads == 1) {
01236 if (getValByKey (&dataOprInp->condInput,
01237 NO_CHK_COPY_LEN_KW) != NULL) {
01238 myInput[0].flags = NO_CHK_COPY_LEN_FLAG;
01239 }
01240 sameHostPartialCopy (&myInput[0]);
01241 return (myInput[0].status);
01242 } else {
01243 #ifdef PARA_OPR
01244 rodsLong_t totalWritten = 0;
01245 rodsLong_t mySize = 0;
01246 rodsLong_t myOffset = 0;
01247
01248 for (i = 1; i < numThreads; i++) {
01249 myOffset += size0;
01250 if (i < numThreads - 1) {
01251 mySize = size0;
01252 } else {
01253 mySize = size1;
01254 }
01255
01256 out_fd = l3OpenByHost (rsComm, dataOprInp->destRescTypeInx,
01257 dataOprInp->destL3descInx, O_WRONLY);
01258 if (out_fd < 0) {
01259 retVal = out_fd;
01260 rodsLog (LOG_NOTICE,
01261 "sameHostCopy: cannot open dest file, status = %d",
01262 out_fd);
01263 continue;
01264 }
01265
01266 in_fd = l3OpenByHost (rsComm, dataOprInp->srcRescTypeInx,
01267 dataOprInp->srcL3descInx, O_RDONLY);
01268 if (in_fd < 0) {
01269 retVal = out_fd;
01270 rodsLog (LOG_NOTICE,
01271 "sameHostCopy: cannot open src file, status = %d", in_fd);
01272 continue;
01273 }
01274 fillPortalTransferInp (&myInput[i], rsComm,
01275 in_fd, out_fd,
01276 dataOprInp->srcRescTypeInx, dataOprInp->destRescTypeInx,
01277 i, mySize, myOffset, 0);
01278
01279 #ifdef USE_BOOST
01280 tid[i] = new boost::thread( sameHostPartialCopy, &myInput[i] );
01281 #else
01282 pthread_create (&tid[i], pthread_attr_default,
01283 (void *(*)(void *)) sameHostPartialCopy, (void *) &myInput[i]);
01284 #endif
01285 }
01286
01287 #ifdef USE_BOOST
01288 tid[0] = new boost::thread( sameHostPartialCopy, &myInput[0] );
01289 #else
01290 pthread_create (&tid[0], pthread_attr_default,
01291 (void *(*)(void *)) sameHostPartialCopy, (void *) &myInput[0]);
01292 #endif
01293
01294 if (retVal < 0) {
01295 return (retVal);
01296 }
01297
01298 for ( i = 0; i < numThreads; i++) {
01299 if (tid[i] != 0) {
01300 #ifdef USE_BOOST
01301 tid[i]->join();
01302 #else
01303 pthread_join (tid[i], NULL);
01304 #endif
01305 }
01306 totalWritten += myInput[i].bytesWritten;
01307 if (myInput[i].status < 0) {
01308 retVal = myInput[i].status;
01309 }
01310 }
01311 if (retVal < 0) {
01312 return (retVal);
01313 } else {
01314 if (dataSize <= 0 || totalWritten == dataSize) {
01315 return (0);
01316 } else {
01317 rodsLog (LOG_NOTICE,
01318 "sameHostCopy: totalWritten %lld dataSize %lld mismatch",
01319 totalWritten, dataSize);
01320 return (SYS_COPY_LEN_ERR);
01321 }
01322 }
01323 #else
01324 return (SYS_PARA_OPR_NO_SUPPORT);
01325 #endif
01326 }
01327 }
01328
01329 void
01330 sameHostPartialCopy (portalTransferInp_t *myInput)
01331 {
01332 int destL3descInx, srcL3descInx, destRescTypeInx, srcRescTypeInx;
01333 void *buf;
01334 rodsLong_t myOffset = 0;
01335 rodsLong_t toCopy;
01336 int bytesRead, bytesWritten;
01337
01338 if (myInput == NULL) {
01339 rodsLog (LOG_NOTICE,
01340 "onsameHostPartialCopy: NULL input");
01341 return;
01342 }
01343 #ifdef PARA_DEBUG
01344 printf ("onsameHostPartialCopy: thread %d at start\n", myInput->threadNum);
01345 #endif
01346
01347 myInput->status = 0;
01348 destL3descInx = myInput->destFd;
01349 srcL3descInx = myInput->srcFd;
01350 destRescTypeInx = myInput->destRescTypeInx;
01351 srcRescTypeInx = myInput->srcRescTypeInx;
01352 myInput->bytesWritten = 0;
01353
01354 if (myInput->offset != 0) {
01355 myOffset = _l3Lseek (myInput->rsComm, destRescTypeInx,
01356 destL3descInx, myInput->offset, SEEK_SET);
01357 if (myOffset < 0) {
01358 myInput->status = myOffset;
01359 rodsLog (LOG_NOTICE,
01360 "sameHostPartialCopy: _objSeek error, status = %d ",
01361 myInput->status);
01362 if (myInput->threadNum > 0) {
01363 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
01364 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01365 }
01366 return;
01367 }
01368 myOffset = _l3Lseek (myInput->rsComm, srcRescTypeInx,
01369 srcL3descInx, myInput->offset, SEEK_SET);
01370 if (myOffset < 0) {
01371 myInput->status = myOffset;
01372 rodsLog (LOG_NOTICE,
01373 "sameHostPartialCopy: _objSeek error, status = %d ",
01374 myInput->status);
01375 if (myInput->threadNum > 0) {
01376 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
01377 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01378 }
01379 return;
01380 }
01381 }
01382
01383 buf = malloc (TRANS_BUF_SZ);
01384
01385 toCopy = myInput->size;
01386
01387 while (toCopy > 0) {
01388 int toRead;
01389
01390 if (toCopy > TRANS_BUF_SZ) {
01391 toRead = TRANS_BUF_SZ;
01392 } else {
01393 toRead = toCopy;
01394 }
01395
01396 bytesRead = _l3Read (myInput->rsComm, srcRescTypeInx,
01397 srcL3descInx, buf, toRead);
01398
01399 if (bytesRead <= 0) {
01400 if (bytesRead < 0) {
01401 myInput->status = bytesRead;
01402 rodsLogError (LOG_ERROR, bytesRead,
01403 "sameHostPartialCopy: copy error for %lld", bytesRead);
01404 } else if ((myInput->flags & NO_CHK_COPY_LEN_FLAG) == 0) {
01405 myInput->status = SYS_COPY_LEN_ERR - errno;
01406 rodsLog (LOG_ERROR,
01407 "sameHostPartialCopy: toCopy %lld, bytesRead %d",
01408 toCopy, bytesRead);
01409 }
01410 break;
01411 }
01412
01413 bytesWritten = _l3Write (myInput->rsComm, destRescTypeInx,
01414 destL3descInx, buf, bytesRead);
01415
01416 if (bytesWritten != bytesRead) {
01417 rodsLog (LOG_NOTICE,
01418 "sameHostPartialCopy:Bytes written %d don't match read %d",
01419 bytesWritten, bytesRead);
01420
01421 if (bytesWritten < 0) {
01422 myInput->status = bytesWritten;
01423 } else {
01424 myInput->status = SYS_COPY_LEN_ERR;
01425 }
01426 break;
01427 }
01428
01429 toCopy -= bytesWritten;
01430 myInput->bytesWritten += bytesWritten;
01431 }
01432
01433 free (buf);
01434 if (myInput->threadNum > 0) {
01435 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
01436 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01437 }
01438 }
01439
01440 void
01441 locToRemPartialCopy (portalTransferInp_t *myInput)
01442 {
01443 transferHeader_t myHeader;
01444 int srcL3descInx, destFd, srcRescTypeInx;
01445 void *buf;
01446 rodsLong_t curOffset = 0;
01447 rodsLong_t myOffset = 0;
01448 int toRead, bytesRead, bytesWritten;
01449
01450 if (myInput == NULL) {
01451 rodsLog (LOG_NOTICE,
01452 "locToRemPartialCopy: NULL input");
01453 return;
01454 }
01455 #ifdef PARA_DEBUG
01456 printf ("locToRemPartialCopy: thread %d at start\n", myInput->threadNum);
01457 #endif
01458
01459 myInput->status = 0;
01460 srcL3descInx = myInput->srcFd;
01461 destFd = myInput->destFd;
01462 srcRescTypeInx = myInput->srcRescTypeInx;
01463 myInput->bytesWritten = 0;
01464
01465 buf = malloc (TRANS_BUF_SZ);
01466
01467 while (myInput->status >= 0) {
01468 rodsLong_t toGet;
01469
01470 myInput->status = rcvTranHeader (destFd, &myHeader);
01471
01472 #ifdef PARA_DEBUG
01473 printf ("locToRemPartialCopy: thread %d after rcvTranHeader\n",
01474 myInput->threadNum);
01475 #endif
01476
01477 if (myInput->status < 0) {
01478 break;
01479 }
01480
01481 if (myHeader.oprType == DONE_OPR) {
01482 break;
01483 }
01484 #ifdef PARA_DEBUG
01485 printf ("locToRemPartialCopy:thread %d header offset %lld, len %lld",
01486 myInput->threadNum, myHeader.offset, myHeader.length);
01487 #endif
01488
01489 if (myHeader.offset != curOffset) {
01490 curOffset = myHeader.offset;
01491 myOffset = _l3Lseek (myInput->rsComm, srcRescTypeInx,
01492 srcL3descInx, myHeader.offset, SEEK_SET);
01493 if (myOffset < 0) {
01494 myInput->status = myOffset;
01495 rodsLog (LOG_NOTICE,
01496 "locToRemPartialCopy: _objSeek error, status = %d ",
01497 myInput->status);
01498 break;
01499 }
01500 }
01501
01502 toGet = myHeader.length;
01503 while (toGet > 0) {
01504
01505 if (toGet > TRANS_BUF_SZ) {
01506 toRead = TRANS_BUF_SZ;
01507 } else {
01508 toRead = toGet;
01509 }
01510
01511 bytesRead = _l3Read (myInput->rsComm, srcRescTypeInx,
01512 srcL3descInx, buf, toRead);
01513
01514 if (bytesRead != toRead) {
01515 if (bytesRead < 0) {
01516 myInput->status = bytesRead;
01517 rodsLogError (LOG_ERROR, bytesRead,
01518 "locToRemPartialCopy: copy error for %lld", bytesRead);
01519 } else if ((myInput->flags & NO_CHK_COPY_LEN_FLAG) == 0) {
01520 myInput->status = SYS_COPY_LEN_ERR - errno;
01521 rodsLog (LOG_ERROR,
01522 "locToRemPartialCopy: toGet %lld, bytesRead %d",
01523 toGet, bytesRead);
01524 }
01525 break;
01526 }
01527
01528 bytesWritten = myWrite (destFd, buf, bytesRead,
01529 SOCK_TYPE, NULL);
01530
01531
01532 if (bytesWritten != bytesRead) {
01533 rodsLog (LOG_NOTICE,
01534 "_partialDataPut:Bytes written %d don't match read %d",
01535 bytesWritten, bytesRead);
01536
01537 if (bytesWritten < 0) {
01538 myInput->status = bytesWritten;
01539 } else {
01540 myInput->status = SYS_COPY_LEN_ERR;
01541 }
01542 break;
01543 }
01544
01545 toGet -= bytesWritten;
01546 }
01547 curOffset += myHeader.length;
01548 myInput->bytesWritten += myHeader.length;
01549 }
01550
01551 free (buf);
01552 if (myInput->threadNum > 0)
01553 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01554 CLOSE_SOCK (destFd);
01555 }
01556
01557
01558
01559
01560
01561
01562
01563 void
01564 getZoneServerId(char *zoneName, char *zoneSID) {
01565 zoneInfo_t *tmpZoneInfo;
01566 rodsServerHost_t *tmpRodsServerHost;
01567 int i;
01568 int zoneNameLen=0;
01569 char *localZoneName=NULL;
01570 char matchStr[MAX_NAME_LEN+2];
01571
01572 if (zoneName!=NULL) zoneNameLen=strlen(zoneName);
01573 if (zoneNameLen==0) {
01574 strncpy(zoneSID, localSID, MAX_PASSWORD_LEN);
01575 return;
01576 }
01577
01578
01579 tmpZoneInfo = ZoneInfoHead;
01580 while (tmpZoneInfo != NULL) {
01581 tmpRodsServerHost = (rodsServerHost_t *) tmpZoneInfo->masterServerHost;
01582 if (tmpRodsServerHost->rcatEnabled == LOCAL_ICAT) {
01583 localZoneName = tmpZoneInfo->zoneName;
01584 }
01585 tmpZoneInfo = tmpZoneInfo->next;
01586 }
01587
01588
01589 if (localZoneName!=NULL) {
01590 if (strncmp(localZoneName, zoneName, MAX_NAME_LEN)==0) {
01591 strncpy(zoneSID, localSID, MAX_PASSWORD_LEN);
01592 return;
01593 }
01594 }
01595
01596
01597 strncpy(matchStr, zoneName, MAX_NAME_LEN);
01598 strncat(matchStr, "-", MAX_NAME_LEN);
01599 for (i=0;i<MAX_FED_RSIDS;i++) {
01600 if (strncmp(matchStr, remoteSID[i], zoneNameLen+1)==0) {
01601 strncpy(zoneSID, (char*)&remoteSID[i][zoneNameLen+1],
01602 MAX_PASSWORD_LEN);
01603 return;
01604 }
01605 }
01606
01607 zoneSID[0]='\0';
01608 return;
01609 }
01610
01611 int
01612 isUserPrivileged(rsComm_t *rsComm)
01613 {
01614
01615 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
01616 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
01617 }
01618 if (rsComm->proxyUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
01619 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
01620 }
01621
01622 return(0);
01623 }
01624
01625 #if !defined(solaris_platform)
01626 char *regcmp (char *pat, char *end)
01627 {
01628 return(NULL);
01629 }
01630
01631 char *regex (char *rec, char *text, ...)
01632 {
01633 return(NULL);
01634 }
01635 #endif
01636
01637
01638
01639 int
01640 #ifdef __cplusplus
01641 intNoSupport( ... )
01642 #else
01643 intNoSupport()
01644 #endif
01645 {
01646 return SYS_NOT_SUPPORTED;
01647 }
01648
01649 rodsLong_t
01650 #ifdef __cplusplus
01651 longNoSupport( ... )
01652 #else
01653 longNoSupport()
01654 #endif
01655 {
01656 return (rodsLong_t) SYS_NOT_SUPPORTED;
01657 }
01658
01659 #ifdef RBUDP_TRANSFER
01660 int
01661 svrPortalPutGetRbudp (rsComm_t *rsComm)
01662 {
01663 portalOpr_t *myPortalOpr;
01664 portList_t *thisPortList;
01665 int lsock;
01666 int tcpSock, udpSockfd;
01667 int udpPortBuf;
01668 int status;
01669 #if defined(aix_platform)
01670 socklen_t laddrlen = sizeof(struct sockaddr);
01671 #elif defined(windows_platform)
01672 int laddrlen = sizeof(struct sockaddr);
01673 #else
01674 uint laddrlen = sizeof(struct sockaddr);
01675 #endif
01676 int packetSize;
01677 char *tmpStr;
01678 int verbose;
01679
01680 myPortalOpr = rsComm->portalOpr;
01681
01682 if (myPortalOpr == NULL) {
01683 rodsLog (LOG_NOTICE, "svrPortalPutGetRbudp: NULL myPortalOpr");
01684 return (SYS_INTERNAL_NULL_INPUT_ERR);
01685 }
01686
01687 thisPortList = &myPortalOpr->portList;
01688 if (thisPortList == NULL) {
01689 rodsLog (LOG_NOTICE, "svrPortalPutGetRbudp: NULL portList");
01690 return (SYS_INTERNAL_NULL_INPUT_ERR);
01691 }
01692
01693 lsock = getTcpSockFromPortList (thisPortList);
01694
01695 tcpSock = acceptSrvPortal (rsComm, thisPortList);
01696 if (tcpSock < 0) {
01697 rodsLog (LOG_NOTICE,
01698 "svrPortalPutGetRbudp: acceptSrvPortal error. errno = %d",
01699 errno);
01700 CLOSE_SOCK (lsock);
01701 return (tcpSock);
01702 } else {
01703 CLOSE_SOCK (lsock);
01704 }
01705 status = readn (tcpSock, (char *) &udpPortBuf, sizeof (udpPortBuf));
01706 if (status != sizeof (udpPortBuf)) {
01707 rodsLog (LOG_ERROR,
01708 "svrPortalPutGetRbudp: readn error. toread %d, bytes read %d ",
01709 sizeof (udpPortBuf), status);
01710 return (SYS_UDP_CONNECT_ERR);
01711 }
01712
01713 if ((tmpStr = getValByKey (&myPortalOpr->dataOprInp.condInput,
01714 RBUDP_PACK_SIZE_KW)) != NULL) {
01715 packetSize = atoi (tmpStr);
01716 } else {
01717 packetSize = DEF_UDP_PACKET_SIZE;
01718 }
01719
01720 if (getValByKey (&myPortalOpr->dataOprInp.condInput, VERY_VERBOSE_KW) !=
01721 NULL)
01722 verbose = 2;
01723 else
01724 verbose = 0;
01725
01726 udpSockfd = getUdpSockFromPortList (thisPortList);
01727
01728 checkbuf (udpSockfd, UDPSOCKBUF, verbose);
01729 if (myPortalOpr->oprType == PUT_OPR) {
01730 rbudpReceiver_t rbudpReceiver;
01731 bzero (&rbudpReceiver, sizeof (rbudpReceiver));
01732 int destL3descInx = myPortalOpr->dataOprInp.destL3descInx;
01733
01734 rbudpReceiver.rbudpBase.verbose = verbose;
01735 rbudpReceiver.rbudpBase.udpSockBufSize = UDPSOCKBUF;
01736 rbudpReceiver.rbudpBase.tcpPort = getTcpPortFromPortList (thisPortList);
01737 rbudpReceiver.rbudpBase.tcpSockfd = tcpSock;
01738 rbudpReceiver.rbudpBase.udpSockfd = udpSockfd;
01739 rbudpReceiver.rbudpBase.hasTcpSock = 0;
01740 rbudpReceiver.rbudpBase.udpRemotePort = ntohl (udpPortBuf);
01741
01742 if (getpeername (tcpSock,
01743 (struct sockaddr *) &rbudpReceiver.rbudpBase.udpServerAddr,
01744 &laddrlen) < 0) {
01745 rodsLog (LOG_NOTICE,
01746 "svrPortalPutGetRbudp() - getpeername() failed: errno=%d",
01747 errno);
01748 recvClose (&rbudpReceiver);
01749 return (USER_RODS_HOSTNAME_ERR);
01750 }
01751
01752 rbudpReceiver.rbudpBase.udpServerAddr.sin_port = htons (rbudpReceiver.rbudpBase.udpRemotePort);
01753
01754 status = getfileByFd (&rbudpReceiver, FileDesc[destL3descInx].fd, packetSize);
01755
01756 if (status < 0) {
01757 rodsLog (LOG_ERROR,
01758 "svrPortalPutGetRbudp: getfileByFd error for %s",
01759 FileDesc[destL3descInx].fileName);
01760 status += SYS_UDP_TRANSFER_ERR;
01761 }
01762 recvClose (&rbudpReceiver);
01763 } else if (myPortalOpr->oprType == GET_OPR) {
01764
01765 int sendRate;
01766 rbudpSender_t rbudpSender;
01767 int srcL3descInx = myPortalOpr->dataOprInp.srcL3descInx;
01768
01769 bzero (&rbudpSender, sizeof (rbudpSender));
01770 rbudpSender.rbudpBase.verbose = verbose;
01771 rbudpSender.rbudpBase.udpSockBufSize = UDPSOCKBUF;
01772 rbudpSender.rbudpBase.tcpPort = getTcpPortFromPortList (thisPortList);
01773 rbudpSender.rbudpBase.tcpSockfd = tcpSock;
01774 rbudpSender.rbudpBase.udpSockfd = udpSockfd;
01775 rbudpSender.rbudpBase.hasTcpSock = 0;
01776 rbudpSender.rbudpBase.udpRemotePort = ntohl (udpPortBuf);
01777
01778 if (getpeername (tcpSock,
01779 (struct sockaddr *) &rbudpSender.rbudpBase.udpServerAddr,
01780 &laddrlen) < 0) {
01781 rodsLog (LOG_NOTICE,
01782 "svrPortalPutGetRbudp() - getpeername() failed: errno=%d",
01783 errno);
01784 sendClose (&rbudpSender);
01785 return (USER_RODS_HOSTNAME_ERR);
01786 }
01787 rbudpSender.rbudpBase.udpServerAddr.sin_port =
01788 htons (rbudpSender.rbudpBase.udpRemotePort);
01789 if ((tmpStr = getValByKey (&myPortalOpr->dataOprInp.condInput,
01790 RBUDP_SEND_RATE_KW)) != NULL) {
01791 sendRate = atoi (tmpStr);
01792 } else {
01793 sendRate = DEF_UDP_SEND_RATE;
01794 }
01795
01796 status = sendfileByFd( &rbudpSender, sendRate, packetSize, FileDesc[srcL3descInx].fd );
01797
01798
01799 if (status < 0) {
01800 rodsLog (LOG_ERROR,
01801 "svrPortalPutGetRbudp: sendfile error for %s",
01802 FileDesc[srcL3descInx].fileName);
01803 status += SYS_UDP_TRANSFER_ERR;
01804 }
01805 sendClose (&rbudpSender);
01806 }
01807
01808 return (status);
01809 }
01810 #endif
01811 #ifndef windows_platform
01812 void
01813 reconnManager (rsComm_t *rsComm)
01814 {
01815 fd_set basemask;
01816 int nSockets, nSelected;
01817 struct sockaddr_in remoteAddr;
01818 socklen_t len;
01819 int newSock, status;
01820 reconnMsg_t *reconnMsg;
01821 int acceptFailCnt = 0;
01822
01823 if (rsComm == NULL || rsComm->reconnSock <= 0) {
01824 return;
01825 }
01826
01827 listen (rsComm->reconnSock, 1);
01828
01829 nSockets = rsComm->reconnSock + 1;
01830 FD_ZERO(&basemask);
01831 FD_SET(rsComm->reconnSock, &basemask);
01832
01833 while (1) {
01834 while ((nSelected = select (nSockets, &basemask,
01835 (fd_set *) NULL, (fd_set *) NULL, NULL)) < 0) {
01836 if (errno == EINTR) {
01837 rodsLog (LOG_NOTICE, "reconnManager: select interrupted\n");
01838 continue;
01839 } else {
01840 rodsLog (LOG_ERROR, "reconnManager: select failed, errno = %d",
01841 errno);
01842 #ifdef USE_BOOST
01843 boost::unique_lock< boost::mutex > boost_lock( *rsComm->lock );
01844 #else
01845 pthread_mutex_lock (&rsComm->lock);
01846 #endif
01847 close (rsComm->reconnSock);
01848 rsComm->reconnSock = 0;
01849 #ifdef USE_BOOST
01850 boost_lock.unlock();
01851 #else
01852 pthread_mutex_unlock (&rsComm->lock);
01853 #endif
01854 return;
01855 }
01856 }
01857
01858 len = sizeof (remoteAddr);
01859 bzero (&remoteAddr, sizeof (remoteAddr));
01860 newSock = accept (rsComm->reconnSock, (struct sockaddr *) &remoteAddr,
01861 &len);
01862 if (newSock < 0) {
01863 acceptFailCnt++;
01864 rodsLog (LOG_ERROR,
01865 "reconnManager: accept for sock %d failed, errno = %d",
01866 rsComm->reconnSock, errno);
01867 if (acceptFailCnt > MAX_RECON_ERROR_CNT) {
01868 rodsLog (LOG_ERROR,
01869 "reconnManager: accept failed cnt > 10, reconnManager exit");
01870 close (rsComm->reconnSock);
01871 rsComm->reconnSock = -1;
01872 rsComm->reconnPort = 0;
01873 return;
01874 } else {
01875 continue;
01876 }
01877 }
01878 if ((status = readReconMsg (newSock, &reconnMsg)) < 0) {
01879 rodsLog (LOG_ERROR,
01880 "reconnManager: readReconMsg error, status = %d", status);
01881 close (newSock);
01882 continue;
01883 } else if (reconnMsg->cookie != rsComm->cookie) {
01884 rodsLog (LOG_ERROR,
01885 "reconnManager: cookie mistach, got = %d vs %d",
01886 reconnMsg->cookie, rsComm->cookie);
01887 close (newSock);
01888 free (reconnMsg);
01889 continue;
01890 }
01891
01892 #ifdef USE_BOOST
01893 boost::unique_lock<boost::mutex> boost_lock( *rsComm->lock );
01894 #else
01895 pthread_mutex_lock (&rsComm->lock);
01896 #endif
01897 rsComm->clientState = reconnMsg->procState;
01898 rsComm->reconnectedSock = newSock;
01899
01900 while (rsComm->agentState == SENDING_STATE) {
01901
01902 rsComm->reconnThrState = CONN_WAIT_STATE;
01903 #ifdef USE_BOOST
01904 rsComm->cond->wait( boost_lock );
01905 #else
01906 pthread_cond_wait (&rsComm->cond, &rsComm->lock);
01907 #endif
01908 }
01909
01910 rsComm->reconnThrState = PROCESSING_STATE;
01911 bzero (reconnMsg, sizeof (procState_t));
01912 reconnMsg->procState = rsComm->agentState;
01913 status = sendReconnMsg (newSock, reconnMsg);
01914 free (reconnMsg);
01915 if (status < 0) {
01916 rodsLog (LOG_ERROR,
01917 "reconnManager: sendReconnMsg error. status = %d",
01918 status);
01919 close (newSock);
01920 rsComm->reconnectedSock = 0;
01921 #ifdef USE_BOOST
01922 boost_lock.unlock();
01923 #else
01924 pthread_mutex_unlock (&rsComm->lock);
01925 #endif
01926 continue;
01927 }
01928 if (rsComm->agentState == PROCESSING_STATE) {
01929 rodsLog (LOG_NOTICE,
01930 "reconnManager: svrSwitchConnect. cliState = %d,agState=%d",
01931 rsComm->clientState, rsComm->agentState);
01932 svrSwitchConnect (rsComm);
01933 }
01934 #ifdef USE_BOOST
01935 boost_lock.unlock();
01936 #else
01937 pthread_mutex_unlock (&rsComm->lock);
01938 #endif
01939 }
01940 }
01941
01942 int
01943 svrChkReconnAtSendStart (rsComm_t *rsComm)
01944 {
01945 if (rsComm->reconnSock > 0) {
01946
01947 #ifdef USE_BOOST
01948 boost::unique_lock<boost::mutex> boost_lock( *rsComm->lock );
01949 #else
01950 pthread_mutex_lock (&rsComm->lock);
01951 #endif
01952 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
01953
01954 rodsLog (LOG_NOTICE,
01955 "svrChkReconnAtSendStart: ThrState = CONN_WAIT_STATE, agentState=%d",
01956 rsComm->agentState);
01957 rsComm->agentState = PROCESSING_STATE;
01958 #ifdef USE_BOOST
01959 rsComm->cond->notify_all();
01960 #else
01961 pthread_cond_signal (&rsComm->cond);
01962 #endif
01963 }
01964 svrSwitchConnect (rsComm);
01965 rsComm->agentState = SENDING_STATE;
01966 #ifdef USE_BOOST
01967 boost_lock.unlock();
01968 #else
01969 pthread_mutex_unlock (&rsComm->lock);
01970 #endif
01971 }
01972 return 0;
01973 }
01974
01975 int
01976 svrChkReconnAtSendEnd (rsComm_t *rsComm)
01977 {
01978 if (rsComm->reconnSock > 0) {
01979
01980 #ifdef USE_BOOST
01981 boost::unique_lock<boost::mutex> boost_lock( *rsComm->lock );
01982 #else
01983 pthread_mutex_lock (&rsComm->lock);
01984 #endif
01985 rsComm->agentState = PROCESSING_STATE;
01986 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
01987
01988 #ifdef USE_BOOST
01989 rsComm->cond->wait( boost_lock );
01990 #else
01991 pthread_cond_signal (&rsComm->cond);
01992 #endif
01993 }
01994 #ifdef USE_BOOST
01995 boost_lock.unlock();
01996 #else
01997 pthread_mutex_unlock (&rsComm->lock);
01998 #endif
01999 }
02000 return 0;
02001 }
02002
02003 int
02004 svrChkReconnAtReadStart (rsComm_t *rsComm)
02005 {
02006 if (rsComm->reconnSock > 0) {
02007
02008 #ifdef USE_BOOST
02009 boost::unique_lock< boost::mutex > boost_lock( *rsComm->lock );
02010 #else
02011 pthread_mutex_lock (&rsComm->lock);
02012 #endif
02013 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
02014
02015 rodsLog (LOG_NOTICE,
02016 "svrChkReconnAtReadStart: ThrState = CONN_WAIT_STATE, agentState=%d",
02017 rsComm->agentState);
02018 rsComm->agentState = PROCESSING_STATE;
02019 #ifdef USE_BOOST
02020 rsComm->cond->wait( boost_lock );
02021 #else
02022 pthread_cond_signal (&rsComm->cond);
02023 #endif
02024 }
02025 svrSwitchConnect (rsComm);
02026 rsComm->agentState = RECEIVING_STATE;
02027 #ifdef USE_BOOST
02028 boost_lock.unlock();
02029 #else
02030 pthread_mutex_unlock (&rsComm->lock);
02031 #endif
02032 }
02033 return 0;
02034 }
02035
02036 int
02037 svrChkReconnAtReadEnd (rsComm_t *rsComm)
02038 {
02039 if (rsComm->reconnSock > 0) {
02040
02041 #ifdef USE_BOOST
02042 boost::unique_lock< boost::mutex > boost_lock( *rsComm->lock );
02043 rsComm->agentState = PROCESSING_STATE;
02044 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
02045 rsComm->cond->notify_all();
02046 }
02047 boost_lock.unlock();
02048 #else
02049 pthread_mutex_lock (&rsComm->lock);
02050 rsComm->agentState = PROCESSING_STATE;
02051 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
02052 pthread_cond_signal (&rsComm->cond);
02053 }
02054 pthread_mutex_unlock (&rsComm->lock);
02055 #endif
02056 }
02057 return 0;
02058 }
02059
02060 #endif
02061
02062 int
02063 svrSockOpenForInConn (rsComm_t *rsComm, int *portNum, char **addr, int proto)
02064 {
02065 int status;
02066
02067 status = sockOpenForInConn (rsComm, portNum, addr, proto);
02068 if (status < 0) return status;
02069
02070 if (addr != NULL && *addr != NULL &&
02071 (strcmp (*addr, "127.0.0.1") == 0 || strcmp (*addr, "0.0.0.0") == 0 ||
02072 strcmp (*addr, "localhost") == 0)) {
02073
02074 char *myaddr;
02075
02076 myaddr = getLocalSvrAddr ();
02077 if (myaddr != NULL) {
02078 free (*addr);
02079 *addr = strdup (myaddr);
02080 } else {
02081 rodsLog (LOG_NOTICE,
02082 "svrSockOpenForInConn: problem resolving local host addr %s",
02083 *addr);
02084 }
02085 }
02086 return status;
02087 }
02088
02089 char *
02090 getLocalSvrAddr ()
02091 {
02092 char *myHost;
02093 myHost = _getSvrAddr (LocalServerHost);
02094 return myHost;
02095 }
02096
02097 char *
02098 _getSvrAddr (rodsServerHost_t *rodsServerHost)
02099 {
02100 hostName_t *tmpHostName;
02101
02102 if (rodsServerHost == NULL) return NULL;
02103
02104 tmpHostName = rodsServerHost->hostName;
02105 while (tmpHostName != NULL) {
02106 if (strcmp (tmpHostName->name, "localhost") != 0 &&
02107 strcmp (tmpHostName->name, "127.0.0.1") != 0 &&
02108 strcmp (tmpHostName->name, "0.0.0.0") != 0 &&
02109 strchr (tmpHostName->name, '.') != NULL) {
02110 return (tmpHostName->name);
02111 }
02112 tmpHostName = tmpHostName->next;
02113 }
02114 return (NULL);
02115 }
02116
02117 char *
02118 getSvrAddr (rodsServerHost_t *rodsServerHost)
02119 {
02120 char *myHost;
02121
02122 myHost = _getSvrAddr (rodsServerHost);
02123 if (myHost == NULL) {
02124
02125 myHost = rodsServerHost->hostName->name;
02126 }
02127 return myHost;
02128 }
02129
02130 int
02131 setLocalSrvAddr (char *outLocalAddr)
02132 {
02133 char *myHost;
02134
02135 if (outLocalAddr == NULL) return USER__NULL_INPUT_ERR;
02136
02137 myHost = getSvrAddr (LocalServerHost);
02138
02139 if (myHost != NULL) {
02140 rstrcpy (outLocalAddr, myHost, NAME_LEN);
02141 return 0;
02142 } else {
02143 return SYS_INVALID_SERVER_HOST;
02144 }
02145 }
02146
02147 int
02148 forkAndExec (char *av[])
02149 {
02150 int childPid = 0;
02151 int status = -1;
02152 int childStatus = 0;
02153
02154
02155 #ifndef windows_platform
02156 childPid = RODS_FORK ();
02157
02158 if (childPid == 0) {
02159
02160 execv(av[0], av);
02161
02162 exit(1);
02163 } else if (childPid < 0) {
02164 rodsLog (LOG_ERROR,
02165 "exectar: RODS_FORK failed. errno = %d", errno);
02166 return (SYS_FORK_ERROR);
02167 }
02168
02169
02170
02171 status = waitpid (childPid, &childStatus, 0);
02172 if (status >= 0 && childStatus != 0) {
02173 rodsLog (LOG_ERROR,
02174 "forkAndExec: waitpid status = %d, childStatus = %d",
02175 status, childStatus);
02176 status = EXEC_CMD_ERROR;
02177 }
02178 #else
02179 rodsLog (LOG_ERROR,
02180 "forkAndExec: fork and exec not supported");
02181
02182 status = SYS_NOT_SUPPORTED;
02183 #endif
02184 return status;
02185 }
02186
02187 int
02188 singleRemLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
02189 {
02190 dataOprInp_t *dataOprInp;
02191 int status = 0;
02192 int oprType;
02193
02194 if (dataCopyInp == NULL) {
02195 rodsLog (LOG_NOTICE,
02196 "remLocCopy: NULL dataCopyInp input");
02197 return (SYS_INTERNAL_NULL_INPUT_ERR);
02198 }
02199 dataOprInp = &dataCopyInp->dataOprInp;
02200 oprType = dataOprInp->oprType;
02201
02202 if (oprType == COPY_TO_LOCAL_OPR) {
02203 status = singleRemToLocCopy (rsComm, dataCopyInp);
02204 } else {
02205 status = singleLocToRemCopy (rsComm, dataCopyInp);
02206 }
02207 return status;
02208 }
02209
02210 int
02211 singleRemToLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
02212 {
02213 dataOprInp_t *dataOprInp;
02214 rodsLong_t dataSize;
02215 int l1descInx;
02216 int destL3descInx;
02217 int destRescTypeInx;
02218 bytesBuf_t dataObjReadInpBBuf;
02219 openedDataObjInp_t dataObjReadInp;
02220 int bytesWritten, bytesRead;
02221 rodsLong_t totalWritten = 0;
02222
02223
02224 if (dataCopyInp == NULL) {
02225 rodsLog (LOG_NOTICE,
02226 "singleRemToLocCopy: NULL dataCopyInp input");
02227 return (SYS_INTERNAL_NULL_INPUT_ERR);
02228 }
02229 dataOprInp = &dataCopyInp->dataOprInp;
02230 l1descInx = dataCopyInp->portalOprOut.l1descInx;
02231 destL3descInx = dataOprInp->destL3descInx;
02232 destRescTypeInx = dataOprInp->destRescTypeInx;
02233 dataSize = dataOprInp->dataSize;
02234
02235 bzero (&dataObjReadInp, sizeof (dataObjReadInp));
02236 dataObjReadInpBBuf.buf = malloc (TRANS_BUF_SZ);
02237 dataObjReadInpBBuf.len = dataObjReadInp.len = TRANS_BUF_SZ;
02238 dataObjReadInp.l1descInx = l1descInx;
02239 while ((bytesRead = rsDataObjRead (rsComm, &dataObjReadInp,
02240 &dataObjReadInpBBuf)) > 0) {
02241 bytesWritten = _l3Write (rsComm, destRescTypeInx,
02242 destL3descInx, dataObjReadInpBBuf.buf, bytesRead);
02243
02244 if (bytesWritten != bytesRead) {
02245 rodsLog (LOG_ERROR,
02246 "singleRemToLocCopy: Read %d bytes, Wrote %d bytes.\n ",
02247 bytesRead, bytesWritten);
02248 free (dataObjReadInpBBuf.buf);
02249 return (SYS_COPY_LEN_ERR);
02250 } else {
02251 totalWritten += bytesWritten;
02252 }
02253 }
02254 free (dataObjReadInpBBuf.buf);
02255 if (dataSize <= 0 || totalWritten == dataSize ||
02256 getValByKey (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW) != NULL) {
02257 return (0);
02258 } else {
02259 rodsLog (LOG_ERROR,
02260 "singleRemToLocCopy: totalWritten %lld dataSize %lld mismatch",
02261 totalWritten, dataSize);
02262 return (SYS_COPY_LEN_ERR);
02263 }
02264 }
02265
02266 int
02267 singleLocToRemCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
02268 {
02269 dataOprInp_t *dataOprInp;
02270 rodsLong_t dataSize;
02271 int l1descInx;
02272 int srcL3descInx;
02273 int srcRescTypeInx;
02274 bytesBuf_t dataObjWriteInpBBuf;
02275 openedDataObjInp_t dataObjWriteInp;
02276 int bytesWritten, bytesRead;
02277 rodsLong_t totalWritten = 0;
02278
02279
02280 if (dataCopyInp == NULL) {
02281 rodsLog (LOG_NOTICE,
02282 "singleRemToLocCopy: NULL dataCopyInp input");
02283 return (SYS_INTERNAL_NULL_INPUT_ERR);
02284 }
02285 dataOprInp = &dataCopyInp->dataOprInp;
02286 l1descInx = dataCopyInp->portalOprOut.l1descInx;
02287 srcL3descInx = dataOprInp->srcL3descInx;
02288 srcRescTypeInx = dataOprInp->srcRescTypeInx;
02289 dataSize = dataOprInp->dataSize;
02290
02291 bzero (&dataObjWriteInp, sizeof (dataObjWriteInp));
02292 dataObjWriteInpBBuf.buf = malloc (TRANS_BUF_SZ);
02293 dataObjWriteInpBBuf.len = 0;
02294 dataObjWriteInp.l1descInx = l1descInx;
02295
02296 while ((bytesRead = _l3Read (rsComm, srcRescTypeInx,
02297 srcL3descInx, dataObjWriteInpBBuf.buf, TRANS_BUF_SZ)) > 0) {
02298 dataObjWriteInp.len = dataObjWriteInpBBuf.len = bytesRead;
02299 bytesWritten = rsDataObjWrite (rsComm, &dataObjWriteInp,
02300 &dataObjWriteInpBBuf);
02301 if (bytesWritten != bytesRead) {
02302 rodsLog (LOG_ERROR,
02303 "singleLocToRemCopy: Read %d bytes, Wrote %d bytes.\n ",
02304 bytesRead, bytesWritten);
02305 free (dataObjWriteInpBBuf.buf);
02306 return (SYS_COPY_LEN_ERR);
02307 } else {
02308 totalWritten += bytesWritten;
02309 }
02310 }
02311 free (dataObjWriteInpBBuf.buf);
02312 if (dataSize <= 0 || totalWritten == dataSize ||
02313 getValByKey (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW) != NULL) {
02314 return (0);
02315 } else {
02316 rodsLog (LOG_ERROR,
02317 "singleLocToRemCopy: totalWritten %lld dataSize %lld mismatch",
02318 totalWritten, dataSize);
02319 return (SYS_COPY_LEN_ERR);
02320 }
02321 }
02322
02323
02324
02325
02326
02327 int
02328 readStartupPack (int sock, startupPack_t **startupPack, struct timeval *tv)
02329 {
02330 int status;
02331 msgHeader_t myHeader;
02332 bytesBuf_t inputStructBBuf, bsBBuf, errorBBuf;
02333
02334 status = readMsgHeader (sock, &myHeader, tv);
02335
02336 if (status < 0) {
02337 rodsLogError (LOG_NOTICE, status,
02338 "readStartupPack: readMsgHeader error. status = %d", status);
02339 return (status);
02340 }
02341
02342 if (myHeader.msgLen > (int) sizeof (startupPack_t) * 2 ||
02343 myHeader.msgLen <= 0) {
02344 rodsLog (LOG_NOTICE,
02345 "readStartupPack: problem with myHeader.msgLen = %d",
02346 myHeader.msgLen);
02347 return (SYS_HEADER_READ_LEN_ERR);
02348 }
02349
02350 memset (&bsBBuf, 0, sizeof (bytesBuf_t));
02351 status = readMsgBody (sock, &myHeader, &inputStructBBuf, &bsBBuf,
02352 &errorBBuf, XML_PROT, tv);
02353 if (status < 0) {
02354 rodsLogError (LOG_NOTICE, status,
02355 "readStartupPack: readMsgBody error. status = %d", status);
02356 return (status);
02357 }
02358
02359
02360
02361 if (strcmp (myHeader.type, RODS_CONNECT_T) != 0) {
02362 if (inputStructBBuf.buf != NULL)
02363 free (inputStructBBuf.buf);
02364 if (bsBBuf.buf != NULL)
02365 free (inputStructBBuf.buf);
02366 if (errorBBuf.buf != NULL)
02367 free (inputStructBBuf.buf);
02368 rodsLog (LOG_NOTICE,
02369 "readStartupPack: wrong mag type - %s, expect %s",
02370 myHeader.type, RODS_CONNECT_T);
02371 return (SYS_HEADER_TPYE_LEN_ERR);
02372 }
02373
02374 if (myHeader.bsLen != 0) {
02375 if (bsBBuf.buf != NULL)
02376 free (inputStructBBuf.buf);
02377 rodsLog (LOG_NOTICE, "readStartupPack: myHeader.bsLen = %d is not 0",
02378 myHeader.bsLen);
02379 }
02380
02381 if (myHeader.errorLen != 0) {
02382 if (errorBBuf.buf != NULL)
02383 free (inputStructBBuf.buf);
02384 rodsLog (LOG_NOTICE,
02385 "readStartupPack: myHeader.errorLen = %d is not 0",
02386 myHeader.errorLen);
02387 }
02388
02389
02390 status = unpackStruct (inputStructBBuf.buf, (void **) startupPack,
02391 "StartupPack_PI", RodsPackTable, XML_PROT);
02392
02393 clearBBuf (&inputStructBBuf);
02394
02395 if (status >= 0) {
02396 if ((*startupPack)->clientUser[0] != '\0' &&
02397 (*startupPack)->clientRodsZone[0] == '\0') {
02398 char *zoneName;
02399
02400 if ((zoneName = getLocalZoneName ()) != NULL) {
02401 rstrcpy ((*startupPack)->clientRodsZone, zoneName, NAME_LEN);
02402 }
02403 }
02404 if ((*startupPack)->proxyUser[0] != '\0' &&
02405 (*startupPack)->proxyRodsZone[0] == '\0') {
02406 char *zoneName;
02407
02408 if ((zoneName = getLocalZoneName ()) != NULL) {
02409 rstrcpy ((*startupPack)->proxyRodsZone, zoneName, NAME_LEN);
02410 }
02411 }
02412 } else {
02413 rodsLogError (LOG_NOTICE, status,
02414 "readStartupPack:unpackStruct error. status = %d",
02415 status);
02416 }
02417
02418 return (status);
02419 }
02420
02421
02422 #ifdef RUN_SERVER_AS_ROOT
02423
02424
02425
02426
02427
02428
02429 int
02430 initServiceUser()
02431 {
02432 #ifndef windows_platform
02433 char *serviceUser;
02434 struct passwd *pwent;
02435
02436 serviceUser = getenv("irodsServiceUser");
02437 if (serviceUser == NULL || getuid() != 0) {
02438
02439
02440 return (0);
02441 }
02442
02443
02444
02445 errno = 0;
02446 pwent = getpwnam(serviceUser);
02447 if (pwent) {
02448 ServiceUid = pwent->pw_uid;
02449 return (changeToServiceUser());
02450 }
02451
02452 if (errno) {
02453 rodsLogError(LOG_ERROR, SYS_USER_RETRIEVE_ERR,
02454 "setServiceUser: error in getpwnam %s, errno = %d",
02455 serviceUser, errno);
02456 return (SYS_USER_RETRIEVE_ERR - errno);
02457 }
02458 else {
02459 rodsLogError(LOG_ERROR, SYS_USER_RETRIEVE_ERR,
02460 "setServiceUser: user %s doesn't exist", serviceUser);
02461 return (SYS_USER_RETRIEVE_ERR);
02462 }
02463 #else
02464 return (0);
02465 #endif
02466 }
02467
02468
02469
02470 int
02471 isServiceUserSet()
02472 {
02473 if (ServiceUid) {
02474 return 1;
02475 }
02476 else {
02477 return 0;
02478 }
02479 }
02480
02481
02482
02483
02484 int
02485 changeToRootUser()
02486 {
02487 int prev_errno, my_errno;
02488
02489 if (!isServiceUserSet()) {
02490
02491 return (0);
02492 }
02493
02494 #ifndef windows_platform
02495
02496
02497
02498 prev_errno = errno;
02499 if (seteuid(0) == -1) {
02500 my_errno = errno;
02501 errno = prev_errno;
02502 rodsLogError(LOG_ERROR, SYS_USER_NO_PERMISSION - my_errno,
02503 "changeToRootUser: can't change to root user id");
02504 return (SYS_USER_NO_PERMISSION - my_errno);
02505 }
02506 #endif
02507
02508 return (0);
02509 }
02510
02511
02512
02513
02514
02515 int
02516 changeToServiceUser()
02517 {
02518 int prev_errno, my_errno;
02519
02520 if (!isServiceUserSet()) {
02521
02522 return (0);
02523 }
02524
02525 #ifndef windows_platform
02526 prev_errno = errno;
02527 if (seteuid(ServiceUid) == -1) {
02528 my_errno = errno;
02529 errno = prev_errno;
02530 rodsLogError(LOG_ERROR, SYS_USER_NO_PERMISSION - my_errno,
02531 "changeToServiceUser: can't change to service user id");
02532 return (SYS_USER_NO_PERMISSION - my_errno);
02533 }
02534 #endif
02535
02536 return (0);
02537 }
02538
02539 #endif
02540
02541