00001
00002
00003
00004
00005
00006
00007
00008 #include "dataObjRepl.h"
00009 #include "dataObjOpr.h"
00010 #include "dataObjCreate.h"
00011 #include "dataObjOpen.h"
00012 #include "dataObjPut.h"
00013 #include "dataObjGet.h"
00014 #include "rodsLog.h"
00015 #include "objMetaOpr.h"
00016 #include "physPath.h"
00017 #include "specColl.h"
00018 #include "resource.h"
00019 #include "reGlobalsExtern.h"
00020 #include "reDefines.h"
00021 #include "reSysDataObjOpr.h"
00022 #include "getRemoteZoneResc.h"
00023 #include "l3FileGetSingleBuf.h"
00024 #include "l3FilePutSingleBuf.h"
00025 #include "fileSyncToArch.h"
00026 #include "fileStageToCache.h"
00027 #include "unbunAndRegPhyBunfile.h"
00028 #include "dataObjTrim.h"
00029 #include "dataObjLock.h"
00030
00031
00032
00033 #include "eirods_resource_backport.h"
00034 #include "eirods_resource_redirect.h"
00035 #include "eirods_log.h"
00036 #include "eirods_stacktrace.h"
00037
00038 int
00039 rsDataObjRepl250 (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00040 transStat_t **transStat)
00041 {
00042 int status;
00043 transferStat_t *transferStat = NULL;
00044
00045 status = rsDataObjRepl (rsComm, dataObjInp, &transferStat);
00046
00047 if (transStat != NULL && status >= 0 && transferStat != NULL) {
00048 *transStat = (transStat_t *) malloc (sizeof (transStat_t));
00049 (*transStat)->numThreads = transferStat->numThreads;
00050 (*transStat)->bytesWritten = transferStat->bytesWritten;
00051 free (transferStat);
00052 }
00053 return status;
00054 }
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 int
00065 rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00066 transferStat_t **transStat)
00067 {
00068
00069 int status;
00070 int remoteFlag;
00071 rodsServerHost_t *rodsServerHost;
00072 dataObjInfo_t *dataObjInfo = NULL;
00073 char* lockType = NULL;
00074 int lockFd = -1;
00075
00076 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00077
00078 if (rsComm->proxyUser.authInfo.authFlag < REMOTE_PRIV_USER_AUTH) {
00079 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00080 }
00081 }
00082
00083 status = resolvePathInSpecColl (rsComm, dataObjInp->objPath,
00084 READ_COLL_PERM, 0, &dataObjInfo);
00085
00086 if (status == DATA_OBJ_T) {
00087 if (dataObjInfo != NULL && dataObjInfo->specColl != NULL) {
00088 if (dataObjInfo->specColl->collClass == LINKED_COLL) {
00089 rstrcpy (dataObjInp->objPath, dataObjInfo->objPath,
00090 MAX_NAME_LEN);
00091 freeAllDataObjInfo (dataObjInfo);
00092 } else {
00093 freeAllDataObjInfo (dataObjInfo);
00094 return (SYS_REG_OBJ_IN_SPEC_COLL);
00095 }
00096 }
00097 }
00098
00099 remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost,
00100 REMOTE_OPEN);
00101
00102 if (remoteFlag < 0) {
00103 return (remoteFlag);
00104 } else if (remoteFlag == REMOTE_HOST) {
00105 status = _rcDataObjRepl (rodsServerHost->conn, dataObjInp,
00106 transStat);
00107 return status;
00108 }
00109
00110
00111
00112
00113
00114
00115 char* dest_resc_ptr = getValByKey( &dataObjInp->condInput, DEST_RESC_NAME_KW );
00116 std::string tmp_dest_resc;
00117 if( dest_resc_ptr ) {
00118 tmp_dest_resc = dest_resc_ptr;
00119 rmKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW );
00120 }
00121
00122
00123
00124 std::string hier;
00125 char* tmp_hier = getValByKey( &dataObjInp->condInput, RESC_HIER_STR_KW );
00126
00127 if( 0 == tmp_hier ) {
00128
00129 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00130 eirods::error ret = eirods::resolve_resource_hierarchy( eirods::EIRODS_OPEN_OPERATION,
00131 rsComm, dataObjInp, hier );
00132 if( !ret.ok() ) {
00133 std::stringstream msg;
00134 msg << "failed in eirods::resolve_resource_hierarchy for [";
00135 msg << dataObjInp->objPath << "]";
00136 eirods::log( PASSMSG( msg.str(), ret ) );
00137 return ret.code();
00138 }
00139
00140 addKeyVal( &dataObjInp->condInput, RESC_HIER_STR_KW, hier.c_str() );
00141
00142 } else {
00143 hier = tmp_hier;
00144 }
00145
00146
00147
00148 if( !tmp_dest_resc.empty() ) {
00149 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, tmp_dest_resc.c_str() );
00150 }
00151
00152
00153
00154 *transStat = (transferStat_t*)malloc (sizeof (transferStat_t));
00155 memset (*transStat, 0, sizeof (transferStat_t));
00156
00157
00158 lockType = getValByKey (&dataObjInp->condInput, LOCK_TYPE_KW);
00159 if (lockType != NULL) {
00160 lockFd = rsDataObjLock (rsComm, dataObjInp);
00161 if (lockFd >= 0) {
00162
00163 rmKeyVal (&dataObjInp->condInput, LOCK_TYPE_KW);
00164 } else {
00165 rodsLogError (LOG_ERROR, lockFd,
00166 "rsDataObjRepl: rsDataObjLock error for %s. lockType = %s",
00167 dataObjInp->objPath, lockType);
00168 return lockFd;
00169 }
00170 }
00171
00172
00173 status = _rsDataObjRepl (rsComm, dataObjInp, *transStat, NULL);
00174 if( status < 0 && status != EIRODS_DIRECT_ARCHIVE_ACCESS ) {
00175 rodsLog(LOG_NOTICE, "%s - Failed to replicate data object.", __FUNCTION__);
00176 }
00177
00178 if (lockFd > 0) rsDataObjUnlock (rsComm, dataObjInp, lockFd);
00179
00180
00181
00182
00183 if( status == EIRODS_DIRECT_ARCHIVE_ACCESS ) {
00184 return 0;
00185 } else {
00186 return (status);
00187 }
00188 }
00189
00190 int
00191 _rsDataObjRepl (
00192 rsComm_t *rsComm,
00193 dataObjInp_t *dataObjInp,
00194 transferStat_t *transStat,
00195 dataObjInfo_t *outDataObjInfo)
00196 {
00197 int status;
00198 dataObjInfo_t *dataObjInfoHead = NULL;
00199 dataObjInfo_t *oldDataObjInfoHead = NULL;
00200 dataObjInfo_t *destDataObjInfo = NULL;
00201 rescGrpInfo_t *myRescGrpInfo = NULL;
00202 ruleExecInfo_t rei;
00203 int multiCopyFlag;
00204 char *accessPerm;
00205 int backupFlag;
00206 int allFlag;
00207 int savedStatus = 0;
00208 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00209 accessPerm = NULL;
00210 } else if (getValByKey (&dataObjInp->condInput, IRODS_ADMIN_KW) != NULL) {
00211 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
00212 return (CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00213 }
00214 accessPerm = NULL;
00215 } else {
00216 accessPerm = ACCESS_READ_OBJECT;
00217 }
00218
00219
00220 initReiWithDataObjInp (&rei, rsComm, dataObjInp);
00221 status = applyRule ("acSetMultiReplPerResc", NULL, &rei, NO_SAVE_REI);
00222 if (strcmp (rei.statusStr, MULTI_COPIES_PER_RESC) == 0) {
00223 multiCopyFlag = 1;
00224 } else {
00225 multiCopyFlag = 0;
00226 }
00227
00228
00229 if (multiCopyFlag) {
00230 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00231 accessPerm, 0);
00232 } else {
00233
00234
00235 status = getDataObjInfo( rsComm, dataObjInp, &dataObjInfoHead, accessPerm, 1 );
00236 }
00237
00238 if (status < 0) {
00239 rodsLog (LOG_NOTICE,
00240 "%s: getDataObjInfo for [%s] failed", __FUNCTION__, dataObjInp->objPath );
00241 return (status);
00242 }
00243
00244 char* resc_hier = getValByKey(&dataObjInp->condInput, RESC_HIER_STR_KW);
00245 char* dest_hier = getValByKey(&dataObjInp->condInput, DEST_RESC_HIER_STR_KW);
00246
00247 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, 0, resc_hier, dest_hier );
00248 if (status < 0) {
00249 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00250 return status;
00251 }
00252
00253
00254
00255
00256 if( ( !multiCopyFlag && oldDataObjInfoHead ) || getValByKey (&dataObjInp->condInput, UPDATE_REPL_KW) != NULL) {
00257 if (status < 0) {
00258 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication update.", __FUNCTION__);
00259 return status;
00260 }
00261
00262
00263 status = _rsDataObjReplUpdate (rsComm, dataObjInp, dataObjInfoHead, oldDataObjInfoHead, transStat, NULL);
00264
00265 if (status >= 0 && outDataObjInfo != NULL) {
00266 *outDataObjInfo = *oldDataObjInfoHead;
00267 outDataObjInfo->next = NULL;
00268 } else {
00269 if( status < 0 && status != EIRODS_DIRECT_ARCHIVE_ACCESS ) {
00270 rodsLog(LOG_NOTICE, "%s - Failed to update replica.", __FUNCTION__);
00271 }
00272 }
00273
00274 freeAllDataObjInfo (dataObjInfoHead);
00275 freeAllDataObjInfo (oldDataObjInfoHead);
00276
00277 return status;
00278
00279 }
00280
00281
00282 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, multiCopyFlag, resc_hier, dest_hier );
00283 if (status < 0) {
00284 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00285 return status;
00286 }
00287
00288 if (getValByKey (&dataObjInp->condInput, BACKUP_RESC_NAME_KW) != NULL) {
00289
00290 backupFlag = 1;
00291 multiCopyFlag = 0;
00292 } else {
00293 backupFlag = 0;
00294 }
00295
00296 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00297 allFlag = 1;
00298 } else {
00299 allFlag = 0;
00300 }
00301
00302 if( backupFlag == 0 && allFlag == 1 &&
00303 getValByKey (&dataObjInp->condInput, DEST_RESC_NAME_KW) == NULL &&
00304 dataObjInfoHead != NULL && dataObjInfoHead->rescGroupName[0] != '\0') {
00305
00306 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, dataObjInfoHead->rescGroupName );
00307 }
00308
00309
00310 dataObjInp->oprType = REPLICATE_OPR;
00311 status = getRescGrpForCreate( rsComm, dataObjInp, &myRescGrpInfo );
00312 if (status < 0) {
00313 rodsLog(LOG_NOTICE, "%s - Failed to get a resource group for create.", __FUNCTION__);
00314 return status;
00315 }
00316
00317 if (multiCopyFlag == 0 ) {
00318
00319
00320
00321
00322
00323
00324
00325 status = resolveSingleReplCopy( &dataObjInfoHead, &oldDataObjInfoHead,
00326 &myRescGrpInfo, &destDataObjInfo,
00327 &dataObjInp->condInput );
00328
00329 if (status == HAVE_GOOD_COPY) {
00330
00331
00332
00333 if (outDataObjInfo != NULL && destDataObjInfo != NULL) {
00334
00335 *outDataObjInfo = *destDataObjInfo;
00336 outDataObjInfo->next = NULL;
00337 }
00338
00339
00340
00341 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00342 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00343 ( myRescGrpInfo->status < 0 ) ) {
00344 status = myRescGrpInfo->status;
00345
00346 } else {
00347 status = 0;
00348 }
00349
00350
00351
00352 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00353 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00354 ( myRescGrpInfo->status < 0 ) ) {
00355 status = myRescGrpInfo->status;
00356
00357 } else {
00358 status = 0;
00359 }
00360
00361 freeAllDataObjInfo (dataObjInfoHead);
00362 freeAllDataObjInfo (oldDataObjInfoHead);
00363 freeAllDataObjInfo (destDataObjInfo);
00364 freeAllRescGrpInfo (myRescGrpInfo);
00365
00366 return status;
00367 } else if (status < 0) {
00368 freeAllDataObjInfo (dataObjInfoHead);
00369 freeAllDataObjInfo (oldDataObjInfoHead);
00370 freeAllDataObjInfo (destDataObjInfo);
00371 freeAllRescGrpInfo (myRescGrpInfo);
00372 rodsLog(LOG_NOTICE, "%s - Failed to resolve a single replication copy.", __FUNCTION__);
00373 return status;
00374 }
00375
00376
00377 }
00378
00379
00380 status = applyPreprocRuleForOpen (rsComm, dataObjInp, &dataObjInfoHead);
00381 if (status < 0) return status;
00382
00383
00384
00385 if (destDataObjInfo != NULL) {
00386
00387 status = _rsDataObjReplUpdate( rsComm, dataObjInp, dataObjInfoHead,
00388 destDataObjInfo, transStat, oldDataObjInfoHead);
00389 if (status >= 0) {
00390 if (outDataObjInfo != NULL) {
00391 *outDataObjInfo = *destDataObjInfo;
00392 outDataObjInfo->next = NULL;
00393 }
00394 if (allFlag == 0) {
00395 freeAllDataObjInfo (dataObjInfoHead);
00396 freeAllDataObjInfo (oldDataObjInfoHead);
00397 freeAllDataObjInfo (destDataObjInfo);
00398 freeAllRescGrpInfo (myRescGrpInfo);
00399 return 0;
00400 } else {
00401
00402
00403 queDataObjInfo (&dataObjInfoHead, destDataObjInfo, 0, 1);
00404 destDataObjInfo = NULL;
00405 }
00406 } else {
00407 savedStatus = status;
00408 }
00409 }
00410
00411 if (myRescGrpInfo != NULL) {
00412
00413 status = _rsDataObjReplNewCopy( rsComm, dataObjInp, dataObjInfoHead,
00414 myRescGrpInfo, transStat, oldDataObjInfoHead,
00415 outDataObjInfo);
00416 if (status < 0) savedStatus = status;
00417 }
00418
00419 freeAllDataObjInfo (dataObjInfoHead);
00420 freeAllDataObjInfo (oldDataObjInfoHead);
00421 freeAllRescGrpInfo (myRescGrpInfo);
00422
00423 return (savedStatus);
00424 }
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437 int _rsDataObjReplUpdate(
00438 rsComm_t* rsComm,
00439 dataObjInp_t* dataObjInp,
00440 dataObjInfo_t* srcDataObjInfoHead,
00441 dataObjInfo_t* destDataObjInfoHead,
00442 transferStat_t* transStat,
00443 dataObjInfo_t* oldDataObjInfo ) {
00444
00445
00446 dataObjInfo_t *destDataObjInfo = 0;
00447 dataObjInfo_t *srcDataObjInfo = 0;
00448 int status = 0;
00449 int allFlag = 0;
00450 int savedStatus = 0;
00451 int replCnt = 0;
00452
00453
00454
00455 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00456 allFlag = 1;
00457 } else {
00458 allFlag = 0;
00459 }
00460
00461
00462
00463 std::string dst_resc_hier;
00464 char* dst_resc_hier_ptr = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00465 if( dst_resc_hier_ptr ) {
00466 dst_resc_hier = dst_resc_hier_ptr;
00467
00468 }
00469
00470
00471
00472 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00473 destDataObjInfo = destDataObjInfoHead;
00474 while (destDataObjInfo != NULL) {
00475
00476
00477 if (destDataObjInfo->dataId == 0) {
00478 destDataObjInfo = destDataObjInfo->next;
00479 continue;
00480 }
00481
00482
00483
00484 srcDataObjInfo = srcDataObjInfoHead;
00485 while (srcDataObjInfo != NULL) {
00486
00487
00488
00489 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, destDataObjInfo->rescHier );
00490 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo, NULL, "", destDataObjInfo, 1 );
00491
00492 if (status >= 0) {
00493 break;
00494 }
00495 srcDataObjInfo = srcDataObjInfo->next;
00496 }
00497
00498 if (status >= 0) {
00499 transStat->numThreads = dataObjInp->numThreads;
00500 if (allFlag == 0) {
00501 return 0;
00502 }
00503 } else {
00504 savedStatus = status;
00505 replCnt ++;
00506 }
00507 destDataObjInfo = destDataObjInfo->next;
00508 }
00509
00510
00511
00512 if( !dst_resc_hier.empty() ) {
00513 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, dst_resc_hier.c_str() );
00514
00515 }
00516
00517 return savedStatus;
00518
00519 }
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533 int
00534 _rsDataObjReplNewCopy (
00535 rsComm_t *rsComm,
00536 dataObjInp_t *dataObjInp,
00537 dataObjInfo_t *srcDataObjInfoHead,
00538 rescGrpInfo_t *destRescGrpInfo,
00539 transferStat_t *transStat,
00540 dataObjInfo_t *oldDataObjInfo,
00541 dataObjInfo_t *outDataObjInfo)
00542 {
00543 dataObjInfo_t *srcDataObjInfo;
00544 rescGrpInfo_t *tmpRescGrpInfo;
00545 rescInfo_t *tmpRescInfo;
00546 int status;
00547 int allFlag;
00548 int savedStatus = 0;
00549
00550 #if 0
00551 rescInfo_t *compRescInfo = NULL;
00552 rescInfo_t *cacheRescInfo = NULL;
00553 #endif
00554
00555 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00556 allFlag = 1;
00557 } else {
00558 allFlag = 0;
00559 }
00560 #if 0 // JMC - legacy resource
00561
00562
00563
00564
00565 if( allFlag == 1 && destRescGrpInfo != NULL &&
00566 strlen(destRescGrpInfo->rescGroupName) > 0 &&
00567 getRescGrpClass (destRescGrpInfo, &compRescInfo) == COMPOUND_CL) {
00568 getCacheRescInGrp (rsComm, destRescGrpInfo->rescGroupName,compRescInfo, &cacheRescInfo);
00569 }
00570 #endif // JMC - legacy resource
00571
00572
00573 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00574 tmpRescGrpInfo = destRescGrpInfo;
00575 while (tmpRescGrpInfo != NULL) {
00576 tmpRescInfo = tmpRescGrpInfo->rescInfo;
00577 #if 0 // JMC - legacy resource
00578
00579
00580 if (tmpRescInfo == cacheRescInfo) {
00581
00582
00583 tmpRescGrpInfo = tmpRescGrpInfo->next;
00584 continue;
00585 }
00586
00587 if (getRescClass (tmpRescInfo) == COMPOUND_CL) {
00588
00589 if ((status = getCacheDataInfoOfCompResc (rsComm, dataObjInp,
00590 srcDataObjInfoHead, NULL, tmpRescGrpInfo,
00591 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00592 return status;
00593 }
00594
00595 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00596 tmpRescInfo, tmpRescGrpInfo->rescGroupName, outDataObjInfo, 0);
00597 } else {
00598 #endif // JMC - legacy resource
00599
00600
00601 {
00602 srcDataObjInfo = srcDataObjInfoHead;
00603 while (srcDataObjInfo != NULL) {
00604 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo,
00605 tmpRescInfo, tmpRescGrpInfo->rescGroupName,
00606 outDataObjInfo, 0);
00607 if (status >= 0) {
00608 break;
00609 } else {
00610 savedStatus = status;
00611 }
00612 srcDataObjInfo = srcDataObjInfo->next;
00613 }
00614 }
00615 if (status >= 0) {
00616 transStat->numThreads = dataObjInp->numThreads;
00617 if (allFlag == 0) {
00618 return 0;
00619 }
00620 } else {
00621 savedStatus = status;
00622 }
00623 tmpRescGrpInfo = tmpRescGrpInfo->next;
00624 }
00625
00626 if (savedStatus == 0 && destRescGrpInfo->status < 0) {
00627
00628 return destRescGrpInfo->status;
00629 } else {
00630 return (savedStatus);
00631 }
00632 }
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647 int
00648 _rsDataObjReplS (
00649 rsComm_t *rsComm,
00650 dataObjInp_t *dataObjInp,
00651 dataObjInfo_t *srcDataObjInfo,
00652 rescInfo_t *destRescInfo,
00653 char *rescGroupName,
00654 dataObjInfo_t *destDataObjInfo,
00655 int updateFlag)
00656 {
00657 int status, status1;
00658 int l1descInx;
00659 openedDataObjInp_t dataObjCloseInp;
00660 dataObjInfo_t *myDestDataObjInfo;
00661
00662 l1descInx = dataObjOpenForRepl( rsComm, dataObjInp, srcDataObjInfo, destRescInfo,
00663 rescGroupName, destDataObjInfo, updateFlag );
00664
00665 if (l1descInx < 0) {
00666 return (l1descInx);
00667 }
00668
00669 if (L1desc[l1descInx].stageFlag != NO_STAGING) {
00670 status = l3DataStageSync (rsComm, l1descInx);
00671 } else if( L1desc[l1descInx].dataObjInp->numThreads == 0 &&
00672 L1desc[l1descInx].dataObjInfo->dataSize <= MAX_SZ_FOR_SINGLE_BUF ) {
00673 status = l3DataCopySingleBuf (rsComm, l1descInx);
00674 } else {
00675 status = dataObjCopy( rsComm, l1descInx );
00676 }
00677
00678 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00679
00680 dataObjCloseInp.l1descInx = l1descInx;
00681
00682 L1desc[l1descInx].oprStatus = status;
00683 if (status >= 0) {
00684 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataObjInfo->dataSize;
00685 }
00686
00687
00688 char* pdmo_kw = getValByKey(&dataObjInp->condInput, IN_PDMO_KW);
00689 if(pdmo_kw != NULL) {
00690 addKeyVal(&dataObjCloseInp.condInput, IN_PDMO_KW, pdmo_kw);
00691 }
00692
00693 status1 = irsDataObjClose (rsComm, &dataObjCloseInp, &myDestDataObjInfo);
00694
00695 if (destDataObjInfo != NULL) {
00696 if (destDataObjInfo->dataId <= 0 && myDestDataObjInfo != NULL) {
00697 destDataObjInfo->dataId = myDestDataObjInfo->dataId;
00698 destDataObjInfo->replNum = myDestDataObjInfo->replNum;
00699 } else {
00700
00701 destDataObjInfo->dataSize = myDestDataObjInfo->dataSize;
00702 }
00703 }
00704
00705 freeDataObjInfo (myDestDataObjInfo);
00706
00707 if (status < 0) {
00708 return status;
00709 } else if (status1 < 0) {
00710 return status1;
00711 } else {
00712 return (status);
00713 }
00714 }
00715
00716
00717
00718
00719 int
00720 dataObjOpenForRepl (
00721 rsComm_t *rsComm,
00722 dataObjInp_t *dataObjInp,
00723 dataObjInfo_t *inpSrcDataObjInfo,
00724 rescInfo_t *destRescInfo,
00725 char *rescGroupName,
00726 dataObjInfo_t *inpDestDataObjInfo,
00727 int updateFlag)
00728 {
00729 dataObjInfo_t *myDestDataObjInfo = 0, *srcDataObjInfo = NULL;
00730 rescInfo_t *myDestRescInfo = 0;
00731 int destL1descInx = 0;
00732 int srcL1descInx = 0;
00733 int status = 0;
00734 int replStatus = 0;
00735
00736
00737
00738 dataObjInfo_t *cacheDataObjInfo = NULL;
00739 dataObjInp_t dest_inp, myDataObjInp, *l1DataObjInp = 0;
00740 if (destRescInfo == NULL) {
00741 myDestRescInfo = inpDestDataObjInfo->rescInfo;
00742 } else {
00743 myDestRescInfo = destRescInfo;
00744 }
00745
00746 if (inpSrcDataObjInfo->rescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00747 return SYS_RESC_IS_DOWN;
00748 }
00749
00750 if (myDestRescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00751 return SYS_RESC_IS_DOWN;
00752 }
00753
00754 #if 0 // JMC - legacy resource
00755 destRescClass = getRescClass (myDestRescInfo);
00756 #endif // JMC - legacy resource
00757
00758
00759
00760 #if 0 // JMC - legacy resource
00761 if (srcRescClass == COMPOUND_CL) {
00762 rescGrpInfo_t *myRescGrpInfo;
00763 if (destRescClass == CACHE_CL &&
00764 isRescsInSameGrp (rsComm, myDestRescInfo->rescName, inpSrcDataObjInfo->rescInfo->rescName,
00765 &myRescGrpInfo)) {
00766
00767 if (strlen (inpSrcDataObjInfo->rescGroupName) == 0) {
00768 rstrcpy (inpSrcDataObjInfo->rescGroupName,
00769 myRescGrpInfo->rescGroupName, NAME_LEN);
00770 }
00771 } else if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00772 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00773 cacheDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00774 status = stageDataFromCompToCache (rsComm, inpSrcDataObjInfo,
00775 cacheDataObjInfo);
00776 if (status < 0) { free( cacheDataObjInfo ); return status; }
00777
00778 srcRescClass = getRescClass (cacheDataObjInfo->rescInfo);
00779 }
00780 }
00781
00782 #endif // JMC - legacy resource
00783 if (cacheDataObjInfo == NULL) {
00784 srcDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00785 *srcDataObjInfo = *inpSrcDataObjInfo;
00786 srcDataObjInfo->rescInfo = new rescInfo_t;
00787 memcpy( srcDataObjInfo->rescInfo, inpSrcDataObjInfo->rescInfo, sizeof( rescInfo_t ) );
00788
00789 } else {
00790 srcDataObjInfo = cacheDataObjInfo;
00791 }
00792
00793 if( NULL == srcDataObjInfo ) {
00794 rodsLog( LOG_ERROR, "dataObjOpenForRepl - srcDataObjInfo is NULL" );
00795 return -1;
00796 }
00797
00798 myDataObjInp = *dataObjInp;
00799 myDataObjInp.dataSize = inpSrcDataObjInfo->dataSize;
00800
00801 destL1descInx = allocL1desc ();
00802
00803 if (destL1descInx < 0) return destL1descInx;
00804
00805
00806
00807
00808 std::string op_name;
00809 memset (&dest_inp, 0, sizeof (dest_inp));
00810 memset (&dest_inp.condInput, 0, sizeof (dest_inp.condInput));
00811 strncpy( dest_inp.objPath, dataObjInp->objPath, MAX_NAME_LEN );
00812 addKeyVal( &(dest_inp.condInput), RESC_NAME_KW, myDestRescInfo->rescName );
00813
00814 myDestDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00815 if (updateFlag > 0) {
00816
00817
00818 op_name = eirods::EIRODS_WRITE_OPERATION;
00819
00820
00821 if(inpDestDataObjInfo == NULL || inpDestDataObjInfo->dataId <= 0) {
00822 rodsLog( LOG_ERROR, "dataObjOpenForRepl: dataId of %s copy to be updated not defined",
00823 srcDataObjInfo->objPath);
00824 return (SYS_UPDATE_REPL_INFO_ERR);
00825 }
00826
00827 inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus;
00828 *myDestDataObjInfo = *inpDestDataObjInfo;
00829 myDestDataObjInfo->rescInfo = new rescInfo_t;
00830 memcpy( myDestDataObjInfo->rescInfo, inpDestDataObjInfo->rescInfo, sizeof( rescInfo_t ) );
00831 replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY;
00832 addKeyVal (&myDataObjInp.condInput, FORCE_FLAG_KW, "");
00833 myDataObjInp.openFlags |= (O_TRUNC | O_WRONLY);
00834 } else {
00835
00836
00837 op_name = eirods::EIRODS_CREATE_OPERATION;
00838
00839 initDataObjInfoForRepl( rsComm, myDestDataObjInfo, srcDataObjInfo,
00840 destRescInfo, rescGroupName);
00841 replStatus = srcDataObjInfo->replStatus;
00842 }
00843
00844
00845
00846 std::string hier;
00847 char* dst_hier_str = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00848 if( 0 == dst_hier_str ) {
00849
00850 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00851
00852 eirods::error ret = eirods::resolve_resource_hierarchy( op_name, rsComm, &dest_inp, hier );
00853 if( !ret.ok() ) {
00854 std::stringstream msg;
00855 msg << "failed in eirods::resolve_resource_hierarchy for [";
00856 msg << dest_inp.objPath << "]";
00857 eirods::log( PASSMSG( msg.str(), ret ) );
00858 return ret.code();
00859 }
00860
00861 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, hier.c_str() );
00862
00863 } else {
00864 hier = dst_hier_str;
00865
00866 }
00867
00868
00869
00870 rstrcpy(myDestDataObjInfo->rescHier, hier.c_str(), MAX_NAME_LEN);
00871
00872
00873
00874
00875 fillL1desc (destL1descInx, &myDataObjInp, myDestDataObjInfo, replStatus, srcDataObjInfo->dataSize);
00876
00877 l1DataObjInp = L1desc[destL1descInx].dataObjInp;
00878 if (l1DataObjInp->oprType == PHYMV_OPR) {
00879 L1desc[destL1descInx].oprType = PHYMV_DEST;
00880 myDestDataObjInfo->replNum = srcDataObjInfo->replNum;
00881 myDestDataObjInfo->dataId = srcDataObjInfo->dataId;
00882 } else {
00883 L1desc[destL1descInx].oprType = REPLICATE_DEST;
00884 }
00885
00886
00887
00888
00889 char* stage_kw = getValByKey( &dataObjInp->condInput, STAGE_OBJ_KW );
00890 char* sync_kw = getValByKey( &dataObjInp->condInput, SYNC_OBJ_KW );
00891 if( stage_kw ) {
00892 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00893 } else if( sync_kw ) {
00894 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00895 } else {
00896
00897
00898 }
00899
00900 char* src_hier_str = 0;
00901 if (srcDataObjInfo != NULL && srcDataObjInfo->rescHier != NULL) {
00902 src_hier_str = srcDataObjInfo->rescHier;
00903 }
00904
00905 l1DataObjInp->numThreads = dataObjInp->numThreads =
00906 getNumThreads( rsComm, l1DataObjInp->dataSize, l1DataObjInp->numThreads,
00907
00908 &dataObjInp->condInput, dst_hier_str, src_hier_str );
00909
00910 if( l1DataObjInp->numThreads > 0 &&
00911 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00912 if (updateFlag > 0) {
00913 status = dataOpen (rsComm, destL1descInx);
00914 } else {
00915 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00916 if (status >= 0)
00917 status = dataCreate (rsComm, destL1descInx);
00918 }
00919
00920 if (status < 0) {
00921 freeL1desc (destL1descInx);
00922 return (status);
00923 }
00924 } else {
00925 if (updateFlag == 0) {
00926 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00927 if (status < 0) {
00928 freeL1desc (destL1descInx);
00929 return (status);
00930 }
00931 }
00932 }
00933
00934 if (inpDestDataObjInfo != NULL && updateFlag == 0) {
00935
00936 *inpDestDataObjInfo = *myDestDataObjInfo;
00937 inpDestDataObjInfo->next = NULL;
00938 }
00939
00940
00941
00942
00943 eirods::file_object_ptr file_obj(
00944 new eirods::file_object(
00945 rsComm,
00946 myDestDataObjInfo ) );
00947 eirods::error ret = fileNotify(
00948 rsComm,
00949 file_obj,
00950 eirods::EIRODS_WRITE_OPERATION );
00951 if(!ret.ok()) {
00952 std::stringstream msg;
00953 msg << "Failed to signal the resource that the data object \"";
00954 msg << myDestDataObjInfo->objPath;
00955 msg << "\" was modified.";
00956 ret = PASSMSG( msg.str(), ret );
00957 eirods::log( ret );
00958 return ret.code();
00959 }
00960
00961
00962 rstrcpy(srcDataObjInfo->rescHier, inpSrcDataObjInfo->rescHier, MAX_NAME_LEN);
00963
00964 srcL1descInx = allocL1desc ();
00965 if (srcL1descInx < 0) return srcL1descInx;
00966 fillL1desc (srcL1descInx, &myDataObjInp, srcDataObjInfo, srcDataObjInfo->replStatus, srcDataObjInfo->dataSize);
00967 l1DataObjInp = L1desc[srcL1descInx].dataObjInp;
00968 l1DataObjInp->numThreads = dataObjInp->numThreads;
00969 if (l1DataObjInp->oprType == PHYMV_OPR) {
00970 L1desc[srcL1descInx].oprType = PHYMV_SRC;
00971 } else {
00972 L1desc[srcL1descInx].oprType = REPLICATE_SRC;
00973 }
00974
00975 if( getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL ) {
00976 L1desc[srcL1descInx].purgeCacheFlag = 1;
00977 }
00978
00979 if( l1DataObjInp->numThreads > 0 &&
00980 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00981 openedDataObjInp_t dataObjCloseInp;
00982
00983 l1DataObjInp->openFlags = O_RDONLY;
00984 status = dataOpen (rsComm, srcL1descInx);
00985 if (status < 0) {
00986 freeL1desc (srcL1descInx);
00987 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00988 dataObjCloseInp.l1descInx = destL1descInx;
00989 rsDataObjClose (rsComm, &dataObjCloseInp);
00990 return (status);
00991 }
00992 }
00993
00994 L1desc[destL1descInx].srcL1descInx = srcL1descInx;
00995
00996 return (destL1descInx);
00997 }
00998
00999 int
01000 dataObjCopy (rsComm_t *rsComm, int l1descInx)
01001 {
01002 int srcL1descInx, destL1descInx;
01003 int srcL3descInx, destL3descInx;
01004 int status;
01005 portalOprOut_t *portalOprOut = NULL;
01006 dataCopyInp_t dataCopyInp;
01007 dataOprInp_t *dataOprInp;
01008 int srcRemoteFlag, destRemoteFlag;
01009
01010 bzero (&dataCopyInp, sizeof (dataCopyInp));
01011 dataOprInp = &dataCopyInp.dataOprInp;
01012 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01013 destL1descInx = l1descInx;
01014
01015 srcL3descInx = L1desc[srcL1descInx].l3descInx;
01016 destL3descInx = L1desc[destL1descInx].l3descInx;
01017
01018 if (L1desc[srcL1descInx].remoteZoneHost != NULL) {
01019 srcRemoteFlag = REMOTE_ZONE_HOST;
01020 } else {
01021 srcRemoteFlag = FileDesc[srcL3descInx].rodsServerHost->localFlag;
01022 }
01023
01024 if (L1desc[destL1descInx].remoteZoneHost != NULL) {
01025 destRemoteFlag = REMOTE_ZONE_HOST;
01026 } else {
01027 destRemoteFlag = FileDesc[destL3descInx].rodsServerHost->localFlag;
01028 }
01029
01030 if (srcRemoteFlag != REMOTE_ZONE_HOST &&
01031 destRemoteFlag != REMOTE_ZONE_HOST &&
01032 FileDesc[srcL3descInx].rodsServerHost ==
01033 FileDesc[destL3descInx].rodsServerHost) {
01034
01035 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, SAME_HOST_COPY_OPR);
01036
01037 dataCopyInp.portalOprOut.numThreads =
01038 dataCopyInp.dataOprInp.numThreads;
01039 if (srcRemoteFlag == LOCAL_HOST) {
01040 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01041 }
01042
01043 } else if ((srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) ||
01044 destRemoteFlag == REMOTE_ZONE_HOST) {
01045 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_REM_OPR);
01046
01047 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01048
01049
01050 status = preProcParaPut (rsComm, destL1descInx, &portalOprOut);
01051 if (status < 0 || NULL == portalOprOut ) {
01052 rodsLog (LOG_NOTICE,
01053 "dataObjCopy: preProcParaPut error for %s",
01054 L1desc[srcL1descInx].dataObjInfo->objPath);
01055 return (status);
01056 }
01057 dataCopyInp.portalOprOut = *portalOprOut;
01058 } else {
01059 dataCopyInp.portalOprOut.l1descInx = destL1descInx;
01060 }
01061 if (srcRemoteFlag == LOCAL_HOST)
01062 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01063 } else if ((srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) ||
01064 srcRemoteFlag == REMOTE_ZONE_HOST) {
01065
01066 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01067
01068 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01069
01070 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01071 if (status < 0 || NULL == portalOprOut ) {
01072 rodsLog (LOG_NOTICE,
01073 "dataObjCopy: preProcParaGet error for %s",
01074 L1desc[srcL1descInx].dataObjInfo->objPath);
01075 return (status);
01076 }
01077 dataCopyInp.portalOprOut = *portalOprOut;
01078 } else {
01079 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01080 }
01081 if (destRemoteFlag == LOCAL_HOST)
01082 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01083 } else {
01084
01085 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01086
01087
01088 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01089 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01090
01091 if (status < 0 || NULL == portalOprOut ) {
01092 rodsLog (LOG_NOTICE,
01093 "dataObjCopy: preProcParaGet error for %s",
01094 L1desc[srcL1descInx].dataObjInfo->objPath);
01095 return (status);
01096 }
01097 dataCopyInp.portalOprOut = *portalOprOut;
01098 } else {
01099 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01100 }
01101 }
01102
01103 if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01104 NO_CHK_COPY_LEN_KW) != NULL) {
01105
01106 addKeyVal (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW, "");
01107 if (L1desc[l1descInx].dataObjInp->numThreads > 1) {
01108 L1desc[l1descInx].dataObjInp->numThreads =
01109 L1desc[srcL1descInx].dataObjInp->numThreads =
01110 dataCopyInp.portalOprOut.numThreads = 1;
01111 }
01112 }
01113 status = rsDataCopy (rsComm, &dataCopyInp);
01114
01115 if (status >= 0 && portalOprOut != NULL &&
01116 L1desc[l1descInx].dataObjInp != NULL) {
01117
01118 L1desc[l1descInx].dataObjInp->numThreads = portalOprOut->numThreads;
01119 }
01120 if (portalOprOut != NULL) free (portalOprOut);
01121 clearKeyVal (&dataOprInp->condInput);
01122
01123 return (status);
01124 }
01125
01126 int
01127 l3DataCopySingleBuf (rsComm_t *rsComm, int l1descInx)
01128 {
01129 bytesBuf_t dataBBuf;
01130 int bytesRead, bytesWritten;
01131 int srcL1descInx;
01132
01133 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01134 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01135
01136 if (L1desc[srcL1descInx].dataSize < 0) {
01137 rodsLog (LOG_ERROR,
01138 "l3DataCopySingleBuf: dataSize %lld for %s is negative",
01139 L1desc[srcL1descInx].dataSize,
01140 L1desc[srcL1descInx].dataObjInfo->objPath);
01141 return (SYS_COPY_LEN_ERR);
01142 } else if (L1desc[srcL1descInx].dataSize == 0) {
01143 bytesRead = 0;
01144 } else {
01145 dataBBuf.buf = malloc (L1desc[srcL1descInx].dataSize);
01146 bytesRead = rsL3FileGetSingleBuf (rsComm, &srcL1descInx, &dataBBuf);
01147 }
01148
01149 if (bytesRead < 0) {
01150 free( dataBBuf.buf );
01151 return (bytesRead);
01152 } else if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01153 NO_CHK_COPY_LEN_KW) != NULL) {
01154
01155 L1desc[l1descInx].dataSize = L1desc[l1descInx].dataObjInp->dataSize
01156 = bytesRead;
01157 }
01158
01159 bytesWritten = rsL3FilePutSingleBuf (rsComm, &l1descInx, &dataBBuf);
01160
01161 L1desc[l1descInx].bytesWritten = bytesWritten;
01162
01163 if (dataBBuf.buf != NULL) {
01164 free (dataBBuf.buf);
01165 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01166 }
01167
01168 if (bytesWritten != bytesRead) {
01169 if (bytesWritten >= 0) {
01170 rodsLog (LOG_NOTICE,
01171 "l3DataCopySingleBuf: l3FilePut error, towrite %d, written %d",
01172 bytesRead, bytesWritten);
01173 return (SYS_COPY_LEN_ERR);
01174 } else {
01175 return (bytesWritten);
01176 }
01177 }
01178
01179
01180 return (0);
01181 }
01182
01183 int
01184 l3DataStageSync (rsComm_t *rsComm, int l1descInx)
01185 {
01186 bytesBuf_t dataBBuf;
01187 int srcL1descInx;
01188 int status = 0;
01189
01190 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01191
01192 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01193 if (L1desc[srcL1descInx].dataSize < 0 &&
01194 L1desc[srcL1descInx].dataSize != UNKNOWN_FILE_SZ) {
01195 rodsLog (LOG_ERROR,
01196 "l3DataStageSync: dataSize %lld for %s is negative",
01197 L1desc[srcL1descInx].dataSize,
01198 L1desc[srcL1descInx].dataObjInfo->objPath);
01199 return (SYS_COPY_LEN_ERR);
01200 } else if (L1desc[srcL1descInx].dataSize >= 0 ||
01201 L1desc[srcL1descInx].dataSize == UNKNOWN_FILE_SZ) {
01202 if (L1desc[l1descInx].stageFlag == SYNC_DEST) {
01203
01204 status = l3FileSync (rsComm, srcL1descInx, l1descInx);
01205 } else {
01206
01207 status = l3FileStage (rsComm, srcL1descInx, l1descInx);
01208 }
01209 } else {
01210
01211 }
01212
01213 if (status < 0) {
01214 L1desc[l1descInx].bytesWritten = -1;
01215 } else {
01216 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataSize =
01217 L1desc[srcL1descInx].dataSize;
01218 }
01219
01220 return (status);
01221 }
01222
01223 int
01224 l3FileSync (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01225 {
01226 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01227
01228 fileStageSyncInp_t fileSyncToArchInp;
01229 dataObjInp_t *dataObjInp;
01230 int status;
01231 dataObjInfo_t tmpDataObjInfo;
01232
01233 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01234 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01235
01236 #if 0 // JMC legacy resource
01237 rescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01238 cacheRescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01239
01240 switch (RescTypeDef[rescTypeInx].rescCat) {
01241 case FILE_CAT:
01242
01243 if (RescTypeDef[rescTypeInx].createPathFlag == CREATE_PATH) {
01244 #endif // JMC legacy resource
01245
01246 int dst_create_path = 0;
01247 eirods::error err = eirods::get_resource_property< int >( destDataObjInfo->rescInfo->rescName,
01248 eirods::RESOURCE_CREATE_PATH, dst_create_path );
01249 if( !err.ok() ) {
01250 eirods::log( PASS( err ) );
01251 }
01252
01253 if( CREATE_PATH == dst_create_path ) {
01254
01255 status = chkOrphanFile ( rsComm, destDataObjInfo->filePath, destDataObjInfo->rescName, &tmpDataObjInfo );
01256 if (status == 0 && tmpDataObjInfo.dataId != destDataObjInfo->dataId) {
01257
01258 char tmp_str[ MAX_NAME_LEN ];
01259 snprintf( tmp_str, MAX_NAME_LEN, "%s.%-d", destDataObjInfo->filePath, (int) random());
01260 strncpy( destDataObjInfo->filePath, tmp_str, MAX_NAME_LEN );
01261 }
01262 }
01263
01264 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01265 dataObjInp = L1desc[destL1descInx].dataObjInp;
01266 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01267 fileSyncToArchInp.fileType = static_cast< fileDriverType_t >( -1 );
01268 fileSyncToArchInp.cacheFileType = static_cast< fileDriverType_t >( -1 );
01269
01270
01271
01272 std::string location;
01273 eirods::error ret = eirods::get_loc_for_hier_string( srcDataObjInfo->rescHier, location );
01274 if( !ret.ok() ) {
01275 eirods::log( PASSMSG( "failed in get_loc_for_hier_String", ret ) );
01276 return -1;
01277 }
01278
01279 rstrcpy( fileSyncToArchInp.addr.hostAddr, location.c_str(), NAME_LEN );
01280
01281
01282 rstrcpy( fileSyncToArchInp.filename, destDataObjInfo->filePath, MAX_NAME_LEN );
01283 rstrcpy( fileSyncToArchInp.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01284 rstrcpy( fileSyncToArchInp.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01285 rstrcpy( fileSyncToArchInp.cacheFilename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01286
01287 fileSyncToArchInp.mode = getFileMode (dataObjInp);
01288 status = rsFileSyncToArch (rsComm, &fileSyncToArchInp);
01289
01290 if (status >= 0 &&
01291 CREATE_PATH == dst_create_path &&
01292 fileSyncToArchInp.filename != NULL) {
01293
01294
01295 rstrcpy (destDataObjInfo->filePath, fileSyncToArchInp.filename, MAX_NAME_LEN);
01296 L1desc[destL1descInx].replStatus |= FILE_PATH_HAS_CHG;
01297 }
01298 #if 0 // JMC legacy resource
01299 break;
01300 default:
01301 rodsLog (LOG_ERROR,
01302 "l3FileSync: rescCat type %d is not recognized",
01303 RescTypeDef[rescTypeInx].rescCat);
01304 status = SYS_INVALID_RESC_TYPE;
01305 break;
01306 }
01307 #endif // JMC legacy resource
01308 return (status);
01309 }
01310
01311 int
01312 l3FileStage (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01313 {
01314 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01315 int mode, status;
01316
01317 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01318 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01319
01320 mode = getFileMode (L1desc[destL1descInx].dataObjInp);
01321
01322 status = _l3FileStage (rsComm, srcDataObjInfo, destDataObjInfo, mode);
01323
01324 return status;
01325 }
01326
01327 int
01328 _l3FileStage (rsComm_t *rsComm, dataObjInfo_t *srcDataObjInfo,
01329 dataObjInfo_t *destDataObjInfo, int mode)
01330 {
01331
01332 fileStageSyncInp_t file_stage;
01333 int status;
01334
01335 #if 0 // JMC legacy resource
01336 rescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01337 cacheRescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01338
01339
01340 switch (RescTypeDef[rescTypeInx].rescCat) {
01341 case FILE_CAT:
01342 #endif // JMC legacy resource
01343 memset (&file_stage, 0, sizeof (file_stage));
01344 file_stage.dataSize = srcDataObjInfo->dataSize;
01345 file_stage.fileType = static_cast< fileDriverType_t >( -1 );
01346 file_stage.cacheFileType = static_cast< fileDriverType_t >( -1 );
01347
01348 rstrcpy( file_stage.addr.hostAddr,
01349 destDataObjInfo->rescInfo->rescLoc, NAME_LEN );
01350
01351 rstrcpy( file_stage.cacheFilename, destDataObjInfo->filePath, MAX_NAME_LEN );
01352 rstrcpy( file_stage.filename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01353 rstrcpy( file_stage.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01354 rstrcpy( file_stage.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01355 file_stage.mode = mode;
01356 status = rsFileStageToCache( rsComm, &file_stage );
01357 #if 0 // JMC legacy resource
01358 break;
01359 default:
01360 rodsLog (LOG_ERROR,
01361 "l3FileStage: rescCat type %d is not recognized",
01362 RescTypeDef[rescTypeInx].rescCat);
01363 status = SYS_INVALID_RESC_TYPE;
01364 break;
01365 }
01366 #endif // JMC legacy resource
01367 return (status);
01368 }
01369
01370
01371
01372
01373
01374
01375
01376
01377
01378 int
01379 rsReplAndRequeDataObjInfo (rsComm_t *rsComm,
01380 dataObjInfo_t **srcDataObjInfoHead, char *destRescName, char *flagStr)
01381 {
01382 dataObjInfo_t *dataObjInfoHead, *myDataObjInfo;
01383 transferStat_t transStat;
01384 dataObjInp_t dataObjInp;
01385 char tmpStr[NAME_LEN];
01386 int status;
01387
01388 dataObjInfoHead = *srcDataObjInfoHead;
01389 myDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01390 memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
01391 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01392 memset (&transStat, 0, sizeof (transStat));
01393
01394 if (flagStr != NULL) {
01395 if (strstr (flagStr, ALL_KW) != NULL) {
01396 addKeyVal (&dataObjInp.condInput, ALL_KW, "");
01397 }
01398 if (strstr (flagStr, RBUDP_TRANSFER_KW) != NULL) {
01399 addKeyVal (&dataObjInp.condInput, RBUDP_TRANSFER_KW, "");
01400 }
01401 if (strstr (flagStr, SU_CLIENT_USER_KW) != NULL) {
01402 addKeyVal (&dataObjInp.condInput, SU_CLIENT_USER_KW, "");
01403 }
01404 if (strstr (flagStr, UPDATE_REPL_KW) != NULL) {
01405 addKeyVal (&dataObjInp.condInput, UPDATE_REPL_KW, "");
01406 }
01407 }
01408
01409 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01410 snprintf (tmpStr, NAME_LEN, "%d", dataObjInfoHead->replNum);
01411 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01412 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, destRescName);
01413
01414 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01415 myDataObjInfo);
01416
01417
01418 clearKeyVal (&dataObjInp.condInput);
01419 if (status >= 0) {
01420 status = 1;
01421
01422 queDataObjInfo (srcDataObjInfoHead, myDataObjInfo, 0, 1);
01423 } else {
01424 freeAllDataObjInfo (myDataObjInfo);
01425 }
01426
01427 return status;
01428 }
01429
01430 #if 0 // JMC - legacy resource
01431 int
01432 stageDataFromCompToCache (rsComm_t *rsComm, dataObjInfo_t *compObjInfo,
01433 dataObjInfo_t *outCacheObjInfo)
01434 {
01435 int status;
01436 rescInfo_t *cacheResc;
01437 transferStat_t transStat;
01438 dataObjInp_t dataObjInp;
01439 char tmpStr[NAME_LEN];
01440
01441 #if 0 // JMC - legacy resource
01442 if (getRescClass (compObjInfo->rescInfo) != COMPOUND_CL) return 0;
01443 #endif // JMC - legacy resource
01444
01445 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,compObjInfo->rescInfo, &cacheResc);
01446 if (status < 0) {
01447 rodsLog (LOG_ERROR,
01448 "stageDataFromCompToCache: getCacheRescInGrp %s failed for %s stat=%d",
01449 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01450 return status;
01451 }
01452 if (outCacheObjInfo != NULL)
01453 memset (outCacheObjInfo, 0, sizeof (dataObjInfo_t));
01454 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01455 memset (&transStat, 0, sizeof (transStat));
01456
01457 rstrcpy (dataObjInp.objPath, compObjInfo->objPath, MAX_NAME_LEN);
01458 snprintf (tmpStr, NAME_LEN, "%d", compObjInfo->replNum);
01459 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01460 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, cacheResc->rescName);
01461
01462 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01463 outCacheObjInfo);
01464
01465 clearKeyVal (&dataObjInp.condInput);
01466 return status;
01467 }
01468
01469
01470
01471
01472
01473
01474 int
01475 stageAndRequeDataToCache (rsComm_t *rsComm, dataObjInfo_t **compObjInfoHead)
01476 {
01477 int status;
01478 dataObjInfo_t *outCacheObjInfo;
01479
01480 outCacheObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01481 bzero (outCacheObjInfo, sizeof (dataObjInfo_t));
01482 status = stageDataFromCompToCache (rsComm, *compObjInfoHead,
01483 outCacheObjInfo);
01484
01485 if (status < 0) {
01486
01487 if (outCacheObjInfo->dataId > 0) {
01488
01489 if( requeDataObjInfoByReplNum (compObjInfoHead,
01490 outCacheObjInfo->replNum) == 0) {
01491
01492 status = 0;
01493 }
01494 }
01495 free (outCacheObjInfo);
01496 } else {
01497 queDataObjInfo (compObjInfoHead, outCacheObjInfo, 0, 1);
01498 }
01499 return status;
01500 }
01501
01502 int
01503 replToCacheRescOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01504 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *compObjInfo,
01505 dataObjInfo_t *oldDataObjInfo, dataObjInfo_t **outDestDataObjInfo)
01506 {
01507 int status = 0;
01508 rescInfo_t *cacheResc;
01509 dataObjInfo_t *destDataObjInfo, *srcDataObjInfo;
01510 dataObjInfo_t *tmpDestDataObjInfo = NULL;
01511 int updateFlag;
01512
01513 if ((status = getCacheDataInfoForRepl (rsComm, oldDataObjInfo, NULL, compObjInfo, &tmpDestDataObjInfo)) >= 0) {
01514 cacheResc = oldDataObjInfo->rescInfo;
01515 updateFlag = 1;
01516 } else {
01517
01518 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01519 compObjInfo->rescInfo, &cacheResc);
01520 if (status < 0) {
01521 rodsLog (LOG_ERROR,
01522 "replToCacheRescOfCompObj:getCacheRescInGrp %s err for %s stat=%d",
01523 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01524 return status;
01525 }
01526 updateFlag = 0;
01527 }
01528
01529 if (outDestDataObjInfo == NULL) {
01530 destDataObjInfo = tmpDestDataObjInfo;
01531 } else {
01532 destDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01533 if (tmpDestDataObjInfo == NULL) {
01534 bzero (destDataObjInfo, sizeof (dataObjInfo_t));
01535 } else {
01536 *destDataObjInfo = *tmpDestDataObjInfo;
01537 }
01538 }
01539 srcDataObjInfo = srcDataObjInfoHead;
01540 while (srcDataObjInfo != NULL) {
01541 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
01542 cacheResc, compObjInfo->rescGroupName, destDataObjInfo, updateFlag);
01543 if (status >= 0) {
01544 if (outDestDataObjInfo != NULL)
01545 *outDestDataObjInfo = destDataObjInfo;
01546 break;
01547 }
01548 srcDataObjInfo = srcDataObjInfo->next;
01549 }
01550
01551 return status;
01552 }
01553 #endif // JMC - legacy resource
01554 int
01555 stageBundledData (rsComm_t *rsComm, dataObjInfo_t **subfileObjInfoHead)
01556 {
01557 int status;
01558 dataObjInfo_t *dataObjInfoHead = *subfileObjInfoHead;
01559 rescInfo_t *cacheResc;
01560 dataObjInp_t dataObjInp;
01561 dataObjInfo_t *cacheObjInfo;
01562
01563 #if 0 // JMC - legacy resource
01564 if (getRescClass (dataObjInfoHead->rescInfo) != BUNDLE_CL) return 0;
01565 #endif // JMC - legacy resource
01566
01567 status = unbunAndStageBunfileObj (rsComm, dataObjInfoHead->filePath,
01568 &cacheResc);
01569
01570 if (status < 0) return status;
01571
01572
01573 bzero (&dataObjInp, sizeof (dataObjInp));
01574 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01575 addKeyVal (&dataObjInp.condInput, RESC_NAME_KW, cacheResc->rescName);
01576 status = getDataObjInfo (rsComm, &dataObjInp, &cacheObjInfo, NULL, 0);
01577 clearKeyVal (&dataObjInp.condInput);
01578 if (status < 0) {
01579 rodsLog (LOG_ERROR,
01580 "unbunAndStageBunfileObj: getDataObjInfo of subfile %s failed.stat=%d",
01581 dataObjInp.objPath, status);
01582 return status;
01583 }
01584
01585 queDataObjInfo (subfileObjInfoHead, cacheObjInfo, 0, 1);
01586
01587
01588 return status;
01589 }
01590
01591 int
01592 unbunAndStageBunfileObj (rsComm_t *rsComm, char *bunfileObjPath,
01593 rescInfo_t **outCacheResc)
01594 {
01595 dataObjInfo_t *bunfileObjInfoHead;
01596 dataObjInp_t dataObjInp;
01597 int status;
01598
01599
01600 bzero (&dataObjInp, sizeof (dataObjInp));
01601 rstrcpy (dataObjInp.objPath, bunfileObjPath, MAX_NAME_LEN);
01602
01603 status = getDataObjInfo (rsComm, &dataObjInp, &bunfileObjInfoHead, NULL, 1);
01604 if (status < 0) {
01605 rodsLog (LOG_ERROR,
01606 "unbunAndStageBunfileObj: getDataObjInfo of bunfile %s failed.stat=%d",
01607 dataObjInp.objPath, status);
01608 return status;
01609 }
01610 status = _unbunAndStageBunfileObj (rsComm, &bunfileObjInfoHead, &dataObjInp.condInput,
01611 outCacheResc, 0);
01612
01613 freeAllDataObjInfo (bunfileObjInfoHead);
01614
01615 return status;
01616 }
01617
01618 int
01619 _unbunAndStageBunfileObj (rsComm_t *rsComm, dataObjInfo_t **bunfileObjInfoHead, keyValPair_t *condInput,
01620 rescInfo_t **outCacheResc, int rmBunCopyFlag)
01621 {
01622 int status;
01623
01624 dataObjInp_t dataObjInp;
01625
01626 bzero (&dataObjInp, sizeof (dataObjInp));
01627 bzero (&dataObjInp.condInput, sizeof (dataObjInp.condInput));
01628 rstrcpy (dataObjInp.objPath, (*bunfileObjInfoHead)->objPath, MAX_NAME_LEN);
01629 status = sortObjInfoForOpen (rsComm, bunfileObjInfoHead, condInput, 0);
01630
01631 addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, (*bunfileObjInfoHead)->rescHier );
01632 if (status < 0) return status;
01633
01634 #if 0 // JMC - legacy resource
01635 if (getRescClass ((*bunfileObjInfoHead)->rescInfo) != CACHE_CL) {
01636
01637 status = getCacheRescInGrp (rsComm,
01638 (*bunfileObjInfoHead)->rescGroupName,
01639 (*bunfileObjInfoHead)->rescInfo, &cacheResc);
01640 if (status < 0) {
01641 rodsLog (LOG_ERROR,
01642 "unbunAndStageBunfileObj:getCacheRescInGrp %s err for %s stat=%d",
01643 (*bunfileObjInfoHead)->rescGroupName,
01644 (*bunfileObjInfoHead)->objPath, status);
01645 return status;
01646 }
01647 if (outCacheResc != NULL)
01648 *outCacheResc = cacheResc;
01649
01650
01651 status = rsReplAndRequeDataObjInfo (rsComm, bunfileObjInfoHead,
01652 cacheResc->rescName, SU_CLIENT_USER_KW);
01653 if (status < 0) {
01654 rodsLog (LOG_ERROR,
01655 "unbunAndStageBunfileObj:rsReplAndRequeDataObjInfo %s err stat=%d",
01656 (*bunfileObjInfoHead)->objPath, status);
01657 return status;
01658 }
01659 } else
01660 #endif // JMC - legacy resource
01661
01662 if (outCacheResc != NULL)
01663 *outCacheResc = (*bunfileObjInfoHead)->rescInfo;
01664
01665 addKeyVal (&dataObjInp.condInput, BUN_FILE_PATH_KW,
01666 (*bunfileObjInfoHead)->filePath);
01667 if (rmBunCopyFlag > 0) {
01668 addKeyVal (&dataObjInp.condInput, RM_BUN_COPY_KW, "");
01669 }
01670 if (strlen ((*bunfileObjInfoHead)->dataType) > 0) {
01671 addKeyVal (&dataObjInp.condInput, DATA_TYPE_KW,
01672 (*bunfileObjInfoHead)->dataType);
01673 }
01674 status = _rsUnbunAndRegPhyBunfile (rsComm, &dataObjInp,
01675 (*bunfileObjInfoHead)->rescInfo);
01676
01677 return status;
01678 }
01679
01680 #if 0 // JMC - legacy resource
01681
01682
01683
01684
01685
01686
01687
01688
01689
01690
01691
01692
01693
01694
01695
01696 int
01697 getCacheDataInfoOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01698 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01699 dataObjInfo_t *compDataObjInfo, dataObjInfo_t *oldDataObjInfo,
01700 dataObjInfo_t **outDataObjInfo)
01701 {
01702 int status;
01703
01704 if ((status = getCacheDataInfoForRepl (rsComm, srcDataObjInfoHead,
01705 destDataObjInfoHead, compDataObjInfo, outDataObjInfo)) < 0) {
01706
01707 status = replToCacheRescOfCompObj (rsComm, dataObjInp,
01708 srcDataObjInfoHead, compDataObjInfo, oldDataObjInfo,
01709 outDataObjInfo);
01710 if (status < 0) {
01711 rodsLog (LOG_ERROR,
01712 "_rsDataObjRepl: replToCacheRescOfCompObj of %s error stat=%d",
01713 srcDataObjInfoHead->objPath, status);
01714 return status;
01715 }
01716
01717
01718 queDataObjInfo (&srcDataObjInfoHead, *outDataObjInfo, 1, 0);
01719 }
01720 return status;
01721 }
01722
01723 int
01724 getCacheDataInfoOfCompResc (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01725 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01726 rescGrpInfo_t *compRescGrpInfo, dataObjInfo_t *oldDataObjInfo,
01727 dataObjInfo_t **outDataObjInfo)
01728 {
01729 int status;
01730 dataObjInfo_t compDataObjInfo;
01731
01732
01733 bzero (&compDataObjInfo, sizeof (compDataObjInfo));
01734 rstrcpy (compDataObjInfo.rescGroupName, compRescGrpInfo->rescGroupName,
01735 NAME_LEN);
01736 compDataObjInfo.rescInfo=compRescGrpInfo->rescInfo;
01737 status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
01738 srcDataObjInfoHead, destDataObjInfoHead, &compDataObjInfo,
01739 oldDataObjInfo, outDataObjInfo);
01740
01741 return status;
01742 }
01743
01744 #endif // JMC - legacy resource