00001
00002
00003
00004
00005
00006
00007
00008 #include "dataObjRepl.h"
00009 #include "dataObjOpr.h"
00010 #include "dataObjCreate.h"
00011 #include "dataObjOpen.h"
00012 #include "dataObjPut.h"
00013 #include "dataObjGet.h"
00014 #include "rodsLog.h"
00015 #include "objMetaOpr.h"
00016 #include "physPath.h"
00017 #include "specColl.h"
00018 #include "resource.h"
00019 #include "reGlobalsExtern.h"
00020 #include "reDefines.h"
00021 #include "reSysDataObjOpr.h"
00022 #include "getRemoteZoneResc.h"
00023 #include "l3FileGetSingleBuf.h"
00024 #include "l3FilePutSingleBuf.h"
00025 #include "fileSyncToArch.h"
00026 #include "fileStageToCache.h"
00027 #include "unbunAndRegPhyBunfile.h"
00028 #include "dataObjTrim.h"
00029 #include "dataObjLock.h"
00030
00031
00032
00033 #include "eirods_resource_backport.h"
00034 #include "eirods_resource_redirect.h"
00035 #include "eirods_log.h"
00036
00037
00038 int
00039 rsDataObjRepl250 (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00040 transStat_t **transStat)
00041 {
00042 int status;
00043 transferStat_t *transferStat = NULL;
00044
00045 status = rsDataObjRepl (rsComm, dataObjInp, &transferStat);
00046
00047 if (transStat != NULL && status >= 0 && transferStat != NULL) {
00048 *transStat = (transStat_t *) malloc (sizeof (transStat_t));
00049 (*transStat)->numThreads = transferStat->numThreads;
00050 (*transStat)->bytesWritten = transferStat->bytesWritten;
00051 free (transferStat);
00052 }
00053 return status;
00054 }
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 int
00065 rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00066 transferStat_t **transStat)
00067 {
00068
00069 int status;
00070 int remoteFlag;
00071 rodsServerHost_t *rodsServerHost;
00072 dataObjInfo_t *dataObjInfo = NULL;
00073 char* lockType = NULL;
00074 int lockFd = -1;
00075
00076 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00077
00078 if (rsComm->proxyUser.authInfo.authFlag < REMOTE_PRIV_USER_AUTH) {
00079 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00080 }
00081 }
00082
00083 status = resolvePathInSpecColl (rsComm, dataObjInp->objPath,
00084 READ_COLL_PERM, 0, &dataObjInfo);
00085
00086 if (status == DATA_OBJ_T) {
00087 if (dataObjInfo != NULL && dataObjInfo->specColl != NULL) {
00088 if (dataObjInfo->specColl->collClass == LINKED_COLL) {
00089 rstrcpy (dataObjInp->objPath, dataObjInfo->objPath,
00090 MAX_NAME_LEN);
00091 freeAllDataObjInfo (dataObjInfo);
00092 } else {
00093 freeAllDataObjInfo (dataObjInfo);
00094 return (SYS_REG_OBJ_IN_SPEC_COLL);
00095 }
00096 }
00097 }
00098
00099 remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost,
00100 REMOTE_OPEN);
00101
00102 if (remoteFlag < 0) {
00103 return (remoteFlag);
00104 } else if (remoteFlag == REMOTE_HOST) {
00105 status = _rcDataObjRepl (rodsServerHost->conn, dataObjInp,
00106 transStat);
00107 return status;
00108 }
00109
00110
00111
00112 std::string hier;
00113 char* tmp_hier = getValByKey( &dataObjInp->condInput, RESC_HIER_STR_KW );
00114 if( 0 == tmp_hier ) {
00115 eirods::error ret = eirods::resolve_resource_hierarchy( eirods::EIRODS_OPEN_OPERATION,
00116 rsComm, dataObjInp, hier );
00117 if( !ret.ok() ) {
00118 std::stringstream msg;
00119 msg << "rsDataObjRepl :: failed in eirods::resolve_resource_hierarchy for [";
00120 msg << dataObjInp->objPath << "]";
00121 eirods::log( PASSMSG( msg.str(), ret ) );
00122 return ret.code();
00123 }
00124
00125 addKeyVal( &dataObjInp->condInput, RESC_HIER_STR_KW, hier.c_str() );
00126 } else {
00127 hier = tmp_hier;
00128 }
00129
00130
00131
00132 *transStat = (transferStat_t*)malloc (sizeof (transferStat_t));
00133 memset (*transStat, 0, sizeof (transferStat_t));
00134
00135
00136 lockType = getValByKey (&dataObjInp->condInput, LOCK_TYPE_KW);
00137 if (lockType != NULL) {
00138 lockFd = rsDataObjLock (rsComm, dataObjInp);
00139 if (lockFd >= 0) {
00140
00141 rmKeyVal (&dataObjInp->condInput, LOCK_TYPE_KW);
00142 } else {
00143 rodsLogError (LOG_ERROR, lockFd,
00144 "rsDataObjRepl: rsDataObjLock error for %s. lockType = %s",
00145 dataObjInp->objPath, lockType);
00146 return lockFd;
00147 }
00148 }
00149
00150
00151 status = _rsDataObjRepl (rsComm, dataObjInp, *transStat, NULL);
00152 if(status < 0) {
00153 rodsLog(LOG_NOTICE, "%s - Failed to replicated data object.", __FUNCTION__);
00154 }
00155
00156 if (lockFd > 0) rsDataObjUnlock (rsComm, dataObjInp, lockFd);
00157
00158 return (status);
00159 }
00160
00161 int
00162 _rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00163 transferStat_t *transStat, dataObjInfo_t *outDataObjInfo)
00164 {
00165 int status;
00166 dataObjInfo_t *dataObjInfoHead = NULL;
00167 dataObjInfo_t *oldDataObjInfoHead = NULL;
00168 dataObjInfo_t *destDataObjInfo = NULL;
00169 rescGrpInfo_t *myRescGrpInfo = NULL;
00170 ruleExecInfo_t rei;
00171 int multiCopyFlag;
00172 char *accessPerm;
00173 int backupFlag;
00174 int allFlag;
00175 int savedStatus = 0;
00176 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00177 accessPerm = NULL;
00178 } else if (getValByKey (&dataObjInp->condInput, IRODS_ADMIN_KW) != NULL) {
00179 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
00180 return (CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00181 }
00182 accessPerm = NULL;
00183 } else {
00184 accessPerm = ACCESS_READ_OBJECT;
00185 }
00186
00187 initReiWithDataObjInp (&rei, rsComm, dataObjInp);
00188 status = applyRule ("acSetMultiReplPerResc", NULL, &rei, NO_SAVE_REI);
00189 if (strcmp (rei.statusStr, MULTI_COPIES_PER_RESC) == 0) {
00190 multiCopyFlag = 1;
00191 } else {
00192 multiCopyFlag = 0;
00193 }
00194
00195
00196 if (multiCopyFlag) {
00197 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00198 accessPerm, 0);
00199 } else {
00200
00201
00202 status = getDataObjInfo( rsComm, dataObjInp, &dataObjInfoHead, accessPerm, 1 );
00203 }
00204
00205 if (status < 0) {
00206 rodsLog (LOG_NOTICE,
00207 "rsDataObjRepl: getDataObjInfo for %s", dataObjInp->objPath);
00208 return (status);
00209 }
00210
00211 if (getValByKey (&dataObjInp->condInput, UPDATE_REPL_KW) != NULL) {
00212 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, 0 );
00213 if (status < 0) {
00214 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication update.", __FUNCTION__);
00215 return status;
00216 }
00217
00218
00219 status = _rsDataObjReplUpdate (rsComm, dataObjInp, dataObjInfoHead, oldDataObjInfoHead, transStat, NULL);
00220
00221 if (status >= 0 && outDataObjInfo != NULL) {
00222 *outDataObjInfo = *oldDataObjInfoHead;
00223 outDataObjInfo->next = NULL;
00224 } else {
00225 if(status < 0) {
00226 rodsLog(LOG_NOTICE, "%s - Failed to update replica.", __FUNCTION__);
00227 }
00228 }
00229
00230 freeAllDataObjInfo (dataObjInfoHead);
00231 freeAllDataObjInfo (oldDataObjInfoHead);
00232
00233 return status;
00234 }
00235
00236
00237 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, multiCopyFlag);
00238
00239 if (status < 0) {
00240 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00241 return status;
00242 }
00243
00244 if (getValByKey (&dataObjInp->condInput, BACKUP_RESC_NAME_KW) != NULL) {
00245
00246 backupFlag = 1;
00247 multiCopyFlag = 0;
00248 } else {
00249 backupFlag = 0;
00250 }
00251
00252 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00253 allFlag = 1;
00254 } else {
00255 allFlag = 0;
00256 }
00257
00258 if( backupFlag == 0 && allFlag == 1 &&
00259 getValByKey (&dataObjInp->condInput, DEST_RESC_NAME_KW) == NULL &&
00260 dataObjInfoHead != NULL && dataObjInfoHead->rescGroupName[0] != '\0') {
00261
00262 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, dataObjInfoHead->rescGroupName );
00263 }
00264
00265
00266 dataObjInp->oprType = REPLICATE_OPR;
00267 status = getRescGrpForCreate( rsComm, dataObjInp, &myRescGrpInfo );
00268 if (status < 0) {
00269 rodsLog(LOG_NOTICE, "%s - Failed to get a resource group for create.", __FUNCTION__);
00270 return status;
00271 }
00272
00273 if (multiCopyFlag == 0 ) {
00274
00275
00276
00277
00278
00279
00280
00281 status = resolveSingleReplCopy( &dataObjInfoHead, &oldDataObjInfoHead,
00282 &myRescGrpInfo, &destDataObjInfo,
00283 &dataObjInp->condInput );
00284
00285 if (status == HAVE_GOOD_COPY) {
00286
00287
00288 #if 0 // JMC - legacy resource :: we no longer deal with the cache here
00289 dataObjInfo_t *cacheDataObjInfo = NULL;
00290 dataObjInfo_t *compDataObjInfo = NULL;
00291
00292
00293 if( getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL &&
00294
00295
00296 #if 0
00297 getDataObjByClass (dataObjInfoHead, CACHE_CL, &cacheDataObjInfo)
00298 >= 0 && cacheDataObjInfo != destDataObjInfo) [[
00299 #else
00300 getDataObjByClass( dataObjInfoHead, COMPOUND_CL, &compDataObjInfo ) >= 0 &&
00301 strlen (compDataObjInfo->rescGroupName) > 0 &&
00302 getCacheDataInfoForRepl (rsComm, dataObjInfoHead, NULL,compDataObjInfo, &cacheDataObjInfo) >= 0 ) [[
00303 #endif
00304
00305
00306 int status1 = trimDataObjInfo (rsComm, cacheDataObjInfo);
00307 if (status1 < 0) [[
00308 rodsLog (LOG_NOTICE,
00309 "_rsDataObjRepl: trimDataObjInfo for %s",
00310 dataObjInp->objPath);
00311 ]]
00312 ]]
00313 #endif // JMC - legacy resource
00314
00315 if (outDataObjInfo != NULL && destDataObjInfo != NULL) {
00316
00317 *outDataObjInfo = *destDataObjInfo;
00318 outDataObjInfo->next = NULL;
00319 }
00320
00321
00322
00323 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00324 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00325 ( myRescGrpInfo->status < 0 ) ) {
00326 status = myRescGrpInfo->status;
00327
00328 } else {
00329 status = 0;
00330 }
00331
00332
00333
00334 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00335 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00336 ( myRescGrpInfo->status < 0 ) ) {
00337 status = myRescGrpInfo->status;
00338
00339 } else {
00340 status = 0;
00341 }
00342
00343 freeAllDataObjInfo (dataObjInfoHead);
00344 freeAllDataObjInfo (oldDataObjInfoHead);
00345 freeAllDataObjInfo (destDataObjInfo);
00346 freeAllRescGrpInfo (myRescGrpInfo);
00347 return status;
00348 } else if (status < 0) {
00349 freeAllDataObjInfo (dataObjInfoHead);
00350 freeAllDataObjInfo (oldDataObjInfoHead);
00351 freeAllDataObjInfo (destDataObjInfo);
00352 freeAllRescGrpInfo (myRescGrpInfo);
00353 rodsLog(LOG_NOTICE, "%s - Failed to resolve a single replication copy.", __FUNCTION__);
00354 return status;
00355 }
00356
00357
00358 }
00359
00360
00361 status = applyPreprocRuleForOpen (rsComm, dataObjInp, &dataObjInfoHead);
00362 if (status < 0) return status;
00363
00364
00365
00366 if (destDataObjInfo != NULL) {
00367 status = _rsDataObjReplUpdate( rsComm, dataObjInp, dataObjInfoHead,
00368 destDataObjInfo, transStat, oldDataObjInfoHead);
00369 if (status >= 0) {
00370 if (outDataObjInfo != NULL) {
00371 *outDataObjInfo = *destDataObjInfo;
00372 outDataObjInfo->next = NULL;
00373 }
00374 if (allFlag == 0) {
00375 freeAllDataObjInfo (dataObjInfoHead);
00376 freeAllDataObjInfo (oldDataObjInfoHead);
00377 freeAllDataObjInfo (destDataObjInfo);
00378 freeAllRescGrpInfo (myRescGrpInfo);
00379 return 0;
00380 } else {
00381
00382
00383 queDataObjInfo (&dataObjInfoHead, destDataObjInfo, 0, 1);
00384 destDataObjInfo = NULL;
00385 }
00386 } else {
00387 savedStatus = status;
00388 }
00389 }
00390
00391 if (myRescGrpInfo != NULL) {
00392
00393 status = _rsDataObjReplNewCopy( rsComm, dataObjInp, dataObjInfoHead,
00394 myRescGrpInfo, transStat, oldDataObjInfoHead,
00395 outDataObjInfo);
00396 if (status < 0) savedStatus = status;
00397 }
00398
00399 freeAllDataObjInfo (dataObjInfoHead);
00400 freeAllDataObjInfo (oldDataObjInfoHead);
00401 freeAllRescGrpInfo (myRescGrpInfo);
00402
00403 return (savedStatus);
00404 }
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418 int
00419 _rsDataObjReplUpdate( rsComm_t* rsComm, dataObjInp_t* dataObjInp,
00420 dataObjInfo_t* srcDataObjInfoHead, dataObjInfo_t* destDataObjInfoHead,
00421 transferStat_t* transStat, dataObjInfo_t* oldDataObjInfo ) {
00422 dataObjInfo_t *destDataObjInfo = 0;
00423 dataObjInfo_t *srcDataObjInfo = 0;
00424 int status;
00425 int allFlag;
00426 int savedStatus = 0;
00427 int replCnt = 0;
00428
00429 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00430 allFlag = 1;
00431 } else {
00432 allFlag = 0;
00433 }
00434
00435 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00436 destDataObjInfo = destDataObjInfoHead;
00437 while (destDataObjInfo != NULL) {
00438 if (destDataObjInfo->dataId == 0) {
00439 destDataObjInfo = destDataObjInfo->next;
00440 continue;
00441 }
00442
00443 #if 0 // JMC - legacy resource
00444
00445 if (getRescClass (destDataObjInfo->rescInfo) == COMPOUND_CL) {
00446
00447 if ((status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
00448 srcDataObjInfoHead, destDataObjInfoHead, destDataObjInfo,
00449 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00450 return status;
00451 }
00452 status = _rsDataObjReplS (rsComm, dataObjInp,
00453 srcDataObjInfo, NULL, "", destDataObjInfo, 1);
00454 } else
00455 #endif // JMC - legacy resource
00456 {
00457 srcDataObjInfo = srcDataObjInfoHead;
00458 while (srcDataObjInfo != NULL) {
00459
00460 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo, NULL, "", destDataObjInfo, 1 );
00461
00462 if (status >= 0) {
00463 break;
00464 }
00465 srcDataObjInfo = srcDataObjInfo->next;
00466 }
00467 }
00468 if (status >= 0) {
00469 transStat->numThreads = dataObjInp->numThreads;
00470 if (allFlag == 0) {
00471 return 0;
00472 }
00473 } else {
00474 savedStatus = status;
00475 replCnt ++;
00476 }
00477 destDataObjInfo = destDataObjInfo->next;
00478 }
00479
00480 return savedStatus;
00481 }
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495 int
00496 _rsDataObjReplNewCopy (rsComm_t *rsComm,
00497 dataObjInp_t *dataObjInp,
00498 dataObjInfo_t *srcDataObjInfoHead,
00499 rescGrpInfo_t *destRescGrpInfo,
00500 transferStat_t *transStat,
00501 dataObjInfo_t *oldDataObjInfo,
00502 dataObjInfo_t *outDataObjInfo)
00503 {
00504 dataObjInfo_t *srcDataObjInfo;
00505 rescGrpInfo_t *tmpRescGrpInfo;
00506 rescInfo_t *tmpRescInfo;
00507 int status;
00508 int allFlag;
00509 int savedStatus = 0;
00510
00511 #if 0
00512 rescInfo_t *compRescInfo = NULL;
00513 rescInfo_t *cacheRescInfo = NULL;
00514 #endif
00515
00516 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00517 allFlag = 1;
00518 } else {
00519 allFlag = 0;
00520 }
00521 #if 0 // JMC - legacy resource
00522
00523
00524
00525
00526 if( allFlag == 1 && destRescGrpInfo != NULL &&
00527 strlen(destRescGrpInfo->rescGroupName) > 0 &&
00528 getRescGrpClass (destRescGrpInfo, &compRescInfo) == COMPOUND_CL) {
00529 getCacheRescInGrp (rsComm, destRescGrpInfo->rescGroupName,compRescInfo, &cacheRescInfo);
00530 }
00531 #endif // JMC - legacy resource
00532
00533
00534 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00535 tmpRescGrpInfo = destRescGrpInfo;
00536 while (tmpRescGrpInfo != NULL) {
00537 tmpRescInfo = tmpRescGrpInfo->rescInfo;
00538 #if 0 // JMC - legacy resource
00539
00540
00541 if (tmpRescInfo == cacheRescInfo) {
00542
00543
00544 tmpRescGrpInfo = tmpRescGrpInfo->next;
00545 continue;
00546 }
00547
00548 if (getRescClass (tmpRescInfo) == COMPOUND_CL) {
00549
00550 if ((status = getCacheDataInfoOfCompResc (rsComm, dataObjInp,
00551 srcDataObjInfoHead, NULL, tmpRescGrpInfo,
00552 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00553 return status;
00554 }
00555
00556 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00557 tmpRescInfo, tmpRescGrpInfo->rescGroupName, outDataObjInfo, 0);
00558 } else {
00559 #endif // JMC - legacy resource
00560
00561
00562 {
00563 srcDataObjInfo = srcDataObjInfoHead;
00564 while (srcDataObjInfo != NULL) {
00565 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo,
00566 tmpRescInfo, tmpRescGrpInfo->rescGroupName,
00567 outDataObjInfo, 0);
00568 if (status >= 0) {
00569 break;
00570 } else {
00571 savedStatus = status;
00572 }
00573 srcDataObjInfo = srcDataObjInfo->next;
00574 }
00575 }
00576 if (status >= 0) {
00577 transStat->numThreads = dataObjInp->numThreads;
00578 if (allFlag == 0) {
00579 return 0;
00580 }
00581 } else {
00582 savedStatus = status;
00583 }
00584 tmpRescGrpInfo = tmpRescGrpInfo->next;
00585 }
00586
00587 if (savedStatus == 0 && destRescGrpInfo->status < 0) {
00588
00589 return destRescGrpInfo->status;
00590 } else {
00591 return (savedStatus);
00592 }
00593 }
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608 int
00609 _rsDataObjReplS (rsComm_t *rsComm,
00610 dataObjInp_t *dataObjInp,
00611 dataObjInfo_t *srcDataObjInfo,
00612 rescInfo_t *destRescInfo,
00613 char *rescGroupName,
00614 dataObjInfo_t *destDataObjInfo,
00615 int updateFlag)
00616 {
00617 int status, status1;
00618 int l1descInx;
00619 openedDataObjInp_t dataObjCloseInp;
00620 dataObjInfo_t *myDestDataObjInfo;
00621 l1descInx = dataObjOpenForRepl( rsComm, dataObjInp, srcDataObjInfo, destRescInfo,
00622 rescGroupName, destDataObjInfo, updateFlag );
00623 if (l1descInx < 0) {
00624 return (l1descInx);
00625 }
00626
00627 if (L1desc[l1descInx].stageFlag != NO_STAGING) {
00628 status = l3DataStageSync (rsComm, l1descInx);
00629 } else if( L1desc[l1descInx].dataObjInp->numThreads == 0 &&
00630 L1desc[l1descInx].dataObjInfo->dataSize <= MAX_SZ_FOR_SINGLE_BUF ) {
00631 status = l3DataCopySingleBuf (rsComm, l1descInx);
00632 } else {
00633 status = dataObjCopy( rsComm, l1descInx );
00634 }
00635
00636 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00637
00638 dataObjCloseInp.l1descInx = l1descInx;
00639
00640 L1desc[l1descInx].oprStatus = status;
00641 if (status >= 0) {
00642 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataObjInfo->dataSize;
00643 }
00644
00645 status1 = irsDataObjClose (rsComm, &dataObjCloseInp, &myDestDataObjInfo);
00646
00647 if (destDataObjInfo != NULL) {
00648 if (destDataObjInfo->dataId <= 0 && myDestDataObjInfo != NULL) {
00649 destDataObjInfo->dataId = myDestDataObjInfo->dataId;
00650 destDataObjInfo->replNum = myDestDataObjInfo->replNum;
00651 } else {
00652
00653 destDataObjInfo->dataSize = myDestDataObjInfo->dataSize;
00654 }
00655 }
00656
00657 freeDataObjInfo (myDestDataObjInfo);
00658
00659 if (status < 0) {
00660 return status;
00661 } else if (status1 < 0) {
00662 return status1;
00663 } else {
00664 return (status);
00665 }
00666 }
00667
00668
00669
00670
00671 int
00672 dataObjOpenForRepl (rsComm_t *rsComm,
00673 dataObjInp_t *dataObjInp,
00674 dataObjInfo_t *inpSrcDataObjInfo,
00675 rescInfo_t *destRescInfo,
00676 char *rescGroupName,
00677 dataObjInfo_t *inpDestDataObjInfo,
00678 int updateFlag)
00679 {
00680 dataObjInfo_t *myDestDataObjInfo = 0, *srcDataObjInfo = NULL;
00681 rescInfo_t *myDestRescInfo = 0;
00682 int destL1descInx = 0;
00683 int srcL1descInx = 0;
00684 int status = 0;
00685 int replStatus = 0;
00686
00687
00688
00689 dataObjInfo_t *cacheDataObjInfo = NULL;
00690 dataObjInp_t dest_inp, myDataObjInp, *l1DataObjInp = 0;
00691 if (destRescInfo == NULL) {
00692 myDestRescInfo = inpDestDataObjInfo->rescInfo;
00693 } else {
00694 myDestRescInfo = destRescInfo;
00695 }
00696
00697 if (inpSrcDataObjInfo->rescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00698 return SYS_RESC_IS_DOWN;
00699 }
00700
00701 if (myDestRescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00702 return SYS_RESC_IS_DOWN;
00703 }
00704
00705 #if 0 // JMC - legacy resource
00706 destRescClass = getRescClass (myDestRescInfo);
00707 #endif // JMC - legacy resource
00708
00709
00710
00711 #if 0 // JMC - legacy resource
00712 if (srcRescClass == COMPOUND_CL) {
00713 rescGrpInfo_t *myRescGrpInfo;
00714 if (destRescClass == CACHE_CL &&
00715 isRescsInSameGrp (rsComm, myDestRescInfo->rescName, inpSrcDataObjInfo->rescInfo->rescName,
00716 &myRescGrpInfo)) {
00717
00718 if (strlen (inpSrcDataObjInfo->rescGroupName) == 0) {
00719 rstrcpy (inpSrcDataObjInfo->rescGroupName,
00720 myRescGrpInfo->rescGroupName, NAME_LEN);
00721 }
00722 } else if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00723 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00724 cacheDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00725 status = stageDataFromCompToCache (rsComm, inpSrcDataObjInfo,
00726 cacheDataObjInfo);
00727 if (status < 0) { free( cacheDataObjInfo ); return status; }
00728
00729 srcRescClass = getRescClass (cacheDataObjInfo->rescInfo);
00730 }
00731 }
00732
00733 #endif // JMC - legacy resource
00734 if (cacheDataObjInfo == NULL) {
00735 srcDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00736 *srcDataObjInfo = *inpSrcDataObjInfo;
00737 } else {
00738 srcDataObjInfo = cacheDataObjInfo;
00739 }
00740
00741 if( NULL == srcDataObjInfo ) {
00742 rodsLog( LOG_ERROR, "dataObjOpenForRepl - srcDataObjInfo is NULL" );
00743 return -1;
00744 }
00745
00746 myDataObjInp = *dataObjInp;
00747 myDataObjInp.dataSize = inpSrcDataObjInfo->dataSize;
00748
00749 destL1descInx = allocL1desc ();
00750
00751 if (destL1descInx < 0) return destL1descInx;
00752
00753
00754
00755
00756 std::string op_name;
00757 memset (&dest_inp, 0, sizeof (dest_inp));
00758 memset (&dest_inp.condInput, 0, sizeof (dest_inp.condInput));
00759
00760 strncpy( dest_inp.objPath, dataObjInp->objPath, MAX_NAME_LEN );
00761 addKeyVal( &(dest_inp.condInput), RESC_NAME_KW, myDestRescInfo->rescName );
00762
00763 myDestDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00764 if (updateFlag > 0) {
00765
00766
00767 op_name = eirods::EIRODS_OPEN_OPERATION;
00768
00769
00770 if(inpDestDataObjInfo == NULL || inpDestDataObjInfo->dataId <= 0) {
00771 rodsLog( LOG_ERROR, "dataObjOpenForRepl: dataId of %s copy to be updated not defined",
00772 srcDataObjInfo->objPath);
00773 return (SYS_UPDATE_REPL_INFO_ERR);
00774 }
00775
00776 inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus;
00777 *myDestDataObjInfo = *inpDestDataObjInfo;
00778 replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY;
00779 addKeyVal (&myDataObjInp.condInput, FORCE_FLAG_KW, "");
00780 myDataObjInp.openFlags |= (O_TRUNC | O_WRONLY);
00781 } else {
00782
00783
00784 op_name = eirods::EIRODS_CREATE_OPERATION;
00785
00786 initDataObjInfoForRepl( rsComm, myDestDataObjInfo, srcDataObjInfo,
00787 destRescInfo, rescGroupName);
00788 replStatus = srcDataObjInfo->replStatus;
00789 }
00790
00791
00792
00793 std::string hier;
00794 char* dst_hier_str = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00795 if( 0 == dst_hier_str ) {
00796 eirods::error ret = eirods::resolve_resource_hierarchy( op_name, rsComm, &dest_inp, hier );
00797 if( !ret.ok() ) {
00798 std::stringstream msg;
00799 msg << "dataObjOpenForRepl :: failed in eirods::resolve_resource_hierarchy for [";
00800 msg << dest_inp.objPath << "]";
00801 eirods::log( PASSMSG( msg.str(), ret ) );
00802 return ret.code();
00803 }
00804
00805 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, hier.c_str() );
00806
00807 } else {
00808 hier = dst_hier_str;
00809 }
00810
00811
00812
00813 rstrcpy(myDestDataObjInfo->rescHier, hier.c_str(), MAX_NAME_LEN);
00814 addKeyVal( &(myDataObjInp.condInput), RESC_HIER_STR_KW, hier.c_str() );
00815 fillL1desc (destL1descInx, &myDataObjInp, myDestDataObjInfo, replStatus, srcDataObjInfo->dataSize);
00816 l1DataObjInp = L1desc[destL1descInx].dataObjInp;
00817 if (l1DataObjInp->oprType == PHYMV_OPR) {
00818 L1desc[destL1descInx].oprType = PHYMV_DEST;
00819 myDestDataObjInfo->replNum = srcDataObjInfo->replNum;
00820 myDestDataObjInfo->dataId = srcDataObjInfo->dataId;
00821 } else {
00822 L1desc[destL1descInx].oprType = REPLICATE_DEST;
00823 }
00824
00825 #if 0 // JMC - legacy resource
00826 if (destRescClass == COMPOUND_CL) {
00827 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00828 }
00829 else if (srcRescClass == COMPOUND_CL) {
00830 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00831 }
00832 #endif // JMC - legacy resource
00833
00834
00835 char* src_hier_str = 0;
00836 if (srcDataObjInfo != NULL && srcDataObjInfo->rescHier != NULL) {
00837 src_hier_str = srcDataObjInfo->rescHier;
00838 }
00839
00840 l1DataObjInp->numThreads = dataObjInp->numThreads =
00841 getNumThreads( rsComm, l1DataObjInp->dataSize, l1DataObjInp->numThreads,
00842
00843 &dataObjInp->condInput, dst_hier_str, src_hier_str );
00844
00845 if( l1DataObjInp->numThreads > 0 &&
00846 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00847 if (updateFlag > 0) {
00848 status = dataOpen (rsComm, destL1descInx);
00849 } else {
00850 status = getFilePathName (rsComm, myDestDataObjInfo,L1desc[destL1descInx].dataObjInp);
00851 if (status >= 0)
00852 status = dataCreate (rsComm, destL1descInx);
00853 }
00854
00855 if (status < 0) {
00856 freeL1desc (destL1descInx);
00857 return (status);
00858 }
00859 } else {
00860 if (updateFlag == 0) {
00861 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00862 if (status < 0) {
00863 freeL1desc (destL1descInx);
00864 return (status);
00865 }
00866 }
00867 }
00868
00869 if (inpDestDataObjInfo != NULL && updateFlag == 0) {
00870
00871 *inpDestDataObjInfo = *myDestDataObjInfo;
00872 inpDestDataObjInfo->next = NULL;
00873 }
00874
00875
00876 rstrcpy(srcDataObjInfo->rescHier, inpSrcDataObjInfo->rescHier, MAX_NAME_LEN);
00877 srcL1descInx = allocL1desc ();
00878 if (srcL1descInx < 0) return srcL1descInx;
00879 fillL1desc (srcL1descInx, &myDataObjInp, srcDataObjInfo, srcDataObjInfo->replStatus, srcDataObjInfo->dataSize);
00880 l1DataObjInp = L1desc[srcL1descInx].dataObjInp;
00881 l1DataObjInp->numThreads = dataObjInp->numThreads;
00882 if (l1DataObjInp->oprType == PHYMV_OPR) {
00883 L1desc[srcL1descInx].oprType = PHYMV_SRC;
00884 } else {
00885 L1desc[srcL1descInx].oprType = REPLICATE_SRC;
00886 }
00887
00888 if( L1desc[destL1descInx].stageFlag == SYNC_DEST &&
00889 getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL) {
00890 L1desc[srcL1descInx].purgeCacheFlag = 1;
00891 }
00892
00893
00894 if( l1DataObjInp->numThreads > 0 &&
00895 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00896 openedDataObjInp_t dataObjCloseInp;
00897
00898 l1DataObjInp->openFlags = O_RDONLY;
00899 status = dataOpen (rsComm, srcL1descInx);
00900 if (status < 0) {
00901 freeL1desc (srcL1descInx);
00902 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00903 dataObjCloseInp.l1descInx = destL1descInx;
00904 rsDataObjClose (rsComm, &dataObjCloseInp);
00905 return (status);
00906 }
00907 }
00908
00909 L1desc[destL1descInx].srcL1descInx = srcL1descInx;
00910
00911 return (destL1descInx);
00912 }
00913
00914 int
00915 dataObjCopy (rsComm_t *rsComm, int l1descInx)
00916 {
00917 int srcL1descInx, destL1descInx;
00918 int srcL3descInx, destL3descInx;
00919 int status;
00920 portalOprOut_t *portalOprOut = NULL;
00921 dataCopyInp_t dataCopyInp;
00922 dataOprInp_t *dataOprInp;
00923 int srcRemoteFlag, destRemoteFlag;
00924
00925 bzero (&dataCopyInp, sizeof (dataCopyInp));
00926 dataOprInp = &dataCopyInp.dataOprInp;
00927 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00928 destL1descInx = l1descInx;
00929
00930 srcL3descInx = L1desc[srcL1descInx].l3descInx;
00931 destL3descInx = L1desc[destL1descInx].l3descInx;
00932
00933 if (L1desc[srcL1descInx].remoteZoneHost != NULL) {
00934 srcRemoteFlag = REMOTE_ZONE_HOST;
00935 } else {
00936 srcRemoteFlag = FileDesc[srcL3descInx].rodsServerHost->localFlag;
00937 }
00938
00939 if (L1desc[destL1descInx].remoteZoneHost != NULL) {
00940 destRemoteFlag = REMOTE_ZONE_HOST;
00941 } else {
00942 destRemoteFlag = FileDesc[destL3descInx].rodsServerHost->localFlag;
00943 }
00944
00945 if (srcRemoteFlag != REMOTE_ZONE_HOST &&
00946 destRemoteFlag != REMOTE_ZONE_HOST &&
00947 FileDesc[srcL3descInx].rodsServerHost ==
00948 FileDesc[destL3descInx].rodsServerHost) {
00949
00950 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, SAME_HOST_COPY_OPR);
00951
00952 dataCopyInp.portalOprOut.numThreads =
00953 dataCopyInp.dataOprInp.numThreads;
00954 if (srcRemoteFlag == LOCAL_HOST) {
00955 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00956 }
00957
00958 } else if ((srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) ||
00959 destRemoteFlag == REMOTE_ZONE_HOST) {
00960 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_REM_OPR);
00961
00962 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00963
00964
00965 status = preProcParaPut (rsComm, destL1descInx, &portalOprOut);
00966 if (status < 0 || NULL == portalOprOut ) {
00967 rodsLog (LOG_NOTICE,
00968 "dataObjCopy: preProcParaPut error for %s",
00969 L1desc[srcL1descInx].dataObjInfo->objPath);
00970 return (status);
00971 }
00972 dataCopyInp.portalOprOut = *portalOprOut;
00973 } else {
00974 dataCopyInp.portalOprOut.l1descInx = destL1descInx;
00975 }
00976 if (srcRemoteFlag == LOCAL_HOST)
00977 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00978 } else if ((srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) ||
00979 srcRemoteFlag == REMOTE_ZONE_HOST) {
00980
00981 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
00982
00983 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00984
00985 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
00986 if (status < 0 || NULL == portalOprOut ) {
00987 rodsLog (LOG_NOTICE,
00988 "dataObjCopy: preProcParaGet error for %s",
00989 L1desc[srcL1descInx].dataObjInfo->objPath);
00990 return (status);
00991 }
00992 dataCopyInp.portalOprOut = *portalOprOut;
00993 } else {
00994 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
00995 }
00996 if (destRemoteFlag == LOCAL_HOST)
00997 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00998 } else {
00999
01000 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01001
01002
01003 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01004 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01005
01006 if (status < 0 || NULL == portalOprOut ) {
01007 rodsLog (LOG_NOTICE,
01008 "dataObjCopy: preProcParaGet error for %s",
01009 L1desc[srcL1descInx].dataObjInfo->objPath);
01010 return (status);
01011 }
01012 dataCopyInp.portalOprOut = *portalOprOut;
01013 } else {
01014 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01015 }
01016 }
01017
01018 if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01019 NO_CHK_COPY_LEN_KW) != NULL) {
01020
01021 addKeyVal (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW, "");
01022 if (L1desc[l1descInx].dataObjInp->numThreads > 1) {
01023 L1desc[l1descInx].dataObjInp->numThreads =
01024 L1desc[srcL1descInx].dataObjInp->numThreads =
01025 dataCopyInp.portalOprOut.numThreads = 1;
01026 }
01027 }
01028 status = rsDataCopy (rsComm, &dataCopyInp);
01029
01030 if (status >= 0 && portalOprOut != NULL &&
01031 L1desc[l1descInx].dataObjInp != NULL) {
01032
01033 L1desc[l1descInx].dataObjInp->numThreads = portalOprOut->numThreads;
01034 }
01035 if (portalOprOut != NULL) free (portalOprOut);
01036 clearKeyVal (&dataOprInp->condInput);
01037
01038 return (status);
01039 }
01040
01041 int
01042 l3DataCopySingleBuf (rsComm_t *rsComm, int l1descInx)
01043 {
01044 bytesBuf_t dataBBuf;
01045 int bytesRead, bytesWritten;
01046 int srcL1descInx;
01047
01048 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01049 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01050
01051 if (L1desc[srcL1descInx].dataSize < 0) {
01052 rodsLog (LOG_ERROR,
01053 "l3DataCopySingleBuf: dataSize %lld for %s is negative",
01054 L1desc[srcL1descInx].dataSize,
01055 L1desc[srcL1descInx].dataObjInfo->objPath);
01056 return (SYS_COPY_LEN_ERR);
01057 } else if (L1desc[srcL1descInx].dataSize == 0) {
01058 bytesRead = 0;
01059 } else {
01060 dataBBuf.buf = malloc (L1desc[srcL1descInx].dataSize);
01061 bytesRead = rsL3FileGetSingleBuf (rsComm, &srcL1descInx, &dataBBuf);
01062 }
01063
01064 if (bytesRead < 0) {
01065 free( dataBBuf.buf );
01066 return (bytesRead);
01067 } else if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01068 NO_CHK_COPY_LEN_KW) != NULL) {
01069
01070 L1desc[l1descInx].dataSize = L1desc[l1descInx].dataObjInp->dataSize
01071 = bytesRead;
01072 }
01073
01074 bytesWritten = rsL3FilePutSingleBuf (rsComm, &l1descInx, &dataBBuf);
01075
01076 L1desc[l1descInx].bytesWritten = bytesWritten;
01077
01078 if (dataBBuf.buf != NULL) {
01079 free (dataBBuf.buf);
01080 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01081 }
01082
01083 if (bytesWritten != bytesRead) {
01084 if (bytesWritten >= 0) {
01085 rodsLog (LOG_NOTICE,
01086 "l3DataCopySingleBuf: l3FilePut error, towrite %d, written %d",
01087 bytesRead, bytesWritten);
01088 return (SYS_COPY_LEN_ERR);
01089 } else {
01090 return (bytesWritten);
01091 }
01092 }
01093
01094
01095 return (0);
01096 }
01097
01098 int
01099 l3DataStageSync (rsComm_t *rsComm, int l1descInx)
01100 {
01101 bytesBuf_t dataBBuf;
01102 int srcL1descInx;
01103 int status = 0;
01104
01105 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01106
01107 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01108 if (L1desc[srcL1descInx].dataSize < 0 &&
01109 L1desc[srcL1descInx].dataSize != UNKNOWN_FILE_SZ) {
01110 rodsLog (LOG_ERROR,
01111 "l3DataStageSync: dataSize %lld for %s is negative",
01112 L1desc[srcL1descInx].dataSize,
01113 L1desc[srcL1descInx].dataObjInfo->objPath);
01114 return (SYS_COPY_LEN_ERR);
01115 } else if (L1desc[srcL1descInx].dataSize > 0 ||
01116 L1desc[srcL1descInx].dataSize == UNKNOWN_FILE_SZ) {
01117 if (L1desc[l1descInx].stageFlag == SYNC_DEST) {
01118
01119 status = l3FileSync (rsComm, srcL1descInx, l1descInx);
01120 } else {
01121
01122 status = l3FileStage (rsComm, srcL1descInx, l1descInx);
01123 }
01124 }
01125
01126 if (status < 0) {
01127 L1desc[l1descInx].bytesWritten = -1;
01128 } else {
01129 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataSize =
01130 L1desc[srcL1descInx].dataSize;
01131 }
01132
01133 return (status);
01134 }
01135
01136 int
01137 l3FileSync (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01138 {
01139 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01140
01141 fileStageSyncInp_t fileSyncToArchInp;
01142 dataObjInp_t *dataObjInp;
01143 int status;
01144 char *outFileName = NULL;
01145 dataObjInfo_t tmpDataObjInfo;
01146
01147 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01148 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01149
01150 #if 0 // JMC legacy resource
01151 rescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01152 cacheRescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01153
01154 switch (RescTypeDef[rescTypeInx].rescCat) {
01155 case FILE_CAT:
01156
01157 if (RescTypeDef[rescTypeInx].createPathFlag == CREATE_PATH) {
01158 #endif // JMC legacy resource
01159
01160 int dst_create_path = 0;
01161 eirods::error err = eirods::get_resource_property< int >( destDataObjInfo->rescInfo->rescName,
01162 "create_path", dst_create_path );
01163 if( !err.ok() ) {
01164 eirods::log( PASS( false, -1, "l3FileSync - failed.", err ) );
01165 }
01166
01167 if( CREATE_PATH == dst_create_path ) {
01168
01169 status = chkOrphanFile ( rsComm, destDataObjInfo->filePath, destDataObjInfo->rescName, &tmpDataObjInfo );
01170 if (status == 0 && tmpDataObjInfo.dataId != destDataObjInfo->dataId) {
01171
01172 snprintf (destDataObjInfo->filePath, MAX_NAME_LEN,
01173 "%s.%-d", destDataObjInfo->filePath, (int) random());
01174 }
01175 }
01176
01177 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01178 dataObjInp = L1desc[destL1descInx].dataObjInp;
01179 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01180 fileSyncToArchInp.fileType = static_cast< fileDriverType_t >( -1 );
01181 fileSyncToArchInp.cacheFileType = static_cast< fileDriverType_t >( -1 );
01182
01183
01184
01185 std::string location;
01186 eirods::error ret = eirods::get_loc_for_hier_string( srcDataObjInfo->rescHier, location );
01187 if( !ret.ok() ) {
01188 eirods::log( PASSMSG( "l3FileSycn - failed in get_loc_for_hier_String", ret ) );
01189 return -1;
01190 }
01191
01192 rstrcpy( fileSyncToArchInp.addr.hostAddr, location.c_str(), NAME_LEN );
01193
01194
01195 rstrcpy( fileSyncToArchInp.filename, destDataObjInfo->filePath, MAX_NAME_LEN);
01196 rstrcpy( fileSyncToArchInp.cacheFilename, srcDataObjInfo->filePath, MAX_NAME_LEN);
01197
01198 fileSyncToArchInp.mode = getFileMode (dataObjInp);
01199 status = rsFileSyncToArch (rsComm, &fileSyncToArchInp, &outFileName);
01200
01201 if (status >= 0 &&
01202
01203 NO_CREATE_PATH == dst_create_path &&
01204 outFileName != NULL) {
01205
01206 rstrcpy (destDataObjInfo->filePath, outFileName, MAX_NAME_LEN);
01207 L1desc[destL1descInx].replStatus |= FILE_PATH_HAS_CHG;
01208 free (outFileName);
01209 }
01210 #if 0 // JMC legacy resource
01211 break;
01212 default:
01213 rodsLog (LOG_ERROR,
01214 "l3FileSync: rescCat type %d is not recognized",
01215 RescTypeDef[rescTypeInx].rescCat);
01216 status = SYS_INVALID_RESC_TYPE;
01217 break;
01218 }
01219 #endif // JMC legacy resource
01220 return (status);
01221 }
01222
01223 int
01224 l3FileStage (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01225 {
01226 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01227 int mode, status;
01228
01229 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01230 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01231
01232 mode = getFileMode (L1desc[destL1descInx].dataObjInp);
01233
01234 status = _l3FileStage (rsComm, srcDataObjInfo, destDataObjInfo, mode);
01235
01236 return status;
01237 }
01238
01239 int
01240 _l3FileStage (rsComm_t *rsComm, dataObjInfo_t *srcDataObjInfo,
01241 dataObjInfo_t *destDataObjInfo, int mode)
01242 {
01243
01244 fileStageSyncInp_t fileSyncToArchInp;
01245 int status;
01246
01247 #if 0 // JMC legacy resource
01248 rescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01249 cacheRescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01250
01251
01252 switch (RescTypeDef[rescTypeInx].rescCat) {
01253 case FILE_CAT:
01254 #endif // JMC legacy resource
01255 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01256 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01257 fileSyncToArchInp.fileType = static_cast< fileDriverType_t >( -1 );
01258 fileSyncToArchInp.cacheFileType = static_cast< fileDriverType_t >( -1 );
01259
01260 rstrcpy (fileSyncToArchInp.addr.hostAddr,
01261 destDataObjInfo->rescInfo->rescLoc, NAME_LEN);
01262
01263 rstrcpy( fileSyncToArchInp.cacheFilename, destDataObjInfo->filePath,
01264 MAX_NAME_LEN);
01265 rstrcpy( fileSyncToArchInp.filename, srcDataObjInfo->filePath,
01266 MAX_NAME_LEN);
01267 fileSyncToArchInp.mode = mode;
01268 status = rsFileStageToCache (rsComm, &fileSyncToArchInp);
01269
01270 #if 0 // JMC legacy resource
01271 break;
01272 default:
01273 rodsLog (LOG_ERROR,
01274 "l3FileStage: rescCat type %d is not recognized",
01275 RescTypeDef[rescTypeInx].rescCat);
01276 status = SYS_INVALID_RESC_TYPE;
01277 break;
01278 }
01279 #endif // JMC legacy resource
01280 return (status);
01281 }
01282
01283
01284
01285
01286
01287
01288
01289
01290
01291 int
01292 rsReplAndRequeDataObjInfo (rsComm_t *rsComm,
01293 dataObjInfo_t **srcDataObjInfoHead, char *destRescName, char *flagStr)
01294 {
01295 dataObjInfo_t *dataObjInfoHead, *myDataObjInfo;
01296 transferStat_t transStat;
01297 dataObjInp_t dataObjInp;
01298 char tmpStr[NAME_LEN];
01299 int status;
01300
01301 dataObjInfoHead = *srcDataObjInfoHead;
01302 myDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01303 memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
01304 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01305 memset (&transStat, 0, sizeof (transStat));
01306
01307 if (flagStr != NULL) {
01308 if (strstr (flagStr, ALL_KW) != NULL) {
01309 addKeyVal (&dataObjInp.condInput, ALL_KW, "");
01310 }
01311 if (strstr (flagStr, RBUDP_TRANSFER_KW) != NULL) {
01312 addKeyVal (&dataObjInp.condInput, RBUDP_TRANSFER_KW, "");
01313 }
01314 if (strstr (flagStr, SU_CLIENT_USER_KW) != NULL) {
01315 addKeyVal (&dataObjInp.condInput, SU_CLIENT_USER_KW, "");
01316 }
01317 if (strstr (flagStr, UPDATE_REPL_KW) != NULL) {
01318 addKeyVal (&dataObjInp.condInput, UPDATE_REPL_KW, "");
01319 }
01320 }
01321
01322 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01323 snprintf (tmpStr, NAME_LEN, "%d", dataObjInfoHead->replNum);
01324 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01325 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, destRescName);
01326
01327 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01328 myDataObjInfo);
01329
01330
01331 clearKeyVal (&dataObjInp.condInput);
01332 if (status >= 0) {
01333 status = 1;
01334
01335 queDataObjInfo (srcDataObjInfoHead, myDataObjInfo, 0, 1);
01336 } else {
01337 freeAllDataObjInfo (myDataObjInfo);
01338 }
01339
01340 return status;
01341 }
01342
01343 #if 0 // JMC - legacy resource
01344 int
01345 stageDataFromCompToCache (rsComm_t *rsComm, dataObjInfo_t *compObjInfo,
01346 dataObjInfo_t *outCacheObjInfo)
01347 {
01348 int status;
01349 rescInfo_t *cacheResc;
01350 transferStat_t transStat;
01351 dataObjInp_t dataObjInp;
01352 char tmpStr[NAME_LEN];
01353
01354 #if 0 // JMC - legacy resource
01355 if (getRescClass (compObjInfo->rescInfo) != COMPOUND_CL) return 0;
01356 #endif // JMC - legacy resource
01357
01358 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,compObjInfo->rescInfo, &cacheResc);
01359 if (status < 0) {
01360 rodsLog (LOG_ERROR,
01361 "stageDataFromCompToCache: getCacheRescInGrp %s failed for %s stat=%d",
01362 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01363 return status;
01364 }
01365 if (outCacheObjInfo != NULL)
01366 memset (outCacheObjInfo, 0, sizeof (dataObjInfo_t));
01367 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01368 memset (&transStat, 0, sizeof (transStat));
01369
01370 rstrcpy (dataObjInp.objPath, compObjInfo->objPath, MAX_NAME_LEN);
01371 snprintf (tmpStr, NAME_LEN, "%d", compObjInfo->replNum);
01372 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01373 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, cacheResc->rescName);
01374
01375 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01376 outCacheObjInfo);
01377
01378 clearKeyVal (&dataObjInp.condInput);
01379 return status;
01380 }
01381
01382
01383
01384
01385
01386
01387 int
01388 stageAndRequeDataToCache (rsComm_t *rsComm, dataObjInfo_t **compObjInfoHead)
01389 {
01390 int status;
01391 dataObjInfo_t *outCacheObjInfo;
01392
01393 outCacheObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01394 bzero (outCacheObjInfo, sizeof (dataObjInfo_t));
01395 status = stageDataFromCompToCache (rsComm, *compObjInfoHead,
01396 outCacheObjInfo);
01397
01398 if (status < 0) {
01399
01400 if (outCacheObjInfo->dataId > 0) {
01401
01402 if( requeDataObjInfoByReplNum (compObjInfoHead,
01403 outCacheObjInfo->replNum) == 0) {
01404
01405 status = 0;
01406 }
01407 }
01408 free (outCacheObjInfo);
01409 } else {
01410 queDataObjInfo (compObjInfoHead, outCacheObjInfo, 0, 1);
01411 }
01412 return status;
01413 }
01414
01415 int
01416 replToCacheRescOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01417 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *compObjInfo,
01418 dataObjInfo_t *oldDataObjInfo, dataObjInfo_t **outDestDataObjInfo)
01419 {
01420 int status = 0;
01421 rescInfo_t *cacheResc;
01422 dataObjInfo_t *destDataObjInfo, *srcDataObjInfo;
01423 dataObjInfo_t *tmpDestDataObjInfo = NULL;
01424 int updateFlag;
01425
01426 if ((status = getCacheDataInfoForRepl (rsComm, oldDataObjInfo, NULL, compObjInfo, &tmpDestDataObjInfo)) >= 0) {
01427 cacheResc = oldDataObjInfo->rescInfo;
01428 updateFlag = 1;
01429 } else {
01430
01431 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01432 compObjInfo->rescInfo, &cacheResc);
01433 if (status < 0) {
01434 rodsLog (LOG_ERROR,
01435 "replToCacheRescOfCompObj:getCacheRescInGrp %s err for %s stat=%d",
01436 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01437 return status;
01438 }
01439 updateFlag = 0;
01440 }
01441
01442 if (outDestDataObjInfo == NULL) {
01443 destDataObjInfo = tmpDestDataObjInfo;
01444 } else {
01445 destDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01446 if (tmpDestDataObjInfo == NULL) {
01447 bzero (destDataObjInfo, sizeof (dataObjInfo_t));
01448 } else {
01449 *destDataObjInfo = *tmpDestDataObjInfo;
01450 }
01451 }
01452 srcDataObjInfo = srcDataObjInfoHead;
01453 while (srcDataObjInfo != NULL) {
01454 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
01455 cacheResc, compObjInfo->rescGroupName, destDataObjInfo, updateFlag);
01456 if (status >= 0) {
01457 if (outDestDataObjInfo != NULL)
01458 *outDestDataObjInfo = destDataObjInfo;
01459 break;
01460 }
01461 srcDataObjInfo = srcDataObjInfo->next;
01462 }
01463
01464 return status;
01465 }
01466 #endif // JMC - legacy resource
01467 int
01468 stageBundledData (rsComm_t *rsComm, dataObjInfo_t **subfileObjInfoHead)
01469 {
01470 int status;
01471 dataObjInfo_t *dataObjInfoHead = *subfileObjInfoHead;
01472 rescInfo_t *cacheResc;
01473 dataObjInp_t dataObjInp;
01474 dataObjInfo_t *cacheObjInfo;
01475
01476 #if 0 // JMC - legacy resource
01477 if (getRescClass (dataObjInfoHead->rescInfo) != BUNDLE_CL) return 0;
01478 #endif // JMC - legacy resource
01479
01480 status = unbunAndStageBunfileObj (rsComm, dataObjInfoHead->filePath,
01481 &cacheResc);
01482
01483 if (status < 0) return status;
01484
01485
01486 bzero (&dataObjInp, sizeof (dataObjInp));
01487 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01488 addKeyVal (&dataObjInp.condInput, RESC_NAME_KW, cacheResc->rescName);
01489 status = getDataObjInfo (rsComm, &dataObjInp, &cacheObjInfo, NULL, 0);
01490 clearKeyVal (&dataObjInp.condInput);
01491 if (status < 0) {
01492 rodsLog (LOG_ERROR,
01493 "unbunAndStageBunfileObj: getDataObjInfo of subfile %s failed.stat=%d",
01494 dataObjInp.objPath, status);
01495 return status;
01496 }
01497
01498 queDataObjInfo (subfileObjInfoHead, cacheObjInfo, 0, 1);
01499
01500
01501 return status;
01502 }
01503
01504 int
01505 unbunAndStageBunfileObj (rsComm_t *rsComm, char *bunfileObjPath,
01506 rescInfo_t **outCacheResc)
01507 {
01508 dataObjInfo_t *bunfileObjInfoHead;
01509 dataObjInp_t dataObjInp;
01510 int status;
01511
01512
01513 bzero (&dataObjInp, sizeof (dataObjInp));
01514 rstrcpy (dataObjInp.objPath, bunfileObjPath, MAX_NAME_LEN);
01515
01516 status = getDataObjInfo (rsComm, &dataObjInp, &bunfileObjInfoHead, NULL, 1);
01517 if (status < 0) {
01518 rodsLog (LOG_ERROR,
01519 "unbunAndStageBunfileObj: getDataObjInfo of bunfile %s failed.stat=%d",
01520 dataObjInp.objPath, status);
01521 return status;
01522 }
01523 status = _unbunAndStageBunfileObj (rsComm, &bunfileObjInfoHead,
01524 outCacheResc, 0);
01525
01526 freeAllDataObjInfo (bunfileObjInfoHead);
01527
01528 return status;
01529 }
01530
01531 int
01532 _unbunAndStageBunfileObj (rsComm_t *rsComm, dataObjInfo_t **bunfileObjInfoHead,
01533 rescInfo_t **outCacheResc, int rmBunCopyFlag)
01534 {
01535 int status;
01536
01537 dataObjInp_t dataObjInp;
01538
01539 bzero (&dataObjInp, sizeof (dataObjInp));
01540 bzero (&dataObjInp.condInput, sizeof (dataObjInp.condInput));
01541 rstrcpy (dataObjInp.objPath, (*bunfileObjInfoHead)->objPath, MAX_NAME_LEN);
01542 status = sortObjInfoForOpen (rsComm, bunfileObjInfoHead, NULL, 0);
01543
01544 addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, (*bunfileObjInfoHead)->rescHier );
01545 if (status < 0) return status;
01546
01547 #if 0 // JMC - legacy resource
01548 if (getRescClass ((*bunfileObjInfoHead)->rescInfo) != CACHE_CL) {
01549
01550 status = getCacheRescInGrp (rsComm,
01551 (*bunfileObjInfoHead)->rescGroupName,
01552 (*bunfileObjInfoHead)->rescInfo, &cacheResc);
01553 if (status < 0) {
01554 rodsLog (LOG_ERROR,
01555 "unbunAndStageBunfileObj:getCacheRescInGrp %s err for %s stat=%d",
01556 (*bunfileObjInfoHead)->rescGroupName,
01557 (*bunfileObjInfoHead)->objPath, status);
01558 return status;
01559 }
01560 if (outCacheResc != NULL)
01561 *outCacheResc = cacheResc;
01562
01563
01564 status = rsReplAndRequeDataObjInfo (rsComm, bunfileObjInfoHead,
01565 cacheResc->rescName, SU_CLIENT_USER_KW);
01566 if (status < 0) {
01567 rodsLog (LOG_ERROR,
01568 "unbunAndStageBunfileObj:rsReplAndRequeDataObjInfo %s err stat=%d",
01569 (*bunfileObjInfoHead)->objPath, status);
01570 return status;
01571 }
01572 } else
01573 #endif // JMC - legacy resource
01574
01575 if (outCacheResc != NULL)
01576 *outCacheResc = (*bunfileObjInfoHead)->rescInfo;
01577
01578 addKeyVal (&dataObjInp.condInput, BUN_FILE_PATH_KW,
01579 (*bunfileObjInfoHead)->filePath);
01580 if (rmBunCopyFlag > 0) {
01581 addKeyVal (&dataObjInp.condInput, RM_BUN_COPY_KW, "");
01582 }
01583 if (strlen ((*bunfileObjInfoHead)->dataType) > 0) {
01584 addKeyVal (&dataObjInp.condInput, DATA_TYPE_KW,
01585 (*bunfileObjInfoHead)->dataType);
01586 }
01587 status = _rsUnbunAndRegPhyBunfile (rsComm, &dataObjInp,
01588 (*bunfileObjInfoHead)->rescInfo);
01589
01590 return status;
01591 }
01592
01593 #if 0 // JMC - legacy resource
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609 int
01610 getCacheDataInfoOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01611 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01612 dataObjInfo_t *compDataObjInfo, dataObjInfo_t *oldDataObjInfo,
01613 dataObjInfo_t **outDataObjInfo)
01614 {
01615 int status;
01616
01617 if ((status = getCacheDataInfoForRepl (rsComm, srcDataObjInfoHead,
01618 destDataObjInfoHead, compDataObjInfo, outDataObjInfo)) < 0) {
01619
01620 status = replToCacheRescOfCompObj (rsComm, dataObjInp,
01621 srcDataObjInfoHead, compDataObjInfo, oldDataObjInfo,
01622 outDataObjInfo);
01623 if (status < 0) {
01624 rodsLog (LOG_ERROR,
01625 "_rsDataObjRepl: replToCacheRescOfCompObj of %s error stat=%d",
01626 srcDataObjInfoHead->objPath, status);
01627 return status;
01628 }
01629
01630
01631 queDataObjInfo (&srcDataObjInfoHead, *outDataObjInfo, 1, 0);
01632 }
01633 return status;
01634 }
01635
01636 int
01637 getCacheDataInfoOfCompResc (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01638 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01639 rescGrpInfo_t *compRescGrpInfo, dataObjInfo_t *oldDataObjInfo,
01640 dataObjInfo_t **outDataObjInfo)
01641 {
01642 int status;
01643 dataObjInfo_t compDataObjInfo;
01644
01645
01646 bzero (&compDataObjInfo, sizeof (compDataObjInfo));
01647 rstrcpy (compDataObjInfo.rescGroupName, compRescGrpInfo->rescGroupName,
01648 NAME_LEN);
01649 compDataObjInfo.rescInfo = compRescGrpInfo->rescInfo;
01650 status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
01651 srcDataObjInfoHead, destDataObjInfoHead, &compDataObjInfo,
01652 oldDataObjInfo, outDataObjInfo);
01653
01654 return status;
01655 }
01656
01657 #endif // JMC - legacy resource