00001
00002
00003
00004
00005
00006
00007
00008 #include "dataObjRepl.h"
00009 #include "dataObjOpr.h"
00010 #include "dataObjCreate.h"
00011 #include "dataObjOpen.h"
00012 #include "dataObjPut.h"
00013 #include "dataObjGet.h"
00014 #include "rodsLog.h"
00015 #include "objMetaOpr.h"
00016 #include "physPath.h"
00017 #include "specColl.h"
00018 #include "resource.h"
00019 #include "reGlobalsExtern.h"
00020 #include "reDefines.h"
00021 #include "reSysDataObjOpr.h"
00022 #include "getRemoteZoneResc.h"
00023 #include "l3FileGetSingleBuf.h"
00024 #include "l3FilePutSingleBuf.h"
00025 #include "fileSyncToArch.h"
00026 #include "fileStageToCache.h"
00027 #include "unbunAndRegPhyBunfile.h"
00028 #include "dataObjTrim.h"
00029 #include "dataObjLock.h"
00030
00031
00032
00033 #include "eirods_resource_backport.h"
00034 #include "eirods_resource_redirect.h"
00035 #include "eirods_log.h"
00036 #include "eirods_stacktrace.h"
00037
00038 int
00039 rsDataObjRepl250 (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00040 transStat_t **transStat)
00041 {
00042 int status;
00043 transferStat_t *transferStat = NULL;
00044
00045 status = rsDataObjRepl (rsComm, dataObjInp, &transferStat);
00046
00047 if (transStat != NULL && status >= 0 && transferStat != NULL) {
00048 *transStat = (transStat_t *) malloc (sizeof (transStat_t));
00049 (*transStat)->numThreads = transferStat->numThreads;
00050 (*transStat)->bytesWritten = transferStat->bytesWritten;
00051 free (transferStat);
00052 }
00053 return status;
00054 }
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 int
00065 rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00066 transferStat_t **transStat)
00067 {
00068
00069 int status;
00070 int remoteFlag;
00071 rodsServerHost_t *rodsServerHost;
00072 dataObjInfo_t *dataObjInfo = NULL;
00073 char* lockType = NULL;
00074 int lockFd = -1;
00075
00076 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00077
00078 if (rsComm->proxyUser.authInfo.authFlag < REMOTE_PRIV_USER_AUTH) {
00079 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00080 }
00081 }
00082
00083 status = resolvePathInSpecColl (rsComm, dataObjInp->objPath,
00084 READ_COLL_PERM, 0, &dataObjInfo);
00085
00086 if (status == DATA_OBJ_T) {
00087 if (dataObjInfo != NULL && dataObjInfo->specColl != NULL) {
00088 if (dataObjInfo->specColl->collClass == LINKED_COLL) {
00089 rstrcpy (dataObjInp->objPath, dataObjInfo->objPath,
00090 MAX_NAME_LEN);
00091 freeAllDataObjInfo (dataObjInfo);
00092 } else {
00093 freeAllDataObjInfo (dataObjInfo);
00094 return (SYS_REG_OBJ_IN_SPEC_COLL);
00095 }
00096 }
00097 }
00098
00099 remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost,
00100 REMOTE_OPEN);
00101
00102 if (remoteFlag < 0) {
00103 return (remoteFlag);
00104 } else if (remoteFlag == REMOTE_HOST) {
00105 status = _rcDataObjRepl (rodsServerHost->conn, dataObjInp,
00106 transStat);
00107 return status;
00108 }
00109
00110
00111
00112
00113
00114
00115 char* dest_resc_ptr = getValByKey( &dataObjInp->condInput, DEST_RESC_NAME_KW );
00116 std::string tmp_dest_resc;
00117 if( dest_resc_ptr ) {
00118 tmp_dest_resc = dest_resc_ptr;
00119 rmKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW );
00120 }
00121
00122
00123
00124 std::string hier;
00125 char* tmp_hier = getValByKey( &dataObjInp->condInput, RESC_HIER_STR_KW );
00126
00127 if( 0 == tmp_hier ) {
00128
00129 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00130 eirods::error ret = eirods::resolve_resource_hierarchy( eirods::EIRODS_OPEN_OPERATION,
00131 rsComm, dataObjInp, hier );
00132 if( !ret.ok() ) {
00133 std::stringstream msg;
00134 msg << "failed in eirods::resolve_resource_hierarchy for [";
00135 msg << dataObjInp->objPath << "]";
00136 eirods::log( PASSMSG( msg.str(), ret ) );
00137 return ret.code();
00138 }
00139
00140 addKeyVal( &dataObjInp->condInput, RESC_HIER_STR_KW, hier.c_str() );
00141
00142 } else {
00143 hier = tmp_hier;
00144 }
00145
00146
00147
00148 if( !tmp_dest_resc.empty() ) {
00149 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, tmp_dest_resc.c_str() );
00150 }
00151
00152
00153
00154 *transStat = (transferStat_t*)malloc (sizeof (transferStat_t));
00155 memset (*transStat, 0, sizeof (transferStat_t));
00156
00157
00158 lockType = getValByKey (&dataObjInp->condInput, LOCK_TYPE_KW);
00159 if (lockType != NULL) {
00160 lockFd = rsDataObjLock (rsComm, dataObjInp);
00161 if (lockFd >= 0) {
00162
00163 rmKeyVal (&dataObjInp->condInput, LOCK_TYPE_KW);
00164 } else {
00165 rodsLogError (LOG_ERROR, lockFd,
00166 "rsDataObjRepl: rsDataObjLock error for %s. lockType = %s",
00167 dataObjInp->objPath, lockType);
00168 return lockFd;
00169 }
00170 }
00171
00172
00173 status = _rsDataObjRepl (rsComm, dataObjInp, *transStat, NULL);
00174 if( status < 0 && status != EIRODS_DIRECT_ARCHIVE_ACCESS ) {
00175 rodsLog(LOG_NOTICE, "%s - Failed to replicate data object.", __FUNCTION__);
00176 }
00177
00178 if (lockFd > 0) rsDataObjUnlock (rsComm, dataObjInp, lockFd);
00179
00180
00181
00182
00183 if( status == EIRODS_DIRECT_ARCHIVE_ACCESS ) {
00184 return 0;
00185 } else {
00186 return (status);
00187 }
00188 }
00189
00190 int
00191 _rsDataObjRepl (
00192 rsComm_t *rsComm,
00193 dataObjInp_t *dataObjInp,
00194 transferStat_t *transStat,
00195 dataObjInfo_t *outDataObjInfo)
00196 {
00197 int status;
00198 dataObjInfo_t *dataObjInfoHead = NULL;
00199 dataObjInfo_t *oldDataObjInfoHead = NULL;
00200 dataObjInfo_t *destDataObjInfo = NULL;
00201 rescGrpInfo_t *myRescGrpInfo = NULL;
00202 ruleExecInfo_t rei;
00203 int multiCopyFlag;
00204 char *accessPerm;
00205 int backupFlag;
00206 int allFlag;
00207 int savedStatus = 0;
00208 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00209 accessPerm = NULL;
00210 } else if (getValByKey (&dataObjInp->condInput, IRODS_ADMIN_KW) != NULL) {
00211 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
00212 return (CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00213 }
00214 accessPerm = NULL;
00215 } else {
00216 accessPerm = ACCESS_READ_OBJECT;
00217 }
00218
00219
00220 initReiWithDataObjInp (&rei, rsComm, dataObjInp);
00221 status = applyRule ("acSetMultiReplPerResc", NULL, &rei, NO_SAVE_REI);
00222 if (strcmp (rei.statusStr, MULTI_COPIES_PER_RESC) == 0) {
00223 multiCopyFlag = 1;
00224 } else {
00225 multiCopyFlag = 0;
00226 }
00227
00228
00229 if (multiCopyFlag) {
00230 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00231 accessPerm, 0);
00232 } else {
00233
00234
00235 status = getDataObjInfo( rsComm, dataObjInp, &dataObjInfoHead, accessPerm, 1 );
00236 }
00237
00238 if (status < 0) {
00239 rodsLog (LOG_NOTICE,
00240 "%s: getDataObjInfo for [%s] failed", __FUNCTION__, dataObjInp->objPath );
00241 return (status);
00242 }
00243
00244 char* resc_hier = getValByKey(&dataObjInp->condInput, RESC_HIER_STR_KW);
00245 char* dest_hier = getValByKey(&dataObjInp->condInput, DEST_RESC_HIER_STR_KW);
00246
00247 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, 0, resc_hier, dest_hier );
00248 if (status < 0) {
00249 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00250 return status;
00251 }
00252
00253
00254
00255
00256 if( ( !multiCopyFlag && oldDataObjInfoHead ) || getValByKey (&dataObjInp->condInput, UPDATE_REPL_KW) != NULL) {
00257 if (status < 0) {
00258 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication update.", __FUNCTION__);
00259 return status;
00260 }
00261
00262
00263 status = _rsDataObjReplUpdate (rsComm, dataObjInp, dataObjInfoHead, oldDataObjInfoHead, transStat, NULL);
00264
00265 if (status >= 0 && outDataObjInfo != NULL) {
00266 *outDataObjInfo = *oldDataObjInfoHead;
00267 outDataObjInfo->next = NULL;
00268 } else {
00269 if( status < 0 && status != EIRODS_DIRECT_ARCHIVE_ACCESS ) {
00270 rodsLog(LOG_NOTICE, "%s - Failed to update replica.", __FUNCTION__);
00271 }
00272 }
00273
00274 freeAllDataObjInfo (dataObjInfoHead);
00275 freeAllDataObjInfo (oldDataObjInfoHead);
00276
00277 return status;
00278
00279 }
00280
00281
00282 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, multiCopyFlag, resc_hier, dest_hier );
00283 if (status < 0) {
00284 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00285 return status;
00286 }
00287
00288 if (getValByKey (&dataObjInp->condInput, BACKUP_RESC_NAME_KW) != NULL) {
00289
00290 backupFlag = 1;
00291 multiCopyFlag = 0;
00292 } else {
00293 backupFlag = 0;
00294 }
00295
00296 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00297 allFlag = 1;
00298 } else {
00299 allFlag = 0;
00300 }
00301
00302 if( backupFlag == 0 && allFlag == 1 &&
00303 getValByKey (&dataObjInp->condInput, DEST_RESC_NAME_KW) == NULL &&
00304 dataObjInfoHead != NULL && dataObjInfoHead->rescGroupName[0] != '\0') {
00305
00306 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, dataObjInfoHead->rescGroupName );
00307 }
00308
00309
00310 dataObjInp->oprType = REPLICATE_OPR;
00311 status = getRescGrpForCreate( rsComm, dataObjInp, &myRescGrpInfo );
00312 if (status < 0) {
00313 rodsLog(LOG_NOTICE, "%s - Failed to get a resource group for create.", __FUNCTION__);
00314 return status;
00315 }
00316
00317 if (multiCopyFlag == 0 ) {
00318
00319
00320
00321
00322
00323
00324
00325 status = resolveSingleReplCopy( &dataObjInfoHead, &oldDataObjInfoHead,
00326 &myRescGrpInfo, &destDataObjInfo,
00327 &dataObjInp->condInput );
00328
00329 if (status == HAVE_GOOD_COPY) {
00330
00331
00332
00333 if (outDataObjInfo != NULL && destDataObjInfo != NULL) {
00334
00335 *outDataObjInfo = *destDataObjInfo;
00336 outDataObjInfo->next = NULL;
00337 }
00338
00339
00340
00341 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00342 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00343 ( myRescGrpInfo->status < 0 ) ) {
00344 status = myRescGrpInfo->status;
00345
00346 } else {
00347 status = 0;
00348 }
00349
00350
00351
00352 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00353 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00354 ( myRescGrpInfo->status < 0 ) ) {
00355 status = myRescGrpInfo->status;
00356
00357 } else {
00358 status = 0;
00359 }
00360
00361 freeAllDataObjInfo (dataObjInfoHead);
00362 freeAllDataObjInfo (oldDataObjInfoHead);
00363 freeAllDataObjInfo (destDataObjInfo);
00364 freeAllRescGrpInfo (myRescGrpInfo);
00365
00366 return status;
00367 } else if (status < 0) {
00368 freeAllDataObjInfo (dataObjInfoHead);
00369 freeAllDataObjInfo (oldDataObjInfoHead);
00370 freeAllDataObjInfo (destDataObjInfo);
00371 freeAllRescGrpInfo (myRescGrpInfo);
00372 rodsLog(LOG_NOTICE, "%s - Failed to resolve a single replication copy.", __FUNCTION__);
00373 return status;
00374 }
00375
00376
00377 }
00378
00379
00380 status = applyPreprocRuleForOpen (rsComm, dataObjInp, &dataObjInfoHead);
00381 if (status < 0) return status;
00382
00383
00384
00385 if (destDataObjInfo != NULL) {
00386
00387 status = _rsDataObjReplUpdate( rsComm, dataObjInp, dataObjInfoHead,
00388 destDataObjInfo, transStat, oldDataObjInfoHead);
00389 if (status >= 0) {
00390 if (outDataObjInfo != NULL) {
00391 *outDataObjInfo = *destDataObjInfo;
00392 outDataObjInfo->next = NULL;
00393 }
00394 if (allFlag == 0) {
00395 freeAllDataObjInfo (dataObjInfoHead);
00396 freeAllDataObjInfo (oldDataObjInfoHead);
00397 freeAllDataObjInfo (destDataObjInfo);
00398 freeAllRescGrpInfo (myRescGrpInfo);
00399 return 0;
00400 } else {
00401
00402
00403 queDataObjInfo (&dataObjInfoHead, destDataObjInfo, 0, 1);
00404 destDataObjInfo = NULL;
00405 }
00406 } else {
00407 savedStatus = status;
00408 }
00409 }
00410
00411 if (myRescGrpInfo != NULL) {
00412
00413 status = _rsDataObjReplNewCopy( rsComm, dataObjInp, dataObjInfoHead,
00414 myRescGrpInfo, transStat, oldDataObjInfoHead,
00415 outDataObjInfo);
00416 if (status < 0) savedStatus = status;
00417 }
00418
00419 freeAllDataObjInfo (dataObjInfoHead);
00420 freeAllDataObjInfo (oldDataObjInfoHead);
00421 freeAllRescGrpInfo (myRescGrpInfo);
00422
00423 return (savedStatus);
00424 }
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437 int _rsDataObjReplUpdate(
00438 rsComm_t* rsComm,
00439 dataObjInp_t* dataObjInp,
00440 dataObjInfo_t* srcDataObjInfoHead,
00441 dataObjInfo_t* destDataObjInfoHead,
00442 transferStat_t* transStat,
00443 dataObjInfo_t* oldDataObjInfo ) {
00444
00445
00446 dataObjInfo_t *destDataObjInfo = 0;
00447 dataObjInfo_t *srcDataObjInfo = 0;
00448 int status = 0;
00449 int allFlag = 0;
00450 int savedStatus = 0;
00451 int replCnt = 0;
00452
00453
00454
00455 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00456 allFlag = 1;
00457 } else {
00458 allFlag = 0;
00459 }
00460
00461
00462
00463 std::string dst_resc_hier;
00464 char* dst_resc_hier_ptr = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00465 if( dst_resc_hier_ptr ) {
00466 dst_resc_hier = dst_resc_hier_ptr;
00467
00468 }
00469
00470
00471
00472 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00473 destDataObjInfo = destDataObjInfoHead;
00474 while (destDataObjInfo != NULL) {
00475
00476
00477 if (destDataObjInfo->dataId == 0) {
00478 destDataObjInfo = destDataObjInfo->next;
00479 continue;
00480 }
00481
00482
00483
00484 srcDataObjInfo = srcDataObjInfoHead;
00485 while (srcDataObjInfo != NULL) {
00486
00487
00488
00489 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, destDataObjInfo->rescHier );
00490
00491 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo, NULL, "", destDataObjInfo, 1 );
00492
00493 if (status >= 0) {
00494 break;
00495 }
00496 srcDataObjInfo = srcDataObjInfo->next;
00497 }
00498
00499 if (status >= 0) {
00500 transStat->numThreads = dataObjInp->numThreads;
00501 if (allFlag == 0) {
00502 return 0;
00503 }
00504 } else {
00505 savedStatus = status;
00506 replCnt ++;
00507 }
00508 destDataObjInfo = destDataObjInfo->next;
00509 }
00510
00511
00512
00513 if( !dst_resc_hier.empty() ) {
00514 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, dst_resc_hier.c_str() );
00515
00516 }
00517
00518 return savedStatus;
00519
00520 }
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534 int
00535 _rsDataObjReplNewCopy (
00536 rsComm_t *rsComm,
00537 dataObjInp_t *dataObjInp,
00538 dataObjInfo_t *srcDataObjInfoHead,
00539 rescGrpInfo_t *destRescGrpInfo,
00540 transferStat_t *transStat,
00541 dataObjInfo_t *oldDataObjInfo,
00542 dataObjInfo_t *outDataObjInfo)
00543 {
00544 dataObjInfo_t *srcDataObjInfo;
00545 rescGrpInfo_t *tmpRescGrpInfo;
00546 rescInfo_t *tmpRescInfo;
00547 int status;
00548 int allFlag;
00549 int savedStatus = 0;
00550
00551 #if 0
00552 rescInfo_t *compRescInfo = NULL;
00553 rescInfo_t *cacheRescInfo = NULL;
00554 #endif
00555
00556 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00557 allFlag = 1;
00558 } else {
00559 allFlag = 0;
00560 }
00561 #if 0 // JMC - legacy resource
00562
00563
00564
00565
00566 if( allFlag == 1 && destRescGrpInfo != NULL &&
00567 strlen(destRescGrpInfo->rescGroupName) > 0 &&
00568 getRescGrpClass (destRescGrpInfo, &compRescInfo) == COMPOUND_CL) {
00569 getCacheRescInGrp (rsComm, destRescGrpInfo->rescGroupName,compRescInfo, &cacheRescInfo);
00570 }
00571 #endif // JMC - legacy resource
00572
00573
00574 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00575 tmpRescGrpInfo = destRescGrpInfo;
00576 while (tmpRescGrpInfo != NULL) {
00577 tmpRescInfo = tmpRescGrpInfo->rescInfo;
00578 #if 0 // JMC - legacy resource
00579
00580
00581 if (tmpRescInfo == cacheRescInfo) {
00582
00583
00584 tmpRescGrpInfo = tmpRescGrpInfo->next;
00585 continue;
00586 }
00587
00588 if (getRescClass (tmpRescInfo) == COMPOUND_CL) {
00589
00590 if ((status = getCacheDataInfoOfCompResc (rsComm, dataObjInp,
00591 srcDataObjInfoHead, NULL, tmpRescGrpInfo,
00592 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00593 return status;
00594 }
00595
00596 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00597 tmpRescInfo, tmpRescGrpInfo->rescGroupName, outDataObjInfo, 0);
00598 } else {
00599 #endif // JMC - legacy resource
00600
00601
00602 {
00603 srcDataObjInfo = srcDataObjInfoHead;
00604 while (srcDataObjInfo != NULL) {
00605 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo,
00606 tmpRescInfo, tmpRescGrpInfo->rescGroupName,
00607 outDataObjInfo, 0);
00608 if (status >= 0) {
00609 break;
00610 } else {
00611 savedStatus = status;
00612 }
00613 srcDataObjInfo = srcDataObjInfo->next;
00614 }
00615 }
00616 if (status >= 0) {
00617 transStat->numThreads = dataObjInp->numThreads;
00618 if (allFlag == 0) {
00619 return 0;
00620 }
00621 } else {
00622 savedStatus = status;
00623 }
00624 tmpRescGrpInfo = tmpRescGrpInfo->next;
00625 }
00626
00627 if (savedStatus == 0 && destRescGrpInfo->status < 0) {
00628
00629 return destRescGrpInfo->status;
00630 } else {
00631 return (savedStatus);
00632 }
00633 }
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648 int
00649 _rsDataObjReplS (
00650 rsComm_t *rsComm,
00651 dataObjInp_t *dataObjInp,
00652 dataObjInfo_t *srcDataObjInfo,
00653 rescInfo_t *destRescInfo,
00654 char *rescGroupName,
00655 dataObjInfo_t *destDataObjInfo,
00656 int updateFlag)
00657 {
00658 int status, status1;
00659 int l1descInx;
00660 openedDataObjInp_t dataObjCloseInp;
00661 dataObjInfo_t *myDestDataObjInfo;
00662
00663 l1descInx = dataObjOpenForRepl( rsComm, dataObjInp, srcDataObjInfo, destRescInfo,
00664 rescGroupName, destDataObjInfo, updateFlag );
00665
00666 if (l1descInx < 0) {
00667 return (l1descInx);
00668 }
00669
00670 if (L1desc[l1descInx].stageFlag != NO_STAGING) {
00671 status = l3DataStageSync (rsComm, l1descInx);
00672 } else if( L1desc[l1descInx].dataObjInp->numThreads == 0 &&
00673 L1desc[l1descInx].dataObjInfo->dataSize <= MAX_SZ_FOR_SINGLE_BUF ) {
00674 status = l3DataCopySingleBuf (rsComm, l1descInx);
00675 } else {
00676 status = dataObjCopy( rsComm, l1descInx );
00677 }
00678
00679 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00680
00681 dataObjCloseInp.l1descInx = l1descInx;
00682
00683 L1desc[l1descInx].oprStatus = status;
00684 if (status >= 0) {
00685 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataObjInfo->dataSize;
00686 }
00687
00688
00689 char* pdmo_kw = getValByKey(&dataObjInp->condInput, IN_PDMO_KW);
00690 if(pdmo_kw != NULL) {
00691 addKeyVal(&dataObjCloseInp.condInput, IN_PDMO_KW, pdmo_kw);
00692 }
00693
00694 status1 = irsDataObjClose (rsComm, &dataObjCloseInp, &myDestDataObjInfo);
00695
00696 if (destDataObjInfo != NULL) {
00697 if (destDataObjInfo->dataId <= 0 && myDestDataObjInfo != NULL) {
00698 destDataObjInfo->dataId = myDestDataObjInfo->dataId;
00699 destDataObjInfo->replNum = myDestDataObjInfo->replNum;
00700 } else {
00701
00702 destDataObjInfo->dataSize = myDestDataObjInfo->dataSize;
00703 }
00704 }
00705
00706 freeDataObjInfo (myDestDataObjInfo);
00707
00708 if (status < 0) {
00709 return status;
00710 } else if (status1 < 0) {
00711 return status1;
00712 } else {
00713 return (status);
00714 }
00715 }
00716
00717
00718
00719
00720 int
00721 dataObjOpenForRepl (
00722 rsComm_t *rsComm,
00723 dataObjInp_t *dataObjInp,
00724 dataObjInfo_t *inpSrcDataObjInfo,
00725 rescInfo_t *destRescInfo,
00726 char *rescGroupName,
00727 dataObjInfo_t *inpDestDataObjInfo,
00728 int updateFlag)
00729 {
00730 dataObjInfo_t *myDestDataObjInfo = 0, *srcDataObjInfo = NULL;
00731 rescInfo_t *myDestRescInfo = 0;
00732 int destL1descInx = 0;
00733 int srcL1descInx = 0;
00734 int status = 0;
00735 int replStatus = 0;
00736
00737
00738
00739 dataObjInfo_t *cacheDataObjInfo = NULL;
00740 dataObjInp_t dest_inp, myDataObjInp, *l1DataObjInp = 0;
00741 if (destRescInfo == NULL) {
00742 myDestRescInfo = inpDestDataObjInfo->rescInfo;
00743 } else {
00744 myDestRescInfo = destRescInfo;
00745 }
00746
00747 if (inpSrcDataObjInfo->rescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00748 return SYS_RESC_IS_DOWN;
00749 }
00750
00751 if (myDestRescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00752 return SYS_RESC_IS_DOWN;
00753 }
00754
00755 #if 0 // JMC - legacy resource
00756 destRescClass = getRescClass (myDestRescInfo);
00757 #endif // JMC - legacy resource
00758
00759
00760
00761 #if 0 // JMC - legacy resource
00762 if (srcRescClass == COMPOUND_CL) {
00763 rescGrpInfo_t *myRescGrpInfo;
00764 if (destRescClass == CACHE_CL &&
00765 isRescsInSameGrp (rsComm, myDestRescInfo->rescName, inpSrcDataObjInfo->rescInfo->rescName,
00766 &myRescGrpInfo)) {
00767
00768 if (strlen (inpSrcDataObjInfo->rescGroupName) == 0) {
00769 rstrcpy (inpSrcDataObjInfo->rescGroupName,
00770 myRescGrpInfo->rescGroupName, NAME_LEN);
00771 }
00772 } else if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00773 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00774 cacheDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00775 status = stageDataFromCompToCache (rsComm, inpSrcDataObjInfo,
00776 cacheDataObjInfo);
00777 if (status < 0) { free( cacheDataObjInfo ); return status; }
00778
00779 srcRescClass = getRescClass (cacheDataObjInfo->rescInfo);
00780 }
00781 }
00782
00783 #endif // JMC - legacy resource
00784 if (cacheDataObjInfo == NULL) {
00785 srcDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00786 *srcDataObjInfo = *inpSrcDataObjInfo;
00787 srcDataObjInfo->rescInfo = new rescInfo_t;
00788 memcpy( srcDataObjInfo->rescInfo, inpSrcDataObjInfo->rescInfo, sizeof( rescInfo_t ) );
00789
00790 } else {
00791 srcDataObjInfo = cacheDataObjInfo;
00792 }
00793
00794 if( NULL == srcDataObjInfo ) {
00795 rodsLog( LOG_ERROR, "dataObjOpenForRepl - srcDataObjInfo is NULL" );
00796 return -1;
00797 }
00798
00799 myDataObjInp = *dataObjInp;
00800 myDataObjInp.dataSize = inpSrcDataObjInfo->dataSize;
00801
00802 destL1descInx = allocL1desc ();
00803
00804 if (destL1descInx < 0) return destL1descInx;
00805
00806
00807
00808
00809 std::string op_name;
00810 memset (&dest_inp, 0, sizeof (dest_inp));
00811 memset (&dest_inp.condInput, 0, sizeof (dest_inp.condInput));
00812 strncpy( dest_inp.objPath, dataObjInp->objPath, MAX_NAME_LEN );
00813 addKeyVal( &(dest_inp.condInput), RESC_NAME_KW, myDestRescInfo->rescName );
00814
00815 myDestDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00816 if (updateFlag > 0) {
00817
00818
00819 op_name = eirods::EIRODS_OPEN_OPERATION;
00820
00821
00822 if(inpDestDataObjInfo == NULL || inpDestDataObjInfo->dataId <= 0) {
00823 rodsLog( LOG_ERROR, "dataObjOpenForRepl: dataId of %s copy to be updated not defined",
00824 srcDataObjInfo->objPath);
00825 return (SYS_UPDATE_REPL_INFO_ERR);
00826 }
00827
00828 inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus;
00829 *myDestDataObjInfo = *inpDestDataObjInfo;
00830 myDestDataObjInfo->rescInfo = new rescInfo_t;
00831 memcpy( myDestDataObjInfo->rescInfo, inpDestDataObjInfo->rescInfo, sizeof( rescInfo_t ) );
00832 replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY;
00833 addKeyVal (&myDataObjInp.condInput, FORCE_FLAG_KW, "");
00834 myDataObjInp.openFlags |= (O_TRUNC | O_WRONLY);
00835 } else {
00836
00837
00838 op_name = eirods::EIRODS_CREATE_OPERATION;
00839
00840 initDataObjInfoForRepl( rsComm, myDestDataObjInfo, srcDataObjInfo,
00841 destRescInfo, rescGroupName);
00842 replStatus = srcDataObjInfo->replStatus;
00843 }
00844
00845
00846
00847 std::string hier;
00848 char* dst_hier_str = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00849 if( 0 == dst_hier_str ) {
00850
00851 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00852
00853 eirods::error ret = eirods::resolve_resource_hierarchy( op_name, rsComm, &dest_inp, hier );
00854 if( !ret.ok() ) {
00855 std::stringstream msg;
00856 msg << "failed in eirods::resolve_resource_hierarchy for [";
00857 msg << dest_inp.objPath << "]";
00858 eirods::log( PASSMSG( msg.str(), ret ) );
00859 return ret.code();
00860 }
00861
00862 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, hier.c_str() );
00863
00864 } else {
00865 hier = dst_hier_str;
00866 }
00867
00868
00869
00870
00871 rstrcpy(myDestDataObjInfo->rescHier, hier.c_str(), MAX_NAME_LEN);
00872
00873
00874
00875
00876 fillL1desc (destL1descInx, &myDataObjInp, myDestDataObjInfo, replStatus, srcDataObjInfo->dataSize);
00877
00878 l1DataObjInp = L1desc[destL1descInx].dataObjInp;
00879 if (l1DataObjInp->oprType == PHYMV_OPR) {
00880 L1desc[destL1descInx].oprType = PHYMV_DEST;
00881 myDestDataObjInfo->replNum = srcDataObjInfo->replNum;
00882 myDestDataObjInfo->dataId = srcDataObjInfo->dataId;
00883 } else {
00884 L1desc[destL1descInx].oprType = REPLICATE_DEST;
00885 }
00886
00887
00888
00889
00890 char* stage_kw = getValByKey( &dataObjInp->condInput, STAGE_OBJ_KW );
00891 char* sync_kw = getValByKey( &dataObjInp->condInput, SYNC_OBJ_KW );
00892 if( stage_kw ) {
00893 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00894 } else if( sync_kw ) {
00895 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00896 } else {
00897
00898
00899 }
00900
00901 char* src_hier_str = 0;
00902 if (srcDataObjInfo != NULL && srcDataObjInfo->rescHier != NULL) {
00903 src_hier_str = srcDataObjInfo->rescHier;
00904 }
00905
00906 l1DataObjInp->numThreads = dataObjInp->numThreads =
00907 getNumThreads( rsComm, l1DataObjInp->dataSize, l1DataObjInp->numThreads,
00908
00909 &dataObjInp->condInput, dst_hier_str, src_hier_str );
00910
00911 if( l1DataObjInp->numThreads > 0 &&
00912 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00913 if (updateFlag > 0) {
00914 status = dataOpen (rsComm, destL1descInx);
00915 } else {
00916 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00917 if (status >= 0)
00918 status = dataCreate (rsComm, destL1descInx);
00919 }
00920
00921 if (status < 0) {
00922 freeL1desc (destL1descInx);
00923 return (status);
00924 }
00925 } else {
00926 if (updateFlag == 0) {
00927 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00928 if (status < 0) {
00929 freeL1desc (destL1descInx);
00930 return (status);
00931 }
00932 }
00933 }
00934
00935 if (inpDestDataObjInfo != NULL && updateFlag == 0) {
00936
00937 *inpDestDataObjInfo = *myDestDataObjInfo;
00938 inpDestDataObjInfo->next = NULL;
00939 }
00940
00941
00942 rstrcpy(srcDataObjInfo->rescHier, inpSrcDataObjInfo->rescHier, MAX_NAME_LEN);
00943
00944 srcL1descInx = allocL1desc ();
00945 if (srcL1descInx < 0) return srcL1descInx;
00946 fillL1desc (srcL1descInx, &myDataObjInp, srcDataObjInfo, srcDataObjInfo->replStatus, srcDataObjInfo->dataSize);
00947 l1DataObjInp = L1desc[srcL1descInx].dataObjInp;
00948 l1DataObjInp->numThreads = dataObjInp->numThreads;
00949 if (l1DataObjInp->oprType == PHYMV_OPR) {
00950 L1desc[srcL1descInx].oprType = PHYMV_SRC;
00951 } else {
00952 L1desc[srcL1descInx].oprType = REPLICATE_SRC;
00953 }
00954
00955 if( getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL ) {
00956 L1desc[srcL1descInx].purgeCacheFlag = 1;
00957 }
00958
00959 if( l1DataObjInp->numThreads > 0 &&
00960 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00961 openedDataObjInp_t dataObjCloseInp;
00962
00963 l1DataObjInp->openFlags = O_RDONLY;
00964 status = dataOpen (rsComm, srcL1descInx);
00965 if (status < 0) {
00966 freeL1desc (srcL1descInx);
00967 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00968 dataObjCloseInp.l1descInx = destL1descInx;
00969 rsDataObjClose (rsComm, &dataObjCloseInp);
00970 return (status);
00971 }
00972 }
00973
00974 L1desc[destL1descInx].srcL1descInx = srcL1descInx;
00975
00976 return (destL1descInx);
00977 }
00978
00979 int
00980 dataObjCopy (rsComm_t *rsComm, int l1descInx)
00981 {
00982 int srcL1descInx, destL1descInx;
00983 int srcL3descInx, destL3descInx;
00984 int status;
00985 portalOprOut_t *portalOprOut = NULL;
00986 dataCopyInp_t dataCopyInp;
00987 dataOprInp_t *dataOprInp;
00988 int srcRemoteFlag, destRemoteFlag;
00989
00990 bzero (&dataCopyInp, sizeof (dataCopyInp));
00991 dataOprInp = &dataCopyInp.dataOprInp;
00992 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00993 destL1descInx = l1descInx;
00994
00995 srcL3descInx = L1desc[srcL1descInx].l3descInx;
00996 destL3descInx = L1desc[destL1descInx].l3descInx;
00997
00998 if (L1desc[srcL1descInx].remoteZoneHost != NULL) {
00999 srcRemoteFlag = REMOTE_ZONE_HOST;
01000 } else {
01001 srcRemoteFlag = FileDesc[srcL3descInx].rodsServerHost->localFlag;
01002 }
01003
01004 if (L1desc[destL1descInx].remoteZoneHost != NULL) {
01005 destRemoteFlag = REMOTE_ZONE_HOST;
01006 } else {
01007 destRemoteFlag = FileDesc[destL3descInx].rodsServerHost->localFlag;
01008 }
01009
01010 if (srcRemoteFlag != REMOTE_ZONE_HOST &&
01011 destRemoteFlag != REMOTE_ZONE_HOST &&
01012 FileDesc[srcL3descInx].rodsServerHost ==
01013 FileDesc[destL3descInx].rodsServerHost) {
01014
01015 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, SAME_HOST_COPY_OPR);
01016
01017 dataCopyInp.portalOprOut.numThreads =
01018 dataCopyInp.dataOprInp.numThreads;
01019 if (srcRemoteFlag == LOCAL_HOST) {
01020 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01021 }
01022
01023 } else if ((srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) ||
01024 destRemoteFlag == REMOTE_ZONE_HOST) {
01025 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_REM_OPR);
01026
01027 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01028
01029
01030 status = preProcParaPut (rsComm, destL1descInx, &portalOprOut);
01031 if (status < 0 || NULL == portalOprOut ) {
01032 rodsLog (LOG_NOTICE,
01033 "dataObjCopy: preProcParaPut error for %s",
01034 L1desc[srcL1descInx].dataObjInfo->objPath);
01035 return (status);
01036 }
01037 dataCopyInp.portalOprOut = *portalOprOut;
01038 } else {
01039 dataCopyInp.portalOprOut.l1descInx = destL1descInx;
01040 }
01041 if (srcRemoteFlag == LOCAL_HOST)
01042 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01043 } else if ((srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) ||
01044 srcRemoteFlag == REMOTE_ZONE_HOST) {
01045
01046 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01047
01048 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01049
01050 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01051 if (status < 0 || NULL == portalOprOut ) {
01052 rodsLog (LOG_NOTICE,
01053 "dataObjCopy: preProcParaGet error for %s",
01054 L1desc[srcL1descInx].dataObjInfo->objPath);
01055 return (status);
01056 }
01057 dataCopyInp.portalOprOut = *portalOprOut;
01058 } else {
01059 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01060 }
01061 if (destRemoteFlag == LOCAL_HOST)
01062 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01063 } else {
01064
01065 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01066
01067
01068 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01069 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01070
01071 if (status < 0 || NULL == portalOprOut ) {
01072 rodsLog (LOG_NOTICE,
01073 "dataObjCopy: preProcParaGet error for %s",
01074 L1desc[srcL1descInx].dataObjInfo->objPath);
01075 return (status);
01076 }
01077 dataCopyInp.portalOprOut = *portalOprOut;
01078 } else {
01079 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01080 }
01081 }
01082
01083 if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01084 NO_CHK_COPY_LEN_KW) != NULL) {
01085
01086 addKeyVal (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW, "");
01087 if (L1desc[l1descInx].dataObjInp->numThreads > 1) {
01088 L1desc[l1descInx].dataObjInp->numThreads =
01089 L1desc[srcL1descInx].dataObjInp->numThreads =
01090 dataCopyInp.portalOprOut.numThreads = 1;
01091 }
01092 }
01093 status = rsDataCopy (rsComm, &dataCopyInp);
01094
01095 if (status >= 0 && portalOprOut != NULL &&
01096 L1desc[l1descInx].dataObjInp != NULL) {
01097
01098 L1desc[l1descInx].dataObjInp->numThreads = portalOprOut->numThreads;
01099 }
01100 if (portalOprOut != NULL) free (portalOprOut);
01101 clearKeyVal (&dataOprInp->condInput);
01102
01103 return (status);
01104 }
01105
01106 int
01107 l3DataCopySingleBuf (rsComm_t *rsComm, int l1descInx)
01108 {
01109 bytesBuf_t dataBBuf;
01110 int bytesRead, bytesWritten;
01111 int srcL1descInx;
01112
01113 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01114 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01115
01116 if (L1desc[srcL1descInx].dataSize < 0) {
01117 rodsLog (LOG_ERROR,
01118 "l3DataCopySingleBuf: dataSize %lld for %s is negative",
01119 L1desc[srcL1descInx].dataSize,
01120 L1desc[srcL1descInx].dataObjInfo->objPath);
01121 return (SYS_COPY_LEN_ERR);
01122 } else if (L1desc[srcL1descInx].dataSize == 0) {
01123 bytesRead = 0;
01124 } else {
01125 dataBBuf.buf = malloc (L1desc[srcL1descInx].dataSize);
01126 bytesRead = rsL3FileGetSingleBuf (rsComm, &srcL1descInx, &dataBBuf);
01127 }
01128
01129 if (bytesRead < 0) {
01130 free( dataBBuf.buf );
01131 return (bytesRead);
01132 } else if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01133 NO_CHK_COPY_LEN_KW) != NULL) {
01134
01135 L1desc[l1descInx].dataSize = L1desc[l1descInx].dataObjInp->dataSize
01136 = bytesRead;
01137 }
01138
01139 bytesWritten = rsL3FilePutSingleBuf (rsComm, &l1descInx, &dataBBuf);
01140
01141 L1desc[l1descInx].bytesWritten = bytesWritten;
01142
01143 if (dataBBuf.buf != NULL) {
01144 free (dataBBuf.buf);
01145 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01146 }
01147
01148 if (bytesWritten != bytesRead) {
01149 if (bytesWritten >= 0) {
01150 rodsLog (LOG_NOTICE,
01151 "l3DataCopySingleBuf: l3FilePut error, towrite %d, written %d",
01152 bytesRead, bytesWritten);
01153 return (SYS_COPY_LEN_ERR);
01154 } else {
01155 return (bytesWritten);
01156 }
01157 }
01158
01159
01160 return (0);
01161 }
01162
01163 int
01164 l3DataStageSync (rsComm_t *rsComm, int l1descInx)
01165 {
01166 bytesBuf_t dataBBuf;
01167 int srcL1descInx;
01168 int status = 0;
01169
01170 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01171
01172 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01173 if (L1desc[srcL1descInx].dataSize < 0 &&
01174 L1desc[srcL1descInx].dataSize != UNKNOWN_FILE_SZ) {
01175 rodsLog (LOG_ERROR,
01176 "l3DataStageSync: dataSize %lld for %s is negative",
01177 L1desc[srcL1descInx].dataSize,
01178 L1desc[srcL1descInx].dataObjInfo->objPath);
01179 return (SYS_COPY_LEN_ERR);
01180 } else if (L1desc[srcL1descInx].dataSize >= 0 ||
01181 L1desc[srcL1descInx].dataSize == UNKNOWN_FILE_SZ) {
01182 if (L1desc[l1descInx].stageFlag == SYNC_DEST) {
01183
01184 status = l3FileSync (rsComm, srcL1descInx, l1descInx);
01185 } else {
01186
01187 status = l3FileStage (rsComm, srcL1descInx, l1descInx);
01188 }
01189 } else {
01190
01191 }
01192
01193 if (status < 0) {
01194 L1desc[l1descInx].bytesWritten = -1;
01195 } else {
01196 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataSize =
01197 L1desc[srcL1descInx].dataSize;
01198 }
01199
01200 return (status);
01201 }
01202
01203 int
01204 l3FileSync (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01205 {
01206 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01207
01208 fileStageSyncInp_t fileSyncToArchInp;
01209 dataObjInp_t *dataObjInp;
01210 int status;
01211 dataObjInfo_t tmpDataObjInfo;
01212
01213 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01214 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01215
01216 #if 0 // JMC legacy resource
01217 rescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01218 cacheRescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01219
01220 switch (RescTypeDef[rescTypeInx].rescCat) {
01221 case FILE_CAT:
01222
01223 if (RescTypeDef[rescTypeInx].createPathFlag == CREATE_PATH) {
01224 #endif // JMC legacy resource
01225
01226 int dst_create_path = 0;
01227 eirods::error err = eirods::get_resource_property< int >( destDataObjInfo->rescInfo->rescName,
01228 eirods::RESOURCE_CREATE_PATH, dst_create_path );
01229 if( !err.ok() ) {
01230 eirods::log( PASS( err ) );
01231 }
01232
01233 if( CREATE_PATH == dst_create_path ) {
01234
01235 status = chkOrphanFile ( rsComm, destDataObjInfo->filePath, destDataObjInfo->rescName, &tmpDataObjInfo );
01236 if (status == 0 && tmpDataObjInfo.dataId != destDataObjInfo->dataId) {
01237
01238 char tmp_str[ MAX_NAME_LEN ];
01239 snprintf( tmp_str, MAX_NAME_LEN, "%s.%-d", destDataObjInfo->filePath, (int) random());
01240 strncpy( destDataObjInfo->filePath, tmp_str, MAX_NAME_LEN );
01241 }
01242 }
01243
01244 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01245 dataObjInp = L1desc[destL1descInx].dataObjInp;
01246 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01247 fileSyncToArchInp.fileType = static_cast< fileDriverType_t >( -1 );
01248 fileSyncToArchInp.cacheFileType = static_cast< fileDriverType_t >( -1 );
01249
01250
01251
01252 std::string location;
01253 eirods::error ret = eirods::get_loc_for_hier_string( srcDataObjInfo->rescHier, location );
01254 if( !ret.ok() ) {
01255 eirods::log( PASSMSG( "failed in get_loc_for_hier_String", ret ) );
01256 return -1;
01257 }
01258
01259 rstrcpy( fileSyncToArchInp.addr.hostAddr, location.c_str(), NAME_LEN );
01260
01261
01262 rstrcpy( fileSyncToArchInp.filename, destDataObjInfo->filePath, MAX_NAME_LEN );
01263 rstrcpy( fileSyncToArchInp.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01264 rstrcpy( fileSyncToArchInp.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01265 rstrcpy( fileSyncToArchInp.cacheFilename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01266
01267 fileSyncToArchInp.mode = getFileMode (dataObjInp);
01268 status = rsFileSyncToArch (rsComm, &fileSyncToArchInp);
01269
01270 if (status >= 0 &&
01271 CREATE_PATH == dst_create_path &&
01272 fileSyncToArchInp.filename != NULL) {
01273
01274
01275 rstrcpy (destDataObjInfo->filePath, fileSyncToArchInp.filename, MAX_NAME_LEN);
01276 L1desc[destL1descInx].replStatus |= FILE_PATH_HAS_CHG;
01277 }
01278 #if 0 // JMC legacy resource
01279 break;
01280 default:
01281 rodsLog (LOG_ERROR,
01282 "l3FileSync: rescCat type %d is not recognized",
01283 RescTypeDef[rescTypeInx].rescCat);
01284 status = SYS_INVALID_RESC_TYPE;
01285 break;
01286 }
01287 #endif // JMC legacy resource
01288 return (status);
01289 }
01290
01291 int
01292 l3FileStage (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01293 {
01294 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01295 int mode, status;
01296
01297 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01298 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01299
01300 mode = getFileMode (L1desc[destL1descInx].dataObjInp);
01301
01302 status = _l3FileStage (rsComm, srcDataObjInfo, destDataObjInfo, mode);
01303
01304 return status;
01305 }
01306
01307 int
01308 _l3FileStage (rsComm_t *rsComm, dataObjInfo_t *srcDataObjInfo,
01309 dataObjInfo_t *destDataObjInfo, int mode)
01310 {
01311
01312 fileStageSyncInp_t file_stage;
01313 int status;
01314
01315 #if 0 // JMC legacy resource
01316 rescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01317 cacheRescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01318
01319
01320 switch (RescTypeDef[rescTypeInx].rescCat) {
01321 case FILE_CAT:
01322 #endif // JMC legacy resource
01323 memset (&file_stage, 0, sizeof (file_stage));
01324 file_stage.dataSize = srcDataObjInfo->dataSize;
01325 file_stage.fileType = static_cast< fileDriverType_t >( -1 );
01326 file_stage.cacheFileType = static_cast< fileDriverType_t >( -1 );
01327
01328 rstrcpy( file_stage.addr.hostAddr,
01329 destDataObjInfo->rescInfo->rescLoc, NAME_LEN );
01330
01331 rstrcpy( file_stage.cacheFilename, destDataObjInfo->filePath, MAX_NAME_LEN );
01332 rstrcpy( file_stage.filename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01333 rstrcpy( file_stage.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01334 rstrcpy( file_stage.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01335 file_stage.mode = mode;
01336 status = rsFileStageToCache( rsComm, &file_stage );
01337 #if 0 // JMC legacy resource
01338 break;
01339 default:
01340 rodsLog (LOG_ERROR,
01341 "l3FileStage: rescCat type %d is not recognized",
01342 RescTypeDef[rescTypeInx].rescCat);
01343 status = SYS_INVALID_RESC_TYPE;
01344 break;
01345 }
01346 #endif // JMC legacy resource
01347 return (status);
01348 }
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358 int
01359 rsReplAndRequeDataObjInfo (rsComm_t *rsComm,
01360 dataObjInfo_t **srcDataObjInfoHead, char *destRescName, char *flagStr)
01361 {
01362 dataObjInfo_t *dataObjInfoHead, *myDataObjInfo;
01363 transferStat_t transStat;
01364 dataObjInp_t dataObjInp;
01365 char tmpStr[NAME_LEN];
01366 int status;
01367
01368 dataObjInfoHead = *srcDataObjInfoHead;
01369 myDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01370 memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
01371 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01372 memset (&transStat, 0, sizeof (transStat));
01373
01374 if (flagStr != NULL) {
01375 if (strstr (flagStr, ALL_KW) != NULL) {
01376 addKeyVal (&dataObjInp.condInput, ALL_KW, "");
01377 }
01378 if (strstr (flagStr, RBUDP_TRANSFER_KW) != NULL) {
01379 addKeyVal (&dataObjInp.condInput, RBUDP_TRANSFER_KW, "");
01380 }
01381 if (strstr (flagStr, SU_CLIENT_USER_KW) != NULL) {
01382 addKeyVal (&dataObjInp.condInput, SU_CLIENT_USER_KW, "");
01383 }
01384 if (strstr (flagStr, UPDATE_REPL_KW) != NULL) {
01385 addKeyVal (&dataObjInp.condInput, UPDATE_REPL_KW, "");
01386 }
01387 }
01388
01389 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01390 snprintf (tmpStr, NAME_LEN, "%d", dataObjInfoHead->replNum);
01391 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01392 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, destRescName);
01393
01394 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01395 myDataObjInfo);
01396
01397
01398 clearKeyVal (&dataObjInp.condInput);
01399 if (status >= 0) {
01400 status = 1;
01401
01402 queDataObjInfo (srcDataObjInfoHead, myDataObjInfo, 0, 1);
01403 } else {
01404 freeAllDataObjInfo (myDataObjInfo);
01405 }
01406
01407 return status;
01408 }
01409
01410 #if 0 // JMC - legacy resource
01411 int
01412 stageDataFromCompToCache (rsComm_t *rsComm, dataObjInfo_t *compObjInfo,
01413 dataObjInfo_t *outCacheObjInfo)
01414 {
01415 int status;
01416 rescInfo_t *cacheResc;
01417 transferStat_t transStat;
01418 dataObjInp_t dataObjInp;
01419 char tmpStr[NAME_LEN];
01420
01421 #if 0 // JMC - legacy resource
01422 if (getRescClass (compObjInfo->rescInfo) != COMPOUND_CL) return 0;
01423 #endif // JMC - legacy resource
01424
01425 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,compObjInfo->rescInfo, &cacheResc);
01426 if (status < 0) {
01427 rodsLog (LOG_ERROR,
01428 "stageDataFromCompToCache: getCacheRescInGrp %s failed for %s stat=%d",
01429 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01430 return status;
01431 }
01432 if (outCacheObjInfo != NULL)
01433 memset (outCacheObjInfo, 0, sizeof (dataObjInfo_t));
01434 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01435 memset (&transStat, 0, sizeof (transStat));
01436
01437 rstrcpy (dataObjInp.objPath, compObjInfo->objPath, MAX_NAME_LEN);
01438 snprintf (tmpStr, NAME_LEN, "%d", compObjInfo->replNum);
01439 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01440 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, cacheResc->rescName);
01441
01442 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01443 outCacheObjInfo);
01444
01445 clearKeyVal (&dataObjInp.condInput);
01446 return status;
01447 }
01448
01449
01450
01451
01452
01453
01454 int
01455 stageAndRequeDataToCache (rsComm_t *rsComm, dataObjInfo_t **compObjInfoHead)
01456 {
01457 int status;
01458 dataObjInfo_t *outCacheObjInfo;
01459
01460 outCacheObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01461 bzero (outCacheObjInfo, sizeof (dataObjInfo_t));
01462 status = stageDataFromCompToCache (rsComm, *compObjInfoHead,
01463 outCacheObjInfo);
01464
01465 if (status < 0) {
01466
01467 if (outCacheObjInfo->dataId > 0) {
01468
01469 if( requeDataObjInfoByReplNum (compObjInfoHead,
01470 outCacheObjInfo->replNum) == 0) {
01471
01472 status = 0;
01473 }
01474 }
01475 free (outCacheObjInfo);
01476 } else {
01477 queDataObjInfo (compObjInfoHead, outCacheObjInfo, 0, 1);
01478 }
01479 return status;
01480 }
01481
01482 int
01483 replToCacheRescOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01484 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *compObjInfo,
01485 dataObjInfo_t *oldDataObjInfo, dataObjInfo_t **outDestDataObjInfo)
01486 {
01487 int status = 0;
01488 rescInfo_t *cacheResc;
01489 dataObjInfo_t *destDataObjInfo, *srcDataObjInfo;
01490 dataObjInfo_t *tmpDestDataObjInfo = NULL;
01491 int updateFlag;
01492
01493 if ((status = getCacheDataInfoForRepl (rsComm, oldDataObjInfo, NULL, compObjInfo, &tmpDestDataObjInfo)) >= 0) {
01494 cacheResc = oldDataObjInfo->rescInfo;
01495 updateFlag = 1;
01496 } else {
01497
01498 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01499 compObjInfo->rescInfo, &cacheResc);
01500 if (status < 0) {
01501 rodsLog (LOG_ERROR,
01502 "replToCacheRescOfCompObj:getCacheRescInGrp %s err for %s stat=%d",
01503 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01504 return status;
01505 }
01506 updateFlag = 0;
01507 }
01508
01509 if (outDestDataObjInfo == NULL) {
01510 destDataObjInfo = tmpDestDataObjInfo;
01511 } else {
01512 destDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01513 if (tmpDestDataObjInfo == NULL) {
01514 bzero (destDataObjInfo, sizeof (dataObjInfo_t));
01515 } else {
01516 *destDataObjInfo = *tmpDestDataObjInfo;
01517 }
01518 }
01519 srcDataObjInfo = srcDataObjInfoHead;
01520 while (srcDataObjInfo != NULL) {
01521 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
01522 cacheResc, compObjInfo->rescGroupName, destDataObjInfo, updateFlag);
01523 if (status >= 0) {
01524 if (outDestDataObjInfo != NULL)
01525 *outDestDataObjInfo = destDataObjInfo;
01526 break;
01527 }
01528 srcDataObjInfo = srcDataObjInfo->next;
01529 }
01530
01531 return status;
01532 }
01533 #endif // JMC - legacy resource
01534 int
01535 stageBundledData (rsComm_t *rsComm, dataObjInfo_t **subfileObjInfoHead)
01536 {
01537 int status;
01538 dataObjInfo_t *dataObjInfoHead = *subfileObjInfoHead;
01539 rescInfo_t *cacheResc;
01540 dataObjInp_t dataObjInp;
01541 dataObjInfo_t *cacheObjInfo;
01542
01543 #if 0 // JMC - legacy resource
01544 if (getRescClass (dataObjInfoHead->rescInfo) != BUNDLE_CL) return 0;
01545 #endif // JMC - legacy resource
01546
01547 status = unbunAndStageBunfileObj (rsComm, dataObjInfoHead->filePath,
01548 &cacheResc);
01549
01550 if (status < 0) return status;
01551
01552
01553 bzero (&dataObjInp, sizeof (dataObjInp));
01554 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01555 addKeyVal (&dataObjInp.condInput, RESC_NAME_KW, cacheResc->rescName);
01556 status = getDataObjInfo (rsComm, &dataObjInp, &cacheObjInfo, NULL, 0);
01557 clearKeyVal (&dataObjInp.condInput);
01558 if (status < 0) {
01559 rodsLog (LOG_ERROR,
01560 "unbunAndStageBunfileObj: getDataObjInfo of subfile %s failed.stat=%d",
01561 dataObjInp.objPath, status);
01562 return status;
01563 }
01564
01565 queDataObjInfo (subfileObjInfoHead, cacheObjInfo, 0, 1);
01566
01567
01568 return status;
01569 }
01570
01571 int
01572 unbunAndStageBunfileObj (rsComm_t *rsComm, char *bunfileObjPath,
01573 rescInfo_t **outCacheResc)
01574 {
01575 dataObjInfo_t *bunfileObjInfoHead;
01576 dataObjInp_t dataObjInp;
01577 int status;
01578
01579
01580 bzero (&dataObjInp, sizeof (dataObjInp));
01581 rstrcpy (dataObjInp.objPath, bunfileObjPath, MAX_NAME_LEN);
01582
01583 status = getDataObjInfo (rsComm, &dataObjInp, &bunfileObjInfoHead, NULL, 1);
01584 if (status < 0) {
01585 rodsLog (LOG_ERROR,
01586 "unbunAndStageBunfileObj: getDataObjInfo of bunfile %s failed.stat=%d",
01587 dataObjInp.objPath, status);
01588 return status;
01589 }
01590 status = _unbunAndStageBunfileObj (rsComm, &bunfileObjInfoHead, &dataObjInp.condInput,
01591 outCacheResc, 0);
01592
01593 freeAllDataObjInfo (bunfileObjInfoHead);
01594
01595 return status;
01596 }
01597
01598 int
01599 _unbunAndStageBunfileObj (rsComm_t *rsComm, dataObjInfo_t **bunfileObjInfoHead, keyValPair_t *condInput,
01600 rescInfo_t **outCacheResc, int rmBunCopyFlag)
01601 {
01602 int status;
01603
01604 dataObjInp_t dataObjInp;
01605
01606 bzero (&dataObjInp, sizeof (dataObjInp));
01607 bzero (&dataObjInp.condInput, sizeof (dataObjInp.condInput));
01608 rstrcpy (dataObjInp.objPath, (*bunfileObjInfoHead)->objPath, MAX_NAME_LEN);
01609 status = sortObjInfoForOpen (rsComm, bunfileObjInfoHead, condInput, 0);
01610
01611 addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, (*bunfileObjInfoHead)->rescHier );
01612 if (status < 0) return status;
01613
01614 #if 0 // JMC - legacy resource
01615 if (getRescClass ((*bunfileObjInfoHead)->rescInfo) != CACHE_CL) {
01616
01617 status = getCacheRescInGrp (rsComm,
01618 (*bunfileObjInfoHead)->rescGroupName,
01619 (*bunfileObjInfoHead)->rescInfo, &cacheResc);
01620 if (status < 0) {
01621 rodsLog (LOG_ERROR,
01622 "unbunAndStageBunfileObj:getCacheRescInGrp %s err for %s stat=%d",
01623 (*bunfileObjInfoHead)->rescGroupName,
01624 (*bunfileObjInfoHead)->objPath, status);
01625 return status;
01626 }
01627 if (outCacheResc != NULL)
01628 *outCacheResc = cacheResc;
01629
01630
01631 status = rsReplAndRequeDataObjInfo (rsComm, bunfileObjInfoHead,
01632 cacheResc->rescName, SU_CLIENT_USER_KW);
01633 if (status < 0) {
01634 rodsLog (LOG_ERROR,
01635 "unbunAndStageBunfileObj:rsReplAndRequeDataObjInfo %s err stat=%d",
01636 (*bunfileObjInfoHead)->objPath, status);
01637 return status;
01638 }
01639 } else
01640 #endif // JMC - legacy resource
01641
01642 if (outCacheResc != NULL)
01643 *outCacheResc = (*bunfileObjInfoHead)->rescInfo;
01644
01645 addKeyVal (&dataObjInp.condInput, BUN_FILE_PATH_KW,
01646 (*bunfileObjInfoHead)->filePath);
01647 if (rmBunCopyFlag > 0) {
01648 addKeyVal (&dataObjInp.condInput, RM_BUN_COPY_KW, "");
01649 }
01650 if (strlen ((*bunfileObjInfoHead)->dataType) > 0) {
01651 addKeyVal (&dataObjInp.condInput, DATA_TYPE_KW,
01652 (*bunfileObjInfoHead)->dataType);
01653 }
01654 status = _rsUnbunAndRegPhyBunfile (rsComm, &dataObjInp,
01655 (*bunfileObjInfoHead)->rescInfo);
01656
01657 return status;
01658 }
01659
01660 #if 0 // JMC - legacy resource
01661
01662
01663
01664
01665
01666
01667
01668
01669
01670
01671
01672
01673
01674
01675
01676 int
01677 getCacheDataInfoOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01678 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01679 dataObjInfo_t *compDataObjInfo, dataObjInfo_t *oldDataObjInfo,
01680 dataObjInfo_t **outDataObjInfo)
01681 {
01682 int status;
01683
01684 if ((status = getCacheDataInfoForRepl (rsComm, srcDataObjInfoHead,
01685 destDataObjInfoHead, compDataObjInfo, outDataObjInfo)) < 0) {
01686
01687 status = replToCacheRescOfCompObj (rsComm, dataObjInp,
01688 srcDataObjInfoHead, compDataObjInfo, oldDataObjInfo,
01689 outDataObjInfo);
01690 if (status < 0) {
01691 rodsLog (LOG_ERROR,
01692 "_rsDataObjRepl: replToCacheRescOfCompObj of %s error stat=%d",
01693 srcDataObjInfoHead->objPath, status);
01694 return status;
01695 }
01696
01697
01698 queDataObjInfo (&srcDataObjInfoHead, *outDataObjInfo, 1, 0);
01699 }
01700 return status;
01701 }
01702
01703 int
01704 getCacheDataInfoOfCompResc (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01705 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01706 rescGrpInfo_t *compRescGrpInfo, dataObjInfo_t *oldDataObjInfo,
01707 dataObjInfo_t **outDataObjInfo)
01708 {
01709 int status;
01710 dataObjInfo_t compDataObjInfo;
01711
01712
01713 bzero (&compDataObjInfo, sizeof (compDataObjInfo));
01714 rstrcpy (compDataObjInfo.rescGroupName, compRescGrpInfo->rescGroupName,
01715 NAME_LEN);
01716 compDataObjInfo.rescInfo=compRescGrpInfo->rescInfo;
01717 status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
01718 srcDataObjInfoHead, destDataObjInfoHead, &compDataObjInfo,
01719 oldDataObjInfo, outDataObjInfo);
01720
01721 return status;
01722 }
01723
01724 #endif // JMC - legacy resource