00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "dataObjRepl.h"
00011 #include "dataObjOpr.h"
00012 #include "dataObjCreate.h"
00013 #include "dataObjOpen.h"
00014 #include "dataObjPut.h"
00015 #include "dataObjGet.h"
00016 #include "rodsLog.h"
00017 #include "objMetaOpr.h"
00018 #include "physPath.h"
00019 #include "specColl.h"
00020 #include "resource.h"
00021 #include "reGlobalsExtern.h"
00022 #include "reDefines.h"
00023 #include "reSysDataObjOpr.h"
00024 #include "getRemoteZoneResc.h"
00025 #include "l3FileGetSingleBuf.h"
00026 #include "l3FilePutSingleBuf.h"
00027 #include "fileSyncToArch.h"
00028 #include "fileStageToCache.h"
00029 #include "unbunAndRegPhyBunfile.h"
00030 #include "dataObjTrim.h"
00031 #include "dataObjLock.h"
00032
00033
00034
00035 #include "eirods_resource_backport.h"
00036 #include "eirods_resource_redirect.h"
00037 #include "eirods_log.h"
00038
00039
00040 int
00041 rsDataObjRepl250 (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00042 transStat_t **transStat)
00043 {
00044 int status;
00045 transferStat_t *transferStat = NULL;
00046
00047 status = rsDataObjRepl (rsComm, dataObjInp, &transferStat);
00048
00049 if (transStat != NULL && status >= 0 && transferStat != NULL) {
00050 *transStat = (transStat_t *) malloc (sizeof (transStat_t));
00051 (*transStat)->numThreads = transferStat->numThreads;
00052 (*transStat)->bytesWritten = transferStat->bytesWritten;
00053 free (transferStat);
00054 }
00055 return status;
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066 int
00067 rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00068 transferStat_t **transStat)
00069 {
00070
00071 int status;
00072 int remoteFlag;
00073 rodsServerHost_t *rodsServerHost;
00074 dataObjInfo_t *dataObjInfo = NULL;
00075 char* lockType = NULL;
00076 int lockFd = -1;
00077
00078 char* stage_kw = getValByKey( &dataObjInp->condInput, STAGE_OBJ_KW );
00079 char* sync_kw = getValByKey( &dataObjInp->condInput, SYNC_OBJ_KW );
00080
00081 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00082
00083 if (rsComm->proxyUser.authInfo.authFlag < REMOTE_PRIV_USER_AUTH) {
00084 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00085 }
00086 }
00087
00088 status = resolvePathInSpecColl (rsComm, dataObjInp->objPath,
00089 READ_COLL_PERM, 0, &dataObjInfo);
00090
00091 if (status == DATA_OBJ_T) {
00092 if (dataObjInfo != NULL && dataObjInfo->specColl != NULL) {
00093 if (dataObjInfo->specColl->collClass == LINKED_COLL) {
00094 rstrcpy (dataObjInp->objPath, dataObjInfo->objPath,
00095 MAX_NAME_LEN);
00096 freeAllDataObjInfo (dataObjInfo);
00097 } else {
00098 freeAllDataObjInfo (dataObjInfo);
00099 return (SYS_REG_OBJ_IN_SPEC_COLL);
00100 }
00101 }
00102 }
00103
00104 remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost,
00105 REMOTE_OPEN);
00106
00107 if (remoteFlag < 0) {
00108 return (remoteFlag);
00109 } else if (remoteFlag == REMOTE_HOST) {
00110 status = _rcDataObjRepl (rodsServerHost->conn, dataObjInp,
00111 transStat);
00112 return status;
00113 }
00114
00115
00116
00117 std::string hier;
00118 char* tmp_hier = getValByKey( &dataObjInp->condInput, RESC_HIER_STR_KW );
00119 if( 0 == tmp_hier ) {
00120
00121 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00122
00123 eirods::error ret = eirods::resolve_resource_hierarchy( eirods::EIRODS_OPEN_OPERATION,
00124 rsComm, dataObjInp, hier );
00125 if( !ret.ok() ) {
00126 std::stringstream msg;
00127 msg << "failed in eirods::resolve_resource_hierarchy for [";
00128 msg << dataObjInp->objPath << "]";
00129 eirods::log( PASSMSG( msg.str(), ret ) );
00130 return ret.code();
00131 }
00132
00133 addKeyVal( &dataObjInp->condInput, RESC_HIER_STR_KW, hier.c_str() );
00134 } else {
00135 hier = tmp_hier;
00136 }
00137
00138
00139
00140 *transStat = (transferStat_t*)malloc (sizeof (transferStat_t));
00141 memset (*transStat, 0, sizeof (transferStat_t));
00142
00143
00144 lockType = getValByKey (&dataObjInp->condInput, LOCK_TYPE_KW);
00145 if (lockType != NULL) {
00146 lockFd = rsDataObjLock (rsComm, dataObjInp);
00147 if (lockFd >= 0) {
00148
00149 rmKeyVal (&dataObjInp->condInput, LOCK_TYPE_KW);
00150 } else {
00151 rodsLogError (LOG_ERROR, lockFd,
00152 "rsDataObjRepl: rsDataObjLock error for %s. lockType = %s",
00153 dataObjInp->objPath, lockType);
00154 return lockFd;
00155 }
00156 }
00157
00158
00159 status = _rsDataObjRepl (rsComm, dataObjInp, *transStat, NULL);
00160 if(status < 0) {
00161 rodsLog(LOG_NOTICE, "%s - Failed to replicate data object.", __FUNCTION__);
00162 }
00163
00164 if (lockFd > 0) rsDataObjUnlock (rsComm, dataObjInp, lockFd);
00165
00166 return (status);
00167 }
00168
00169 int
00170 _rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00171 transferStat_t *transStat, dataObjInfo_t *outDataObjInfo)
00172 {
00173 int status;
00174 dataObjInfo_t *dataObjInfoHead = NULL;
00175 dataObjInfo_t *oldDataObjInfoHead = NULL;
00176 dataObjInfo_t *destDataObjInfo = NULL;
00177 rescGrpInfo_t *myRescGrpInfo = NULL;
00178 ruleExecInfo_t rei;
00179 int multiCopyFlag;
00180 char *accessPerm;
00181 int backupFlag;
00182 int allFlag;
00183 int savedStatus = 0;
00184 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00185 accessPerm = NULL;
00186 } else if (getValByKey (&dataObjInp->condInput, IRODS_ADMIN_KW) != NULL) {
00187 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
00188 return (CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00189 }
00190 accessPerm = NULL;
00191 } else {
00192 accessPerm = ACCESS_READ_OBJECT;
00193 }
00194
00195
00196 initReiWithDataObjInp (&rei, rsComm, dataObjInp);
00197 status = applyRule ("acSetMultiReplPerResc", NULL, &rei, NO_SAVE_REI);
00198 if (strcmp (rei.statusStr, MULTI_COPIES_PER_RESC) == 0) {
00199 multiCopyFlag = 1;
00200 } else {
00201 multiCopyFlag = 0;
00202 }
00203
00204
00205 if (multiCopyFlag) {
00206 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00207 accessPerm, 0);
00208 } else {
00209
00210
00211 status = getDataObjInfo( rsComm, dataObjInp, &dataObjInfoHead, accessPerm, 1 );
00212 }
00213
00214 if (status < 0) {
00215 rodsLog (LOG_NOTICE,
00216 "%s: getDataObjInfo for %s failed", __FUNCTION__, dataObjInp->objPath);
00217 return (status);
00218 }
00219
00220 if (getValByKey (&dataObjInp->condInput, UPDATE_REPL_KW) != NULL) {
00221 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, 0 );
00222 if (status < 0) {
00223 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication update.", __FUNCTION__);
00224 return status;
00225 }
00226
00227
00228 status = _rsDataObjReplUpdate (rsComm, dataObjInp, dataObjInfoHead, oldDataObjInfoHead, transStat, NULL);
00229
00230 if (status >= 0 && outDataObjInfo != NULL) {
00231 *outDataObjInfo = *oldDataObjInfoHead;
00232 outDataObjInfo->next = NULL;
00233 } else {
00234 if(status < 0) {
00235 rodsLog(LOG_NOTICE, "%s - Failed to update replica.", __FUNCTION__);
00236 }
00237 }
00238
00239 freeAllDataObjInfo (dataObjInfoHead);
00240 freeAllDataObjInfo (oldDataObjInfoHead);
00241
00242 return status;
00243 }
00244
00245
00246 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, multiCopyFlag);
00247
00248 if (status < 0) {
00249 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00250 return status;
00251 }
00252
00253 if (getValByKey (&dataObjInp->condInput, BACKUP_RESC_NAME_KW) != NULL) {
00254
00255 backupFlag = 1;
00256 multiCopyFlag = 0;
00257 } else {
00258 backupFlag = 0;
00259 }
00260
00261 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00262 allFlag = 1;
00263 } else {
00264 allFlag = 0;
00265 }
00266
00267 if( backupFlag == 0 && allFlag == 1 &&
00268 getValByKey (&dataObjInp->condInput, DEST_RESC_NAME_KW) == NULL &&
00269 dataObjInfoHead != NULL && dataObjInfoHead->rescGroupName[0] != '\0') {
00270
00271 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, dataObjInfoHead->rescGroupName );
00272 }
00273
00274
00275 dataObjInp->oprType = REPLICATE_OPR;
00276 status = getRescGrpForCreate( rsComm, dataObjInp, &myRescGrpInfo );
00277 if (status < 0) {
00278 rodsLog(LOG_NOTICE, "%s - Failed to get a resource group for create.", __FUNCTION__);
00279 return status;
00280 }
00281
00282 if (multiCopyFlag == 0 ) {
00283
00284
00285
00286
00287
00288
00289
00290 status = resolveSingleReplCopy( &dataObjInfoHead, &oldDataObjInfoHead,
00291 &myRescGrpInfo, &destDataObjInfo,
00292 &dataObjInp->condInput );
00293
00294 if (status == HAVE_GOOD_COPY) {
00295
00296
00297 #if 0 // JMC - legacy resource :: we no longer deal with the cache here
00298 dataObjInfo_t *cacheDataObjInfo = NULL;
00299 dataObjInfo_t *compDataObjInfo = NULL;
00300
00301
00302 if( getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL &&
00303
00304
00305 #if 0
00306 getDataObjByClass (dataObjInfoHead, CACHE_CL, &cacheDataObjInfo)
00307 >= 0 && cacheDataObjInfo != destDataObjInfo) [[
00308 #else
00309 getDataObjByClass( dataObjInfoHead, COMPOUND_CL, &compDataObjInfo ) >= 0 &&
00310 strlen (compDataObjInfo->rescGroupName) > 0 &&
00311 getCacheDataInfoForRepl (rsComm, dataObjInfoHead, NULL,compDataObjInfo, &cacheDataObjInfo) >= 0 ) [[
00312 #endif
00313
00314
00315 int status1 = trimDataObjInfo (rsComm, cacheDataObjInfo);
00316 if (status1 < 0) [[
00317 rodsLog (LOG_NOTICE,
00318 "_rsDataObjRepl: trimDataObjInfo for %s",
00319 dataObjInp->objPath);
00320 ]]
00321 ]]
00322 #endif // JMC - legacy resource
00323
00324 if (outDataObjInfo != NULL && destDataObjInfo != NULL) {
00325
00326 *outDataObjInfo = *destDataObjInfo;
00327 outDataObjInfo->next = NULL;
00328 }
00329
00330
00331
00332 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00333 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00334 ( myRescGrpInfo->status < 0 ) ) {
00335 status = myRescGrpInfo->status;
00336
00337 } else {
00338 status = 0;
00339 }
00340
00341
00342
00343 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00344 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00345 ( myRescGrpInfo->status < 0 ) ) {
00346 status = myRescGrpInfo->status;
00347
00348 } else {
00349 status = 0;
00350 }
00351
00352 freeAllDataObjInfo (dataObjInfoHead);
00353 freeAllDataObjInfo (oldDataObjInfoHead);
00354 freeAllDataObjInfo (destDataObjInfo);
00355 freeAllRescGrpInfo (myRescGrpInfo);
00356 return status;
00357 } else if (status < 0) {
00358 freeAllDataObjInfo (dataObjInfoHead);
00359 freeAllDataObjInfo (oldDataObjInfoHead);
00360 freeAllDataObjInfo (destDataObjInfo);
00361 freeAllRescGrpInfo (myRescGrpInfo);
00362 rodsLog(LOG_NOTICE, "%s - Failed to resolve a single replication copy.", __FUNCTION__);
00363 return status;
00364 }
00365
00366
00367 }
00368
00369
00370 status = applyPreprocRuleForOpen (rsComm, dataObjInp, &dataObjInfoHead);
00371 if (status < 0) return status;
00372
00373
00374
00375 if (destDataObjInfo != NULL) {
00376 status = _rsDataObjReplUpdate( rsComm, dataObjInp, dataObjInfoHead,
00377 destDataObjInfo, transStat, oldDataObjInfoHead);
00378 if (status >= 0) {
00379 if (outDataObjInfo != NULL) {
00380 *outDataObjInfo = *destDataObjInfo;
00381 outDataObjInfo->next = NULL;
00382 }
00383 if (allFlag == 0) {
00384 freeAllDataObjInfo (dataObjInfoHead);
00385 freeAllDataObjInfo (oldDataObjInfoHead);
00386 freeAllDataObjInfo (destDataObjInfo);
00387 freeAllRescGrpInfo (myRescGrpInfo);
00388 return 0;
00389 } else {
00390
00391
00392 queDataObjInfo (&dataObjInfoHead, destDataObjInfo, 0, 1);
00393 destDataObjInfo = NULL;
00394 }
00395 } else {
00396 savedStatus = status;
00397 }
00398 }
00399
00400 if (myRescGrpInfo != NULL) {
00401
00402 status = _rsDataObjReplNewCopy( rsComm, dataObjInp, dataObjInfoHead,
00403 myRescGrpInfo, transStat, oldDataObjInfoHead,
00404 outDataObjInfo);
00405 if (status < 0) savedStatus = status;
00406 }
00407
00408 freeAllDataObjInfo (dataObjInfoHead);
00409 freeAllDataObjInfo (oldDataObjInfoHead);
00410 freeAllRescGrpInfo (myRescGrpInfo);
00411
00412 return (savedStatus);
00413 }
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 int
00428 _rsDataObjReplUpdate( rsComm_t* rsComm, dataObjInp_t* dataObjInp,
00429 dataObjInfo_t* srcDataObjInfoHead, dataObjInfo_t* destDataObjInfoHead,
00430 transferStat_t* transStat, dataObjInfo_t* oldDataObjInfo ) {
00431 dataObjInfo_t *destDataObjInfo = 0;
00432 dataObjInfo_t *srcDataObjInfo = 0;
00433 int status;
00434 int allFlag;
00435 int savedStatus = 0;
00436 int replCnt = 0;
00437
00438 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00439 allFlag = 1;
00440 } else {
00441 allFlag = 0;
00442 }
00443
00444 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00445 destDataObjInfo = destDataObjInfoHead;
00446 while (destDataObjInfo != NULL) {
00447 if (destDataObjInfo->dataId == 0) {
00448 destDataObjInfo = destDataObjInfo->next;
00449 continue;
00450 }
00451
00452 #if 0 // JMC - legacy resource
00453
00454 if (getRescClass (destDataObjInfo->rescInfo) == COMPOUND_CL) {
00455
00456 if ((status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
00457 srcDataObjInfoHead, destDataObjInfoHead, destDataObjInfo,
00458 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00459 return status;
00460 }
00461 status = _rsDataObjReplS (rsComm, dataObjInp,
00462 srcDataObjInfo, NULL, "", destDataObjInfo, 1);
00463 } else
00464 #endif // JMC - legacy resource
00465 {
00466 srcDataObjInfo = srcDataObjInfoHead;
00467 while (srcDataObjInfo != NULL) {
00468
00469 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo, NULL, "", destDataObjInfo, 1 );
00470
00471 if (status >= 0) {
00472 break;
00473 }
00474 srcDataObjInfo = srcDataObjInfo->next;
00475 }
00476 }
00477 if (status >= 0) {
00478 transStat->numThreads = dataObjInp->numThreads;
00479 if (allFlag == 0) {
00480 return 0;
00481 }
00482 } else {
00483 savedStatus = status;
00484 replCnt ++;
00485 }
00486 destDataObjInfo = destDataObjInfo->next;
00487 }
00488
00489 return savedStatus;
00490 }
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504 int
00505 _rsDataObjReplNewCopy (rsComm_t *rsComm,
00506 dataObjInp_t *dataObjInp,
00507 dataObjInfo_t *srcDataObjInfoHead,
00508 rescGrpInfo_t *destRescGrpInfo,
00509 transferStat_t *transStat,
00510 dataObjInfo_t *oldDataObjInfo,
00511 dataObjInfo_t *outDataObjInfo)
00512 {
00513 dataObjInfo_t *srcDataObjInfo;
00514 rescGrpInfo_t *tmpRescGrpInfo;
00515 rescInfo_t *tmpRescInfo;
00516 int status;
00517 int allFlag;
00518 int savedStatus = 0;
00519
00520 #if 0
00521 rescInfo_t *compRescInfo = NULL;
00522 rescInfo_t *cacheRescInfo = NULL;
00523 #endif
00524
00525 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00526 allFlag = 1;
00527 } else {
00528 allFlag = 0;
00529 }
00530 #if 0 // JMC - legacy resource
00531
00532
00533
00534
00535 if( allFlag == 1 && destRescGrpInfo != NULL &&
00536 strlen(destRescGrpInfo->rescGroupName) > 0 &&
00537 getRescGrpClass (destRescGrpInfo, &compRescInfo) == COMPOUND_CL) {
00538 getCacheRescInGrp (rsComm, destRescGrpInfo->rescGroupName,compRescInfo, &cacheRescInfo);
00539 }
00540 #endif // JMC - legacy resource
00541
00542
00543 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00544 tmpRescGrpInfo = destRescGrpInfo;
00545 while (tmpRescGrpInfo != NULL) {
00546 tmpRescInfo = tmpRescGrpInfo->rescInfo;
00547 #if 0 // JMC - legacy resource
00548
00549
00550 if (tmpRescInfo == cacheRescInfo) {
00551
00552
00553 tmpRescGrpInfo = tmpRescGrpInfo->next;
00554 continue;
00555 }
00556
00557 if (getRescClass (tmpRescInfo) == COMPOUND_CL) {
00558
00559 if ((status = getCacheDataInfoOfCompResc (rsComm, dataObjInp,
00560 srcDataObjInfoHead, NULL, tmpRescGrpInfo,
00561 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00562 return status;
00563 }
00564
00565 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00566 tmpRescInfo, tmpRescGrpInfo->rescGroupName, outDataObjInfo, 0);
00567 } else {
00568 #endif // JMC - legacy resource
00569
00570
00571 {
00572 srcDataObjInfo = srcDataObjInfoHead;
00573 while (srcDataObjInfo != NULL) {
00574 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo,
00575 tmpRescInfo, tmpRescGrpInfo->rescGroupName,
00576 outDataObjInfo, 0);
00577 if (status >= 0) {
00578 break;
00579 } else {
00580 savedStatus = status;
00581 }
00582 srcDataObjInfo = srcDataObjInfo->next;
00583 }
00584 }
00585 if (status >= 0) {
00586 transStat->numThreads = dataObjInp->numThreads;
00587 if (allFlag == 0) {
00588 return 0;
00589 }
00590 } else {
00591 savedStatus = status;
00592 }
00593 tmpRescGrpInfo = tmpRescGrpInfo->next;
00594 }
00595
00596 if (savedStatus == 0 && destRescGrpInfo->status < 0) {
00597
00598 return destRescGrpInfo->status;
00599 } else {
00600 return (savedStatus);
00601 }
00602 }
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617 int
00618 _rsDataObjReplS (rsComm_t *rsComm,
00619 dataObjInp_t *dataObjInp,
00620 dataObjInfo_t *srcDataObjInfo,
00621 rescInfo_t *destRescInfo,
00622 char *rescGroupName,
00623 dataObjInfo_t *destDataObjInfo,
00624 int updateFlag)
00625 {
00626 int status, status1;
00627 int l1descInx;
00628 openedDataObjInp_t dataObjCloseInp;
00629 dataObjInfo_t *myDestDataObjInfo;
00630 l1descInx = dataObjOpenForRepl( rsComm, dataObjInp, srcDataObjInfo, destRescInfo,
00631 rescGroupName, destDataObjInfo, updateFlag );
00632 if (l1descInx < 0) {
00633 return (l1descInx);
00634 }
00635
00636 if (L1desc[l1descInx].stageFlag != NO_STAGING) {
00637 status = l3DataStageSync (rsComm, l1descInx);
00638 } else if( L1desc[l1descInx].dataObjInp->numThreads == 0 &&
00639 L1desc[l1descInx].dataObjInfo->dataSize <= MAX_SZ_FOR_SINGLE_BUF ) {
00640 status = l3DataCopySingleBuf (rsComm, l1descInx);
00641 } else {
00642 status = dataObjCopy( rsComm, l1descInx );
00643 }
00644
00645 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00646
00647 dataObjCloseInp.l1descInx = l1descInx;
00648
00649 L1desc[l1descInx].oprStatus = status;
00650 if (status >= 0) {
00651 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataObjInfo->dataSize;
00652 }
00653
00654 status1 = irsDataObjClose (rsComm, &dataObjCloseInp, &myDestDataObjInfo);
00655
00656 if (destDataObjInfo != NULL) {
00657 if (destDataObjInfo->dataId <= 0 && myDestDataObjInfo != NULL) {
00658 destDataObjInfo->dataId = myDestDataObjInfo->dataId;
00659 destDataObjInfo->replNum = myDestDataObjInfo->replNum;
00660 } else {
00661
00662 destDataObjInfo->dataSize = myDestDataObjInfo->dataSize;
00663 }
00664 }
00665
00666 freeDataObjInfo (myDestDataObjInfo);
00667
00668 if (status < 0) {
00669 return status;
00670 } else if (status1 < 0) {
00671 return status1;
00672 } else {
00673 return (status);
00674 }
00675 }
00676
00677
00678
00679
00680 int
00681 dataObjOpenForRepl (rsComm_t *rsComm,
00682 dataObjInp_t *dataObjInp,
00683 dataObjInfo_t *inpSrcDataObjInfo,
00684 rescInfo_t *destRescInfo,
00685 char *rescGroupName,
00686 dataObjInfo_t *inpDestDataObjInfo,
00687 int updateFlag)
00688 {
00689 dataObjInfo_t *myDestDataObjInfo = 0, *srcDataObjInfo = NULL;
00690 rescInfo_t *myDestRescInfo = 0;
00691 int destL1descInx = 0;
00692 int srcL1descInx = 0;
00693 int status = 0;
00694 int replStatus = 0;
00695
00696
00697
00698 dataObjInfo_t *cacheDataObjInfo = NULL;
00699 dataObjInp_t dest_inp, myDataObjInp, *l1DataObjInp = 0;
00700 if (destRescInfo == NULL) {
00701 myDestRescInfo = inpDestDataObjInfo->rescInfo;
00702 } else {
00703 myDestRescInfo = destRescInfo;
00704 }
00705
00706 if (inpSrcDataObjInfo->rescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00707 return SYS_RESC_IS_DOWN;
00708 }
00709
00710 if (myDestRescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00711 return SYS_RESC_IS_DOWN;
00712 }
00713
00714 #if 0 // JMC - legacy resource
00715 destRescClass = getRescClass (myDestRescInfo);
00716 #endif // JMC - legacy resource
00717
00718
00719
00720 #if 0 // JMC - legacy resource
00721 if (srcRescClass == COMPOUND_CL) {
00722 rescGrpInfo_t *myRescGrpInfo;
00723 if (destRescClass == CACHE_CL &&
00724 isRescsInSameGrp (rsComm, myDestRescInfo->rescName, inpSrcDataObjInfo->rescInfo->rescName,
00725 &myRescGrpInfo)) {
00726
00727 if (strlen (inpSrcDataObjInfo->rescGroupName) == 0) {
00728 rstrcpy (inpSrcDataObjInfo->rescGroupName,
00729 myRescGrpInfo->rescGroupName, NAME_LEN);
00730 }
00731 } else if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00732 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00733 cacheDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00734 status = stageDataFromCompToCache (rsComm, inpSrcDataObjInfo,
00735 cacheDataObjInfo);
00736 if (status < 0) { free( cacheDataObjInfo ); return status; }
00737
00738 srcRescClass = getRescClass (cacheDataObjInfo->rescInfo);
00739 }
00740 }
00741
00742 #endif // JMC - legacy resource
00743 if (cacheDataObjInfo == NULL) {
00744 srcDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00745 *srcDataObjInfo = *inpSrcDataObjInfo;
00746 } else {
00747 srcDataObjInfo = cacheDataObjInfo;
00748 }
00749
00750 if( NULL == srcDataObjInfo ) {
00751 rodsLog( LOG_ERROR, "dataObjOpenForRepl - srcDataObjInfo is NULL" );
00752 return -1;
00753 }
00754
00755 myDataObjInp = *dataObjInp;
00756 myDataObjInp.dataSize = inpSrcDataObjInfo->dataSize;
00757
00758 destL1descInx = allocL1desc ();
00759
00760 if (destL1descInx < 0) return destL1descInx;
00761
00762
00763
00764
00765 std::string op_name;
00766 memset (&dest_inp, 0, sizeof (dest_inp));
00767 memset (&dest_inp.condInput, 0, sizeof (dest_inp.condInput));
00768
00769 strncpy( dest_inp.objPath, dataObjInp->objPath, MAX_NAME_LEN );
00770 addKeyVal( &(dest_inp.condInput), RESC_NAME_KW, myDestRescInfo->rescName );
00771
00772 myDestDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00773 if (updateFlag > 0) {
00774
00775
00776 op_name = eirods::EIRODS_OPEN_OPERATION;
00777
00778
00779 if(inpDestDataObjInfo == NULL || inpDestDataObjInfo->dataId <= 0) {
00780 rodsLog( LOG_ERROR, "dataObjOpenForRepl: dataId of %s copy to be updated not defined",
00781 srcDataObjInfo->objPath);
00782 return (SYS_UPDATE_REPL_INFO_ERR);
00783 }
00784
00785 inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus;
00786 *myDestDataObjInfo = *inpDestDataObjInfo;
00787 replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY;
00788 addKeyVal (&myDataObjInp.condInput, FORCE_FLAG_KW, "");
00789 myDataObjInp.openFlags |= (O_TRUNC | O_WRONLY);
00790 } else {
00791
00792
00793 op_name = eirods::EIRODS_CREATE_OPERATION;
00794
00795 initDataObjInfoForRepl( rsComm, myDestDataObjInfo, srcDataObjInfo,
00796 destRescInfo, rescGroupName);
00797 replStatus = srcDataObjInfo->replStatus;
00798 }
00799
00800
00801
00802 std::string hier;
00803 char* dst_hier_str = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00804 if( 0 == dst_hier_str ) {
00805
00806 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00807
00808 eirods::error ret = eirods::resolve_resource_hierarchy( op_name, rsComm, &dest_inp, hier );
00809 if( !ret.ok() ) {
00810 std::stringstream msg;
00811 msg << "failed in eirods::resolve_resource_hierarchy for [";
00812 msg << dest_inp.objPath << "]";
00813 eirods::log( PASSMSG( msg.str(), ret ) );
00814 return ret.code();
00815 }
00816
00817 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, hier.c_str() );
00818
00819 } else {
00820 hier = dst_hier_str;
00821 }
00822
00823
00824
00825
00826 rstrcpy(myDestDataObjInfo->rescHier, hier.c_str(), MAX_NAME_LEN);
00827 addKeyVal( &(myDataObjInp.condInput), RESC_HIER_STR_KW, hier.c_str() );
00828 fillL1desc (destL1descInx, &myDataObjInp, myDestDataObjInfo, replStatus, srcDataObjInfo->dataSize);
00829 l1DataObjInp = L1desc[destL1descInx].dataObjInp;
00830 if (l1DataObjInp->oprType == PHYMV_OPR) {
00831 L1desc[destL1descInx].oprType = PHYMV_DEST;
00832 myDestDataObjInfo->replNum = srcDataObjInfo->replNum;
00833 myDestDataObjInfo->dataId = srcDataObjInfo->dataId;
00834 } else {
00835 L1desc[destL1descInx].oprType = REPLICATE_DEST;
00836 }
00837
00838 #if 0 // JMC - legacy resource
00839 if (destRescClass == COMPOUND_CL) {
00840 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00841 }
00842 else if (srcRescClass == COMPOUND_CL) {
00843 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00844 }
00845 #else // JMC - legacy resource
00846
00847
00848
00849
00850 char* stage_kw = getValByKey( &dataObjInp->condInput, STAGE_OBJ_KW );
00851 char* sync_kw = getValByKey( &dataObjInp->condInput, SYNC_OBJ_KW );
00852 if( stage_kw ) {
00853 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00854 } else if( sync_kw ) {
00855 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00856 } else {
00857
00858
00859 }
00860
00861 #endif // JMC - legacy resource
00862
00863
00864
00865 char* src_hier_str = 0;
00866 if (srcDataObjInfo != NULL && srcDataObjInfo->rescHier != NULL) {
00867 src_hier_str = srcDataObjInfo->rescHier;
00868 }
00869
00870 l1DataObjInp->numThreads = dataObjInp->numThreads =
00871 getNumThreads( rsComm, l1DataObjInp->dataSize, l1DataObjInp->numThreads,
00872
00873 &dataObjInp->condInput, dst_hier_str, src_hier_str );
00874
00875 if( l1DataObjInp->numThreads > 0 &&
00876 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00877 if (updateFlag > 0) {
00878 status = dataOpen (rsComm, destL1descInx);
00879 } else {
00880 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00881 if (status >= 0)
00882 status = dataCreate (rsComm, destL1descInx);
00883 }
00884
00885 if (status < 0) {
00886 freeL1desc (destL1descInx);
00887 return (status);
00888 }
00889 } else {
00890 if (updateFlag == 0) {
00891 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00892 if (status < 0) {
00893 freeL1desc (destL1descInx);
00894 return (status);
00895 }
00896 }
00897 }
00898
00899 if (inpDestDataObjInfo != NULL && updateFlag == 0) {
00900
00901 *inpDestDataObjInfo = *myDestDataObjInfo;
00902 inpDestDataObjInfo->next = NULL;
00903 }
00904
00905
00906 rstrcpy(srcDataObjInfo->rescHier, inpSrcDataObjInfo->rescHier, MAX_NAME_LEN);
00907 srcL1descInx = allocL1desc ();
00908 if (srcL1descInx < 0) return srcL1descInx;
00909 fillL1desc (srcL1descInx, &myDataObjInp, srcDataObjInfo, srcDataObjInfo->replStatus, srcDataObjInfo->dataSize);
00910 l1DataObjInp = L1desc[srcL1descInx].dataObjInp;
00911 l1DataObjInp->numThreads = dataObjInp->numThreads;
00912 if (l1DataObjInp->oprType == PHYMV_OPR) {
00913 L1desc[srcL1descInx].oprType = PHYMV_SRC;
00914 } else {
00915 L1desc[srcL1descInx].oprType = REPLICATE_SRC;
00916 }
00917
00918 if( L1desc[destL1descInx].stageFlag == SYNC_DEST &&
00919 getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL) {
00920 L1desc[srcL1descInx].purgeCacheFlag = 1;
00921 }
00922
00923
00924 if( l1DataObjInp->numThreads > 0 &&
00925 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00926 openedDataObjInp_t dataObjCloseInp;
00927
00928 l1DataObjInp->openFlags = O_RDONLY;
00929 status = dataOpen (rsComm, srcL1descInx);
00930 if (status < 0) {
00931 freeL1desc (srcL1descInx);
00932 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00933 dataObjCloseInp.l1descInx = destL1descInx;
00934 rsDataObjClose (rsComm, &dataObjCloseInp);
00935 return (status);
00936 }
00937 }
00938
00939 L1desc[destL1descInx].srcL1descInx = srcL1descInx;
00940
00941 return (destL1descInx);
00942 }
00943
00944 int
00945 dataObjCopy (rsComm_t *rsComm, int l1descInx)
00946 {
00947 int srcL1descInx, destL1descInx;
00948 int srcL3descInx, destL3descInx;
00949 int status;
00950 portalOprOut_t *portalOprOut = NULL;
00951 dataCopyInp_t dataCopyInp;
00952 dataOprInp_t *dataOprInp;
00953 int srcRemoteFlag, destRemoteFlag;
00954
00955 bzero (&dataCopyInp, sizeof (dataCopyInp));
00956 dataOprInp = &dataCopyInp.dataOprInp;
00957 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00958 destL1descInx = l1descInx;
00959
00960 srcL3descInx = L1desc[srcL1descInx].l3descInx;
00961 destL3descInx = L1desc[destL1descInx].l3descInx;
00962
00963 if (L1desc[srcL1descInx].remoteZoneHost != NULL) {
00964 srcRemoteFlag = REMOTE_ZONE_HOST;
00965 } else {
00966 srcRemoteFlag = FileDesc[srcL3descInx].rodsServerHost->localFlag;
00967 }
00968
00969 if (L1desc[destL1descInx].remoteZoneHost != NULL) {
00970 destRemoteFlag = REMOTE_ZONE_HOST;
00971 } else {
00972 destRemoteFlag = FileDesc[destL3descInx].rodsServerHost->localFlag;
00973 }
00974
00975 if (srcRemoteFlag != REMOTE_ZONE_HOST &&
00976 destRemoteFlag != REMOTE_ZONE_HOST &&
00977 FileDesc[srcL3descInx].rodsServerHost ==
00978 FileDesc[destL3descInx].rodsServerHost) {
00979
00980 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, SAME_HOST_COPY_OPR);
00981
00982 dataCopyInp.portalOprOut.numThreads =
00983 dataCopyInp.dataOprInp.numThreads;
00984 if (srcRemoteFlag == LOCAL_HOST) {
00985 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00986 }
00987
00988 } else if ((srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) ||
00989 destRemoteFlag == REMOTE_ZONE_HOST) {
00990 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_REM_OPR);
00991
00992 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00993
00994
00995 status = preProcParaPut (rsComm, destL1descInx, &portalOprOut);
00996 if (status < 0 || NULL == portalOprOut ) {
00997 rodsLog (LOG_NOTICE,
00998 "dataObjCopy: preProcParaPut error for %s",
00999 L1desc[srcL1descInx].dataObjInfo->objPath);
01000 return (status);
01001 }
01002 dataCopyInp.portalOprOut = *portalOprOut;
01003 } else {
01004 dataCopyInp.portalOprOut.l1descInx = destL1descInx;
01005 }
01006 if (srcRemoteFlag == LOCAL_HOST)
01007 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01008 } else if ((srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) ||
01009 srcRemoteFlag == REMOTE_ZONE_HOST) {
01010
01011 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01012
01013 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01014
01015 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01016 if (status < 0 || NULL == portalOprOut ) {
01017 rodsLog (LOG_NOTICE,
01018 "dataObjCopy: preProcParaGet error for %s",
01019 L1desc[srcL1descInx].dataObjInfo->objPath);
01020 return (status);
01021 }
01022 dataCopyInp.portalOprOut = *portalOprOut;
01023 } else {
01024 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01025 }
01026 if (destRemoteFlag == LOCAL_HOST)
01027 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01028 } else {
01029
01030 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01031
01032
01033 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01034 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01035
01036 if (status < 0 || NULL == portalOprOut ) {
01037 rodsLog (LOG_NOTICE,
01038 "dataObjCopy: preProcParaGet error for %s",
01039 L1desc[srcL1descInx].dataObjInfo->objPath);
01040 return (status);
01041 }
01042 dataCopyInp.portalOprOut = *portalOprOut;
01043 } else {
01044 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01045 }
01046 }
01047
01048 if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01049 NO_CHK_COPY_LEN_KW) != NULL) {
01050
01051 addKeyVal (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW, "");
01052 if (L1desc[l1descInx].dataObjInp->numThreads > 1) {
01053 L1desc[l1descInx].dataObjInp->numThreads =
01054 L1desc[srcL1descInx].dataObjInp->numThreads =
01055 dataCopyInp.portalOprOut.numThreads = 1;
01056 }
01057 }
01058 status = rsDataCopy (rsComm, &dataCopyInp);
01059
01060 if (status >= 0 && portalOprOut != NULL &&
01061 L1desc[l1descInx].dataObjInp != NULL) {
01062
01063 L1desc[l1descInx].dataObjInp->numThreads = portalOprOut->numThreads;
01064 }
01065 if (portalOprOut != NULL) free (portalOprOut);
01066 clearKeyVal (&dataOprInp->condInput);
01067
01068 return (status);
01069 }
01070
01071 int
01072 l3DataCopySingleBuf (rsComm_t *rsComm, int l1descInx)
01073 {
01074 bytesBuf_t dataBBuf;
01075 int bytesRead, bytesWritten;
01076 int srcL1descInx;
01077
01078 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01079 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01080
01081 if (L1desc[srcL1descInx].dataSize < 0) {
01082 rodsLog (LOG_ERROR,
01083 "l3DataCopySingleBuf: dataSize %lld for %s is negative",
01084 L1desc[srcL1descInx].dataSize,
01085 L1desc[srcL1descInx].dataObjInfo->objPath);
01086 return (SYS_COPY_LEN_ERR);
01087 } else if (L1desc[srcL1descInx].dataSize == 0) {
01088 bytesRead = 0;
01089 } else {
01090 dataBBuf.buf = malloc (L1desc[srcL1descInx].dataSize);
01091 bytesRead = rsL3FileGetSingleBuf (rsComm, &srcL1descInx, &dataBBuf);
01092 }
01093
01094 if (bytesRead < 0) {
01095 free( dataBBuf.buf );
01096 return (bytesRead);
01097 } else if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01098 NO_CHK_COPY_LEN_KW) != NULL) {
01099
01100 L1desc[l1descInx].dataSize = L1desc[l1descInx].dataObjInp->dataSize
01101 = bytesRead;
01102 }
01103
01104 bytesWritten = rsL3FilePutSingleBuf (rsComm, &l1descInx, &dataBBuf);
01105
01106 L1desc[l1descInx].bytesWritten = bytesWritten;
01107
01108 if (dataBBuf.buf != NULL) {
01109 free (dataBBuf.buf);
01110 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01111 }
01112
01113 if (bytesWritten != bytesRead) {
01114 if (bytesWritten >= 0) {
01115 rodsLog (LOG_NOTICE,
01116 "l3DataCopySingleBuf: l3FilePut error, towrite %d, written %d",
01117 bytesRead, bytesWritten);
01118 return (SYS_COPY_LEN_ERR);
01119 } else {
01120 return (bytesWritten);
01121 }
01122 }
01123
01124
01125 return (0);
01126 }
01127
01128 int
01129 l3DataStageSync (rsComm_t *rsComm, int l1descInx)
01130 {
01131 bytesBuf_t dataBBuf;
01132 int srcL1descInx;
01133 int status = 0;
01134
01135 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01136
01137 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01138 if (L1desc[srcL1descInx].dataSize < 0 &&
01139 L1desc[srcL1descInx].dataSize != UNKNOWN_FILE_SZ) {
01140 rodsLog (LOG_ERROR,
01141 "l3DataStageSync: dataSize %lld for %s is negative",
01142 L1desc[srcL1descInx].dataSize,
01143 L1desc[srcL1descInx].dataObjInfo->objPath);
01144 return (SYS_COPY_LEN_ERR);
01145 } else if (L1desc[srcL1descInx].dataSize > 0 ||
01146 L1desc[srcL1descInx].dataSize == UNKNOWN_FILE_SZ) {
01147 if (L1desc[l1descInx].stageFlag == SYNC_DEST) {
01148
01149 status = l3FileSync (rsComm, srcL1descInx, l1descInx);
01150 } else {
01151
01152 status = l3FileStage (rsComm, srcL1descInx, l1descInx);
01153 }
01154 }
01155
01156 if (status < 0) {
01157 L1desc[l1descInx].bytesWritten = -1;
01158 } else {
01159 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataSize =
01160 L1desc[srcL1descInx].dataSize;
01161 }
01162
01163 return (status);
01164 }
01165
01166 int
01167 l3FileSync (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01168 {
01169 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01170
01171 fileStageSyncInp_t fileSyncToArchInp;
01172 dataObjInp_t *dataObjInp;
01173 int status;
01174 char *outFileName = NULL;
01175 dataObjInfo_t tmpDataObjInfo;
01176
01177 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01178 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01179
01180 #if 0 // JMC legacy resource
01181 rescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01182 cacheRescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01183
01184 switch (RescTypeDef[rescTypeInx].rescCat) {
01185 case FILE_CAT:
01186
01187 if (RescTypeDef[rescTypeInx].createPathFlag == CREATE_PATH) {
01188 #endif // JMC legacy resource
01189
01190 int dst_create_path = 0;
01191 eirods::error err = eirods::get_resource_property< int >( destDataObjInfo->rescInfo->rescName,
01192 "create_path", dst_create_path );
01193 if( !err.ok() ) {
01194 eirods::log( PASS( err ) );
01195 }
01196
01197 if( CREATE_PATH == dst_create_path ) {
01198
01199 status = chkOrphanFile ( rsComm, destDataObjInfo->filePath, destDataObjInfo->rescName, &tmpDataObjInfo );
01200 if (status == 0 && tmpDataObjInfo.dataId != destDataObjInfo->dataId) {
01201
01202 char tmp_str[ MAX_NAME_LEN ];
01203 snprintf( tmp_str, MAX_NAME_LEN, "%s.%-d", destDataObjInfo->filePath, (int) random());
01204 strncpy( destDataObjInfo->filePath, tmp_str, MAX_NAME_LEN );
01205 }
01206 }
01207
01208 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01209 dataObjInp = L1desc[destL1descInx].dataObjInp;
01210 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01211 fileSyncToArchInp.fileType = static_cast< fileDriverType_t >( -1 );
01212 fileSyncToArchInp.cacheFileType = static_cast< fileDriverType_t >( -1 );
01213
01214
01215
01216 std::string location;
01217 eirods::error ret = eirods::get_loc_for_hier_string( srcDataObjInfo->rescHier, location );
01218 if( !ret.ok() ) {
01219 eirods::log( PASSMSG( "failed in get_loc_for_hier_String", ret ) );
01220 return -1;
01221 }
01222
01223 rstrcpy( fileSyncToArchInp.addr.hostAddr, location.c_str(), NAME_LEN );
01224
01225
01226 rstrcpy( fileSyncToArchInp.filename, destDataObjInfo->filePath, MAX_NAME_LEN );
01227 rstrcpy( fileSyncToArchInp.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01228 rstrcpy( fileSyncToArchInp.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01229 rstrcpy( fileSyncToArchInp.cacheFilename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01230
01231 fileSyncToArchInp.mode = getFileMode (dataObjInp);
01232 status = rsFileSyncToArch (rsComm, &fileSyncToArchInp);
01233
01234 if (status >= 0 &&
01235
01236 NO_CREATE_PATH == dst_create_path &&
01237 outFileName != NULL) {
01238
01239 rstrcpy (destDataObjInfo->filePath, fileSyncToArchInp.filename, MAX_NAME_LEN);
01240 L1desc[destL1descInx].replStatus |= FILE_PATH_HAS_CHG;
01241 }
01242 #if 0 // JMC legacy resource
01243 break;
01244 default:
01245 rodsLog (LOG_ERROR,
01246 "l3FileSync: rescCat type %d is not recognized",
01247 RescTypeDef[rescTypeInx].rescCat);
01248 status = SYS_INVALID_RESC_TYPE;
01249 break;
01250 }
01251 #endif // JMC legacy resource
01252 return (status);
01253 }
01254
01255 int
01256 l3FileStage (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01257 {
01258 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01259 int mode, status;
01260
01261 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01262 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01263
01264 mode = getFileMode (L1desc[destL1descInx].dataObjInp);
01265
01266 status = _l3FileStage (rsComm, srcDataObjInfo, destDataObjInfo, mode);
01267
01268 return status;
01269 }
01270
01271 int
01272 _l3FileStage (rsComm_t *rsComm, dataObjInfo_t *srcDataObjInfo,
01273 dataObjInfo_t *destDataObjInfo, int mode)
01274 {
01275
01276 fileStageSyncInp_t file_stage;
01277 int status;
01278
01279 #if 0 // JMC legacy resource
01280 rescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01281 cacheRescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01282
01283
01284 switch (RescTypeDef[rescTypeInx].rescCat) {
01285 case FILE_CAT:
01286 #endif // JMC legacy resource
01287 memset (&file_stage, 0, sizeof (file_stage));
01288 file_stage.dataSize = srcDataObjInfo->dataSize;
01289 file_stage.fileType = static_cast< fileDriverType_t >( -1 );
01290 file_stage.cacheFileType = static_cast< fileDriverType_t >( -1 );
01291
01292 rstrcpy( file_stage.addr.hostAddr,
01293 destDataObjInfo->rescInfo->rescLoc, NAME_LEN );
01294
01295 rstrcpy( file_stage.cacheFilename, destDataObjInfo->filePath, MAX_NAME_LEN );
01296 rstrcpy( file_stage.filename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01297 rstrcpy( file_stage.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01298 rstrcpy( file_stage.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01299 file_stage.mode = mode;
01300 status = rsFileStageToCache( rsComm, &file_stage );
01301 #if 0 // JMC legacy resource
01302 break;
01303 default:
01304 rodsLog (LOG_ERROR,
01305 "l3FileStage: rescCat type %d is not recognized",
01306 RescTypeDef[rescTypeInx].rescCat);
01307 status = SYS_INVALID_RESC_TYPE;
01308 break;
01309 }
01310 #endif // JMC legacy resource
01311 return (status);
01312 }
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322 int
01323 rsReplAndRequeDataObjInfo (rsComm_t *rsComm,
01324 dataObjInfo_t **srcDataObjInfoHead, char *destRescName, char *flagStr)
01325 {
01326 dataObjInfo_t *dataObjInfoHead, *myDataObjInfo;
01327 transferStat_t transStat;
01328 dataObjInp_t dataObjInp;
01329 char tmpStr[NAME_LEN];
01330 int status;
01331
01332 dataObjInfoHead = *srcDataObjInfoHead;
01333 myDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01334 memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
01335 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01336 memset (&transStat, 0, sizeof (transStat));
01337
01338 if (flagStr != NULL) {
01339 if (strstr (flagStr, ALL_KW) != NULL) {
01340 addKeyVal (&dataObjInp.condInput, ALL_KW, "");
01341 }
01342 if (strstr (flagStr, RBUDP_TRANSFER_KW) != NULL) {
01343 addKeyVal (&dataObjInp.condInput, RBUDP_TRANSFER_KW, "");
01344 }
01345 if (strstr (flagStr, SU_CLIENT_USER_KW) != NULL) {
01346 addKeyVal (&dataObjInp.condInput, SU_CLIENT_USER_KW, "");
01347 }
01348 if (strstr (flagStr, UPDATE_REPL_KW) != NULL) {
01349 addKeyVal (&dataObjInp.condInput, UPDATE_REPL_KW, "");
01350 }
01351 }
01352
01353 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01354 snprintf (tmpStr, NAME_LEN, "%d", dataObjInfoHead->replNum);
01355 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01356 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, destRescName);
01357
01358 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01359 myDataObjInfo);
01360
01361
01362 clearKeyVal (&dataObjInp.condInput);
01363 if (status >= 0) {
01364 status = 1;
01365
01366 queDataObjInfo (srcDataObjInfoHead, myDataObjInfo, 0, 1);
01367 } else {
01368 freeAllDataObjInfo (myDataObjInfo);
01369 }
01370
01371 return status;
01372 }
01373
01374 #if 0 // JMC - legacy resource
01375 int
01376 stageDataFromCompToCache (rsComm_t *rsComm, dataObjInfo_t *compObjInfo,
01377 dataObjInfo_t *outCacheObjInfo)
01378 {
01379 int status;
01380 rescInfo_t *cacheResc;
01381 transferStat_t transStat;
01382 dataObjInp_t dataObjInp;
01383 char tmpStr[NAME_LEN];
01384
01385 #if 0 // JMC - legacy resource
01386 if (getRescClass (compObjInfo->rescInfo) != COMPOUND_CL) return 0;
01387 #endif // JMC - legacy resource
01388
01389 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,compObjInfo->rescInfo, &cacheResc);
01390 if (status < 0) {
01391 rodsLog (LOG_ERROR,
01392 "stageDataFromCompToCache: getCacheRescInGrp %s failed for %s stat=%d",
01393 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01394 return status;
01395 }
01396 if (outCacheObjInfo != NULL)
01397 memset (outCacheObjInfo, 0, sizeof (dataObjInfo_t));
01398 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01399 memset (&transStat, 0, sizeof (transStat));
01400
01401 rstrcpy (dataObjInp.objPath, compObjInfo->objPath, MAX_NAME_LEN);
01402 snprintf (tmpStr, NAME_LEN, "%d", compObjInfo->replNum);
01403 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01404 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, cacheResc->rescName);
01405
01406 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01407 outCacheObjInfo);
01408
01409 clearKeyVal (&dataObjInp.condInput);
01410 return status;
01411 }
01412
01413
01414
01415
01416
01417
01418 int
01419 stageAndRequeDataToCache (rsComm_t *rsComm, dataObjInfo_t **compObjInfoHead)
01420 {
01421 int status;
01422 dataObjInfo_t *outCacheObjInfo;
01423
01424 outCacheObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01425 bzero (outCacheObjInfo, sizeof (dataObjInfo_t));
01426 status = stageDataFromCompToCache (rsComm, *compObjInfoHead,
01427 outCacheObjInfo);
01428
01429 if (status < 0) {
01430
01431 if (outCacheObjInfo->dataId > 0) {
01432
01433 if( requeDataObjInfoByReplNum (compObjInfoHead,
01434 outCacheObjInfo->replNum) == 0) {
01435
01436 status = 0;
01437 }
01438 }
01439 free (outCacheObjInfo);
01440 } else {
01441 queDataObjInfo (compObjInfoHead, outCacheObjInfo, 0, 1);
01442 }
01443 return status;
01444 }
01445
01446 int
01447 replToCacheRescOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01448 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *compObjInfo,
01449 dataObjInfo_t *oldDataObjInfo, dataObjInfo_t **outDestDataObjInfo)
01450 {
01451 int status = 0;
01452 rescInfo_t *cacheResc;
01453 dataObjInfo_t *destDataObjInfo, *srcDataObjInfo;
01454 dataObjInfo_t *tmpDestDataObjInfo = NULL;
01455 int updateFlag;
01456
01457 if ((status = getCacheDataInfoForRepl (rsComm, oldDataObjInfo, NULL, compObjInfo, &tmpDestDataObjInfo)) >= 0) {
01458 cacheResc = oldDataObjInfo->rescInfo;
01459 updateFlag = 1;
01460 } else {
01461
01462 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01463 compObjInfo->rescInfo, &cacheResc);
01464 if (status < 0) {
01465 rodsLog (LOG_ERROR,
01466 "replToCacheRescOfCompObj:getCacheRescInGrp %s err for %s stat=%d",
01467 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01468 return status;
01469 }
01470 updateFlag = 0;
01471 }
01472
01473 if (outDestDataObjInfo == NULL) {
01474 destDataObjInfo = tmpDestDataObjInfo;
01475 } else {
01476 destDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01477 if (tmpDestDataObjInfo == NULL) {
01478 bzero (destDataObjInfo, sizeof (dataObjInfo_t));
01479 } else {
01480 *destDataObjInfo = *tmpDestDataObjInfo;
01481 }
01482 }
01483 srcDataObjInfo = srcDataObjInfoHead;
01484 while (srcDataObjInfo != NULL) {
01485 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
01486 cacheResc, compObjInfo->rescGroupName, destDataObjInfo, updateFlag);
01487 if (status >= 0) {
01488 if (outDestDataObjInfo != NULL)
01489 *outDestDataObjInfo = destDataObjInfo;
01490 break;
01491 }
01492 srcDataObjInfo = srcDataObjInfo->next;
01493 }
01494
01495 return status;
01496 }
01497 #endif // JMC - legacy resource
01498 int
01499 stageBundledData (rsComm_t *rsComm, dataObjInfo_t **subfileObjInfoHead)
01500 {
01501 int status;
01502 dataObjInfo_t *dataObjInfoHead = *subfileObjInfoHead;
01503 rescInfo_t *cacheResc;
01504 dataObjInp_t dataObjInp;
01505 dataObjInfo_t *cacheObjInfo;
01506
01507 #if 0 // JMC - legacy resource
01508 if (getRescClass (dataObjInfoHead->rescInfo) != BUNDLE_CL) return 0;
01509 #endif // JMC - legacy resource
01510
01511 status = unbunAndStageBunfileObj (rsComm, dataObjInfoHead->filePath,
01512 &cacheResc);
01513
01514 if (status < 0) return status;
01515
01516
01517 bzero (&dataObjInp, sizeof (dataObjInp));
01518 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01519 addKeyVal (&dataObjInp.condInput, RESC_NAME_KW, cacheResc->rescName);
01520 status = getDataObjInfo (rsComm, &dataObjInp, &cacheObjInfo, NULL, 0);
01521 clearKeyVal (&dataObjInp.condInput);
01522 if (status < 0) {
01523 rodsLog (LOG_ERROR,
01524 "unbunAndStageBunfileObj: getDataObjInfo of subfile %s failed.stat=%d",
01525 dataObjInp.objPath, status);
01526 return status;
01527 }
01528
01529 queDataObjInfo (subfileObjInfoHead, cacheObjInfo, 0, 1);
01530
01531
01532 return status;
01533 }
01534
01535 int
01536 unbunAndStageBunfileObj (rsComm_t *rsComm, char *bunfileObjPath,
01537 rescInfo_t **outCacheResc)
01538 {
01539 dataObjInfo_t *bunfileObjInfoHead;
01540 dataObjInp_t dataObjInp;
01541 int status;
01542
01543
01544 bzero (&dataObjInp, sizeof (dataObjInp));
01545 rstrcpy (dataObjInp.objPath, bunfileObjPath, MAX_NAME_LEN);
01546
01547 status = getDataObjInfo (rsComm, &dataObjInp, &bunfileObjInfoHead, NULL, 1);
01548 if (status < 0) {
01549 rodsLog (LOG_ERROR,
01550 "unbunAndStageBunfileObj: getDataObjInfo of bunfile %s failed.stat=%d",
01551 dataObjInp.objPath, status);
01552 return status;
01553 }
01554 status = _unbunAndStageBunfileObj (rsComm, &bunfileObjInfoHead,
01555 outCacheResc, 0);
01556
01557 freeAllDataObjInfo (bunfileObjInfoHead);
01558
01559 return status;
01560 }
01561
01562 int
01563 _unbunAndStageBunfileObj (rsComm_t *rsComm, dataObjInfo_t **bunfileObjInfoHead,
01564 rescInfo_t **outCacheResc, int rmBunCopyFlag)
01565 {
01566 int status;
01567
01568 dataObjInp_t dataObjInp;
01569
01570 bzero (&dataObjInp, sizeof (dataObjInp));
01571 bzero (&dataObjInp.condInput, sizeof (dataObjInp.condInput));
01572 rstrcpy (dataObjInp.objPath, (*bunfileObjInfoHead)->objPath, MAX_NAME_LEN);
01573 status = sortObjInfoForOpen (rsComm, bunfileObjInfoHead, NULL, 0);
01574
01575 addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, (*bunfileObjInfoHead)->rescHier );
01576 if (status < 0) return status;
01577
01578 #if 0 // JMC - legacy resource
01579 if (getRescClass ((*bunfileObjInfoHead)->rescInfo) != CACHE_CL) {
01580
01581 status = getCacheRescInGrp (rsComm,
01582 (*bunfileObjInfoHead)->rescGroupName,
01583 (*bunfileObjInfoHead)->rescInfo, &cacheResc);
01584 if (status < 0) {
01585 rodsLog (LOG_ERROR,
01586 "unbunAndStageBunfileObj:getCacheRescInGrp %s err for %s stat=%d",
01587 (*bunfileObjInfoHead)->rescGroupName,
01588 (*bunfileObjInfoHead)->objPath, status);
01589 return status;
01590 }
01591 if (outCacheResc != NULL)
01592 *outCacheResc = cacheResc;
01593
01594
01595 status = rsReplAndRequeDataObjInfo (rsComm, bunfileObjInfoHead,
01596 cacheResc->rescName, SU_CLIENT_USER_KW);
01597 if (status < 0) {
01598 rodsLog (LOG_ERROR,
01599 "unbunAndStageBunfileObj:rsReplAndRequeDataObjInfo %s err stat=%d",
01600 (*bunfileObjInfoHead)->objPath, status);
01601 return status;
01602 }
01603 } else
01604 #endif // JMC - legacy resource
01605
01606 if (outCacheResc != NULL)
01607 *outCacheResc = (*bunfileObjInfoHead)->rescInfo;
01608
01609 addKeyVal (&dataObjInp.condInput, BUN_FILE_PATH_KW,
01610 (*bunfileObjInfoHead)->filePath);
01611 if (rmBunCopyFlag > 0) {
01612 addKeyVal (&dataObjInp.condInput, RM_BUN_COPY_KW, "");
01613 }
01614 if (strlen ((*bunfileObjInfoHead)->dataType) > 0) {
01615 addKeyVal (&dataObjInp.condInput, DATA_TYPE_KW,
01616 (*bunfileObjInfoHead)->dataType);
01617 }
01618 status = _rsUnbunAndRegPhyBunfile (rsComm, &dataObjInp,
01619 (*bunfileObjInfoHead)->rescInfo);
01620
01621 return status;
01622 }
01623
01624 #if 0 // JMC - legacy resource
01625
01626
01627
01628
01629
01630
01631
01632
01633
01634
01635
01636
01637
01638
01639
01640 int
01641 getCacheDataInfoOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01642 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01643 dataObjInfo_t *compDataObjInfo, dataObjInfo_t *oldDataObjInfo,
01644 dataObjInfo_t **outDataObjInfo)
01645 {
01646 int status;
01647
01648 if ((status = getCacheDataInfoForRepl (rsComm, srcDataObjInfoHead,
01649 destDataObjInfoHead, compDataObjInfo, outDataObjInfo)) < 0) {
01650
01651 status = replToCacheRescOfCompObj (rsComm, dataObjInp,
01652 srcDataObjInfoHead, compDataObjInfo, oldDataObjInfo,
01653 outDataObjInfo);
01654 if (status < 0) {
01655 rodsLog (LOG_ERROR,
01656 "_rsDataObjRepl: replToCacheRescOfCompObj of %s error stat=%d",
01657 srcDataObjInfoHead->objPath, status);
01658 return status;
01659 }
01660
01661
01662 queDataObjInfo (&srcDataObjInfoHead, *outDataObjInfo, 1, 0);
01663 }
01664 return status;
01665 }
01666
01667 int
01668 getCacheDataInfoOfCompResc (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01669 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01670 rescGrpInfo_t *compRescGrpInfo, dataObjInfo_t *oldDataObjInfo,
01671 dataObjInfo_t **outDataObjInfo)
01672 {
01673 int status;
01674 dataObjInfo_t compDataObjInfo;
01675
01676
01677 bzero (&compDataObjInfo, sizeof (compDataObjInfo));
01678 rstrcpy (compDataObjInfo.rescGroupName, compRescGrpInfo->rescGroupName,
01679 NAME_LEN);
01680 compDataObjInfo.rescInfo = compRescGrpInfo->rescInfo;
01681 status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
01682 srcDataObjInfoHead, destDataObjInfoHead, &compDataObjInfo,
01683 oldDataObjInfo, outDataObjInfo);
01684
01685 return status;
01686 }
01687
01688 #endif // JMC - legacy resource