00001
00002
00003
00004
00005
00006
00007
00008 #ifndef windows_platform
00009 #include <sys/wait.h>
00010 #endif
00011
00012
00013 #include "miscServerFunct.h"
00014 #include "dataObjOpen.h"
00015 #include "dataObjLseek.h"
00016 #include "dataObjOpr.h"
00017 #include "dataObjClose.h"
00018 #include "dataObjWrite.h"
00019 #include "dataObjRead.h"
00020 #include "rcPortalOpr.h"
00021 #include "initServer.h"
00022 #ifdef PARA_OPR
00023 #ifdef USE_BOOST
00024 #include <boost/thread/thread.hpp>
00025 #else
00026 #include <pthread.h>
00027 #endif
00028 #endif
00029 #if !defined(solaris_platform)
00030 char *__loc1;
00031 #endif
00032 #include "rsGlobalExtern.h"
00033 #include "rcGlobalExtern.h"
00034
00035 int
00036 svrToSvrConnectNoLogin (rsComm_t *rsComm, rodsServerHost_t *rodsServerHost)
00037 {
00038 rErrMsg_t errMsg;
00039 int reconnFlag;
00040
00041
00042 if (rodsServerHost->conn == NULL) {
00043 if (getenv (RECONNECT_ENV) != NULL) {
00044 reconnFlag = RECONN_TIMEOUT;
00045 } else {
00046 reconnFlag = NO_RECONN;
00047 }
00048 rodsServerHost->conn = _rcConnect (rodsServerHost->hostName->name,
00049 ((zoneInfo_t *) rodsServerHost->zoneInfo)->portNum,
00050 rsComm->myEnv.rodsUserName, rsComm->myEnv.rodsZone,
00051 rsComm->clientUser.userName, rsComm->clientUser.rodsZone, &errMsg,
00052 rsComm->connectCnt, reconnFlag);
00053
00054 if (rodsServerHost->conn == NULL) {
00055 if (errMsg.status < 0) {
00056 return (errMsg.status);
00057 } else {
00058 return (SYS_SVR_TO_SVR_CONNECT_FAILED - errno);
00059 }
00060 }
00061 }
00062
00063 return (rodsServerHost->localFlag);
00064 }
00065
00066 int
00067 svrToSvrConnect (rsComm_t *rsComm, rodsServerHost_t *rodsServerHost)
00068 {
00069 int status;
00070
00071 status = svrToSvrConnectNoLogin (rsComm, rodsServerHost);
00072
00073 if (status < 0) {
00074 return status;
00075 }
00076
00077 status = clientLogin (rodsServerHost->conn);
00078 if (status < 0) {
00079 rodsLog (LOG_NOTICE,
00080 "svrToSvrConnect: clientLogin to %s failed",
00081 rodsServerHost->hostName->name);
00082 return (status);
00083 } else {
00084 return (rodsServerHost->localFlag);
00085 }
00086 }
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098 int
00099 setupSrvPortalForParaOpr (rsComm_t *rsComm, dataOprInp_t *dataOprInp,
00100 int oprType, portalOprOut_t **portalOprOut)
00101 {
00102 portalOprOut_t *myDataObjPutOut;
00103 int portalSock;
00104 int proto;
00105
00106 #ifdef RBUDP_TRANSFER
00107 if (getValByKey (&dataOprInp->condInput, RBUDP_TRANSFER_KW) != NULL) {
00108 proto = SOCK_DGRAM;
00109 } else {
00110 proto = SOCK_STREAM;
00111 }
00112 #else
00113 proto = SOCK_STREAM;
00114 #endif
00115
00116 myDataObjPutOut = (portalOprOut_t *) malloc (sizeof (portalOprOut_t));
00117 memset (myDataObjPutOut, 0, sizeof (portalOprOut_t));
00118
00119 *portalOprOut = myDataObjPutOut;
00120
00121 if (getValByKey (&dataOprInp->condInput, STREAMING_KW) != NULL ||
00122 proto == SOCK_DGRAM) {
00123
00124 myDataObjPutOut->numThreads = 1;
00125 } else {
00126 myDataObjPutOut->numThreads = getNumThreads (rsComm,
00127 dataOprInp->dataSize, dataOprInp->numThreads,
00128 &dataOprInp->condInput,
00129 getValByKey (&dataOprInp->condInput, RESC_NAME_KW), NULL);
00130 }
00131
00132 if (myDataObjPutOut->numThreads == 0) {
00133 return 0;
00134 } else {
00135 portalOpr_t *myPortalOpr;
00136
00137
00138 portalSock = createSrvPortal (rsComm, &myDataObjPutOut->portList,
00139 proto);
00140 if (portalSock < 0) {
00141 rodsLog (LOG_NOTICE,
00142 "setupSrvPortalForParaOpr: createSrvPortal error, ststus = %d",
00143 portalSock);
00144 myDataObjPutOut->status = portalSock;
00145 return portalSock;
00146 }
00147 myPortalOpr = rsComm->portalOpr =
00148 (portalOpr_t *) malloc (sizeof (portalOpr_t));
00149 myPortalOpr->oprType = oprType;
00150 myPortalOpr->portList = myDataObjPutOut->portList;
00151 myPortalOpr->dataOprInp = *dataOprInp;
00152 memset (&dataOprInp->condInput, 0, sizeof (dataOprInp->condInput));
00153 myPortalOpr->dataOprInp.numThreads = myDataObjPutOut->numThreads;
00154 }
00155 return (0);
00156 }
00157
00158
00159
00160
00161
00162
00163 int
00164 createSrvPortal (rsComm_t *rsComm, portList_t *thisPortList, int proto)
00165 {
00166 int lsock = -1;
00167 int lport = 0;
00168 char *laddr = NULL;
00169 int udpsock = -1;
00170 int udpport = 0;
00171 char *udpaddr = NULL;
00172
00173
00174 if (proto != SOCK_DGRAM && proto != SOCK_STREAM) {
00175 rodsLog (LOG_ERROR,
00176 "createSrvPortal: invalid input protocol %d", proto);
00177 return SYS_INVALID_PROTOCOL_TYPE;
00178 }
00179
00180 if ((lsock = svrSockOpenForInConn (rsComm, &lport, &laddr,
00181 SOCK_STREAM)) < 0) {
00182 rodsLog (LOG_ERROR,
00183 "createSrvPortal: svrSockOpenForInConn failed: status=%d",
00184 lsock);
00185 return lsock;
00186 }
00187
00188 thisPortList->sock = lsock;
00189 thisPortList->cookie = random ();
00190 if (ProcessType == CLIENT_PT) {
00191 rstrcpy (thisPortList->hostAddr, laddr, LONG_NAME_LEN);
00192 } else {
00193 struct hostent *hostEnt;
00194
00195 if (LocalServerHost != NULL &&
00196 strcmp (LocalServerHost->hostName->name, "localhost") != 0 &&
00197 (hostEnt = gethostbyname (LocalServerHost->hostName->name)) != NULL){
00198 rstrcpy (thisPortList->hostAddr, hostEnt->h_name, LONG_NAME_LEN);
00199 } else {
00200 rstrcpy (thisPortList->hostAddr, laddr, LONG_NAME_LEN);
00201 }
00202 }
00203 free (laddr);
00204 thisPortList->portNum = lport;
00205 thisPortList->windowSize = rsComm->windowSize;
00206
00207 listen (lsock, SOMAXCONN);
00208
00209 if (proto == SOCK_DGRAM) {
00210 if ((udpsock = svrSockOpenForInConn (rsComm, &udpport, &udpaddr,
00211 SOCK_DGRAM)) < 0) {
00212 rodsLog (LOG_ERROR,
00213 "setupSrvPortal- sockOpenForInConn of SOCK_DGRAM failed: stat=%d",
00214 udpsock);
00215 CLOSE_SOCK (lsock);
00216 return udpsock;
00217 } else {
00218 addUdpPortToPortList (thisPortList, udpport);
00219 addUdpSockToPortList (thisPortList, udpsock);
00220 }
00221 }
00222 free (udpaddr);
00223
00224 return (lsock);
00225 }
00226
00227 int
00228 acceptSrvPortal (rsComm_t *rsComm, portList_t *thisPortList)
00229 {
00230 int myFd = -1;
00231 int myCookie;
00232 int nbytes;
00233 fd_set basemask;
00234 int nSockets, nSelected;
00235 int lsock = getTcpSockFromPortList (thisPortList);
00236 struct timeval selectTimeout;
00237
00238 nSockets = lsock + 1;
00239 FD_ZERO(&basemask);
00240 FD_SET(lsock, &basemask);
00241
00242
00243
00244 selectTimeout.tv_sec = SELECT_TIMEOUT_FOR_CONN;
00245 selectTimeout.tv_usec = 0;
00246
00247 while ((nSelected = select(nSockets, &basemask,
00248 (fd_set *) NULL, (fd_set *) NULL, &selectTimeout)) < 0) {
00249 if (errno == EINTR) {
00250 rodsLog (LOG_ERROR, "acceptSrvPortal: select interrupted\n");
00251 continue;
00252 }
00253 rodsLog (LOG_ERROR, "acceptSrvPortal: select select failed, errno = %d",
00254 errno);
00255 }
00256 myFd = accept (lsock, 0, 0);
00257 if (myFd < 0) {
00258 rodsLog (LOG_NOTICE,
00259 "acceptSrvPortal() -- accept() failed: errno=%d",
00260 errno);
00261 return SYS_SOCK_ACCEPT_ERR - errno;
00262 } else {
00263 rodsSetSockOpt (myFd, rsComm->windowSize);
00264 }
00265 #ifdef _WIN32
00266 nbytes = recv (myFd,&myCookie,sizeof(myCookie),0);
00267 #else
00268 nbytes = read (myFd, &myCookie,sizeof (myCookie));
00269 #endif
00270 myCookie = ntohl (myCookie);
00271 if (nbytes != sizeof (myCookie) || myCookie != thisPortList->cookie) {
00272 rodsLog (LOG_NOTICE,
00273 "acceptSrvPortal: cookie err, bytes read=%d,cookie=%d,inCookie=%d",
00274 nbytes, thisPortList->cookie, myCookie);
00275 CLOSE_SOCK (myFd);
00276 return SYS_PORT_COOKIE_ERR;
00277 }
00278 return (myFd);
00279 }
00280
00281 int
00282 svrPortalPutGet (rsComm_t *rsComm)
00283 {
00284 portalOpr_t *myPortalOpr;
00285 dataOprInp_t *dataOprInp;
00286 portList_t *thisPortList;
00287 rodsLong_t size0, size1, offset0;
00288 int lsock, portalFd;
00289 int i;
00290 int numThreads;
00291 portalTransferInp_t myInput[MAX_NUM_CONFIG_TRAN_THR];
00292 #ifdef PARA_OPR
00293 #ifdef USE_BOOST
00294 boost::thread* tid[MAX_NUM_CONFIG_TRAN_THR];
00295 #else
00296 pthread_t tid[MAX_NUM_CONFIG_TRAN_THR];
00297 #endif
00298 #endif
00299 int oprType;
00300 int flags = 0;
00301 int retVal = 0;
00302
00303 myPortalOpr = rsComm->portalOpr;
00304
00305 if (myPortalOpr == NULL) {
00306 rodsLog (LOG_NOTICE, "svrPortalPut: NULL myPortalOpr");
00307 return (SYS_INTERNAL_NULL_INPUT_ERR);
00308 }
00309
00310 thisPortList = &myPortalOpr->portList;
00311 if (thisPortList == NULL) {
00312 rodsLog (LOG_NOTICE, "svrPortalPut: NULL portList");
00313 return (SYS_INTERNAL_NULL_INPUT_ERR);
00314 }
00315
00316 if (getUdpPortFromPortList (thisPortList) != 0) {
00317 #ifdef RBUDP_TRANSFER
00318
00319 retVal = svrPortalPutGetRbudp (rsComm);
00320 return retVal;
00321 #else
00322 return SYS_UDP_NO_SUPPORT_ERR;
00323 #endif
00324 }
00325
00326 oprType = myPortalOpr->oprType;
00327 dataOprInp = &myPortalOpr->dataOprInp;
00328
00329 if (getValByKey (&dataOprInp->condInput, STREAMING_KW)!= NULL) {
00330 flags |= STREAMING_FLAG;
00331 }
00332
00333 numThreads = dataOprInp->numThreads;
00334
00335 if (numThreads <= 0 || numThreads > MAX_NUM_CONFIG_TRAN_THR) {
00336 rodsLog (LOG_NOTICE,
00337 "svrPortalPut: numThreads %d out of range");
00338 return (SYS_INTERNAL_NULL_INPUT_ERR);
00339 }
00340
00341 memset (myInput, 0, sizeof (myInput));
00342 #ifdef PARA_OPR
00343 memset (tid, 0, sizeof (tid));
00344 #endif
00345
00346 size0 = dataOprInp->dataSize / numThreads;
00347 size1 = dataOprInp->dataSize - size0 * (numThreads - 1);
00348 offset0 = dataOprInp->offset;
00349
00350 lsock = getTcpSockFromPortList (thisPortList);
00351
00352
00353 portalFd = acceptSrvPortal (rsComm, thisPortList);
00354 if (portalFd < 0) {
00355 rodsLog (LOG_NOTICE,
00356 "svrPortalPut: acceptSrvPortal error. errno = %d",
00357 errno);
00358
00359 CLOSE_SOCK (lsock);
00360
00361 return (portalFd);
00362 }
00363
00364 if (oprType == PUT_OPR) {
00365 fillPortalTransferInp (&myInput[0], rsComm,
00366 portalFd, dataOprInp->destL3descInx, 0, dataOprInp->destRescTypeInx,
00367 0, size0, offset0, flags);
00368 } else {
00369 fillPortalTransferInp (&myInput[0], rsComm,
00370 dataOprInp->srcL3descInx, portalFd, dataOprInp->srcRescTypeInx, 0,
00371 0, size0, offset0, flags);
00372 }
00373
00374 if (numThreads == 1) {
00375 if (oprType == PUT_OPR) {
00376 partialDataPut (&myInput[0]);
00377 } else {
00378 partialDataGet (&myInput[0]);
00379 }
00380 CLOSE_SOCK (lsock);
00381
00382 return (myInput[0].status);
00383 } else {
00384 #ifdef PARA_OPR
00385 rodsLong_t mySize = 0;
00386 rodsLong_t myOffset = 0;
00387
00388 for (i = 1; i < numThreads; i++) {
00389 int l3descInx;
00390
00391 portalFd = acceptSrvPortal (rsComm, thisPortList);
00392 if (portalFd < 0) {
00393 rodsLog (LOG_NOTICE,
00394 "svrPortalPut: acceptSrvPortal error. errno = %d",
00395 errno);
00396
00397 CLOSE_SOCK (lsock);
00398
00399 return (portalFd);
00400 }
00401 myOffset += size0;
00402 if (i < numThreads - 1) {
00403 mySize = size0;
00404 } else {
00405 mySize = size1;
00406 }
00407
00408 if (oprType == PUT_OPR) {
00409
00410 l3descInx = l3OpenByHost (rsComm, dataOprInp->destRescTypeInx,
00411 dataOprInp->destL3descInx, O_WRONLY);
00412 fillPortalTransferInp (&myInput[i], rsComm,
00413 portalFd, l3descInx, 0, dataOprInp->destRescTypeInx,
00414 i, mySize, myOffset, flags);
00415 #ifdef USE_BOOST
00416 tid[i] = new boost::thread( partialDataPut, &myInput[i] );
00417 #else
00418 pthread_create (&tid[i], pthread_attr_default,
00419 (void *(*)(void *)) partialDataPut, (void *) &myInput[i]);
00420 #endif
00421
00422 } else {
00423 l3descInx = l3OpenByHost (rsComm, dataOprInp->srcRescTypeInx,
00424 dataOprInp->srcL3descInx, O_RDONLY);
00425 fillPortalTransferInp (&myInput[i], rsComm,
00426 l3descInx, portalFd, dataOprInp->srcRescTypeInx, 0,
00427 i, mySize, myOffset, flags);
00428 #ifdef USE_BOOST
00429 tid[i] = new boost::thread( partialDataGet, &myInput[i] );
00430 #else
00431 pthread_create (&tid[i], pthread_attr_default,
00432 (void *(*)(void *)) partialDataGet, (void *) &myInput[i]);
00433 #endif
00434 }
00435 }
00436
00437
00438
00439 if (oprType == PUT_OPR) {
00440 #ifdef USE_BOOST
00441 tid[0] = new boost::thread( partialDataPut, &myInput[0] );
00442 #else
00443 pthread_create (&tid[0], pthread_attr_default,
00444 (void *(*)(void *)) partialDataPut, (void *) &myInput[0]);
00445 #endif
00446 } else {
00447 #ifdef USE_BOOST
00448 tid[0] = new boost::thread( partialDataGet, &myInput[0] );
00449 #else
00450 pthread_create (&tid[0], pthread_attr_default,
00451 (void *(*)(void *)) partialDataGet, (void *) &myInput[0]);
00452 #endif
00453 }
00454
00455 for ( i = 0; i < numThreads; i++) {
00456 if (tid[i] != 0)
00457 #ifdef USE_BOOST
00458 tid[i]->join();
00459 #else
00460 pthread_join (tid[i], NULL);
00461 #endif
00462 if (myInput[i].status < 0) {
00463 retVal = myInput[i].status;
00464 }
00465 }
00466 CLOSE_SOCK (lsock);
00467 return (retVal);
00468
00469 #else
00470 CLOSE_SOCK (lsock);
00471 return (SYS_PARA_OPR_NO_SUPPORT);
00472 #endif
00473 }
00474 }
00475
00476 int
00477 fillPortalTransferInp (portalTransferInp_t *myInput, rsComm_t *rsComm,
00478 int srcFd, int destFd, int srcRescTypeInx, int destRescTypeInx,
00479 int threadNum, rodsLong_t size, rodsLong_t offset, int flags)
00480 {
00481 if (myInput == NULL)
00482 return (SYS_INTERNAL_NULL_INPUT_ERR);
00483
00484 myInput->rsComm = rsComm;
00485 myInput->destFd = destFd;
00486 myInput->srcFd = srcFd;
00487 myInput->destRescTypeInx = destRescTypeInx;
00488 myInput->srcRescTypeInx = srcRescTypeInx;
00489
00490 myInput->threadNum = threadNum;
00491 myInput->size = size;
00492 myInput->offset = offset;
00493 myInput->flags = flags;
00494
00495 return (0);
00496 }
00497
00498
00499 void
00500 partialDataPut (portalTransferInp_t *myInput)
00501 {
00502 int destL3descInx, srcFd, destRescTypeInx;
00503 char *buf;
00504 int bytesWritten;
00505 rodsLong_t bytesToGet;
00506 rodsLong_t myOffset = 0;
00507
00508 #ifdef PARA_TIMING
00509 time_t startTime, afterSeek, afterTransfer,
00510 endTime;
00511 startTime=time (0);
00512 #endif
00513
00514 if (myInput == NULL) {
00515 rodsLog (LOG_SYS_FATAL, "partialDataPut: NULL myInput");
00516 return;
00517 }
00518
00519 myInput->status = 0;
00520 destL3descInx = myInput->destFd;
00521 srcFd = myInput->srcFd;
00522 destRescTypeInx = myInput->destRescTypeInx;
00523
00524 if (myInput->offset != 0) {
00525 myOffset = _l3Lseek (myInput->rsComm, destRescTypeInx,
00526 destL3descInx, myInput->offset, SEEK_SET);
00527 if (myOffset < 0) {
00528 myInput->status = myOffset;
00529 rodsLog (LOG_NOTICE,
00530 "_partialDataPut: _objSeek error, status = %d ",
00531 myInput->status);
00532 if (myInput->threadNum > 0)
00533 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00534 CLOSE_SOCK (srcFd);
00535 return;
00536 }
00537 }
00538 buf = (char*)malloc (TRANS_BUF_SZ);
00539
00540 #ifdef PARA_TIMING
00541 afterSeek=time(0);
00542 #endif
00543
00544 bytesToGet = myInput->size;
00545
00546 while (bytesToGet > 0) {
00547 int toread0;
00548 int bytesRead;
00549
00550 #ifdef PARA_TIMING
00551 time_t tstart, tafterRead, tafterWrite;
00552 tstart=time(0);
00553 #endif
00554 if (myInput->flags & STREAMING_FLAG) {
00555 toread0 = bytesToGet;
00556 } else if (bytesToGet > TRANS_SZ) {
00557 toread0 = TRANS_SZ;
00558 } else {
00559 toread0 = bytesToGet;
00560 }
00561
00562 myInput->status = sendTranHeader (srcFd, PUT_OPR, myInput->flags,
00563 myOffset, toread0);
00564
00565 if (myInput->status < 0) {
00566 rodsLog (LOG_NOTICE,
00567 "partialDataPut: sendTranHeader error. status = %d",
00568 myInput->status);
00569 if (myInput->threadNum > 0)
00570 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00571 CLOSE_SOCK (srcFd);
00572 free (buf);
00573 return;
00574 }
00575
00576 while (toread0 > 0) {
00577 int toread1;
00578
00579 if (toread0 > TRANS_BUF_SZ) {
00580 toread1 = TRANS_BUF_SZ;
00581 } else {
00582 toread1 = toread0;
00583 }
00584 bytesRead = myRead (srcFd, buf, toread1, SOCK_TYPE, NULL, NULL);
00585
00586 #ifdef PARA_TIMING
00587 tafterRead=time(0);
00588 #endif
00589 if (bytesRead == toread1) {
00590 if ((bytesWritten = _l3Write (myInput->rsComm, destRescTypeInx,
00591 destL3descInx, buf, bytesRead)) != bytesRead) {
00592 rodsLog (LOG_NOTICE,
00593 "_partialDataPut:Bytes written %d don't match read %d",
00594 bytesWritten, bytesRead);
00595
00596 if (bytesWritten < 0) {
00597 myInput->status = bytesWritten;
00598 } else {
00599 myInput->status = SYS_COPY_LEN_ERR;
00600 }
00601 break;
00602 }
00603 bytesToGet -= bytesWritten;
00604 toread0 -= bytesWritten;
00605 myOffset += bytesWritten;
00606 } else if (bytesRead < 0) {
00607 myInput->status = bytesRead;
00608 break;
00609 } else {
00610 rodsLog (LOG_NOTICE,
00611 "_partialDataPut: toread %d bytes, %d bytes read, errno = %d",
00612 toread1, bytesRead, errno);
00613 myInput->status = SYS_COPY_LEN_ERR;
00614 break;
00615 }
00616 #ifdef PARA_TIMING
00617 tafterWrite=time(0);
00618 rodsLog (LOG_NOTICE,
00619 "Thr %d: sz=%d netReadTm=%d diskWriteTm=%d",
00620 myInput->threadNum, bytesWritten, tafterRead-tstart,
00621 tafterWrite-tafterRead);
00622 #endif
00623 }
00624 if (myInput->status < 0)
00625 break;
00626 }
00627 #ifdef PARA_TIMING
00628 afterTransfer=time(0);
00629 #endif
00630 free (buf);
00631 sendTranHeader (srcFd, DONE_OPR, 0, 0, 0);
00632 if (myInput->threadNum > 0)
00633 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00634 mySockClose (srcFd);
00635 #ifdef PARA_TIMING
00636 endTime=time(0);
00637 rodsLog (LOG_NOTICE,
00638 "Thr %d: seekTm=%d transTm=%d endTm=%d",
00639 myInput->threadInx,
00640 afterSeek-afterConn, afterTransfer-afterSeek, endTime-afterTransfer);
00641 #endif
00642 return;
00643 }
00644
00645 void
00646 partialDataGet (portalTransferInp_t *myInput)
00647 {
00648 int srcL3descInx, destFd, srcRescTypeInx;
00649 char *buf;
00650 int bytesWritten;
00651 rodsLong_t bytesToGet;
00652 rodsLong_t myOffset = 0;
00653
00654 #ifdef PARA_TIMING
00655 time_t startTime, afterSeek, afterTransfer,
00656 endTime;
00657 startTime=time (0);
00658 #endif
00659
00660 if (myInput == NULL) {
00661 rodsLog (LOG_SYS_FATAL, "partialDataGet: NULL myInput");
00662 return;
00663 }
00664
00665 myInput->status = 0;
00666 srcL3descInx = myInput->srcFd;
00667 destFd = myInput->destFd;
00668 srcRescTypeInx = myInput->srcRescTypeInx;
00669
00670 if (myInput->offset != 0) {
00671 myOffset = _l3Lseek (myInput->rsComm, srcRescTypeInx,
00672 srcL3descInx, myInput->offset, SEEK_SET);
00673 if (myOffset < 0) {
00674 myInput->status = myOffset;
00675 rodsLog (LOG_NOTICE,
00676 "_partialDataGet: _objSeek error, status = %d ",
00677 myInput->status);
00678 if (myInput->threadNum > 0)
00679 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
00680 CLOSE_SOCK (destFd);
00681 return;
00682 }
00683 }
00684 buf = (char*)malloc (TRANS_BUF_SZ);
00685
00686 #ifdef PARA_TIMING
00687 afterSeek=time(0);
00688 #endif
00689
00690 bytesToGet = myInput->size;
00691
00692 while (bytesToGet > 0) {
00693 int toread0;
00694 int bytesRead;
00695
00696 #ifdef PARA_TIMING
00697 time_t tstart, tafterRead, tafterWrite;
00698 tstart=time(0);
00699 #endif
00700 if (myInput->flags & STREAMING_FLAG) {
00701 toread0 = bytesToGet;
00702 } else if (bytesToGet > TRANS_SZ) {
00703 toread0 = TRANS_SZ;
00704 } else {
00705 toread0 = bytesToGet;
00706 }
00707
00708 myInput->status = sendTranHeader (destFd, GET_OPR, myInput->flags,
00709 myOffset, toread0);
00710
00711 if (myInput->status < 0) {
00712 rodsLog (LOG_NOTICE,
00713 "partialDataGet: sendTranHeader error. status = %d",
00714 myInput->status);
00715 if (myInput->threadNum > 0)
00716 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
00717 CLOSE_SOCK (destFd);
00718 free (buf);
00719 return;
00720 }
00721
00722 while (toread0 > 0) {
00723 int toread1;
00724
00725 if (toread0 > TRANS_BUF_SZ) {
00726 toread1 = TRANS_BUF_SZ;
00727 } else {
00728 toread1 = toread0;
00729 }
00730 bytesRead = _l3Read (myInput->rsComm, srcRescTypeInx,
00731 srcL3descInx, buf, toread1);
00732
00733 #ifdef PARA_TIMING
00734 tafterRead=time(0);
00735 #endif
00736 if (bytesRead == toread1) {
00737 if ((bytesWritten = myWrite (destFd, buf, bytesRead,
00738 SOCK_TYPE, NULL))
00739 != bytesRead) {
00740 rodsLog (LOG_NOTICE,
00741 "_partialDataGet:Bytes written %d don't match read %d",
00742 bytesWritten, bytesRead);
00743
00744 if (bytesWritten < 0) {
00745 myInput->status = bytesWritten;
00746 } else {
00747 myInput->status = SYS_COPY_LEN_ERR;
00748 }
00749 break;
00750 }
00751 bytesToGet -= bytesWritten;
00752 toread0 -= bytesWritten;
00753 myOffset += bytesWritten;
00754 } else if (bytesRead < 0) {
00755 myInput->status = bytesRead;
00756 break;
00757 } else {
00758 rodsLog (LOG_NOTICE,
00759 "_partialDataGet: toread %d bytes, %d bytes read",
00760 toread1, bytesRead);
00761 myInput->status = SYS_COPY_LEN_ERR;
00762 break;
00763 }
00764 #ifdef PARA_TIMING
00765 tafterWrite=time(0);
00766 rodsLog (LOG_NOTICE,
00767 "Thr %d: sz=%d netReadTm=%d diskWriteTm=%d",
00768 myInput->threadNum, bytesWritten, tafterRead-tstart,
00769 tafterWrite-tafterRead);
00770 #endif
00771 }
00772 if (myInput->status < 0)
00773 break;
00774 }
00775 #ifdef PARA_TIMING
00776 afterTransfer=time(0);
00777 #endif
00778 free (buf);
00779 sendTranHeader (destFd, DONE_OPR, 0, 0, 0);
00780 if (myInput->threadNum > 0)
00781 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
00782 CLOSE_SOCK (destFd);
00783 #ifdef PARA_TIMING
00784 endTime=time(0);
00785 rodsLog (LOG_NOTICE,
00786 "Thr %d: seekTm=%d transTm=%d endTm=%d",
00787 myInput->threadInx,
00788 afterSeek-afterConn, afterTransfer-afterSeek, endTime-afterTransfer);
00789 #endif
00790 return;
00791 }
00792
00793 void
00794 remToLocPartialCopy (portalTransferInp_t *myInput)
00795 {
00796 transferHeader_t myHeader;
00797 int destL3descInx, srcFd, destRescTypeInx;
00798 void *buf;
00799 rodsLong_t curOffset = 0;
00800 rodsLong_t myOffset = 0;
00801 int toRead, bytesRead, bytesWritten;
00802
00803 if (myInput == NULL) {
00804 rodsLog (LOG_NOTICE,
00805 "remToLocPartialCopy: NULL input");
00806 return;
00807 }
00808 #ifdef PARA_DEBUG
00809 printf ("remToLocPartialCopy: thread %d at start\n", myInput->threadNum);
00810 #endif
00811
00812 myInput->status = 0;
00813 destL3descInx = myInput->destFd;
00814 srcFd = myInput->srcFd;
00815 destRescTypeInx = myInput->destRescTypeInx;
00816 myInput->bytesWritten = 0;
00817
00818 buf = malloc (TRANS_BUF_SZ);
00819
00820 while (myInput->status >= 0) {
00821 rodsLong_t toGet;
00822
00823 myInput->status = rcvTranHeader (srcFd, &myHeader);
00824
00825 #ifdef PARA_DEBUG
00826 printf ("remToLocPartialCopy: thread %d after rcvTranHeader\n",
00827 myInput->threadNum);
00828 printf ("remToLocPartialCopy: thread %d header offset %lld, len %lld\n",
00829 myInput->threadNum, myHeader.offset, myHeader.length);
00830
00831 #endif
00832
00833 if (myInput->status < 0) {
00834 break;
00835 }
00836
00837 if (myHeader.oprType == DONE_OPR) {
00838 break;
00839 }
00840 if (myHeader.offset != curOffset) {
00841 curOffset = myHeader.offset;
00842 myOffset = _l3Lseek (myInput->rsComm, destRescTypeInx,
00843 destL3descInx, myHeader.offset, SEEK_SET);
00844 if (myOffset < 0) {
00845 myInput->status = myOffset;
00846 rodsLog (LOG_NOTICE,
00847 "remToLocPartialCopy: _objSeek error, status = %d ",
00848 myInput->status);
00849 break;
00850 }
00851 }
00852
00853 toGet = myHeader.length;
00854 while (toGet > 0) {
00855
00856 if (toGet > TRANS_BUF_SZ) {
00857 toRead = TRANS_BUF_SZ;
00858 } else {
00859 toRead = toGet;
00860 }
00861
00862 bytesRead = myRead (srcFd, buf, toRead,
00863 SOCK_TYPE, NULL, NULL);
00864 if (bytesRead != toRead) {
00865 if (bytesRead < 0) {
00866 myInput->status = bytesRead;
00867 rodsLogError (LOG_ERROR, bytesRead,
00868 "remToLocPartialCopy: copy error for %lld", bytesRead);
00869 } else if ((myInput->flags & NO_CHK_COPY_LEN_FLAG) == 0) {
00870 myInput->status = SYS_COPY_LEN_ERR - errno;
00871 rodsLog (LOG_ERROR,
00872 "remToLocPartialCopy: toGet %lld, bytesRead %d",
00873 toGet, bytesRead);
00874 }
00875 break;
00876 }
00877
00878 bytesWritten = _l3Write (myInput->rsComm, destRescTypeInx,
00879 destL3descInx, buf, bytesRead);
00880
00881 if (bytesWritten != bytesRead) {
00882 rodsLog (LOG_NOTICE,
00883 "_partialDataPut:Bytes written %d don't match read %d",
00884 bytesWritten, bytesRead);
00885
00886 if (bytesWritten < 0) {
00887 myInput->status = bytesWritten;
00888 } else {
00889 myInput->status = SYS_COPY_LEN_ERR;
00890 }
00891 break;
00892 }
00893
00894 toGet -= bytesWritten;
00895 }
00896 curOffset += myHeader.length;
00897 myInput->bytesWritten += myHeader.length;
00898 }
00899
00900 free (buf);
00901 if (myInput->threadNum > 0)
00902 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
00903 CLOSE_SOCK (srcFd);
00904 }
00905
00906
00907
00908
00909 #ifdef RBUDP_TRANSFER
00910 int
00911 rbudpRemLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
00912 {
00913 portalOprOut_t *portalOprOut;
00914 dataOprInp_t *dataOprInp;
00915 rodsLong_t dataSize;
00916 int oprType;
00917 int veryVerbose, sendRate, packetSize;
00918 char *tmpStr;
00919 int status;
00920
00921 if (dataCopyInp == NULL) {
00922 rodsLog (LOG_NOTICE,
00923 "rbudpRemLocCopy: NULL dataCopyInp input");
00924 return (SYS_INTERNAL_NULL_INPUT_ERR);
00925 }
00926 portalOprOut = &dataCopyInp->portalOprOut;
00927 dataOprInp = &dataCopyInp->dataOprInp;
00928 oprType = dataOprInp->oprType;
00929 dataSize = dataOprInp->dataSize;
00930
00931 if (getValByKey (&dataOprInp->condInput, VERY_VERBOSE_KW) != NULL) {
00932 veryVerbose = 2;
00933 } else {
00934 veryVerbose = 0;
00935 }
00936
00937 if ((tmpStr = getValByKey (&dataOprInp->condInput,
00938 RBUDP_PACK_SIZE_KW)) != NULL) {
00939 packetSize = atoi (tmpStr);
00940 } else {
00941 packetSize = DEF_UDP_PACKET_SIZE;
00942 }
00943
00944 if (oprType == COPY_TO_LOCAL_OPR) {
00945 int destL3descInx = dataOprInp->destL3descInx;
00946
00947 status = getFileToPortalRbudp (portalOprOut, NULL,
00948 FileDesc[destL3descInx].fd, dataSize,
00949 veryVerbose, packetSize);
00950 } else {
00951 int srcL3descInx = dataOprInp->srcL3descInx;
00952
00953 if ((tmpStr = getValByKey (&dataOprInp->condInput,
00954 RBUDP_SEND_RATE_KW)) != NULL) {
00955 sendRate = atoi (tmpStr);
00956 } else {
00957 sendRate = DEF_UDP_SEND_RATE;
00958 }
00959 status = putFileToPortalRbudp (portalOprOut, NULL, NULL,
00960 FileDesc[srcL3descInx].fd, dataSize,
00961 veryVerbose, sendRate, packetSize);
00962 }
00963 return (status);
00964 }
00965 #endif
00966
00967
00968
00969
00970 int
00971 remLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
00972 {
00973 portalOprOut_t *portalOprOut;
00974 dataOprInp_t *dataOprInp;
00975 portList_t *myPortList;
00976 int i, sock, myFd;
00977 int numThreads;
00978 portalTransferInp_t myInput[MAX_NUM_CONFIG_TRAN_THR];
00979 #ifndef windows_platform
00980 #ifdef USE_BOOST
00981 boost::thread* tid[MAX_NUM_CONFIG_TRAN_THR];
00982 #else
00983 pthread_t tid[MAX_NUM_CONFIG_TRAN_THR];
00984 #endif
00985 #endif
00986 int retVal = 0;
00987 rodsLong_t dataSize;
00988 int oprType;
00989
00990 if (dataCopyInp == NULL) {
00991 rodsLog (LOG_NOTICE,
00992 "remLocCopy: NULL dataCopyInp input");
00993 return (SYS_INTERNAL_NULL_INPUT_ERR);
00994 }
00995
00996 portalOprOut = &dataCopyInp->portalOprOut;
00997 numThreads = portalOprOut->numThreads;
00998 if (numThreads == 0) {
00999 retVal = singleRemLocCopy (rsComm, dataCopyInp);
01000 return retVal;
01001 }
01002
01003 dataOprInp = &dataCopyInp->dataOprInp;
01004 oprType = dataOprInp->oprType;
01005 dataSize = dataOprInp->dataSize;
01006
01007 if (getUdpPortFromPortList (&portalOprOut->portList) != 0) {
01008
01009 #ifdef RBUDP_TRANSFER
01010 retVal = rbudpRemLocCopy (rsComm, dataCopyInp);
01011 return (retVal);
01012 #else
01013 return (SYS_UDP_NO_SUPPORT_ERR);
01014 #endif
01015 }
01016
01017 if (numThreads > MAX_NUM_CONFIG_TRAN_THR || numThreads <= 0) {
01018 rodsLog (LOG_NOTICE,
01019 "remLocCopy: numThreads %d out of range",
01020 numThreads);
01021 return (SYS_INVALID_PORTAL_OPR);
01022 }
01023
01024
01025 myPortList = &portalOprOut->portList;
01026
01027 #ifndef windows_platform
01028 memset (tid, 0, sizeof (tid));
01029 #endif
01030 memset (myInput, 0, sizeof (myInput));
01031
01032 sock = connectToRhostPortal (myPortList->hostAddr,
01033 myPortList->portNum, myPortList->cookie, rsComm->windowSize);
01034 if (sock < 0) {
01035 return (sock);
01036 }
01037
01038 if (oprType == COPY_TO_LOCAL_OPR) {
01039 fillPortalTransferInp (&myInput[0], rsComm,
01040 sock, dataOprInp->destL3descInx, 0, dataOprInp->destRescTypeInx,
01041 0, 0, 0, 0);
01042 } else {
01043 fillPortalTransferInp (&myInput[0], rsComm,
01044 dataOprInp->srcL3descInx, sock, dataOprInp->srcRescTypeInx, 0,
01045 0, 0, 0, 0);
01046 }
01047
01048 if (numThreads == 1) {
01049 if (getValByKey (&dataOprInp->condInput,
01050 NO_CHK_COPY_LEN_KW) != NULL) {
01051 myInput[0].flags = NO_CHK_COPY_LEN_FLAG;
01052 }
01053 if (oprType == COPY_TO_LOCAL_OPR) {
01054 remToLocPartialCopy (&myInput[0]);
01055 } else {
01056 locToRemPartialCopy (&myInput[0]);
01057 }
01058 if (myInput[0].status < 0) {
01059 return (myInput[0].status);
01060 } else {
01061 if (myInput[0].bytesWritten == dataSize) {
01062 return (0);
01063 } else {
01064 rodsLog (LOG_NOTICE,
01065 "remLocCopy:bytesWritten %lld dataSize %lld mismatch",
01066 myInput[0].bytesWritten, dataSize);
01067 return (SYS_COPY_LEN_ERR);
01068 }
01069 }
01070 } else {
01071 #ifdef PARA_OPR
01072 rodsLong_t totalWritten = 0;
01073
01074 for (i = 1; i < numThreads; i++) {
01075 sock = connectToRhostPortal (myPortList->hostAddr,
01076 myPortList->portNum, myPortList->cookie, rsComm->windowSize);
01077 if (sock < 0) {
01078 return (sock);
01079 }
01080 if (oprType == COPY_TO_LOCAL_OPR) {
01081 myFd = l3OpenByHost (rsComm, dataOprInp->destRescTypeInx,
01082 dataOprInp->destL3descInx, O_WRONLY);
01083 if (myFd < 0) {
01084 retVal = myFd;
01085 rodsLog (LOG_NOTICE,
01086 "remLocCopy: cannot open file, status = %d",
01087 myFd);
01088 CLOSE_SOCK (sock);
01089 continue;
01090 }
01091
01092 fillPortalTransferInp (&myInput[i], rsComm,
01093 sock, myFd, 0, dataOprInp->destRescTypeInx,
01094 i, 0, 0, 0);
01095
01096 #ifdef USE_BOOST
01097 tid[i] = new boost::thread( remToLocPartialCopy, &myInput[i] );
01098 #else
01099 pthread_create (&tid[i], pthread_attr_default,
01100 (void *(*)(void *)) remToLocPartialCopy, (void *) &myInput[i]);
01101 #endif
01102 } else {
01103 myFd = l3OpenByHost (rsComm, dataOprInp->srcRescTypeInx,
01104 dataOprInp->srcL3descInx, O_RDONLY);
01105 if (myFd < 0) {
01106 retVal = myFd;
01107 rodsLog (LOG_NOTICE,
01108 "remLocCopy: cannot open file, status = %d",
01109 myFd);
01110 CLOSE_SOCK (sock);
01111 continue;
01112 }
01113
01114 fillPortalTransferInp (&myInput[i], rsComm,
01115 myFd, sock, dataOprInp->destRescTypeInx, 0,
01116 i, 0, 0, 0);
01117
01118 #ifdef USE_BOOST
01119 tid[i] = new boost::thread( locToRemPartialCopy, &myInput[i] );
01120 #else
01121 pthread_create (&tid[i], pthread_attr_default,
01122 (void *(*)(void *)) locToRemPartialCopy, (void *) &myInput[i]);
01123 #endif
01124 }
01125 }
01126
01127 if (oprType == COPY_TO_LOCAL_OPR) {
01128 #ifdef USE_BOOST
01129 tid[0] = new boost::thread( remToLocPartialCopy,&myInput[0] );
01130 #else
01131 pthread_create (&tid[0], pthread_attr_default,
01132 (void *(*)(void *)) remToLocPartialCopy, (void *) &myInput[0]);
01133 #endif
01134 } else {
01135 #ifdef USE_BOOST
01136 tid[0] = new boost::thread( locToRemPartialCopy, &myInput[0] );
01137 #else
01138 pthread_create (&tid[0], pthread_attr_default,
01139 (void *(*)(void *)) locToRemPartialCopy, (void *) &myInput[0]);
01140 #endif
01141 }
01142
01143
01144 if (retVal < 0) {
01145 return (retVal);
01146 }
01147
01148 for ( i = 0; i < numThreads; i++) {
01149 if (tid[i] != 0) {
01150 #ifdef USE_BOOST
01151 tid[i]->join();
01152 #else
01153 pthread_join (tid[i], NULL);
01154 #endif
01155 }
01156 totalWritten += myInput[i].bytesWritten;
01157 if (myInput[i].status < 0) {
01158 retVal = myInput[i].status;
01159 }
01160 }
01161 if (retVal < 0) {
01162 return (retVal);
01163 } else {
01164 if (dataSize <= 0 || totalWritten == dataSize) {
01165 return (0);
01166 } else {
01167 rodsLog (LOG_NOTICE,
01168 "remLocCopy: totalWritten %lld dataSize %lld mismatch",
01169 totalWritten, dataSize);
01170 return (SYS_COPY_LEN_ERR);
01171 }
01172 }
01173 #else
01174 return (SYS_PARA_OPR_NO_SUPPORT);
01175 #endif
01176 }
01177 }
01178
01179 int
01180 sameHostCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
01181 {
01182 dataOprInp_t *dataOprInp;
01183 int i, out_fd, in_fd;
01184 int numThreads;
01185 portalTransferInp_t myInput[MAX_NUM_CONFIG_TRAN_THR];
01186 #ifndef windows_platform
01187 #ifdef USE_BOOST
01188 boost::thread* tid[MAX_NUM_CONFIG_TRAN_THR];
01189 #else
01190 pthread_t tid[MAX_NUM_CONFIG_TRAN_THR];
01191 #endif
01192 #endif
01193 int retVal = 0;
01194 rodsLong_t dataSize;
01195 rodsLong_t size0, size1, offset0;
01196
01197 if (dataCopyInp == NULL) {
01198 rodsLog (LOG_NOTICE,
01199 "sameHostCopy: NULL dataCopyInp input");
01200 return (SYS_INTERNAL_NULL_INPUT_ERR);
01201 }
01202
01203 dataOprInp = &dataCopyInp->dataOprInp;
01204
01205 numThreads = dataOprInp->numThreads;
01206
01207 dataSize = dataOprInp->dataSize;
01208
01209 if (numThreads == 0) {
01210 numThreads = 1;
01211 } else if (numThreads > MAX_NUM_CONFIG_TRAN_THR || numThreads < 0) {
01212 rodsLog (LOG_NOTICE,
01213 "sameHostCopy: numThreads %d out of range",
01214 numThreads);
01215 return (SYS_INVALID_PORTAL_OPR);
01216 }
01217
01218 #ifndef windows_platform
01219 memset (tid, 0, sizeof (tid));
01220 #endif
01221 memset (myInput, 0, sizeof (myInput));
01222
01223 size0 = dataOprInp->dataSize / numThreads;
01224 size1 = dataOprInp->dataSize - size0 * (numThreads - 1);
01225 offset0 = dataOprInp->offset;
01226
01227 fillPortalTransferInp (&myInput[0], rsComm,
01228 dataOprInp->srcL3descInx, dataOprInp->destL3descInx,
01229 dataOprInp->srcRescTypeInx, dataOprInp->destRescTypeInx,
01230 0, size0, offset0, 0);
01231
01232 if (numThreads == 1) {
01233 if (getValByKey (&dataOprInp->condInput,
01234 NO_CHK_COPY_LEN_KW) != NULL) {
01235 myInput[0].flags = NO_CHK_COPY_LEN_FLAG;
01236 }
01237 sameHostPartialCopy (&myInput[0]);
01238 return (myInput[0].status);
01239 } else {
01240 #ifdef PARA_OPR
01241 rodsLong_t totalWritten = 0;
01242 rodsLong_t mySize = 0;
01243 rodsLong_t myOffset = 0;
01244
01245 for (i = 1; i < numThreads; i++) {
01246 myOffset += size0;
01247 if (i < numThreads - 1) {
01248 mySize = size0;
01249 } else {
01250 mySize = size1;
01251 }
01252
01253 out_fd = l3OpenByHost (rsComm, dataOprInp->destRescTypeInx,
01254 dataOprInp->destL3descInx, O_WRONLY);
01255 if (out_fd < 0) {
01256 retVal = out_fd;
01257 rodsLog (LOG_NOTICE,
01258 "sameHostCopy: cannot open dest file, status = %d",
01259 out_fd);
01260 continue;
01261 }
01262
01263 in_fd = l3OpenByHost (rsComm, dataOprInp->srcRescTypeInx,
01264 dataOprInp->srcL3descInx, O_RDONLY);
01265 if (in_fd < 0) {
01266 retVal = out_fd;
01267 rodsLog (LOG_NOTICE,
01268 "sameHostCopy: cannot open src file, status = %d", in_fd);
01269 continue;
01270 }
01271 fillPortalTransferInp (&myInput[i], rsComm,
01272 in_fd, out_fd,
01273 dataOprInp->srcRescTypeInx, dataOprInp->destRescTypeInx,
01274 i, mySize, myOffset, 0);
01275
01276 #ifdef USE_BOOST
01277 tid[i] = new boost::thread( sameHostPartialCopy, &myInput[i] );
01278 #else
01279 pthread_create (&tid[i], pthread_attr_default,
01280 (void *(*)(void *)) sameHostPartialCopy, (void *) &myInput[i]);
01281 #endif
01282 }
01283
01284 #ifdef USE_BOOST
01285 tid[0] = new boost::thread( sameHostPartialCopy, &myInput[0] );
01286 #else
01287 pthread_create (&tid[0], pthread_attr_default,
01288 (void *(*)(void *)) sameHostPartialCopy, (void *) &myInput[0]);
01289 #endif
01290
01291 if (retVal < 0) {
01292 return (retVal);
01293 }
01294
01295 for ( i = 0; i < numThreads; i++) {
01296 if (tid[i] != 0) {
01297 #ifdef USE_BOOST
01298 tid[i]->join();
01299 #else
01300 pthread_join (tid[i], NULL);
01301 #endif
01302 }
01303 totalWritten += myInput[i].bytesWritten;
01304 if (myInput[i].status < 0) {
01305 retVal = myInput[i].status;
01306 }
01307 }
01308 if (retVal < 0) {
01309 return (retVal);
01310 } else {
01311 if (dataSize <= 0 || totalWritten == dataSize) {
01312 return (0);
01313 } else {
01314 rodsLog (LOG_NOTICE,
01315 "sameHostCopy: totalWritten %lld dataSize %lld mismatch",
01316 totalWritten, dataSize);
01317 return (SYS_COPY_LEN_ERR);
01318 }
01319 }
01320 #else
01321 return (SYS_PARA_OPR_NO_SUPPORT);
01322 #endif
01323 }
01324 }
01325
01326 void
01327 sameHostPartialCopy (portalTransferInp_t *myInput)
01328 {
01329 int destL3descInx, srcL3descInx, destRescTypeInx, srcRescTypeInx;
01330 void *buf;
01331 rodsLong_t myOffset = 0;
01332 rodsLong_t toCopy;
01333 int bytesRead, bytesWritten;
01334
01335 if (myInput == NULL) {
01336 rodsLog (LOG_NOTICE,
01337 "onsameHostPartialCopy: NULL input");
01338 return;
01339 }
01340 #ifdef PARA_DEBUG
01341 printf ("onsameHostPartialCopy: thread %d at start\n", myInput->threadNum);
01342 #endif
01343
01344 myInput->status = 0;
01345 destL3descInx = myInput->destFd;
01346 srcL3descInx = myInput->srcFd;
01347 destRescTypeInx = myInput->destRescTypeInx;
01348 srcRescTypeInx = myInput->srcRescTypeInx;
01349 myInput->bytesWritten = 0;
01350
01351 if (myInput->offset != 0) {
01352 myOffset = _l3Lseek (myInput->rsComm, destRescTypeInx,
01353 destL3descInx, myInput->offset, SEEK_SET);
01354 if (myOffset < 0) {
01355 myInput->status = myOffset;
01356 rodsLog (LOG_NOTICE,
01357 "sameHostPartialCopy: _objSeek error, status = %d ",
01358 myInput->status);
01359 if (myInput->threadNum > 0) {
01360 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
01361 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01362 }
01363 return;
01364 }
01365 myOffset = _l3Lseek (myInput->rsComm, srcRescTypeInx,
01366 srcL3descInx, myInput->offset, SEEK_SET);
01367 if (myOffset < 0) {
01368 myInput->status = myOffset;
01369 rodsLog (LOG_NOTICE,
01370 "sameHostPartialCopy: _objSeek error, status = %d ",
01371 myInput->status);
01372 if (myInput->threadNum > 0) {
01373 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
01374 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01375 }
01376 return;
01377 }
01378 }
01379
01380 buf = malloc (TRANS_BUF_SZ);
01381
01382 toCopy = myInput->size;
01383
01384 while (toCopy > 0) {
01385 int toRead;
01386
01387 if (toCopy > TRANS_BUF_SZ) {
01388 toRead = TRANS_BUF_SZ;
01389 } else {
01390 toRead = toCopy;
01391 }
01392
01393 bytesRead = _l3Read (myInput->rsComm, srcRescTypeInx,
01394 srcL3descInx, buf, toRead);
01395
01396 if (bytesRead <= 0) {
01397 if (bytesRead < 0) {
01398 myInput->status = bytesRead;
01399 rodsLogError (LOG_ERROR, bytesRead,
01400 "sameHostPartialCopy: copy error for %lld", bytesRead);
01401 } else if ((myInput->flags & NO_CHK_COPY_LEN_FLAG) == 0) {
01402 myInput->status = SYS_COPY_LEN_ERR - errno;
01403 rodsLog (LOG_ERROR,
01404 "sameHostPartialCopy: toCopy %lld, bytesRead %d",
01405 toCopy, bytesRead);
01406 }
01407 break;
01408 }
01409
01410 bytesWritten = _l3Write (myInput->rsComm, destRescTypeInx,
01411 destL3descInx, buf, bytesRead);
01412
01413 if (bytesWritten != bytesRead) {
01414 rodsLog (LOG_NOTICE,
01415 "sameHostPartialCopy:Bytes written %d don't match read %d",
01416 bytesWritten, bytesRead);
01417
01418 if (bytesWritten < 0) {
01419 myInput->status = bytesWritten;
01420 } else {
01421 myInput->status = SYS_COPY_LEN_ERR;
01422 }
01423 break;
01424 }
01425
01426 toCopy -= bytesWritten;
01427 myInput->bytesWritten += bytesWritten;
01428 }
01429
01430 free (buf);
01431 if (myInput->threadNum > 0) {
01432 _l3Close (myInput->rsComm, destRescTypeInx, destL3descInx);
01433 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01434 }
01435 }
01436
01437 void
01438 locToRemPartialCopy (portalTransferInp_t *myInput)
01439 {
01440 transferHeader_t myHeader;
01441 int srcL3descInx, destFd, srcRescTypeInx;
01442 void *buf;
01443 rodsLong_t curOffset = 0;
01444 rodsLong_t myOffset = 0;
01445 int toRead, bytesRead, bytesWritten;
01446
01447 if (myInput == NULL) {
01448 rodsLog (LOG_NOTICE,
01449 "locToRemPartialCopy: NULL input");
01450 return;
01451 }
01452 #ifdef PARA_DEBUG
01453 printf ("locToRemPartialCopy: thread %d at start\n", myInput->threadNum);
01454 #endif
01455
01456 myInput->status = 0;
01457 srcL3descInx = myInput->srcFd;
01458 destFd = myInput->destFd;
01459 srcRescTypeInx = myInput->srcRescTypeInx;
01460 myInput->bytesWritten = 0;
01461
01462 buf = malloc (TRANS_BUF_SZ);
01463
01464 while (myInput->status >= 0) {
01465 rodsLong_t toGet;
01466
01467 myInput->status = rcvTranHeader (destFd, &myHeader);
01468
01469 #ifdef PARA_DEBUG
01470 printf ("locToRemPartialCopy: thread %d after rcvTranHeader\n",
01471 myInput->threadNum);
01472 #endif
01473
01474 if (myInput->status < 0) {
01475 break;
01476 }
01477
01478 if (myHeader.oprType == DONE_OPR) {
01479 break;
01480 }
01481 #ifdef PARA_DEBUG
01482 printf ("locToRemPartialCopy:thread %d header offset %lld, len %lld",
01483 myInput->threadNum, myHeader.offset, myHeader.length);
01484 #endif
01485
01486 if (myHeader.offset != curOffset) {
01487 curOffset = myHeader.offset;
01488 myOffset = _l3Lseek (myInput->rsComm, srcRescTypeInx,
01489 srcL3descInx, myHeader.offset, SEEK_SET);
01490 if (myOffset < 0) {
01491 myInput->status = myOffset;
01492 rodsLog (LOG_NOTICE,
01493 "locToRemPartialCopy: _objSeek error, status = %d ",
01494 myInput->status);
01495 break;
01496 }
01497 }
01498
01499 toGet = myHeader.length;
01500 while (toGet > 0) {
01501
01502 if (toGet > TRANS_BUF_SZ) {
01503 toRead = TRANS_BUF_SZ;
01504 } else {
01505 toRead = toGet;
01506 }
01507
01508 bytesRead = _l3Read (myInput->rsComm, srcRescTypeInx,
01509 srcL3descInx, buf, toRead);
01510
01511 if (bytesRead != toRead) {
01512 if (bytesRead < 0) {
01513 myInput->status = bytesRead;
01514 rodsLogError (LOG_ERROR, bytesRead,
01515 "locToRemPartialCopy: copy error for %lld", bytesRead);
01516 } else if ((myInput->flags & NO_CHK_COPY_LEN_FLAG) == 0) {
01517 myInput->status = SYS_COPY_LEN_ERR - errno;
01518 rodsLog (LOG_ERROR,
01519 "locToRemPartialCopy: toGet %lld, bytesRead %d",
01520 toGet, bytesRead);
01521 }
01522 break;
01523 }
01524
01525 bytesWritten = myWrite (destFd, buf, bytesRead,
01526 SOCK_TYPE, NULL);
01527
01528
01529 if (bytesWritten != bytesRead) {
01530 rodsLog (LOG_NOTICE,
01531 "_partialDataPut:Bytes written %d don't match read %d",
01532 bytesWritten, bytesRead);
01533
01534 if (bytesWritten < 0) {
01535 myInput->status = bytesWritten;
01536 } else {
01537 myInput->status = SYS_COPY_LEN_ERR;
01538 }
01539 break;
01540 }
01541
01542 toGet -= bytesWritten;
01543 }
01544 curOffset += myHeader.length;
01545 myInput->bytesWritten += myHeader.length;
01546 }
01547
01548 free (buf);
01549 if (myInput->threadNum > 0)
01550 _l3Close (myInput->rsComm, srcRescTypeInx, srcL3descInx);
01551 CLOSE_SOCK (destFd);
01552 }
01553
01554
01555
01556
01557
01558
01559
01560 void
01561 getZoneServerId(char *zoneName, char *zoneSID) {
01562 zoneInfo_t *tmpZoneInfo;
01563 rodsServerHost_t *tmpRodsServerHost;
01564 int i;
01565 int zoneNameLen=0;
01566 char *localZoneName=NULL;
01567 char matchStr[MAX_NAME_LEN+2];
01568
01569 if (zoneName!=NULL) zoneNameLen=strlen(zoneName);
01570 if (zoneNameLen==0) {
01571 strncpy(zoneSID, localSID, MAX_PASSWORD_LEN);
01572 return;
01573 }
01574
01575
01576 tmpZoneInfo = ZoneInfoHead;
01577 while (tmpZoneInfo != NULL) {
01578 tmpRodsServerHost = (rodsServerHost_t *) tmpZoneInfo->masterServerHost;
01579 if (tmpRodsServerHost->rcatEnabled == LOCAL_ICAT) {
01580 localZoneName = tmpZoneInfo->zoneName;
01581 }
01582 tmpZoneInfo = tmpZoneInfo->next;
01583 }
01584
01585
01586 if (localZoneName!=NULL) {
01587 if (strncmp(localZoneName, zoneName, MAX_NAME_LEN)==0) {
01588 strncpy(zoneSID, localSID, MAX_PASSWORD_LEN);
01589 return;
01590 }
01591 }
01592
01593
01594 strncpy(matchStr, zoneName, MAX_NAME_LEN);
01595 strncat(matchStr, "-", MAX_NAME_LEN);
01596 for (i=0;i<MAX_FED_RSIDS;i++) {
01597 if (strncmp(matchStr, remoteSID[i], zoneNameLen+1)==0) {
01598 strncpy(zoneSID, (char*)&remoteSID[i][zoneNameLen+1],
01599 MAX_PASSWORD_LEN);
01600 return;
01601 }
01602 }
01603
01604 zoneSID[0]='\0';
01605 return;
01606 }
01607
01608 int
01609 isUserPrivileged(rsComm_t *rsComm)
01610 {
01611
01612 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
01613 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
01614 }
01615 if (rsComm->proxyUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
01616 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
01617 }
01618
01619 return(0);
01620 }
01621
01622 #if !defined(solaris_platform)
01623 char *regcmp (char *pat, char *end)
01624 {
01625 return(NULL);
01626 }
01627
01628 char *regex (char *rec, char *text, ...)
01629 {
01630 return(NULL);
01631 }
01632 #endif
01633
01634
01635
01636 int
01637 #ifdef __cplusplus
01638 intNoSupport( ... )
01639 #else
01640 intNoSupport()
01641 #endif
01642 {
01643 return SYS_NOT_SUPPORTED;
01644 }
01645
01646 rodsLong_t
01647 #ifdef __cplusplus
01648 longNoSupport( ... )
01649 #else
01650 longNoSupport()
01651 #endif
01652 {
01653 return (rodsLong_t) SYS_NOT_SUPPORTED;
01654 }
01655
01656 #ifdef RBUDP_TRANSFER
01657 int
01658 svrPortalPutGetRbudp (rsComm_t *rsComm)
01659 {
01660 portalOpr_t *myPortalOpr;
01661 portList_t *thisPortList;
01662 int lsock;
01663 int tcpSock, udpSockfd;
01664 int udpPortBuf;
01665 int status;
01666 #if defined(aix_platform)
01667 socklen_t laddrlen = sizeof(struct sockaddr);
01668 #elif defined(windows_platform)
01669 int laddrlen = sizeof(struct sockaddr);
01670 #else
01671 uint laddrlen = sizeof(struct sockaddr);
01672 #endif
01673 int packetSize;
01674 char *tmpStr;
01675 int verbose;
01676
01677 myPortalOpr = rsComm->portalOpr;
01678
01679 if (myPortalOpr == NULL) {
01680 rodsLog (LOG_NOTICE, "svrPortalPutGetRbudp: NULL myPortalOpr");
01681 return (SYS_INTERNAL_NULL_INPUT_ERR);
01682 }
01683
01684 thisPortList = &myPortalOpr->portList;
01685 if (thisPortList == NULL) {
01686 rodsLog (LOG_NOTICE, "svrPortalPutGetRbudp: NULL portList");
01687 return (SYS_INTERNAL_NULL_INPUT_ERR);
01688 }
01689
01690 lsock = getTcpSockFromPortList (thisPortList);
01691
01692 tcpSock = acceptSrvPortal (rsComm, thisPortList);
01693 if (tcpSock < 0) {
01694 rodsLog (LOG_NOTICE,
01695 "svrPortalPutGetRbudp: acceptSrvPortal error. errno = %d",
01696 errno);
01697 CLOSE_SOCK (lsock);
01698 return (tcpSock);
01699 } else {
01700 CLOSE_SOCK (lsock);
01701 }
01702 status = readn (tcpSock, (char *) &udpPortBuf, sizeof (udpPortBuf));
01703 if (status != sizeof (udpPortBuf)) {
01704 rodsLog (LOG_ERROR,
01705 "svrPortalPutGetRbudp: readn error. toread %d, bytes read %d ",
01706 sizeof (udpPortBuf), status);
01707 return (SYS_UDP_CONNECT_ERR);
01708 }
01709
01710 if ((tmpStr = getValByKey (&myPortalOpr->dataOprInp.condInput,
01711 RBUDP_PACK_SIZE_KW)) != NULL) {
01712 packetSize = atoi (tmpStr);
01713 } else {
01714 packetSize = DEF_UDP_PACKET_SIZE;
01715 }
01716
01717 if (getValByKey (&myPortalOpr->dataOprInp.condInput, VERY_VERBOSE_KW) !=
01718 NULL)
01719 verbose = 2;
01720 else
01721 verbose = 0;
01722
01723 udpSockfd = getUdpSockFromPortList (thisPortList);
01724
01725 checkbuf (udpSockfd, UDPSOCKBUF, verbose);
01726 if (myPortalOpr->oprType == PUT_OPR) {
01727 rbudpReceiver_t rbudpReceiver;
01728 bzero (&rbudpReceiver, sizeof (rbudpReceiver));
01729 int destL3descInx = myPortalOpr->dataOprInp.destL3descInx;
01730
01731 rbudpReceiver.rbudpBase.verbose = verbose;
01732 rbudpReceiver.rbudpBase.udpSockBufSize = UDPSOCKBUF;
01733 rbudpReceiver.rbudpBase.tcpPort = getTcpPortFromPortList (thisPortList);
01734 rbudpReceiver.rbudpBase.tcpSockfd = tcpSock;
01735 rbudpReceiver.rbudpBase.udpSockfd = udpSockfd;
01736 rbudpReceiver.rbudpBase.hasTcpSock = 0;
01737 rbudpReceiver.rbudpBase.udpRemotePort = ntohl (udpPortBuf);
01738
01739 if (getpeername (tcpSock,
01740 (struct sockaddr *) &rbudpReceiver.rbudpBase.udpServerAddr,
01741 &laddrlen) < 0) {
01742 rodsLog (LOG_NOTICE,
01743 "svrPortalPutGetRbudp() - getpeername() failed: errno=%d",
01744 errno);
01745 recvClose (&rbudpReceiver);
01746 return (USER_RODS_HOSTNAME_ERR);
01747 }
01748 rbudpReceiver.rbudpBase.udpServerAddr.sin_port =
01749 htons (rbudpReceiver.rbudpBase.udpRemotePort);
01750 status = getfileByFd (&rbudpReceiver, FileDesc[destL3descInx].fd,
01751 packetSize);
01752 if (status < 0) {
01753 rodsLog (LOG_ERROR,
01754 "svrPortalPutGetRbudp: getfileByFd error for %s",
01755 FileDesc[destL3descInx].fileName);
01756 status += SYS_UDP_TRANSFER_ERR;
01757 }
01758 recvClose (&rbudpReceiver);
01759 } else if (myPortalOpr->oprType == GET_OPR) {
01760 int sendRate;
01761 rbudpSender_t rbudpSender;
01762 int srcL3descInx = myPortalOpr->dataOprInp.srcL3descInx;
01763
01764 bzero (&rbudpSender, sizeof (rbudpSender));
01765 rbudpSender.rbudpBase.verbose = verbose;
01766 rbudpSender.rbudpBase.udpSockBufSize = UDPSOCKBUF;
01767 rbudpSender.rbudpBase.tcpPort = getTcpPortFromPortList (thisPortList);
01768 rbudpSender.rbudpBase.tcpSockfd = tcpSock;
01769 rbudpSender.rbudpBase.udpSockfd = udpSockfd;
01770 rbudpSender.rbudpBase.hasTcpSock = 0;
01771 rbudpSender.rbudpBase.udpRemotePort = ntohl (udpPortBuf);
01772
01773 if (getpeername (tcpSock,
01774 (struct sockaddr *) &rbudpSender.rbudpBase.udpServerAddr,
01775 &laddrlen) < 0) {
01776 rodsLog (LOG_NOTICE,
01777 "svrPortalPutGetRbudp() - getpeername() failed: errno=%d",
01778 errno);
01779 sendClose (&rbudpSender);
01780 return (USER_RODS_HOSTNAME_ERR);
01781 }
01782 rbudpSender.rbudpBase.udpServerAddr.sin_port =
01783 htons (rbudpSender.rbudpBase.udpRemotePort);
01784 if ((tmpStr = getValByKey (&myPortalOpr->dataOprInp.condInput,
01785 RBUDP_SEND_RATE_KW)) != NULL) {
01786 sendRate = atoi (tmpStr);
01787 } else {
01788 sendRate = DEF_UDP_SEND_RATE;
01789 }
01790
01791 status = sendfileByFd (&rbudpSender, sendRate, packetSize,
01792 FileDesc[srcL3descInx].fd);
01793
01794 if (status < 0) {
01795 rodsLog (LOG_ERROR,
01796 "svrPortalPutGetRbudp: sendfile error for %s",
01797 FileDesc[srcL3descInx].fileName);
01798 status += SYS_UDP_TRANSFER_ERR;
01799 }
01800 sendClose (&rbudpSender);
01801 }
01802
01803 return (status);
01804 }
01805 #endif
01806 #ifndef windows_platform
01807 void
01808 reconnManager (rsComm_t *rsComm)
01809 {
01810 fd_set basemask;
01811 int nSockets, nSelected;
01812 struct sockaddr_in remoteAddr;
01813 socklen_t len;
01814 int newSock, status;
01815 reconnMsg_t *reconnMsg;
01816 int acceptFailCnt = 0;
01817
01818 if (rsComm == NULL || rsComm->reconnSock <= 0) {
01819 return;
01820 }
01821
01822 listen (rsComm->reconnSock, 1);
01823
01824 nSockets = rsComm->reconnSock + 1;
01825 FD_ZERO(&basemask);
01826 FD_SET(rsComm->reconnSock, &basemask);
01827
01828 while (1) {
01829 while ((nSelected = select (nSockets, &basemask,
01830 (fd_set *) NULL, (fd_set *) NULL, NULL)) < 0) {
01831 if (errno == EINTR) {
01832 rodsLog (LOG_NOTICE, "reconnManager: select interrupted\n");
01833 continue;
01834 } else {
01835 rodsLog (LOG_ERROR, "reconnManager: select failed, errno = %d",
01836 errno);
01837 #ifdef USE_BOOST
01838 boost::unique_lock< boost::mutex > boost_lock( *rsComm->lock );
01839 #else
01840 pthread_mutex_lock (&rsComm->lock);
01841 #endif
01842 close (rsComm->reconnSock);
01843 rsComm->reconnSock = 0;
01844 #ifdef USE_BOOST
01845 boost_lock.unlock();
01846 #else
01847 pthread_mutex_unlock (&rsComm->lock);
01848 #endif
01849 return;
01850 }
01851 }
01852
01853 len = sizeof (remoteAddr);
01854 bzero (&remoteAddr, sizeof (remoteAddr));
01855 newSock = accept (rsComm->reconnSock, (struct sockaddr *) &remoteAddr,
01856 &len);
01857 if (newSock < 0) {
01858 acceptFailCnt++;
01859 rodsLog (LOG_ERROR,
01860 "reconnManager: accept for sock %d failed, errno = %d",
01861 rsComm->reconnSock, errno);
01862 if (acceptFailCnt > MAX_RECON_ERROR_CNT) {
01863 rodsLog (LOG_ERROR,
01864 "reconnManager: accept failed cnt > 10, reconnManager exit");
01865 close (rsComm->reconnSock);
01866 rsComm->reconnSock = -1;
01867 rsComm->reconnPort = 0;
01868 return;
01869 } else {
01870 continue;
01871 }
01872 }
01873 if ((status = readReconMsg (newSock, &reconnMsg)) < 0) {
01874 rodsLog (LOG_ERROR,
01875 "reconnManager: readReconMsg error, status = %d", status);
01876 close (newSock);
01877 continue;
01878 } else if (reconnMsg->cookie != rsComm->cookie) {
01879 rodsLog (LOG_ERROR,
01880 "reconnManager: cookie mistach, got = %d vs %d",
01881 reconnMsg->cookie, rsComm->cookie);
01882 close (newSock);
01883 free (reconnMsg);
01884 continue;
01885 }
01886
01887 #ifdef USE_BOOST
01888 boost::unique_lock<boost::mutex> boost_lock( *rsComm->lock );
01889 #else
01890 pthread_mutex_lock (&rsComm->lock);
01891 #endif
01892 rsComm->clientState = reconnMsg->procState;
01893 rsComm->reconnectedSock = newSock;
01894
01895 while (rsComm->agentState == SENDING_STATE) {
01896
01897 rsComm->reconnThrState = CONN_WAIT_STATE;
01898 #ifdef USE_BOOST
01899 rsComm->cond->wait( boost_lock );
01900 #else
01901 pthread_cond_wait (&rsComm->cond, &rsComm->lock);
01902 #endif
01903 }
01904
01905 rsComm->reconnThrState = PROCESSING_STATE;
01906 bzero (reconnMsg, sizeof (procState_t));
01907 reconnMsg->procState = rsComm->agentState;
01908 status = sendReconnMsg (newSock, reconnMsg);
01909 free (reconnMsg);
01910 if (status < 0) {
01911 rodsLog (LOG_ERROR,
01912 "reconnManager: sendReconnMsg error. status = %d",
01913 status);
01914 close (newSock);
01915 rsComm->reconnectedSock = 0;
01916 #ifdef USE_BOOST
01917 boost_lock.unlock();
01918 #else
01919 pthread_mutex_unlock (&rsComm->lock);
01920 #endif
01921 continue;
01922 }
01923 if (rsComm->agentState == PROCESSING_STATE) {
01924 rodsLog (LOG_NOTICE,
01925 "reconnManager: svrSwitchConnect. cliState = %d,agState=%d",
01926 rsComm->clientState, rsComm->agentState);
01927 svrSwitchConnect (rsComm);
01928 }
01929 #ifdef USE_BOOST
01930 boost_lock.unlock();
01931 #else
01932 pthread_mutex_unlock (&rsComm->lock);
01933 #endif
01934 }
01935 }
01936
01937 int
01938 svrChkReconnAtSendStart (rsComm_t *rsComm)
01939 {
01940 if (rsComm->reconnSock > 0) {
01941
01942 #ifdef USE_BOOST
01943 boost::unique_lock<boost::mutex> boost_lock( *rsComm->lock );
01944 #else
01945 pthread_mutex_lock (&rsComm->lock);
01946 #endif
01947 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
01948
01949 rodsLog (LOG_NOTICE,
01950 "svrChkReconnAtSendStart: ThrState = CONN_WAIT_STATE, agentState=%d",
01951 rsComm->agentState);
01952 rsComm->agentState = PROCESSING_STATE;
01953 #ifdef USE_BOOST
01954 rsComm->cond->notify_all();
01955 #else
01956 pthread_cond_signal (&rsComm->cond);
01957 #endif
01958 }
01959 svrSwitchConnect (rsComm);
01960 rsComm->agentState = SENDING_STATE;
01961 #ifdef USE_BOOST
01962 boost_lock.unlock();
01963 #else
01964 pthread_mutex_unlock (&rsComm->lock);
01965 #endif
01966 }
01967 return 0;
01968 }
01969
01970 int
01971 svrChkReconnAtSendEnd (rsComm_t *rsComm)
01972 {
01973 if (rsComm->reconnSock > 0) {
01974
01975 #ifdef USE_BOOST
01976 boost::unique_lock<boost::mutex> boost_lock( *rsComm->lock );
01977 #else
01978 pthread_mutex_lock (&rsComm->lock);
01979 #endif
01980 rsComm->agentState = PROCESSING_STATE;
01981 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
01982
01983 #ifdef USE_BOOST
01984 rsComm->cond->wait( boost_lock );
01985 #else
01986 pthread_cond_signal (&rsComm->cond);
01987 #endif
01988 }
01989 #ifdef USE_BOOST
01990 boost_lock.unlock();
01991 #else
01992 pthread_mutex_unlock (&rsComm->lock);
01993 #endif
01994 }
01995 return 0;
01996 }
01997
01998 int
01999 svrChkReconnAtReadStart (rsComm_t *rsComm)
02000 {
02001 if (rsComm->reconnSock > 0) {
02002
02003 #ifdef USE_BOOST
02004 boost::unique_lock< boost::mutex > boost_lock( *rsComm->lock );
02005 #else
02006 pthread_mutex_lock (&rsComm->lock);
02007 #endif
02008 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
02009
02010 rodsLog (LOG_NOTICE,
02011 "svrChkReconnAtReadStart: ThrState = CONN_WAIT_STATE, agentState=%d",
02012 rsComm->agentState);
02013 rsComm->agentState = PROCESSING_STATE;
02014 #ifdef USE_BOOST
02015 rsComm->cond->wait( boost_lock );
02016 #else
02017 pthread_cond_signal (&rsComm->cond);
02018 #endif
02019 }
02020 svrSwitchConnect (rsComm);
02021 rsComm->agentState = RECEIVING_STATE;
02022 #ifdef USE_BOOST
02023 boost_lock.unlock();
02024 #else
02025 pthread_mutex_unlock (&rsComm->lock);
02026 #endif
02027 }
02028 return 0;
02029 }
02030
02031 int
02032 svrChkReconnAtReadEnd (rsComm_t *rsComm)
02033 {
02034 if (rsComm->reconnSock > 0) {
02035
02036 #ifdef USE_BOOST
02037 boost::unique_lock< boost::mutex > boost_lock( *rsComm->lock );
02038 rsComm->agentState = PROCESSING_STATE;
02039 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
02040 rsComm->cond->notify_all();
02041 }
02042 boost_lock.unlock();
02043 #else
02044 pthread_mutex_lock (&rsComm->lock);
02045 rsComm->agentState = PROCESSING_STATE;
02046 if (rsComm->reconnThrState == CONN_WAIT_STATE) {
02047 pthread_cond_signal (&rsComm->cond);
02048 }
02049 pthread_mutex_unlock (&rsComm->lock);
02050 #endif
02051 }
02052 return 0;
02053 }
02054
02055 #endif
02056
02057 int
02058 svrSockOpenForInConn (rsComm_t *rsComm, int *portNum, char **addr, int proto)
02059 {
02060 int status;
02061
02062 status = sockOpenForInConn (rsComm, portNum, addr, proto);
02063 if (status < 0) return status;
02064
02065 if (addr != NULL && *addr != NULL &&
02066 (strcmp (*addr, "127.0.0.1") == 0 || strcmp (*addr, "0.0.0.0") == 0 ||
02067 strcmp (*addr, "localhost") == 0)) {
02068
02069 char *myaddr;
02070
02071 myaddr = getLocalSvrAddr ();
02072 if (myaddr != NULL) {
02073 free (*addr);
02074 *addr = strdup (myaddr);
02075 } else {
02076 rodsLog (LOG_NOTICE,
02077 "svrSockOpenForInConn: problem resolving local host addr %s",
02078 *addr);
02079 }
02080 }
02081 return status;
02082 }
02083
02084 char *
02085 getLocalSvrAddr ()
02086 {
02087 char *myHost;
02088 myHost = _getSvrAddr (LocalServerHost);
02089 return myHost;
02090 }
02091
02092 char *
02093 _getSvrAddr (rodsServerHost_t *rodsServerHost)
02094 {
02095 hostName_t *tmpHostName;
02096
02097 if (rodsServerHost == NULL) return NULL;
02098
02099 tmpHostName = rodsServerHost->hostName;
02100 while (tmpHostName != NULL) {
02101 if (strcmp (tmpHostName->name, "localhost") != 0 &&
02102 strcmp (tmpHostName->name, "127.0.0.1") != 0 &&
02103 strcmp (tmpHostName->name, "0.0.0.0") != 0 &&
02104 strchr (tmpHostName->name, '.') != NULL) {
02105 return (tmpHostName->name);
02106 }
02107 tmpHostName = tmpHostName->next;
02108 }
02109 return (NULL);
02110 }
02111
02112 char *
02113 getSvrAddr (rodsServerHost_t *rodsServerHost)
02114 {
02115 char *myHost;
02116
02117 myHost = _getSvrAddr (rodsServerHost);
02118 if (myHost == NULL) {
02119
02120 myHost = rodsServerHost->hostName->name;
02121 }
02122 return myHost;
02123 }
02124
02125 int
02126 setLocalSrvAddr (char *outLocalAddr)
02127 {
02128 char *myHost;
02129
02130 if (outLocalAddr == NULL) return USER__NULL_INPUT_ERR;
02131
02132 myHost = getSvrAddr (LocalServerHost);
02133
02134 if (myHost != NULL) {
02135 rstrcpy (outLocalAddr, myHost, NAME_LEN);
02136 return 0;
02137 } else {
02138 return SYS_INVALID_SERVER_HOST;
02139 }
02140 }
02141
02142 int
02143 forkAndExec (char *av[])
02144 {
02145 int childPid = 0;
02146 int status = -1;
02147 int childStatus = 0;
02148
02149
02150 #ifndef windows_platform
02151 childPid = RODS_FORK ();
02152
02153 if (childPid == 0) {
02154
02155 execv(av[0], av);
02156
02157 exit(1);
02158 } else if (childPid < 0) {
02159 rodsLog (LOG_ERROR,
02160 "exectar: RODS_FORK failed. errno = %d", errno);
02161 return (SYS_FORK_ERROR);
02162 }
02163
02164
02165
02166 status = waitpid (childPid, &childStatus, 0);
02167 if (status >= 0 && childStatus != 0) {
02168 rodsLog (LOG_ERROR,
02169 "forkAndExec: waitpid status = %d, childStatus = %d",
02170 status, childStatus);
02171 status = EXEC_CMD_ERROR;
02172 }
02173 #else
02174 rodsLog (LOG_ERROR,
02175 "forkAndExec: fork and exec not supported");
02176
02177 status = SYS_NOT_SUPPORTED;
02178 #endif
02179 return status;
02180 }
02181
02182 int
02183 singleRemLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
02184 {
02185 dataOprInp_t *dataOprInp;
02186 int status = 0;
02187 int oprType;
02188
02189 if (dataCopyInp == NULL) {
02190 rodsLog (LOG_NOTICE,
02191 "remLocCopy: NULL dataCopyInp input");
02192 return (SYS_INTERNAL_NULL_INPUT_ERR);
02193 }
02194 dataOprInp = &dataCopyInp->dataOprInp;
02195 oprType = dataOprInp->oprType;
02196
02197 if (oprType == COPY_TO_LOCAL_OPR) {
02198 status = singleRemToLocCopy (rsComm, dataCopyInp);
02199 } else {
02200 status = singleLocToRemCopy (rsComm, dataCopyInp);
02201 }
02202 return status;
02203 }
02204
02205 int
02206 singleRemToLocCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
02207 {
02208 dataOprInp_t *dataOprInp;
02209 rodsLong_t dataSize;
02210 int l1descInx;
02211 int destL3descInx;
02212 int destRescTypeInx;
02213 bytesBuf_t dataObjReadInpBBuf;
02214 openedDataObjInp_t dataObjReadInp;
02215 int bytesWritten, bytesRead;
02216 rodsLong_t totalWritten = 0;
02217
02218
02219 if (dataCopyInp == NULL) {
02220 rodsLog (LOG_NOTICE,
02221 "singleRemToLocCopy: NULL dataCopyInp input");
02222 return (SYS_INTERNAL_NULL_INPUT_ERR);
02223 }
02224 dataOprInp = &dataCopyInp->dataOprInp;
02225 l1descInx = dataCopyInp->portalOprOut.l1descInx;
02226 destL3descInx = dataOprInp->destL3descInx;
02227 destRescTypeInx = dataOprInp->destRescTypeInx;
02228 dataSize = dataOprInp->dataSize;
02229
02230 bzero (&dataObjReadInp, sizeof (dataObjReadInp));
02231 dataObjReadInpBBuf.buf = malloc (TRANS_BUF_SZ);
02232 dataObjReadInpBBuf.len = dataObjReadInp.len = TRANS_BUF_SZ;
02233 dataObjReadInp.l1descInx = l1descInx;
02234 while ((bytesRead = rsDataObjRead (rsComm, &dataObjReadInp,
02235 &dataObjReadInpBBuf)) > 0) {
02236 bytesWritten = _l3Write (rsComm, destRescTypeInx,
02237 destL3descInx, dataObjReadInpBBuf.buf, bytesRead);
02238
02239 if (bytesWritten != bytesRead) {
02240 rodsLog (LOG_ERROR,
02241 "singleRemToLocCopy: Read %d bytes, Wrote %d bytes.\n ",
02242 bytesRead, bytesWritten);
02243 free (dataObjReadInpBBuf.buf);
02244 return (SYS_COPY_LEN_ERR);
02245 } else {
02246 totalWritten += bytesWritten;
02247 }
02248 }
02249 free (dataObjReadInpBBuf.buf);
02250 if (dataSize <= 0 || totalWritten == dataSize ||
02251 getValByKey (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW) != NULL) {
02252 return (0);
02253 } else {
02254 rodsLog (LOG_ERROR,
02255 "singleRemToLocCopy: totalWritten %lld dataSize %lld mismatch",
02256 totalWritten, dataSize);
02257 return (SYS_COPY_LEN_ERR);
02258 }
02259 }
02260
02261 int
02262 singleLocToRemCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp)
02263 {
02264 dataOprInp_t *dataOprInp;
02265 rodsLong_t dataSize;
02266 int l1descInx;
02267 int srcL3descInx;
02268 int srcRescTypeInx;
02269 bytesBuf_t dataObjWriteInpBBuf;
02270 openedDataObjInp_t dataObjWriteInp;
02271 int bytesWritten, bytesRead;
02272 rodsLong_t totalWritten = 0;
02273
02274
02275 if (dataCopyInp == NULL) {
02276 rodsLog (LOG_NOTICE,
02277 "singleRemToLocCopy: NULL dataCopyInp input");
02278 return (SYS_INTERNAL_NULL_INPUT_ERR);
02279 }
02280 dataOprInp = &dataCopyInp->dataOprInp;
02281 l1descInx = dataCopyInp->portalOprOut.l1descInx;
02282 srcL3descInx = dataOprInp->srcL3descInx;
02283 srcRescTypeInx = dataOprInp->srcRescTypeInx;
02284 dataSize = dataOprInp->dataSize;
02285
02286 bzero (&dataObjWriteInp, sizeof (dataObjWriteInp));
02287 dataObjWriteInpBBuf.buf = malloc (TRANS_BUF_SZ);
02288 dataObjWriteInpBBuf.len = 0;
02289 dataObjWriteInp.l1descInx = l1descInx;
02290
02291 while ((bytesRead = _l3Read (rsComm, srcRescTypeInx,
02292 srcL3descInx, dataObjWriteInpBBuf.buf, TRANS_BUF_SZ)) > 0) {
02293 dataObjWriteInp.len = dataObjWriteInpBBuf.len = bytesRead;
02294 bytesWritten = rsDataObjWrite (rsComm, &dataObjWriteInp,
02295 &dataObjWriteInpBBuf);
02296 if (bytesWritten != bytesRead) {
02297 rodsLog (LOG_ERROR,
02298 "singleLocToRemCopy: Read %d bytes, Wrote %d bytes.\n ",
02299 bytesRead, bytesWritten);
02300 free (dataObjWriteInpBBuf.buf);
02301 return (SYS_COPY_LEN_ERR);
02302 } else {
02303 totalWritten += bytesWritten;
02304 }
02305 }
02306 free (dataObjWriteInpBBuf.buf);
02307 if (dataSize <= 0 || totalWritten == dataSize ||
02308 getValByKey (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW) != NULL) {
02309 return (0);
02310 } else {
02311 rodsLog (LOG_ERROR,
02312 "singleLocToRemCopy: totalWritten %lld dataSize %lld mismatch",
02313 totalWritten, dataSize);
02314 return (SYS_COPY_LEN_ERR);
02315 }
02316 }
02317
02318
02319
02320
02321
02322 int
02323 readStartupPack (int sock, startupPack_t **startupPack, struct timeval *tv)
02324 {
02325 int status;
02326 msgHeader_t myHeader;
02327 bytesBuf_t inputStructBBuf, bsBBuf, errorBBuf;
02328
02329 status = readMsgHeader (sock, &myHeader, tv);
02330
02331 if (status < 0) {
02332 rodsLogError (LOG_NOTICE, status,
02333 "readStartupPack: readMsgHeader error. status = %d", status);
02334 return (status);
02335 }
02336
02337 if (myHeader.msgLen > (int) sizeof (startupPack_t) * 2 ||
02338 myHeader.msgLen <= 0) {
02339 rodsLog (LOG_NOTICE,
02340 "readStartupPack: problem with myHeader.msgLen = %d",
02341 myHeader.msgLen);
02342 return (SYS_HEADER_READ_LEN_ERR);
02343 }
02344
02345 memset (&bsBBuf, 0, sizeof (bytesBuf_t));
02346 status = readMsgBody (sock, &myHeader, &inputStructBBuf, &bsBBuf,
02347 &errorBBuf, XML_PROT, tv);
02348 if (status < 0) {
02349 rodsLogError (LOG_NOTICE, status,
02350 "readStartupPack: readMsgBody error. status = %d", status);
02351 return (status);
02352 }
02353
02354
02355
02356 if (strcmp (myHeader.type, RODS_CONNECT_T) != 0) {
02357 if (inputStructBBuf.buf != NULL)
02358 free (inputStructBBuf.buf);
02359 if (bsBBuf.buf != NULL)
02360 free (inputStructBBuf.buf);
02361 if (errorBBuf.buf != NULL)
02362 free (inputStructBBuf.buf);
02363 rodsLog (LOG_NOTICE,
02364 "readStartupPack: wrong mag type - %s, expect %s",
02365 myHeader.type, RODS_CONNECT_T);
02366 return (SYS_HEADER_TPYE_LEN_ERR);
02367 }
02368
02369 if (myHeader.bsLen != 0) {
02370 if (bsBBuf.buf != NULL)
02371 free (inputStructBBuf.buf);
02372 rodsLog (LOG_NOTICE, "readStartupPack: myHeader.bsLen = %d is not 0",
02373 myHeader.bsLen);
02374 }
02375
02376 if (myHeader.errorLen != 0) {
02377 if (errorBBuf.buf != NULL)
02378 free (inputStructBBuf.buf);
02379 rodsLog (LOG_NOTICE,
02380 "readStartupPack: myHeader.errorLen = %d is not 0",
02381 myHeader.errorLen);
02382 }
02383
02384
02385 status = unpackStruct (inputStructBBuf.buf, (void **) startupPack,
02386 "StartupPack_PI", RodsPackTable, XML_PROT);
02387
02388 clearBBuf (&inputStructBBuf);
02389
02390 if (status >= 0) {
02391 if ((*startupPack)->clientUser[0] != '\0' &&
02392 (*startupPack)->clientRodsZone[0] == '\0') {
02393 char *zoneName;
02394
02395 if ((zoneName = getLocalZoneName ()) != NULL) {
02396 rstrcpy ((*startupPack)->clientRodsZone, zoneName, NAME_LEN);
02397 }
02398 }
02399 if ((*startupPack)->proxyUser[0] != '\0' &&
02400 (*startupPack)->proxyRodsZone[0] == '\0') {
02401 char *zoneName;
02402
02403 if ((zoneName = getLocalZoneName ()) != NULL) {
02404 rstrcpy ((*startupPack)->proxyRodsZone, zoneName, NAME_LEN);
02405 }
02406 }
02407 } else {
02408 rodsLogError (LOG_NOTICE, status,
02409 "readStartupPack:unpackStruct error. status = %d",
02410 status);
02411 }
02412
02413 return (status);
02414 }
02415
02416
02417 #ifdef RUN_SERVER_AS_ROOT
02418
02419
02420
02421
02422
02423
02424 int
02425 initServiceUser()
02426 {
02427 #ifndef windows_platform
02428 char *serviceUser;
02429 struct passwd *pwent;
02430
02431 serviceUser = getenv("irodsServiceUser");
02432 if (serviceUser == NULL || getuid() != 0) {
02433
02434
02435 return (0);
02436 }
02437
02438
02439
02440 errno = 0;
02441 pwent = getpwnam(serviceUser);
02442 if (pwent) {
02443 ServiceUid = pwent->pw_uid;
02444 return (changeToServiceUser());
02445 }
02446
02447 if (errno) {
02448 rodsLogError(LOG_ERROR, SYS_USER_RETRIEVE_ERR,
02449 "setServiceUser: error in getpwnam %s, errno = %d",
02450 serviceUser, errno);
02451 return (SYS_USER_RETRIEVE_ERR - errno);
02452 }
02453 else {
02454 rodsLogError(LOG_ERROR, SYS_USER_RETRIEVE_ERR,
02455 "setServiceUser: user %s doesn't exist", serviceUser);
02456 return (SYS_USER_RETRIEVE_ERR);
02457 }
02458 #else
02459 return (0);
02460 #endif
02461 }
02462
02463
02464
02465 int
02466 isServiceUserSet()
02467 {
02468 if (ServiceUid) {
02469 return 1;
02470 }
02471 else {
02472 return 0;
02473 }
02474 }
02475
02476
02477
02478
02479 int
02480 changeToRootUser()
02481 {
02482 int prev_errno, my_errno;
02483
02484 if (!isServiceUserSet()) {
02485
02486 return (0);
02487 }
02488
02489 #ifndef windows_platform
02490
02491
02492
02493 prev_errno = errno;
02494 if (seteuid(0) == -1) {
02495 my_errno = errno;
02496 errno = prev_errno;
02497 rodsLogError(LOG_ERROR, SYS_USER_NO_PERMISSION - my_errno,
02498 "changeToRootUser: can't change to root user id");
02499 return (SYS_USER_NO_PERMISSION - my_errno);
02500 }
02501 #endif
02502
02503 return (0);
02504 }
02505
02506
02507
02508
02509
02510 int
02511 changeToServiceUser()
02512 {
02513 int prev_errno, my_errno;
02514
02515 if (!isServiceUserSet()) {
02516
02517 return (0);
02518 }
02519
02520 #ifndef windows_platform
02521 prev_errno = errno;
02522 if (seteuid(ServiceUid) == -1) {
02523 my_errno = errno;
02524 errno = prev_errno;
02525 rodsLogError(LOG_ERROR, SYS_USER_NO_PERMISSION - my_errno,
02526 "changeToServiceUser: can't change to service user id");
02527 return (SYS_USER_NO_PERMISSION - my_errno);
02528 }
02529 #endif
02530
02531 return (0);
02532 }
02533
02534 #endif
02535
02536