00001
00002
00003
00004
00005
00006 #include "dataObjRepl.h"
00007 #include "dataObjOpr.h"
00008 #include "dataObjCreate.h"
00009 #include "dataObjOpen.h"
00010 #include "dataObjPut.h"
00011 #include "dataObjGet.h"
00012 #include "rodsLog.h"
00013 #include "objMetaOpr.h"
00014 #include "physPath.h"
00015 #include "specColl.h"
00016 #include "resource.h"
00017 #include "reGlobalsExtern.h"
00018 #include "reDefines.h"
00019 #include "reSysDataObjOpr.h"
00020 #include "getRemoteZoneResc.h"
00021 #include "l3FileGetSingleBuf.h"
00022 #include "l3FilePutSingleBuf.h"
00023 #include "fileSyncToArch.h"
00024 #include "fileStageToCache.h"
00025 #include "unbunAndRegPhyBunfile.h"
00026 #include "dataObjTrim.h"
00027 #include "dataObjLock.h"
00028
00029 int
00030 rsDataObjRepl250 (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00031 transStat_t **transStat)
00032 {
00033 int status;
00034 transferStat_t *transferStat = NULL;
00035
00036 status = rsDataObjRepl (rsComm, dataObjInp, &transferStat);
00037
00038 if (transStat != NULL && status >= 0 && transferStat != NULL) {
00039 *transStat = (transStat_t *) malloc (sizeof (transStat_t));
00040 (*transStat)->numThreads = transferStat->numThreads;
00041 (*transStat)->bytesWritten = transferStat->bytesWritten;
00042 free (transferStat);
00043 }
00044 return status;
00045 }
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 int
00056 rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00057 transferStat_t **transStat)
00058 {
00059 int status;
00060 int remoteFlag;
00061 rodsServerHost_t *rodsServerHost;
00062 dataObjInfo_t *dataObjInfo = NULL;
00063 char* lockType = NULL;
00064 int lockFd = -1;
00065
00066 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00067
00068 if (rsComm->proxyUser.authInfo.authFlag < REMOTE_PRIV_USER_AUTH) {
00069 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00070 }
00071 }
00072
00073 status = resolvePathInSpecColl (rsComm, dataObjInp->objPath,
00074 READ_COLL_PERM, 0, &dataObjInfo);
00075
00076 if (status == DATA_OBJ_T) {
00077 if (dataObjInfo != NULL && dataObjInfo->specColl != NULL) {
00078 if (dataObjInfo->specColl->collClass == LINKED_COLL) {
00079 rstrcpy (dataObjInp->objPath, dataObjInfo->objPath,
00080 MAX_NAME_LEN);
00081 freeAllDataObjInfo (dataObjInfo);
00082 } else {
00083 freeAllDataObjInfo (dataObjInfo);
00084 return (SYS_REG_OBJ_IN_SPEC_COLL);
00085 }
00086 }
00087 }
00088
00089 remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost,
00090 REMOTE_OPEN);
00091
00092 if (remoteFlag < 0) {
00093 return (remoteFlag);
00094 } else if (remoteFlag == REMOTE_HOST) {
00095 status = _rcDataObjRepl (rodsServerHost->conn, dataObjInp,
00096 transStat);
00097 return status;
00098 }
00099
00100 *transStat = (transferStat_t*)malloc (sizeof (transferStat_t));
00101 memset (*transStat, 0, sizeof (transferStat_t));
00102
00103
00104 lockType = getValByKey (&dataObjInp->condInput, LOCK_TYPE_KW);
00105 if (lockType != NULL) {
00106 lockFd = rsDataObjLock (rsComm, dataObjInp);
00107 if (lockFd >= 0) {
00108
00109 rmKeyVal (&dataObjInp->condInput, LOCK_TYPE_KW);
00110 } else {
00111 rodsLogError (LOG_ERROR, lockFd,
00112 "rsDataObjRepl: rsDataObjLock error for %s. lockType = %s",
00113 dataObjInp->objPath, lockType);
00114 return lockFd;
00115 }
00116 }
00117
00118 status = _rsDataObjRepl (rsComm, dataObjInp, *transStat, NULL);
00119
00120 if (lockFd > 0) rsDataObjUnlock (rsComm, dataObjInp, lockFd);
00121
00122 return (status);
00123 }
00124
00125 int
00126 _rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00127 transferStat_t *transStat, dataObjInfo_t *outDataObjInfo)
00128 {
00129 int status;
00130 dataObjInfo_t *dataObjInfoHead = NULL;
00131 dataObjInfo_t *oldDataObjInfoHead = NULL;
00132 dataObjInfo_t *destDataObjInfo = NULL;
00133 rescGrpInfo_t *myRescGrpInfo = NULL;
00134 ruleExecInfo_t rei;
00135 int multiCopyFlag;
00136 char *accessPerm;
00137 int backupFlag;
00138 int allFlag;
00139 int savedStatus = 0;
00140
00141 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00142 accessPerm = NULL;
00143 } else if (getValByKey (&dataObjInp->condInput, IRODS_ADMIN_KW) != NULL) {
00144 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
00145 return (CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00146 }
00147 accessPerm = NULL;
00148 } else {
00149 accessPerm = ACCESS_READ_OBJECT;
00150 }
00151
00152 initReiWithDataObjInp (&rei, rsComm, dataObjInp);
00153 status = applyRule ("acSetMultiReplPerResc", NULL, &rei, NO_SAVE_REI);
00154 if (strcmp (rei.statusStr, MULTI_COPIES_PER_RESC) == 0) {
00155 multiCopyFlag = 1;
00156 } else {
00157 multiCopyFlag = 0;
00158 }
00159
00160
00161
00162 if (multiCopyFlag) {
00163 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00164 accessPerm, 0);
00165 } else {
00166
00167
00168 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00169 accessPerm, 1);
00170 }
00171
00172 if (status < 0) {
00173 rodsLog (LOG_NOTICE,
00174 "rsDataObjRepl: getDataObjInfo for %s", dataObjInp->objPath);
00175 return (status);
00176 }
00177
00178 if (getValByKey (&dataObjInp->condInput, UPDATE_REPL_KW) != NULL) {
00179 status = sortObjInfoForRepl (&dataObjInfoHead, &oldDataObjInfoHead, 0);
00180 if (status < 0) return status;
00181
00182
00183 status = _rsDataObjReplUpdate (rsComm, dataObjInp, dataObjInfoHead,oldDataObjInfoHead, transStat, NULL);
00184
00185 if (status >= 0 && outDataObjInfo != NULL) {
00186 *outDataObjInfo = *oldDataObjInfoHead;
00187 outDataObjInfo->next = NULL;
00188 }
00189 freeAllDataObjInfo (dataObjInfoHead);
00190 freeAllDataObjInfo (oldDataObjInfoHead);
00191
00192 return status;
00193 }
00194
00195 status = sortObjInfoForRepl (&dataObjInfoHead, &oldDataObjInfoHead,multiCopyFlag);
00196
00197 if (status < 0)
00198 return status;
00199
00200 if (getValByKey (&dataObjInp->condInput, BACKUP_RESC_NAME_KW) != NULL) {
00201
00202 backupFlag = 1;
00203 multiCopyFlag = 0;
00204 } else {
00205 backupFlag = 0;
00206 }
00207 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00208 allFlag = 1;
00209 } else {
00210 allFlag = 0;
00211 }
00212
00213 if( backupFlag == 0 && allFlag == 1 &&
00214 getValByKey (&dataObjInp->condInput, DEST_RESC_NAME_KW) == NULL &&
00215 dataObjInfoHead != NULL && dataObjInfoHead->rescGroupName[0] != '\0') {
00216
00217 addKeyVal (&dataObjInp->condInput, DEST_RESC_NAME_KW,
00218 dataObjInfoHead->rescGroupName);
00219 }
00220
00221
00222 dataObjInp->oprType = REPLICATE_OPR;
00223 status = getRescGrpForCreate (rsComm, dataObjInp, &myRescGrpInfo);
00224 if (status < 0) return status;
00225
00226
00227 if (multiCopyFlag == 0 ) {
00228
00229
00230
00231
00232
00233
00234 status = resolveSingleReplCopy (&dataObjInfoHead, &oldDataObjInfoHead,
00235 &myRescGrpInfo, &destDataObjInfo, &dataObjInp->condInput);
00236 if (status == HAVE_GOOD_COPY) {
00237
00238
00239 dataObjInfo_t *cacheDataObjInfo = NULL;
00240 dataObjInfo_t *compDataObjInfo = NULL;
00241 if( getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL &&
00242
00243
00244
00245 #if 0
00246 getDataObjByClass (dataObjInfoHead, CACHE_CL, &cacheDataObjInfo)
00247 >= 0 && cacheDataObjInfo != destDataObjInfo) {
00248 #else
00249 getDataObjByClass( dataObjInfoHead, COMPOUND_CL, &compDataObjInfo ) >= 0 &&
00250 strlen (compDataObjInfo->rescGroupName) > 0 &&
00251 getCacheDataInfoForRepl (rsComm, dataObjInfoHead, NULL,compDataObjInfo, &cacheDataObjInfo) >= 0 ) {
00252 #endif
00253
00254
00255 int status1 = trimDataObjInfo (rsComm, cacheDataObjInfo);
00256 if (status1 < 0) {
00257 rodsLog (LOG_NOTICE,
00258 "_rsDataObjRepl: trimDataObjInfo for %s",
00259 dataObjInp->objPath);
00260 }
00261 }
00262
00263 if (outDataObjInfo != NULL && destDataObjInfo != NULL) {
00264
00265 *outDataObjInfo = *destDataObjInfo;
00266 outDataObjInfo->next = NULL;
00267 }
00268
00269
00270 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00271 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00272 ( myRescGrpInfo->status < 0 ) ) {
00273 status = myRescGrpInfo->status;
00274
00275 } else {
00276 status = 0;
00277 }
00278 freeAllDataObjInfo (dataObjInfoHead);
00279 freeAllDataObjInfo (oldDataObjInfoHead);
00280 freeAllDataObjInfo (destDataObjInfo);
00281 freeAllRescGrpInfo (myRescGrpInfo);
00282 return status;
00283 } else if (status < 0) {
00284 freeAllDataObjInfo (dataObjInfoHead);
00285 freeAllDataObjInfo (oldDataObjInfoHead);
00286 freeAllDataObjInfo (destDataObjInfo);
00287 freeAllRescGrpInfo (myRescGrpInfo);
00288 return status;
00289 }
00290
00291 }
00292
00293 status = applyPreprocRuleForOpen (rsComm, dataObjInp, &dataObjInfoHead);
00294 if (status < 0) return status;
00295
00296
00297
00298
00299 if (destDataObjInfo != NULL) {
00300 status = _rsDataObjReplUpdate( rsComm, dataObjInp, dataObjInfoHead,
00301 destDataObjInfo, transStat, oldDataObjInfoHead);
00302 if (status >= 0) {
00303 if (outDataObjInfo != NULL) {
00304 *outDataObjInfo = *destDataObjInfo;
00305 outDataObjInfo->next = NULL;
00306 }
00307 if (allFlag == 0) {
00308 freeAllDataObjInfo (dataObjInfoHead);
00309 freeAllDataObjInfo (oldDataObjInfoHead);
00310 freeAllDataObjInfo (destDataObjInfo);
00311 freeAllRescGrpInfo (myRescGrpInfo);
00312 return 0;
00313 } else {
00314
00315
00316 queDataObjInfo (&dataObjInfoHead, destDataObjInfo, 0, 1);
00317 destDataObjInfo = NULL;
00318 }
00319 } else {
00320 savedStatus = status;
00321 }
00322 }
00323
00324 if (myRescGrpInfo != NULL) {
00325
00326 status = _rsDataObjReplNewCopy( rsComm, dataObjInp, dataObjInfoHead,
00327 myRescGrpInfo, transStat, oldDataObjInfoHead,
00328 outDataObjInfo);
00329 if (status < 0) savedStatus = status;
00330 }
00331
00332 freeAllDataObjInfo (dataObjInfoHead);
00333 freeAllDataObjInfo (oldDataObjInfoHead);
00334 freeAllRescGrpInfo (myRescGrpInfo);
00335
00336 return (savedStatus);
00337 }
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351 int
00352 _rsDataObjReplUpdate (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00353 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
00354 transferStat_t *transStat, dataObjInfo_t *oldDataObjInfo)
00355 {
00356 dataObjInfo_t *destDataObjInfo;
00357 dataObjInfo_t *srcDataObjInfo;
00358 int status;
00359 int allFlag;
00360 int savedStatus = 0;
00361 int replCnt = 0;
00362
00363 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00364 allFlag = 1;
00365 } else {
00366 allFlag = 0;
00367 }
00368
00369 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00370 destDataObjInfo = destDataObjInfoHead;
00371 while (destDataObjInfo != NULL) {
00372 if (destDataObjInfo->dataId == 0) {
00373 destDataObjInfo = destDataObjInfo->next;
00374 continue;
00375 }
00376
00377
00378 if (getRescClass (destDataObjInfo->rescInfo) == COMPOUND_CL) {
00379
00380 if ((status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
00381 srcDataObjInfoHead, destDataObjInfoHead, destDataObjInfo,
00382 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00383 return status;
00384 }
00385 status = _rsDataObjReplS (rsComm, dataObjInp,
00386 srcDataObjInfo, NULL, "", destDataObjInfo, 1);
00387 } else {
00388 srcDataObjInfo = srcDataObjInfoHead;
00389 while (srcDataObjInfo != NULL) {
00390
00391 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00392 NULL, "", destDataObjInfo, 1);
00393 if (status >= 0) {
00394 break;
00395 }
00396 srcDataObjInfo = srcDataObjInfo->next;
00397 }
00398 }
00399 if (status >= 0) {
00400 transStat->numThreads = dataObjInp->numThreads;
00401 if (allFlag == 0) {
00402 return 0;
00403 }
00404 } else {
00405 savedStatus = status;
00406 replCnt ++;
00407 }
00408 destDataObjInfo = destDataObjInfo->next;
00409 }
00410
00411 return savedStatus;
00412 }
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426 int
00427 _rsDataObjReplNewCopy (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00428 dataObjInfo_t *srcDataObjInfoHead, rescGrpInfo_t *destRescGrpInfo,
00429 transferStat_t *transStat, dataObjInfo_t *oldDataObjInfo,
00430 dataObjInfo_t *outDataObjInfo)
00431 {
00432 dataObjInfo_t *srcDataObjInfo;
00433 rescGrpInfo_t *tmpRescGrpInfo;
00434 rescInfo_t *tmpRescInfo;
00435 int status;
00436 int allFlag;
00437 int savedStatus = 0;
00438 rescInfo_t *compRescInfo = NULL;
00439 rescInfo_t *cacheRescInfo = NULL;
00440
00441 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00442 allFlag = 1;
00443 } else {
00444 allFlag = 0;
00445 }
00446
00447
00448
00449
00450 if( allFlag == 1 && destRescGrpInfo != NULL &&
00451 strlen(destRescGrpInfo->rescGroupName) > 0 &&
00452 getRescGrpClass (destRescGrpInfo, &compRescInfo) == COMPOUND_CL) {
00453 getCacheRescInGrp (rsComm, destRescGrpInfo->rescGroupName,compRescInfo, &cacheRescInfo);
00454 }
00455
00456 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00457 tmpRescGrpInfo = destRescGrpInfo;
00458 while (tmpRescGrpInfo != NULL) {
00459 tmpRescInfo = tmpRescGrpInfo->rescInfo;
00460
00461
00462 if (tmpRescInfo == cacheRescInfo) {
00463
00464
00465 tmpRescGrpInfo = tmpRescGrpInfo->next;
00466 continue;
00467 }
00468
00469 if (getRescClass (tmpRescInfo) == COMPOUND_CL) {
00470
00471 if ((status = getCacheDataInfoOfCompResc (rsComm, dataObjInp,
00472 srcDataObjInfoHead, NULL, tmpRescGrpInfo,
00473 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00474 return status;
00475 }
00476
00477 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00478 tmpRescInfo, tmpRescGrpInfo->rescGroupName, outDataObjInfo, 0);
00479 } else {
00480 srcDataObjInfo = srcDataObjInfoHead;
00481 while (srcDataObjInfo != NULL) {
00482 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo,
00483 tmpRescInfo, tmpRescGrpInfo->rescGroupName,
00484 outDataObjInfo, 0);
00485 if (status >= 0) {
00486 break;
00487 } else {
00488 savedStatus = status;
00489 }
00490 srcDataObjInfo = srcDataObjInfo->next;
00491 }
00492 }
00493 if (status >= 0) {
00494 transStat->numThreads = dataObjInp->numThreads;
00495 if (allFlag == 0) {
00496 return 0;
00497 }
00498 } else {
00499 savedStatus = status;
00500 }
00501 tmpRescGrpInfo = tmpRescGrpInfo->next;
00502 }
00503
00504 if (savedStatus == 0 && destRescGrpInfo->status < 0) {
00505
00506 return destRescGrpInfo->status;
00507 } else {
00508 return (savedStatus);
00509 }
00510 }
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525 int
00526 _rsDataObjReplS (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00527 dataObjInfo_t *srcDataObjInfo, rescInfo_t *destRescInfo,
00528 char *rescGroupName, dataObjInfo_t *destDataObjInfo, int updateFlag)
00529 {
00530 int status, status1;
00531 int l1descInx;
00532 openedDataObjInp_t dataObjCloseInp;
00533 dataObjInfo_t *myDestDataObjInfo;
00534
00535 l1descInx = dataObjOpenForRepl (rsComm, dataObjInp, srcDataObjInfo,
00536 destRescInfo, rescGroupName, destDataObjInfo, updateFlag);
00537
00538 if (l1descInx < 0) {
00539 return (l1descInx);
00540 }
00541
00542 if (L1desc[l1descInx].stageFlag != NO_STAGING) {
00543 status = l3DataStageSync (rsComm, l1descInx);
00544 } else if (L1desc[l1descInx].dataObjInp->numThreads == 0 &&
00545 L1desc[l1descInx].dataObjInfo->dataSize <= MAX_SZ_FOR_SINGLE_BUF) {
00546 status = l3DataCopySingleBuf (rsComm, l1descInx);
00547 } else {
00548 status = dataObjCopy (rsComm, l1descInx);
00549 }
00550
00551 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00552
00553 dataObjCloseInp.l1descInx = l1descInx;
00554
00555 L1desc[l1descInx].oprStatus = status;
00556 if (status >= 0) {
00557 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataObjInfo->dataSize;
00558 }
00559
00560 status1 = irsDataObjClose (rsComm, &dataObjCloseInp, &myDestDataObjInfo);
00561
00562 if (destDataObjInfo != NULL) {
00563 if (destDataObjInfo->dataId <= 0 && myDestDataObjInfo != NULL) {
00564 destDataObjInfo->dataId = myDestDataObjInfo->dataId;
00565 destDataObjInfo->replNum = myDestDataObjInfo->replNum;
00566 } else {
00567
00568 destDataObjInfo->dataSize = myDestDataObjInfo->dataSize;
00569 }
00570 }
00571 freeDataObjInfo (myDestDataObjInfo);
00572
00573 if (status < 0) {
00574 return status;
00575 } else if (status1 < 0) {
00576 return status1;
00577 } else {
00578 return (status);
00579 }
00580 }
00581
00582
00583
00584
00585 int
00586 dataObjOpenForRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00587 dataObjInfo_t *inpSrcDataObjInfo, rescInfo_t *destRescInfo,
00588 char *rescGroupName, dataObjInfo_t *inpDestDataObjInfo, int updateFlag)
00589 {
00590 dataObjInfo_t *myDestDataObjInfo, *srcDataObjInfo = NULL;
00591 rescInfo_t *myDestRescInfo;
00592 int destL1descInx;
00593 int srcL1descInx;
00594 int status;
00595 int replStatus;
00596 int destRescClass;
00597 char *destRescName, *srcRescName;
00598 int srcRescClass = getRescClass (inpSrcDataObjInfo->rescInfo);
00599 dataObjInfo_t *cacheDataObjInfo = NULL;
00600 dataObjInp_t myDataObjInp, *l1DataObjInp;
00601
00602 if (destRescInfo == NULL) {
00603 myDestRescInfo = inpDestDataObjInfo->rescInfo;
00604 } else {
00605 myDestRescInfo = destRescInfo;
00606 }
00607
00608 if (inpSrcDataObjInfo->rescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00609 return SYS_RESC_IS_DOWN;
00610 }
00611
00612 if (myDestRescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00613 return SYS_RESC_IS_DOWN;
00614 }
00615
00616 destRescClass = getRescClass (myDestRescInfo);
00617
00618 #if 0
00619 if (destRescClass == COMPOUND_CL && srcRescClass == COMPOUND_CL) {
00620 return SYS_SRC_DEST_RESC_COMPOUND_TYPE;
00621 } else if (destRescClass == COMPOUND_CL) {
00622 if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00623 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00624
00625 return SYS_UNMATCHED_RESC_IN_RESC_GRP;
00626 }
00627 #endif
00628
00629
00630 if (srcRescClass == COMPOUND_CL) {
00631 rescGrpInfo_t *myRescGrpInfo;
00632 if (destRescClass == CACHE_CL && isRescsInSameGrp (rsComm,
00633 myDestRescInfo->rescName, inpSrcDataObjInfo->rescInfo->rescName,
00634 &myRescGrpInfo)) {
00635
00636 if (strlen (inpSrcDataObjInfo->rescGroupName) == 0) {
00637 rstrcpy (inpSrcDataObjInfo->rescGroupName,
00638 myRescGrpInfo->rescGroupName, NAME_LEN);
00639 }
00640 } else if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00641 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00642 cacheDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00643 status = stageDataFromCompToCache (rsComm, inpSrcDataObjInfo,
00644 cacheDataObjInfo);
00645 if (status < 0) { free( cacheDataObjInfo ); return status; }
00646
00647 srcRescClass = getRescClass (cacheDataObjInfo->rescInfo);
00648 }
00649 }
00650
00651 if (cacheDataObjInfo == NULL) {
00652 srcDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00653 *srcDataObjInfo = *inpSrcDataObjInfo;
00654 } else {
00655 srcDataObjInfo = cacheDataObjInfo;
00656 }
00657
00658 if( NULL == srcDataObjInfo ) {
00659 rodsLog( LOG_ERROR, "dataObjOpenForRepl - srcDataObjInfo is NULL" );
00660 return -1;
00661 }
00662
00663 #if 0
00664 dataObjInp->dataSize = inpSrcDataObjInfo->dataSize;
00665 #else
00666 myDataObjInp = *dataObjInp;
00667 myDataObjInp.dataSize = inpSrcDataObjInfo->dataSize;
00668 #endif
00669
00670 destL1descInx = allocL1desc ();
00671
00672 if (destL1descInx < 0) return destL1descInx;
00673
00674 myDestDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00675 if (updateFlag > 0) {
00676
00677 if(inpDestDataObjInfo == NULL || inpDestDataObjInfo->dataId <= 0) {
00678 rodsLog (LOG_ERROR,
00679 "dataObjOpenForRepl: dataId of %s copy to be updated not defined",
00680 srcDataObjInfo->objPath);
00681 return (SYS_UPDATE_REPL_INFO_ERR);
00682 }
00683
00684 inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus;
00685 *myDestDataObjInfo = *inpDestDataObjInfo;
00686 replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY;
00687 addKeyVal (&myDataObjInp.condInput, FORCE_FLAG_KW, "");
00688 myDataObjInp.openFlags |= (O_TRUNC | O_WRONLY);
00689 } else {
00690 initDataObjInfoForRepl (rsComm, myDestDataObjInfo, srcDataObjInfo,
00691 destRescInfo, rescGroupName);
00692 replStatus = srcDataObjInfo->replStatus;
00693 }
00694
00695 fillL1desc (destL1descInx, &myDataObjInp, myDestDataObjInfo,
00696 replStatus, srcDataObjInfo->dataSize);
00697 l1DataObjInp = L1desc[destL1descInx].dataObjInp;
00698 if (l1DataObjInp->oprType == PHYMV_OPR) {
00699 L1desc[destL1descInx].oprType = PHYMV_DEST;
00700 myDestDataObjInfo->replNum = srcDataObjInfo->replNum;
00701 myDestDataObjInfo->dataId = srcDataObjInfo->dataId;
00702 } else {
00703 L1desc[destL1descInx].oprType = REPLICATE_DEST;
00704 }
00705
00706 if (destRescClass == COMPOUND_CL) {
00707 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00708 } else if (srcRescClass == COMPOUND_CL) {
00709 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00710 }
00711
00712 if (destRescInfo != NULL)
00713 destRescName = destRescInfo->rescName;
00714 else
00715 destRescName = NULL;
00716
00717 if (srcDataObjInfo != NULL && srcDataObjInfo->rescInfo != NULL)
00718 srcRescName = srcDataObjInfo->rescInfo->rescName;
00719 else
00720 srcRescName = NULL;
00721
00722 l1DataObjInp->numThreads = dataObjInp->numThreads =
00723 getNumThreads( rsComm, l1DataObjInp->dataSize, l1DataObjInp->numThreads,
00724 &dataObjInp->condInput, destRescName, srcRescName);
00725
00726 if (l1DataObjInp->numThreads > 0 &&
00727 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00728 if (updateFlag > 0) {
00729 status = dataOpen (rsComm, destL1descInx);
00730 } else {
00731 status = getFilePathName (rsComm, myDestDataObjInfo,
00732 L1desc[destL1descInx].dataObjInp);
00733 if (status >= 0)
00734 status = dataCreate (rsComm, destL1descInx);
00735 }
00736
00737 if (status < 0) {
00738 freeL1desc (destL1descInx);
00739 return (status);
00740 }
00741 } else {
00742 if (updateFlag == 0) {
00743 status = getFilePathName (rsComm, myDestDataObjInfo,
00744 L1desc[destL1descInx].dataObjInp);
00745 if (status < 0) {
00746 freeL1desc (destL1descInx);
00747 return (status);
00748 }
00749 }
00750 }
00751
00752 if (inpDestDataObjInfo != NULL && updateFlag == 0) {
00753
00754 *inpDestDataObjInfo = *myDestDataObjInfo;
00755 inpDestDataObjInfo->next = NULL;
00756 }
00757
00758
00759
00760 srcL1descInx = allocL1desc ();
00761 if (srcL1descInx < 0) return srcL1descInx;
00762 fillL1desc (srcL1descInx, &myDataObjInp, srcDataObjInfo,
00763 srcDataObjInfo->replStatus, srcDataObjInfo->dataSize);
00764 l1DataObjInp = L1desc[srcL1descInx].dataObjInp;
00765 l1DataObjInp->numThreads = dataObjInp->numThreads;
00766 if (l1DataObjInp->oprType == PHYMV_OPR) {
00767 L1desc[srcL1descInx].oprType = PHYMV_SRC;
00768 } else {
00769 L1desc[srcL1descInx].oprType = REPLICATE_SRC;
00770 }
00771
00772 if( L1desc[destL1descInx].stageFlag == SYNC_DEST &&
00773 getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL) {
00774 L1desc[srcL1descInx].purgeCacheFlag = 1;
00775 }
00776
00777
00778 if (l1DataObjInp->numThreads > 0 &&
00779 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00780 openedDataObjInp_t dataObjCloseInp;
00781
00782 l1DataObjInp->openFlags = O_RDONLY;
00783 status = dataOpen (rsComm, srcL1descInx);
00784 if (status < 0) {
00785 freeL1desc (srcL1descInx);
00786 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00787 dataObjCloseInp.l1descInx = destL1descInx;
00788 rsDataObjClose (rsComm, &dataObjCloseInp);
00789 return (status);
00790 }
00791 }
00792 L1desc[destL1descInx].srcL1descInx = srcL1descInx;
00793
00794 return (destL1descInx);
00795 }
00796
00797 int
00798 dataObjCopy (rsComm_t *rsComm, int l1descInx)
00799 {
00800 int srcL1descInx, destL1descInx;
00801 int srcL3descInx, destL3descInx;
00802 int status;
00803 portalOprOut_t *portalOprOut = NULL;
00804 dataCopyInp_t dataCopyInp;
00805 dataOprInp_t *dataOprInp;
00806 int srcRemoteFlag, destRemoteFlag;
00807
00808 bzero (&dataCopyInp, sizeof (dataCopyInp));
00809 dataOprInp = &dataCopyInp.dataOprInp;
00810 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00811 destL1descInx = l1descInx;
00812
00813 srcL3descInx = L1desc[srcL1descInx].l3descInx;
00814 destL3descInx = L1desc[destL1descInx].l3descInx;
00815
00816 if (L1desc[srcL1descInx].remoteZoneHost != NULL) {
00817 srcRemoteFlag = REMOTE_ZONE_HOST;
00818 } else {
00819 srcRemoteFlag = FileDesc[srcL3descInx].rodsServerHost->localFlag;
00820 }
00821
00822 if (L1desc[destL1descInx].remoteZoneHost != NULL) {
00823 destRemoteFlag = REMOTE_ZONE_HOST;
00824 } else {
00825 destRemoteFlag = FileDesc[destL3descInx].rodsServerHost->localFlag;
00826 }
00827
00828 if (srcRemoteFlag != REMOTE_ZONE_HOST &&
00829 destRemoteFlag != REMOTE_ZONE_HOST &&
00830 FileDesc[srcL3descInx].rodsServerHost ==
00831 FileDesc[destL3descInx].rodsServerHost) {
00832
00833 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, SAME_HOST_COPY_OPR);
00834
00835 dataCopyInp.portalOprOut.numThreads =
00836 dataCopyInp.dataOprInp.numThreads;
00837 if (srcRemoteFlag == LOCAL_HOST) {
00838 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00839 }
00840
00841 } else if ((srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) ||
00842 destRemoteFlag == REMOTE_ZONE_HOST) {
00843 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_REM_OPR);
00844
00845 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00846
00847
00848 status = preProcParaPut (rsComm, destL1descInx, &portalOprOut);
00849 if (status < 0 || NULL == portalOprOut ) {
00850 rodsLog (LOG_NOTICE,
00851 "dataObjCopy: preProcParaPut error for %s",
00852 L1desc[srcL1descInx].dataObjInfo->objPath);
00853 return (status);
00854 }
00855 dataCopyInp.portalOprOut = *portalOprOut;
00856 } else {
00857 dataCopyInp.portalOprOut.l1descInx = destL1descInx;
00858 }
00859 if (srcRemoteFlag == LOCAL_HOST)
00860 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00861 } else if ((srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) ||
00862 srcRemoteFlag == REMOTE_ZONE_HOST) {
00863
00864 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
00865
00866 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00867
00868 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
00869 if (status < 0 || NULL == portalOprOut ) {
00870 rodsLog (LOG_NOTICE,
00871 "dataObjCopy: preProcParaGet error for %s",
00872 L1desc[srcL1descInx].dataObjInfo->objPath);
00873 return (status);
00874 }
00875 dataCopyInp.portalOprOut = *portalOprOut;
00876 } else {
00877 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
00878 }
00879 if (destRemoteFlag == LOCAL_HOST)
00880 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00881 } else {
00882
00883 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
00884
00885
00886 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00887 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
00888
00889 if (status < 0 || NULL == portalOprOut ) {
00890 rodsLog (LOG_NOTICE,
00891 "dataObjCopy: preProcParaGet error for %s",
00892 L1desc[srcL1descInx].dataObjInfo->objPath);
00893 return (status);
00894 }
00895 dataCopyInp.portalOprOut = *portalOprOut;
00896 } else {
00897 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
00898 }
00899 }
00900
00901 if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
00902 NO_CHK_COPY_LEN_KW) != NULL) {
00903
00904 addKeyVal (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW, "");
00905 if (L1desc[l1descInx].dataObjInp->numThreads > 1) {
00906 L1desc[l1descInx].dataObjInp->numThreads =
00907 L1desc[srcL1descInx].dataObjInp->numThreads =
00908 dataCopyInp.portalOprOut.numThreads = 1;
00909 }
00910 }
00911 status = rsDataCopy (rsComm, &dataCopyInp);
00912
00913 if (status >= 0 && portalOprOut != NULL &&
00914 L1desc[l1descInx].dataObjInp != NULL) {
00915
00916 L1desc[l1descInx].dataObjInp->numThreads = portalOprOut->numThreads;
00917 }
00918 if (portalOprOut != NULL) free (portalOprOut);
00919 clearKeyVal (&dataOprInp->condInput);
00920
00921 return (status);
00922 }
00923
00924 int
00925 l3DataCopySingleBuf (rsComm_t *rsComm, int l1descInx)
00926 {
00927 bytesBuf_t dataBBuf;
00928 int bytesRead, bytesWritten;
00929 int srcL1descInx;
00930
00931 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
00932
00933 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00934 if (L1desc[srcL1descInx].dataSize < 0) {
00935 rodsLog (LOG_ERROR,
00936 "l3DataCopySingleBuf: dataSize %lld for %s is negative",
00937 L1desc[srcL1descInx].dataSize,
00938 L1desc[srcL1descInx].dataObjInfo->objPath);
00939 return (SYS_COPY_LEN_ERR);
00940 } else if (L1desc[srcL1descInx].dataSize == 0) {
00941 bytesRead = 0;
00942 } else {
00943 dataBBuf.buf = malloc (L1desc[srcL1descInx].dataSize);
00944 bytesRead = rsL3FileGetSingleBuf (rsComm, &srcL1descInx, &dataBBuf);
00945 }
00946
00947 if (bytesRead < 0) {
00948 free( dataBBuf.buf );
00949 return (bytesRead);
00950 } else if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
00951 NO_CHK_COPY_LEN_KW) != NULL) {
00952
00953 L1desc[l1descInx].dataSize = L1desc[l1descInx].dataObjInp->dataSize
00954 = bytesRead;
00955 }
00956 bytesWritten = rsL3FilePutSingleBuf (rsComm, &l1descInx, &dataBBuf);
00957
00958 L1desc[l1descInx].bytesWritten = bytesWritten;
00959
00960 if (dataBBuf.buf != NULL) {
00961 free (dataBBuf.buf);
00962 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
00963 }
00964
00965 if (bytesWritten != bytesRead) {
00966 if (bytesWritten >= 0) {
00967 rodsLog (LOG_NOTICE,
00968 "l3DataCopySingleBuf: l3FilePut error, towrite %d, written %d",
00969 bytesRead, bytesWritten);
00970 return (SYS_COPY_LEN_ERR);
00971 } else {
00972 return (bytesWritten);
00973 }
00974 }
00975
00976
00977 return (0);
00978 }
00979
00980 int
00981 l3DataStageSync (rsComm_t *rsComm, int l1descInx)
00982 {
00983 bytesBuf_t dataBBuf;
00984 int srcL1descInx;
00985 int status = 0;
00986
00987 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
00988
00989 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00990 if (L1desc[srcL1descInx].dataSize < 0 &&
00991 L1desc[srcL1descInx].dataSize != UNKNOWN_FILE_SZ) {
00992 rodsLog (LOG_ERROR,
00993 "l3DataStageSync: dataSize %lld for %s is negative",
00994 L1desc[srcL1descInx].dataSize,
00995 L1desc[srcL1descInx].dataObjInfo->objPath);
00996 return (SYS_COPY_LEN_ERR);
00997 } else if (L1desc[srcL1descInx].dataSize > 0 ||
00998 L1desc[srcL1descInx].dataSize == UNKNOWN_FILE_SZ) {
00999 if (L1desc[l1descInx].stageFlag == SYNC_DEST) {
01000
01001 status = l3FileSync (rsComm, srcL1descInx, l1descInx);
01002 } else {
01003
01004 status = l3FileStage (rsComm, srcL1descInx, l1descInx);
01005 }
01006 }
01007
01008 if (status < 0) {
01009 L1desc[l1descInx].bytesWritten = -1;
01010 } else {
01011 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataSize =
01012 L1desc[srcL1descInx].dataSize;
01013 }
01014
01015 return (status);
01016 }
01017
01018 int
01019 l3FileSync (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01020 {
01021 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01022 int rescTypeInx, cacheRescTypeInx;
01023 fileStageSyncInp_t fileSyncToArchInp;
01024 dataObjInp_t *dataObjInp;
01025 int status;
01026 char *outFileName = NULL;
01027
01028 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01029 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01030
01031 rescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01032 cacheRescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01033
01034 switch (RescTypeDef[rescTypeInx].rescCat) {
01035 dataObjInfo_t tmpDataObjInfo;
01036 case FILE_CAT:
01037
01038 if (RescTypeDef[rescTypeInx].createPathFlag == CREATE_PATH) {
01039 status = chkOrphanFile (rsComm, destDataObjInfo->filePath,
01040 destDataObjInfo->rescName, &tmpDataObjInfo);
01041 if (status == 0 && tmpDataObjInfo.dataId !=
01042 destDataObjInfo->dataId) {
01043
01044 snprintf (destDataObjInfo->filePath, MAX_NAME_LEN,
01045 "%s.%-d", destDataObjInfo->filePath, (int) random());
01046 }
01047 }
01048 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01049 dataObjInp = L1desc[destL1descInx].dataObjInp;
01050 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01051 fileSyncToArchInp.fileType = (fileDriverType_t)RescTypeDef[rescTypeInx].driverType;
01052 fileSyncToArchInp.cacheFileType =
01053 (fileDriverType_t)RescTypeDef[cacheRescTypeInx].driverType;
01054 rstrcpy (fileSyncToArchInp.addr.hostAddr,
01055 srcDataObjInfo->rescInfo->rescLoc, NAME_LEN);
01056
01057 rstrcpy (fileSyncToArchInp.filename, destDataObjInfo->filePath,
01058 MAX_NAME_LEN);
01059 rstrcpy (fileSyncToArchInp.cacheFilename, srcDataObjInfo->filePath,
01060 MAX_NAME_LEN);
01061 fileSyncToArchInp.mode = getFileMode (dataObjInp);
01062 status = rsFileSyncToArch (rsComm, &fileSyncToArchInp, &outFileName);
01063 if (status >= 0 &&
01064 RescTypeDef[rescTypeInx].createPathFlag == NO_CREATE_PATH &&
01065 outFileName != NULL) {
01066
01067 rstrcpy (destDataObjInfo->filePath, outFileName, MAX_NAME_LEN);
01068 L1desc[destL1descInx].replStatus |= FILE_PATH_HAS_CHG;
01069 free (outFileName);
01070 }
01071 break;
01072 default:
01073 rodsLog (LOG_ERROR,
01074 "l3FileSync: rescCat type %d is not recognized",
01075 RescTypeDef[rescTypeInx].rescCat);
01076 status = SYS_INVALID_RESC_TYPE;
01077 break;
01078 }
01079 return (status);
01080 }
01081
01082 int
01083 l3FileStage (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01084 {
01085 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01086 int mode, status;
01087
01088 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01089 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01090
01091 mode = getFileMode (L1desc[destL1descInx].dataObjInp);
01092
01093 status = _l3FileStage (rsComm, srcDataObjInfo, destDataObjInfo, mode);
01094
01095 return status;
01096 }
01097
01098 int
01099 _l3FileStage (rsComm_t *rsComm, dataObjInfo_t *srcDataObjInfo,
01100 dataObjInfo_t *destDataObjInfo, int mode)
01101 {
01102 int rescTypeInx, cacheRescTypeInx;
01103 fileStageSyncInp_t fileSyncToArchInp;
01104 int status;
01105
01106 rescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01107 cacheRescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01108
01109
01110 switch (RescTypeDef[rescTypeInx].rescCat) {
01111 case FILE_CAT:
01112 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01113 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01114 fileSyncToArchInp.fileType = (fileDriverType_t)RescTypeDef[rescTypeInx].driverType;
01115 fileSyncToArchInp.cacheFileType =
01116 (fileDriverType_t)RescTypeDef[cacheRescTypeInx].driverType;
01117 rstrcpy (fileSyncToArchInp.addr.hostAddr,
01118 destDataObjInfo->rescInfo->rescLoc, NAME_LEN);
01119
01120 rstrcpy( fileSyncToArchInp.cacheFilename, destDataObjInfo->filePath,
01121 MAX_NAME_LEN);
01122 rstrcpy( fileSyncToArchInp.filename, srcDataObjInfo->filePath,
01123 MAX_NAME_LEN);
01124 fileSyncToArchInp.mode = mode;
01125 status = rsFileStageToCache (rsComm, &fileSyncToArchInp);
01126 break;
01127 default:
01128 rodsLog (LOG_ERROR,
01129 "l3FileStage: rescCat type %d is not recognized",
01130 RescTypeDef[rescTypeInx].rescCat);
01131 status = SYS_INVALID_RESC_TYPE;
01132 break;
01133 }
01134 return (status);
01135 }
01136
01137
01138
01139
01140
01141
01142
01143
01144
01145 int
01146 rsReplAndRequeDataObjInfo (rsComm_t *rsComm,
01147 dataObjInfo_t **srcDataObjInfoHead, char *destRescName, char *flagStr)
01148 {
01149 dataObjInfo_t *dataObjInfoHead, *myDataObjInfo;
01150 transferStat_t transStat;
01151 dataObjInp_t dataObjInp;
01152 char tmpStr[NAME_LEN];
01153 int status;
01154
01155 dataObjInfoHead = *srcDataObjInfoHead;
01156 myDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01157 memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
01158 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01159 memset (&transStat, 0, sizeof (transStat));
01160
01161 if (flagStr != NULL) {
01162 if (strstr (flagStr, ALL_KW) != NULL) {
01163 addKeyVal (&dataObjInp.condInput, ALL_KW, "");
01164 }
01165 if (strstr (flagStr, RBUDP_TRANSFER_KW) != NULL) {
01166 addKeyVal (&dataObjInp.condInput, RBUDP_TRANSFER_KW, "");
01167 }
01168 if (strstr (flagStr, SU_CLIENT_USER_KW) != NULL) {
01169 addKeyVal (&dataObjInp.condInput, SU_CLIENT_USER_KW, "");
01170 }
01171 if (strstr (flagStr, UPDATE_REPL_KW) != NULL) {
01172 addKeyVal (&dataObjInp.condInput, UPDATE_REPL_KW, "");
01173 }
01174 }
01175
01176 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01177 snprintf (tmpStr, NAME_LEN, "%d", dataObjInfoHead->replNum);
01178 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01179 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, destRescName);
01180
01181 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01182 myDataObjInfo);
01183
01184
01185 clearKeyVal (&dataObjInp.condInput);
01186 if (status >= 0) {
01187 status = 1;
01188
01189 queDataObjInfo (srcDataObjInfoHead, myDataObjInfo, 0, 1);
01190 } else {
01191 freeAllDataObjInfo (myDataObjInfo);
01192 }
01193
01194 return status;
01195 }
01196
01197 int
01198 stageDataFromCompToCache (rsComm_t *rsComm, dataObjInfo_t *compObjInfo,
01199 dataObjInfo_t *outCacheObjInfo)
01200 {
01201 int status;
01202 rescInfo_t *cacheResc;
01203 transferStat_t transStat;
01204 dataObjInp_t dataObjInp;
01205 char tmpStr[NAME_LEN];
01206
01207 if (getRescClass (compObjInfo->rescInfo) != COMPOUND_CL) return 0;
01208
01209 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01210 compObjInfo->rescInfo, &cacheResc);
01211 if (status < 0) {
01212 rodsLog (LOG_ERROR,
01213 "stageDataFromCompToCache: getCacheRescInGrp %s failed for %s stat=%d",
01214 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01215 return status;
01216 }
01217 if (outCacheObjInfo != NULL)
01218 memset (outCacheObjInfo, 0, sizeof (dataObjInfo_t));
01219 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01220 memset (&transStat, 0, sizeof (transStat));
01221
01222 rstrcpy (dataObjInp.objPath, compObjInfo->objPath, MAX_NAME_LEN);
01223 snprintf (tmpStr, NAME_LEN, "%d", compObjInfo->replNum);
01224 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01225 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, cacheResc->rescName);
01226
01227 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01228 outCacheObjInfo);
01229
01230 clearKeyVal (&dataObjInp.condInput);
01231 return status;
01232 }
01233
01234
01235
01236
01237
01238
01239 int
01240 stageAndRequeDataToCache (rsComm_t *rsComm, dataObjInfo_t **compObjInfoHead)
01241 {
01242 int status;
01243 dataObjInfo_t *outCacheObjInfo;
01244
01245 outCacheObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01246 bzero (outCacheObjInfo, sizeof (dataObjInfo_t));
01247 status = stageDataFromCompToCache (rsComm, *compObjInfoHead,
01248 outCacheObjInfo);
01249
01250 if (status < 0) {
01251
01252 if (outCacheObjInfo->dataId > 0) {
01253
01254 if( requeDataObjInfoByReplNum (compObjInfoHead,
01255 outCacheObjInfo->replNum) == 0) {
01256
01257 status = 0;
01258 }
01259 }
01260 free (outCacheObjInfo);
01261 } else {
01262 queDataObjInfo (compObjInfoHead, outCacheObjInfo, 0, 1);
01263 }
01264 return status;
01265 }
01266
01267 int
01268 replToCacheRescOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01269 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *compObjInfo,
01270 dataObjInfo_t *oldDataObjInfo, dataObjInfo_t **outDestDataObjInfo)
01271 {
01272 int status = 0;
01273 rescInfo_t *cacheResc;
01274 dataObjInfo_t *destDataObjInfo, *srcDataObjInfo;
01275 dataObjInfo_t *tmpDestDataObjInfo = NULL;
01276 int updateFlag;
01277
01278 if ((status = getCacheDataInfoForRepl (rsComm, oldDataObjInfo, NULL, compObjInfo, &tmpDestDataObjInfo)) >= 0) {
01279 cacheResc = oldDataObjInfo->rescInfo;
01280 updateFlag = 1;
01281 } else {
01282
01283 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01284 compObjInfo->rescInfo, &cacheResc);
01285 if (status < 0) {
01286 rodsLog (LOG_ERROR,
01287 "replToCacheRescOfCompObj:getCacheRescInGrp %s err for %s stat=%d",
01288 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01289 return status;
01290 }
01291 updateFlag = 0;
01292 }
01293
01294 if (outDestDataObjInfo == NULL) {
01295 destDataObjInfo = tmpDestDataObjInfo;
01296 } else {
01297 destDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01298 if (tmpDestDataObjInfo == NULL) {
01299 bzero (destDataObjInfo, sizeof (dataObjInfo_t));
01300 } else {
01301 *destDataObjInfo = *tmpDestDataObjInfo;
01302 }
01303 }
01304 srcDataObjInfo = srcDataObjInfoHead;
01305 while (srcDataObjInfo != NULL) {
01306 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
01307 cacheResc, compObjInfo->rescGroupName, destDataObjInfo, updateFlag);
01308 if (status >= 0) {
01309 if (outDestDataObjInfo != NULL)
01310 *outDestDataObjInfo = destDataObjInfo;
01311 break;
01312 }
01313 srcDataObjInfo = srcDataObjInfo->next;
01314 }
01315
01316 return status;
01317 }
01318
01319 int
01320 stageBundledData (rsComm_t *rsComm, dataObjInfo_t **subfileObjInfoHead)
01321 {
01322 int status;
01323 dataObjInfo_t *dataObjInfoHead = *subfileObjInfoHead;
01324 rescInfo_t *cacheResc;
01325 dataObjInp_t dataObjInp;
01326 dataObjInfo_t *cacheObjInfo;
01327
01328 if (getRescClass (dataObjInfoHead->rescInfo) != BUNDLE_CL) return 0;
01329
01330 status = unbunAndStageBunfileObj (rsComm, dataObjInfoHead->filePath,
01331 &cacheResc);
01332
01333 if (status < 0) return status;
01334
01335
01336 bzero (&dataObjInp, sizeof (dataObjInp));
01337 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01338 addKeyVal (&dataObjInp.condInput, RESC_NAME_KW, cacheResc->rescName);
01339 status = getDataObjInfo (rsComm, &dataObjInp, &cacheObjInfo, NULL, 0);
01340 clearKeyVal (&dataObjInp.condInput);
01341 if (status < 0) {
01342 rodsLog (LOG_ERROR,
01343 "unbunAndStageBunfileObj: getDataObjInfo of subfile %s failed.stat=%d",
01344 dataObjInp.objPath, status);
01345 return status;
01346 }
01347
01348 queDataObjInfo (subfileObjInfoHead, cacheObjInfo, 0, 1);
01349
01350
01351 return status;
01352 }
01353
01354 int
01355 unbunAndStageBunfileObj (rsComm_t *rsComm, char *bunfileObjPath,
01356 rescInfo_t **outCacheResc)
01357 {
01358 dataObjInfo_t *bunfileObjInfoHead;
01359 dataObjInp_t dataObjInp;
01360 int status;
01361
01362
01363 bzero (&dataObjInp, sizeof (dataObjInp));
01364 rstrcpy (dataObjInp.objPath, bunfileObjPath, MAX_NAME_LEN);
01365
01366 status = getDataObjInfo (rsComm, &dataObjInp, &bunfileObjInfoHead, NULL, 1);
01367 if (status < 0) {
01368 rodsLog (LOG_ERROR,
01369 "unbunAndStageBunfileObj: getDataObjInfo of bunfile %s failed.stat=%d",
01370 dataObjInp.objPath, status);
01371 return status;
01372 }
01373 status = _unbunAndStageBunfileObj (rsComm, &bunfileObjInfoHead,
01374 outCacheResc, 0);
01375
01376 freeAllDataObjInfo (bunfileObjInfoHead);
01377
01378 return status;
01379 }
01380
01381 int
01382 _unbunAndStageBunfileObj (rsComm_t *rsComm, dataObjInfo_t **bunfileObjInfoHead,
01383 rescInfo_t **outCacheResc, int rmBunCopyFlag)
01384 {
01385 int status;
01386 rescInfo_t *cacheResc;
01387 dataObjInp_t dataObjInp;
01388
01389 bzero (&dataObjInp, sizeof (dataObjInp));
01390 rstrcpy (dataObjInp.objPath, (*bunfileObjInfoHead)->objPath, MAX_NAME_LEN);
01391 status = sortObjInfoForOpen (rsComm, bunfileObjInfoHead, NULL, 0);
01392 if (status < 0) return status;
01393
01394 if (getRescClass ((*bunfileObjInfoHead)->rescInfo) != CACHE_CL) {
01395
01396 status = getCacheRescInGrp (rsComm,
01397 (*bunfileObjInfoHead)->rescGroupName,
01398 (*bunfileObjInfoHead)->rescInfo, &cacheResc);
01399 if (status < 0) {
01400 rodsLog (LOG_ERROR,
01401 "unbunAndStageBunfileObj:getCacheRescInGrp %s err for %s stat=%d",
01402 (*bunfileObjInfoHead)->rescGroupName,
01403 (*bunfileObjInfoHead)->objPath, status);
01404 return status;
01405 }
01406 if (outCacheResc != NULL)
01407 *outCacheResc = cacheResc;
01408
01409
01410 status = rsReplAndRequeDataObjInfo (rsComm, bunfileObjInfoHead,
01411 cacheResc->rescName, SU_CLIENT_USER_KW);
01412 if (status < 0) {
01413 rodsLog (LOG_ERROR,
01414 "unbunAndStageBunfileObj:rsReplAndRequeDataObjInfo %s err stat=%d",
01415 (*bunfileObjInfoHead)->objPath, status);
01416 return status;
01417 }
01418 } else {
01419 if (outCacheResc != NULL)
01420 *outCacheResc = (*bunfileObjInfoHead)->rescInfo;
01421 }
01422 addKeyVal (&dataObjInp.condInput, BUN_FILE_PATH_KW,
01423 (*bunfileObjInfoHead)->filePath);
01424 if (rmBunCopyFlag > 0) {
01425 addKeyVal (&dataObjInp.condInput, RM_BUN_COPY_KW, "");
01426 }
01427 if (strlen ((*bunfileObjInfoHead)->dataType) > 0) {
01428 addKeyVal (&dataObjInp.condInput, DATA_TYPE_KW,
01429 (*bunfileObjInfoHead)->dataType);
01430 }
01431 status = _rsUnbunAndRegPhyBunfile (rsComm, &dataObjInp,
01432 (*bunfileObjInfoHead)->rescInfo);
01433
01434 return status;
01435 }
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452 int
01453 getCacheDataInfoOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01454 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01455 dataObjInfo_t *compDataObjInfo, dataObjInfo_t *oldDataObjInfo,
01456 dataObjInfo_t **outDataObjInfo)
01457 {
01458 int status;
01459
01460 if ((status = getCacheDataInfoForRepl (rsComm, srcDataObjInfoHead,
01461 destDataObjInfoHead, compDataObjInfo, outDataObjInfo)) < 0) {
01462
01463 status = replToCacheRescOfCompObj (rsComm, dataObjInp,
01464 srcDataObjInfoHead, compDataObjInfo, oldDataObjInfo,
01465 outDataObjInfo);
01466 if (status < 0) {
01467 rodsLog (LOG_ERROR,
01468 "_rsDataObjRepl: replToCacheRescOfCompObj of %s error stat=%d",
01469 srcDataObjInfoHead->objPath, status);
01470 return status;
01471 }
01472
01473
01474 queDataObjInfo (&srcDataObjInfoHead, *outDataObjInfo, 1, 0);
01475 }
01476 return status;
01477 }
01478
01479 int
01480 getCacheDataInfoOfCompResc (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01481 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01482 rescGrpInfo_t *compRescGrpInfo, dataObjInfo_t *oldDataObjInfo,
01483 dataObjInfo_t **outDataObjInfo)
01484 {
01485 int status;
01486 dataObjInfo_t compDataObjInfo;
01487
01488
01489 bzero (&compDataObjInfo, sizeof (compDataObjInfo));
01490 rstrcpy (compDataObjInfo.rescGroupName, compRescGrpInfo->rescGroupName,
01491 NAME_LEN);
01492 compDataObjInfo.rescInfo = compRescGrpInfo->rescInfo;
01493 status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
01494 srcDataObjInfoHead, destDataObjInfoHead, &compDataObjInfo,
01495 oldDataObjInfo, outDataObjInfo);
01496
01497 return status;
01498 }
01499