00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "dataObjRepl.h"
00011 #include "dataObjOpr.h"
00012 #include "dataObjCreate.h"
00013 #include "dataObjOpen.h"
00014 #include "dataObjPut.h"
00015 #include "dataObjGet.h"
00016 #include "rodsLog.h"
00017 #include "objMetaOpr.h"
00018 #include "physPath.h"
00019 #include "specColl.h"
00020 #include "resource.h"
00021 #include "reGlobalsExtern.h"
00022 #include "reDefines.h"
00023 #include "reSysDataObjOpr.h"
00024 #include "getRemoteZoneResc.h"
00025 #include "l3FileGetSingleBuf.h"
00026 #include "l3FilePutSingleBuf.h"
00027 #include "fileSyncToArch.h"
00028 #include "fileStageToCache.h"
00029 #include "unbunAndRegPhyBunfile.h"
00030 #include "dataObjTrim.h"
00031 #include "dataObjLock.h"
00032
00033
00034
00035 #include "eirods_resource_backport.h"
00036 #include "eirods_resource_redirect.h"
00037 #include "eirods_log.h"
00038
00039
00040 int
00041 rsDataObjRepl250 (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00042 transStat_t **transStat)
00043 {
00044 int status;
00045 transferStat_t *transferStat = NULL;
00046
00047 status = rsDataObjRepl (rsComm, dataObjInp, &transferStat);
00048
00049 if (transStat != NULL && status >= 0 && transferStat != NULL) {
00050 *transStat = (transStat_t *) malloc (sizeof (transStat_t));
00051 (*transStat)->numThreads = transferStat->numThreads;
00052 (*transStat)->bytesWritten = transferStat->bytesWritten;
00053 free (transferStat);
00054 }
00055 return status;
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066 int
00067 rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00068 transferStat_t **transStat)
00069 {
00070
00071 int status;
00072 int remoteFlag;
00073 rodsServerHost_t *rodsServerHost;
00074 dataObjInfo_t *dataObjInfo = NULL;
00075 char* lockType = NULL;
00076 int lockFd = -1;
00077
00078 char* stage_kw = getValByKey( &dataObjInp->condInput, STAGE_OBJ_KW );
00079 char* sync_kw = getValByKey( &dataObjInp->condInput, SYNC_OBJ_KW );
00080
00081 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00082
00083 if (rsComm->proxyUser.authInfo.authFlag < REMOTE_PRIV_USER_AUTH) {
00084 return(CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00085 }
00086 }
00087
00088 status = resolvePathInSpecColl (rsComm, dataObjInp->objPath,
00089 READ_COLL_PERM, 0, &dataObjInfo);
00090
00091 if (status == DATA_OBJ_T) {
00092 if (dataObjInfo != NULL && dataObjInfo->specColl != NULL) {
00093 if (dataObjInfo->specColl->collClass == LINKED_COLL) {
00094 rstrcpy (dataObjInp->objPath, dataObjInfo->objPath,
00095 MAX_NAME_LEN);
00096 freeAllDataObjInfo (dataObjInfo);
00097 } else {
00098 freeAllDataObjInfo (dataObjInfo);
00099 return (SYS_REG_OBJ_IN_SPEC_COLL);
00100 }
00101 }
00102 }
00103
00104 remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost,
00105 REMOTE_OPEN);
00106
00107 if (remoteFlag < 0) {
00108 return (remoteFlag);
00109 } else if (remoteFlag == REMOTE_HOST) {
00110 status = _rcDataObjRepl (rodsServerHost->conn, dataObjInp,
00111 transStat);
00112 return status;
00113 }
00114
00115
00116
00117 std::string hier;
00118 char* tmp_hier = getValByKey( &dataObjInp->condInput, RESC_HIER_STR_KW );
00119 if( 0 == tmp_hier ) {
00120
00121 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00122
00123 eirods::error ret = eirods::resolve_resource_hierarchy( eirods::EIRODS_OPEN_OPERATION,
00124 rsComm, dataObjInp, hier );
00125 if( !ret.ok() ) {
00126 std::stringstream msg;
00127 msg << "failed in eirods::resolve_resource_hierarchy for [";
00128 msg << dataObjInp->objPath << "]";
00129 eirods::log( PASSMSG( msg.str(), ret ) );
00130 return ret.code();
00131 }
00132
00133 addKeyVal( &dataObjInp->condInput, RESC_HIER_STR_KW, hier.c_str() );
00134 } else {
00135 hier = tmp_hier;
00136 }
00137
00138
00139
00140 *transStat = (transferStat_t*)malloc (sizeof (transferStat_t));
00141 memset (*transStat, 0, sizeof (transferStat_t));
00142
00143
00144 lockType = getValByKey (&dataObjInp->condInput, LOCK_TYPE_KW);
00145 if (lockType != NULL) {
00146 lockFd = rsDataObjLock (rsComm, dataObjInp);
00147 if (lockFd >= 0) {
00148
00149 rmKeyVal (&dataObjInp->condInput, LOCK_TYPE_KW);
00150 } else {
00151 rodsLogError (LOG_ERROR, lockFd,
00152 "rsDataObjRepl: rsDataObjLock error for %s. lockType = %s",
00153 dataObjInp->objPath, lockType);
00154 return lockFd;
00155 }
00156 }
00157
00158
00159 status = _rsDataObjRepl (rsComm, dataObjInp, *transStat, NULL);
00160 if(status < 0) {
00161 rodsLog(LOG_NOTICE, "%s - Failed to replicate data object.", __FUNCTION__);
00162 }
00163
00164 if (lockFd > 0) rsDataObjUnlock (rsComm, dataObjInp, lockFd);
00165
00166 return (status);
00167 }
00168
00169 int
00170 _rsDataObjRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
00171 transferStat_t *transStat, dataObjInfo_t *outDataObjInfo)
00172 {
00173 int status;
00174 dataObjInfo_t *dataObjInfoHead = NULL;
00175 dataObjInfo_t *oldDataObjInfoHead = NULL;
00176 dataObjInfo_t *destDataObjInfo = NULL;
00177 rescGrpInfo_t *myRescGrpInfo = NULL;
00178 ruleExecInfo_t rei;
00179 int multiCopyFlag;
00180 char *accessPerm;
00181 int backupFlag;
00182 int allFlag;
00183 int savedStatus = 0;
00184 if (getValByKey (&dataObjInp->condInput, SU_CLIENT_USER_KW) != NULL) {
00185 accessPerm = NULL;
00186 } else if (getValByKey (&dataObjInp->condInput, IRODS_ADMIN_KW) != NULL) {
00187 if (rsComm->clientUser.authInfo.authFlag < LOCAL_PRIV_USER_AUTH) {
00188 return (CAT_INSUFFICIENT_PRIVILEGE_LEVEL);
00189 }
00190 accessPerm = NULL;
00191 } else {
00192 accessPerm = ACCESS_READ_OBJECT;
00193 }
00194
00195
00196 initReiWithDataObjInp (&rei, rsComm, dataObjInp);
00197 status = applyRule ("acSetMultiReplPerResc", NULL, &rei, NO_SAVE_REI);
00198 if (strcmp (rei.statusStr, MULTI_COPIES_PER_RESC) == 0) {
00199 multiCopyFlag = 1;
00200 } else {
00201 multiCopyFlag = 0;
00202 }
00203
00204
00205 if (multiCopyFlag) {
00206 status = getDataObjInfo (rsComm, dataObjInp, &dataObjInfoHead,
00207 accessPerm, 0);
00208 } else {
00209
00210
00211 status = getDataObjInfo( rsComm, dataObjInp, &dataObjInfoHead, accessPerm, 1 );
00212 }
00213
00214 if (status < 0) {
00215 rodsLog (LOG_NOTICE,
00216 "%s: getDataObjInfo for %s failed", __FUNCTION__, dataObjInp->objPath);
00217 return (status);
00218 }
00219
00220 if (getValByKey (&dataObjInp->condInput, UPDATE_REPL_KW) != NULL) {
00221 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, 0 );
00222 if (status < 0) {
00223 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication update.", __FUNCTION__);
00224 return status;
00225 }
00226
00227 status = _rsDataObjReplUpdate (rsComm, dataObjInp, dataObjInfoHead, oldDataObjInfoHead, transStat, NULL);
00228
00229 if (status >= 0 && outDataObjInfo != NULL) {
00230 *outDataObjInfo = *oldDataObjInfoHead;
00231 outDataObjInfo->next = NULL;
00232 } else {
00233 if(status < 0) {
00234 rodsLog(LOG_NOTICE, "%s - Failed to update replica.", __FUNCTION__);
00235 }
00236 }
00237
00238 freeAllDataObjInfo (dataObjInfoHead);
00239 freeAllDataObjInfo (oldDataObjInfoHead);
00240
00241 return status;
00242 }
00243
00244
00245 status = sortObjInfoForRepl( &dataObjInfoHead, &oldDataObjInfoHead, multiCopyFlag);
00246
00247 if (status < 0) {
00248 rodsLog(LOG_NOTICE, "%s - Failed to sort objects for replication.", __FUNCTION__);
00249 return status;
00250 }
00251
00252 if (getValByKey (&dataObjInp->condInput, BACKUP_RESC_NAME_KW) != NULL) {
00253
00254 backupFlag = 1;
00255 multiCopyFlag = 0;
00256 } else {
00257 backupFlag = 0;
00258 }
00259
00260 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00261 allFlag = 1;
00262 } else {
00263 allFlag = 0;
00264 }
00265
00266 if( backupFlag == 0 && allFlag == 1 &&
00267 getValByKey (&dataObjInp->condInput, DEST_RESC_NAME_KW) == NULL &&
00268 dataObjInfoHead != NULL && dataObjInfoHead->rescGroupName[0] != '\0') {
00269
00270 addKeyVal( &dataObjInp->condInput, DEST_RESC_NAME_KW, dataObjInfoHead->rescGroupName );
00271 }
00272
00273
00274 dataObjInp->oprType = REPLICATE_OPR;
00275 status = getRescGrpForCreate( rsComm, dataObjInp, &myRescGrpInfo );
00276 if (status < 0) {
00277 rodsLog(LOG_NOTICE, "%s - Failed to get a resource group for create.", __FUNCTION__);
00278 return status;
00279 }
00280
00281 if (multiCopyFlag == 0 ) {
00282
00283
00284
00285
00286
00287
00288
00289 status = resolveSingleReplCopy( &dataObjInfoHead, &oldDataObjInfoHead,
00290 &myRescGrpInfo, &destDataObjInfo,
00291 &dataObjInp->condInput );
00292
00293 if (status == HAVE_GOOD_COPY) {
00294
00295
00296 #if 0 // JMC - legacy resource :: we no longer deal with the cache here
00297 dataObjInfo_t *cacheDataObjInfo = NULL;
00298 dataObjInfo_t *compDataObjInfo = NULL;
00299
00300
00301 if( getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL &&
00302
00303
00304 #if 0
00305 getDataObjByClass (dataObjInfoHead, CACHE_CL, &cacheDataObjInfo)
00306 >= 0 && cacheDataObjInfo != destDataObjInfo) [[
00307 #else
00308 getDataObjByClass( dataObjInfoHead, COMPOUND_CL, &compDataObjInfo ) >= 0 &&
00309 strlen (compDataObjInfo->rescGroupName) > 0 &&
00310 getCacheDataInfoForRepl (rsComm, dataObjInfoHead, NULL,compDataObjInfo, &cacheDataObjInfo) >= 0 ) [[
00311 #endif
00312
00313
00314 int status1 = trimDataObjInfo (rsComm, cacheDataObjInfo);
00315 if (status1 < 0) [[
00316 rodsLog (LOG_NOTICE,
00317 "_rsDataObjRepl: trimDataObjInfo for %s",
00318 dataObjInp->objPath);
00319 ]]
00320 ]]
00321 #endif // JMC - legacy resource
00322
00323 if (outDataObjInfo != NULL && destDataObjInfo != NULL) {
00324
00325 *outDataObjInfo = *destDataObjInfo;
00326 outDataObjInfo->next = NULL;
00327 }
00328
00329
00330
00331 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00332 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00333 ( myRescGrpInfo->status < 0 ) ) {
00334 status = myRescGrpInfo->status;
00335
00336 } else {
00337 status = 0;
00338 }
00339
00340
00341
00342 if( backupFlag == 0 && myRescGrpInfo != NULL &&
00343 ( allFlag == 1 || myRescGrpInfo->next == NULL ) &&
00344 ( myRescGrpInfo->status < 0 ) ) {
00345 status = myRescGrpInfo->status;
00346
00347 } else {
00348 status = 0;
00349 }
00350
00351 freeAllDataObjInfo (dataObjInfoHead);
00352 freeAllDataObjInfo (oldDataObjInfoHead);
00353 freeAllDataObjInfo (destDataObjInfo);
00354 freeAllRescGrpInfo (myRescGrpInfo);
00355 return status;
00356 } else if (status < 0) {
00357 freeAllDataObjInfo (dataObjInfoHead);
00358 freeAllDataObjInfo (oldDataObjInfoHead);
00359 freeAllDataObjInfo (destDataObjInfo);
00360 freeAllRescGrpInfo (myRescGrpInfo);
00361 rodsLog(LOG_NOTICE, "%s - Failed to resolve a single replication copy.", __FUNCTION__);
00362 return status;
00363 }
00364
00365
00366 }
00367
00368
00369 status = applyPreprocRuleForOpen (rsComm, dataObjInp, &dataObjInfoHead);
00370 if (status < 0) return status;
00371
00372
00373
00374 if (destDataObjInfo != NULL) {
00375 status = _rsDataObjReplUpdate( rsComm, dataObjInp, dataObjInfoHead,
00376 destDataObjInfo, transStat, oldDataObjInfoHead);
00377 if (status >= 0) {
00378 if (outDataObjInfo != NULL) {
00379 *outDataObjInfo = *destDataObjInfo;
00380 outDataObjInfo->next = NULL;
00381 }
00382 if (allFlag == 0) {
00383 freeAllDataObjInfo (dataObjInfoHead);
00384 freeAllDataObjInfo (oldDataObjInfoHead);
00385 freeAllDataObjInfo (destDataObjInfo);
00386 freeAllRescGrpInfo (myRescGrpInfo);
00387 return 0;
00388 } else {
00389
00390
00391 queDataObjInfo (&dataObjInfoHead, destDataObjInfo, 0, 1);
00392 destDataObjInfo = NULL;
00393 }
00394 } else {
00395 savedStatus = status;
00396 }
00397 }
00398
00399 if (myRescGrpInfo != NULL) {
00400
00401 status = _rsDataObjReplNewCopy( rsComm, dataObjInp, dataObjInfoHead,
00402 myRescGrpInfo, transStat, oldDataObjInfoHead,
00403 outDataObjInfo);
00404 if (status < 0) savedStatus = status;
00405 }
00406
00407 freeAllDataObjInfo (dataObjInfoHead);
00408 freeAllDataObjInfo (oldDataObjInfoHead);
00409 freeAllRescGrpInfo (myRescGrpInfo);
00410
00411 return (savedStatus);
00412 }
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426 int
00427 _rsDataObjReplUpdate( rsComm_t* rsComm, dataObjInp_t* dataObjInp,
00428 dataObjInfo_t* srcDataObjInfoHead, dataObjInfo_t* destDataObjInfoHead,
00429 transferStat_t* transStat, dataObjInfo_t* oldDataObjInfo ) {
00430 dataObjInfo_t *destDataObjInfo = 0;
00431 dataObjInfo_t *srcDataObjInfo = 0;
00432 int status;
00433 int allFlag;
00434 int savedStatus = 0;
00435 int replCnt = 0;
00436
00437 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00438 allFlag = 1;
00439 } else {
00440 allFlag = 0;
00441 }
00442
00443 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00444 destDataObjInfo = destDataObjInfoHead;
00445 while (destDataObjInfo != NULL) {
00446 if (destDataObjInfo->dataId == 0) {
00447 destDataObjInfo = destDataObjInfo->next;
00448 continue;
00449 }
00450
00451 #if 0 // JMC - legacy resource
00452
00453 if (getRescClass (destDataObjInfo->rescInfo) == COMPOUND_CL) {
00454
00455 if ((status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
00456 srcDataObjInfoHead, destDataObjInfoHead, destDataObjInfo,
00457 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00458 return status;
00459 }
00460 status = _rsDataObjReplS (rsComm, dataObjInp,
00461 srcDataObjInfo, NULL, "", destDataObjInfo, 1);
00462 } else
00463 #endif // JMC - legacy resource
00464 {
00465 srcDataObjInfo = srcDataObjInfoHead;
00466 while (srcDataObjInfo != NULL) {
00467
00468 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo, NULL, "", destDataObjInfo, 1 );
00469
00470 if (status >= 0) {
00471 break;
00472 }
00473 srcDataObjInfo = srcDataObjInfo->next;
00474 }
00475 }
00476 if (status >= 0) {
00477 transStat->numThreads = dataObjInp->numThreads;
00478 if (allFlag == 0) {
00479 return 0;
00480 }
00481 } else {
00482 savedStatus = status;
00483 replCnt ++;
00484 }
00485 destDataObjInfo = destDataObjInfo->next;
00486 }
00487
00488 return savedStatus;
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503 int
00504 _rsDataObjReplNewCopy (rsComm_t *rsComm,
00505 dataObjInp_t *dataObjInp,
00506 dataObjInfo_t *srcDataObjInfoHead,
00507 rescGrpInfo_t *destRescGrpInfo,
00508 transferStat_t *transStat,
00509 dataObjInfo_t *oldDataObjInfo,
00510 dataObjInfo_t *outDataObjInfo)
00511 {
00512 dataObjInfo_t *srcDataObjInfo;
00513 rescGrpInfo_t *tmpRescGrpInfo;
00514 rescInfo_t *tmpRescInfo;
00515 int status;
00516 int allFlag;
00517 int savedStatus = 0;
00518
00519 #if 0
00520 rescInfo_t *compRescInfo = NULL;
00521 rescInfo_t *cacheRescInfo = NULL;
00522 #endif
00523
00524 if (getValByKey (&dataObjInp->condInput, ALL_KW) != NULL) {
00525 allFlag = 1;
00526 } else {
00527 allFlag = 0;
00528 }
00529 #if 0 // JMC - legacy resource
00530
00531
00532
00533
00534 if( allFlag == 1 && destRescGrpInfo != NULL &&
00535 strlen(destRescGrpInfo->rescGroupName) > 0 &&
00536 getRescGrpClass (destRescGrpInfo, &compRescInfo) == COMPOUND_CL) {
00537 getCacheRescInGrp (rsComm, destRescGrpInfo->rescGroupName,compRescInfo, &cacheRescInfo);
00538 }
00539 #endif // JMC - legacy resource
00540
00541
00542 transStat->bytesWritten = srcDataObjInfoHead->dataSize;
00543 tmpRescGrpInfo = destRescGrpInfo;
00544 while (tmpRescGrpInfo != NULL) {
00545 tmpRescInfo = tmpRescGrpInfo->rescInfo;
00546 #if 0 // JMC - legacy resource
00547
00548
00549 if (tmpRescInfo == cacheRescInfo) {
00550
00551
00552 tmpRescGrpInfo = tmpRescGrpInfo->next;
00553 continue;
00554 }
00555
00556 if (getRescClass (tmpRescInfo) == COMPOUND_CL) {
00557
00558 if ((status = getCacheDataInfoOfCompResc (rsComm, dataObjInp,
00559 srcDataObjInfoHead, NULL, tmpRescGrpInfo,
00560 oldDataObjInfo, &srcDataObjInfo)) < 0) {
00561 return status;
00562 }
00563
00564 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
00565 tmpRescInfo, tmpRescGrpInfo->rescGroupName, outDataObjInfo, 0);
00566 } else {
00567 #endif // JMC - legacy resource
00568
00569
00570 {
00571 srcDataObjInfo = srcDataObjInfoHead;
00572 while (srcDataObjInfo != NULL) {
00573 status = _rsDataObjReplS( rsComm, dataObjInp, srcDataObjInfo,
00574 tmpRescInfo, tmpRescGrpInfo->rescGroupName,
00575 outDataObjInfo, 0);
00576 if (status >= 0) {
00577 break;
00578 } else {
00579 savedStatus = status;
00580 }
00581 srcDataObjInfo = srcDataObjInfo->next;
00582 }
00583 }
00584 if (status >= 0) {
00585 transStat->numThreads = dataObjInp->numThreads;
00586 if (allFlag == 0) {
00587 return 0;
00588 }
00589 } else {
00590 savedStatus = status;
00591 }
00592 tmpRescGrpInfo = tmpRescGrpInfo->next;
00593 }
00594
00595 if (savedStatus == 0 && destRescGrpInfo->status < 0) {
00596
00597 return destRescGrpInfo->status;
00598 } else {
00599 return (savedStatus);
00600 }
00601 }
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616 int
00617 _rsDataObjReplS (rsComm_t *rsComm,
00618 dataObjInp_t *dataObjInp,
00619 dataObjInfo_t *srcDataObjInfo,
00620 rescInfo_t *destRescInfo,
00621 char *rescGroupName,
00622 dataObjInfo_t *destDataObjInfo,
00623 int updateFlag)
00624 {
00625 int status, status1;
00626 int l1descInx;
00627 openedDataObjInp_t dataObjCloseInp;
00628 dataObjInfo_t *myDestDataObjInfo;
00629 l1descInx = dataObjOpenForRepl( rsComm, dataObjInp, srcDataObjInfo, destRescInfo,
00630 rescGroupName, destDataObjInfo, updateFlag );
00631 if (l1descInx < 0) {
00632 return (l1descInx);
00633 }
00634
00635 if (L1desc[l1descInx].stageFlag != NO_STAGING) {
00636 status = l3DataStageSync (rsComm, l1descInx);
00637 } else if( L1desc[l1descInx].dataObjInp->numThreads == 0 &&
00638 L1desc[l1descInx].dataObjInfo->dataSize <= MAX_SZ_FOR_SINGLE_BUF ) {
00639 status = l3DataCopySingleBuf (rsComm, l1descInx);
00640 } else {
00641 status = dataObjCopy( rsComm, l1descInx );
00642 }
00643
00644 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00645
00646 dataObjCloseInp.l1descInx = l1descInx;
00647
00648 L1desc[l1descInx].oprStatus = status;
00649 if (status >= 0) {
00650 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataObjInfo->dataSize;
00651 }
00652
00653 status1 = irsDataObjClose (rsComm, &dataObjCloseInp, &myDestDataObjInfo);
00654
00655 if (destDataObjInfo != NULL) {
00656 if (destDataObjInfo->dataId <= 0 && myDestDataObjInfo != NULL) {
00657 destDataObjInfo->dataId = myDestDataObjInfo->dataId;
00658 destDataObjInfo->replNum = myDestDataObjInfo->replNum;
00659 } else {
00660
00661 destDataObjInfo->dataSize = myDestDataObjInfo->dataSize;
00662 }
00663 }
00664
00665 freeDataObjInfo (myDestDataObjInfo);
00666
00667 if (status < 0) {
00668 return status;
00669 } else if (status1 < 0) {
00670 return status1;
00671 } else {
00672 return (status);
00673 }
00674 }
00675
00676
00677
00678
00679 int
00680 dataObjOpenForRepl (rsComm_t *rsComm,
00681 dataObjInp_t *dataObjInp,
00682 dataObjInfo_t *inpSrcDataObjInfo,
00683 rescInfo_t *destRescInfo,
00684 char *rescGroupName,
00685 dataObjInfo_t *inpDestDataObjInfo,
00686 int updateFlag)
00687 {
00688 dataObjInfo_t *myDestDataObjInfo = 0, *srcDataObjInfo = NULL;
00689 rescInfo_t *myDestRescInfo = 0;
00690 int destL1descInx = 0;
00691 int srcL1descInx = 0;
00692 int status = 0;
00693 int replStatus = 0;
00694
00695
00696
00697 dataObjInfo_t *cacheDataObjInfo = NULL;
00698 dataObjInp_t dest_inp, myDataObjInp, *l1DataObjInp = 0;
00699 if (destRescInfo == NULL) {
00700 myDestRescInfo = inpDestDataObjInfo->rescInfo;
00701 } else {
00702 myDestRescInfo = destRescInfo;
00703 }
00704
00705 if (inpSrcDataObjInfo->rescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00706 return SYS_RESC_IS_DOWN;
00707 }
00708
00709 if (myDestRescInfo->rescStatus == INT_RESC_STATUS_DOWN) {
00710 return SYS_RESC_IS_DOWN;
00711 }
00712
00713 #if 0 // JMC - legacy resource
00714 destRescClass = getRescClass (myDestRescInfo);
00715 #endif // JMC - legacy resource
00716
00717
00718
00719 #if 0 // JMC - legacy resource
00720 if (srcRescClass == COMPOUND_CL) {
00721 rescGrpInfo_t *myRescGrpInfo;
00722 if (destRescClass == CACHE_CL &&
00723 isRescsInSameGrp (rsComm, myDestRescInfo->rescName, inpSrcDataObjInfo->rescInfo->rescName,
00724 &myRescGrpInfo)) {
00725
00726 if (strlen (inpSrcDataObjInfo->rescGroupName) == 0) {
00727 rstrcpy (inpSrcDataObjInfo->rescGroupName,
00728 myRescGrpInfo->rescGroupName, NAME_LEN);
00729 }
00730 } else if (getRescInGrp (rsComm, myDestRescInfo->rescName,
00731 inpSrcDataObjInfo->rescGroupName, NULL) < 0) {
00732 cacheDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00733 status = stageDataFromCompToCache (rsComm, inpSrcDataObjInfo,
00734 cacheDataObjInfo);
00735 if (status < 0) { free( cacheDataObjInfo ); return status; }
00736
00737 srcRescClass = getRescClass (cacheDataObjInfo->rescInfo);
00738 }
00739 }
00740
00741 #endif // JMC - legacy resource
00742 if (cacheDataObjInfo == NULL) {
00743 srcDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00744 *srcDataObjInfo = *inpSrcDataObjInfo;
00745 srcDataObjInfo->rescInfo = new rescInfo_t;
00746 memcpy( srcDataObjInfo->rescInfo, inpSrcDataObjInfo->rescInfo, sizeof( rescInfo_t ) );
00747 } else {
00748 srcDataObjInfo = cacheDataObjInfo;
00749 }
00750
00751 if( NULL == srcDataObjInfo ) {
00752 rodsLog( LOG_ERROR, "dataObjOpenForRepl - srcDataObjInfo is NULL" );
00753 return -1;
00754 }
00755
00756 myDataObjInp = *dataObjInp;
00757 myDataObjInp.dataSize = inpSrcDataObjInfo->dataSize;
00758
00759 destL1descInx = allocL1desc ();
00760
00761 if (destL1descInx < 0) return destL1descInx;
00762
00763
00764
00765
00766 std::string op_name;
00767 memset (&dest_inp, 0, sizeof (dest_inp));
00768 memset (&dest_inp.condInput, 0, sizeof (dest_inp.condInput));
00769
00770 strncpy( dest_inp.objPath, dataObjInp->objPath, MAX_NAME_LEN );
00771 addKeyVal( &(dest_inp.condInput), RESC_NAME_KW, myDestRescInfo->rescName );
00772
00773 myDestDataObjInfo = (dataObjInfo_t*)calloc (1, sizeof (dataObjInfo_t));
00774 if (updateFlag > 0) {
00775
00776
00777 op_name = eirods::EIRODS_OPEN_OPERATION;
00778
00779
00780 if(inpDestDataObjInfo == NULL || inpDestDataObjInfo->dataId <= 0) {
00781 rodsLog( LOG_ERROR, "dataObjOpenForRepl: dataId of %s copy to be updated not defined",
00782 srcDataObjInfo->objPath);
00783 return (SYS_UPDATE_REPL_INFO_ERR);
00784 }
00785
00786 inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus;
00787 *myDestDataObjInfo = *inpDestDataObjInfo;
00788 myDestDataObjInfo->rescInfo = new rescInfo_t;
00789 memcpy( myDestDataObjInfo->rescInfo, inpDestDataObjInfo->rescInfo, sizeof( rescInfo_t ) );
00790 replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY;
00791 addKeyVal (&myDataObjInp.condInput, FORCE_FLAG_KW, "");
00792 myDataObjInp.openFlags |= (O_TRUNC | O_WRONLY);
00793 } else {
00794
00795
00796 op_name = eirods::EIRODS_CREATE_OPERATION;
00797
00798 initDataObjInfoForRepl( rsComm, myDestDataObjInfo, srcDataObjInfo,
00799 destRescInfo, rescGroupName);
00800 replStatus = srcDataObjInfo->replStatus;
00801 }
00802
00803
00804
00805 std::string hier;
00806 char* dst_hier_str = getValByKey( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW );
00807 if( 0 == dst_hier_str ) {
00808
00809 addKeyVal(&dataObjInp->condInput, IN_REPL_KW, "");
00810
00811 eirods::error ret = eirods::resolve_resource_hierarchy( op_name, rsComm, &dest_inp, hier );
00812 if( !ret.ok() ) {
00813 std::stringstream msg;
00814 msg << "failed in eirods::resolve_resource_hierarchy for [";
00815 msg << dest_inp.objPath << "]";
00816 eirods::log( PASSMSG( msg.str(), ret ) );
00817 return ret.code();
00818 }
00819
00820 addKeyVal( &dataObjInp->condInput, DEST_RESC_HIER_STR_KW, hier.c_str() );
00821
00822 } else {
00823 hier = dst_hier_str;
00824 }
00825
00826
00827
00828
00829 rstrcpy(myDestDataObjInfo->rescHier, hier.c_str(), MAX_NAME_LEN);
00830 addKeyVal( &(myDataObjInp.condInput), RESC_HIER_STR_KW, hier.c_str() );
00831 fillL1desc (destL1descInx, &myDataObjInp, myDestDataObjInfo, replStatus, srcDataObjInfo->dataSize);
00832 l1DataObjInp = L1desc[destL1descInx].dataObjInp;
00833 if (l1DataObjInp->oprType == PHYMV_OPR) {
00834 L1desc[destL1descInx].oprType = PHYMV_DEST;
00835 myDestDataObjInfo->replNum = srcDataObjInfo->replNum;
00836 myDestDataObjInfo->dataId = srcDataObjInfo->dataId;
00837 } else {
00838 L1desc[destL1descInx].oprType = REPLICATE_DEST;
00839 }
00840
00841 #if 0 // JMC - legacy resource
00842 if (destRescClass == COMPOUND_CL) {
00843 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00844 }
00845 else if (srcRescClass == COMPOUND_CL) {
00846 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00847 }
00848 #else // JMC - legacy resource
00849
00850
00851
00852
00853 char* stage_kw = getValByKey( &dataObjInp->condInput, STAGE_OBJ_KW );
00854 char* sync_kw = getValByKey( &dataObjInp->condInput, SYNC_OBJ_KW );
00855 if( stage_kw ) {
00856 L1desc[destL1descInx].stageFlag = STAGE_SRC;
00857 } else if( sync_kw ) {
00858 L1desc[destL1descInx].stageFlag = SYNC_DEST;
00859 } else {
00860
00861
00862 }
00863
00864 #endif // JMC - legacy resource
00865
00866
00867
00868 char* src_hier_str = 0;
00869 if (srcDataObjInfo != NULL && srcDataObjInfo->rescHier != NULL) {
00870 src_hier_str = srcDataObjInfo->rescHier;
00871 }
00872
00873 l1DataObjInp->numThreads = dataObjInp->numThreads =
00874 getNumThreads( rsComm, l1DataObjInp->dataSize, l1DataObjInp->numThreads,
00875
00876 &dataObjInp->condInput, dst_hier_str, src_hier_str );
00877
00878 if( l1DataObjInp->numThreads > 0 &&
00879 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00880 if (updateFlag > 0) {
00881 status = dataOpen (rsComm, destL1descInx);
00882 } else {
00883 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00884 if (status >= 0)
00885 status = dataCreate (rsComm, destL1descInx);
00886 }
00887
00888 if (status < 0) {
00889 freeL1desc (destL1descInx);
00890 return (status);
00891 }
00892 } else {
00893 if (updateFlag == 0) {
00894 status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp);
00895 if (status < 0) {
00896 freeL1desc (destL1descInx);
00897 return (status);
00898 }
00899 }
00900 }
00901
00902 if (inpDestDataObjInfo != NULL && updateFlag == 0) {
00903
00904 *inpDestDataObjInfo = *myDestDataObjInfo;
00905 inpDestDataObjInfo->next = NULL;
00906 }
00907
00908
00909 rstrcpy(srcDataObjInfo->rescHier, inpSrcDataObjInfo->rescHier, MAX_NAME_LEN);
00910 srcL1descInx = allocL1desc ();
00911 if (srcL1descInx < 0) return srcL1descInx;
00912 fillL1desc (srcL1descInx, &myDataObjInp, srcDataObjInfo, srcDataObjInfo->replStatus, srcDataObjInfo->dataSize);
00913 l1DataObjInp = L1desc[srcL1descInx].dataObjInp;
00914 l1DataObjInp->numThreads = dataObjInp->numThreads;
00915 if (l1DataObjInp->oprType == PHYMV_OPR) {
00916 L1desc[srcL1descInx].oprType = PHYMV_SRC;
00917 } else {
00918 L1desc[srcL1descInx].oprType = REPLICATE_SRC;
00919 }
00920
00921 if( L1desc[destL1descInx].stageFlag == SYNC_DEST &&
00922 getValByKey (&dataObjInp->condInput, PURGE_CACHE_KW) != NULL) {
00923 L1desc[srcL1descInx].purgeCacheFlag = 1;
00924 }
00925
00926
00927 if( l1DataObjInp->numThreads > 0 &&
00928 L1desc[destL1descInx].stageFlag == NO_STAGING) {
00929 openedDataObjInp_t dataObjCloseInp;
00930
00931 l1DataObjInp->openFlags = O_RDONLY;
00932 status = dataOpen (rsComm, srcL1descInx);
00933 if (status < 0) {
00934 freeL1desc (srcL1descInx);
00935 memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp));
00936 dataObjCloseInp.l1descInx = destL1descInx;
00937 rsDataObjClose (rsComm, &dataObjCloseInp);
00938 return (status);
00939 }
00940 }
00941
00942 L1desc[destL1descInx].srcL1descInx = srcL1descInx;
00943
00944 return (destL1descInx);
00945 }
00946
00947 int
00948 dataObjCopy (rsComm_t *rsComm, int l1descInx)
00949 {
00950 int srcL1descInx, destL1descInx;
00951 int srcL3descInx, destL3descInx;
00952 int status;
00953 portalOprOut_t *portalOprOut = NULL;
00954 dataCopyInp_t dataCopyInp;
00955 dataOprInp_t *dataOprInp;
00956 int srcRemoteFlag, destRemoteFlag;
00957
00958 bzero (&dataCopyInp, sizeof (dataCopyInp));
00959 dataOprInp = &dataCopyInp.dataOprInp;
00960 srcL1descInx = L1desc[l1descInx].srcL1descInx;
00961 destL1descInx = l1descInx;
00962
00963 srcL3descInx = L1desc[srcL1descInx].l3descInx;
00964 destL3descInx = L1desc[destL1descInx].l3descInx;
00965
00966 if (L1desc[srcL1descInx].remoteZoneHost != NULL) {
00967 srcRemoteFlag = REMOTE_ZONE_HOST;
00968 } else {
00969 srcRemoteFlag = FileDesc[srcL3descInx].rodsServerHost->localFlag;
00970 }
00971
00972 if (L1desc[destL1descInx].remoteZoneHost != NULL) {
00973 destRemoteFlag = REMOTE_ZONE_HOST;
00974 } else {
00975 destRemoteFlag = FileDesc[destL3descInx].rodsServerHost->localFlag;
00976 }
00977
00978 if (srcRemoteFlag != REMOTE_ZONE_HOST &&
00979 destRemoteFlag != REMOTE_ZONE_HOST &&
00980 FileDesc[srcL3descInx].rodsServerHost ==
00981 FileDesc[destL3descInx].rodsServerHost) {
00982
00983 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, SAME_HOST_COPY_OPR);
00984
00985 dataCopyInp.portalOprOut.numThreads =
00986 dataCopyInp.dataOprInp.numThreads;
00987 if (srcRemoteFlag == LOCAL_HOST) {
00988 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
00989 }
00990
00991 } else if ((srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) ||
00992 destRemoteFlag == REMOTE_ZONE_HOST) {
00993 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_REM_OPR);
00994
00995 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
00996
00997
00998 status = preProcParaPut (rsComm, destL1descInx, &portalOprOut);
00999 if (status < 0 || NULL == portalOprOut ) {
01000 rodsLog (LOG_NOTICE,
01001 "dataObjCopy: preProcParaPut error for %s",
01002 L1desc[srcL1descInx].dataObjInfo->objPath);
01003 return (status);
01004 }
01005 dataCopyInp.portalOprOut = *portalOprOut;
01006 } else {
01007 dataCopyInp.portalOprOut.l1descInx = destL1descInx;
01008 }
01009 if (srcRemoteFlag == LOCAL_HOST)
01010 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01011 } else if ((srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) ||
01012 srcRemoteFlag == REMOTE_ZONE_HOST) {
01013
01014 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01015
01016 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01017
01018 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01019 if (status < 0 || NULL == portalOprOut ) {
01020 rodsLog (LOG_NOTICE,
01021 "dataObjCopy: preProcParaGet error for %s",
01022 L1desc[srcL1descInx].dataObjInfo->objPath);
01023 return (status);
01024 }
01025 dataCopyInp.portalOprOut = *portalOprOut;
01026 } else {
01027 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01028 }
01029 if (destRemoteFlag == LOCAL_HOST)
01030 addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, "");
01031 } else {
01032
01033 initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR);
01034
01035
01036 if (L1desc[l1descInx].dataObjInp->numThreads > 0) {
01037 status = preProcParaGet (rsComm, srcL1descInx, &portalOprOut);
01038
01039 if (status < 0 || NULL == portalOprOut ) {
01040 rodsLog (LOG_NOTICE,
01041 "dataObjCopy: preProcParaGet error for %s",
01042 L1desc[srcL1descInx].dataObjInfo->objPath);
01043 return (status);
01044 }
01045 dataCopyInp.portalOprOut = *portalOprOut;
01046 } else {
01047 dataCopyInp.portalOprOut.l1descInx = srcL1descInx;
01048 }
01049 }
01050
01051 if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01052 NO_CHK_COPY_LEN_KW) != NULL) {
01053
01054 addKeyVal (&dataOprInp->condInput, NO_CHK_COPY_LEN_KW, "");
01055 if (L1desc[l1descInx].dataObjInp->numThreads > 1) {
01056 L1desc[l1descInx].dataObjInp->numThreads =
01057 L1desc[srcL1descInx].dataObjInp->numThreads =
01058 dataCopyInp.portalOprOut.numThreads = 1;
01059 }
01060 }
01061 status = rsDataCopy (rsComm, &dataCopyInp);
01062
01063 if (status >= 0 && portalOprOut != NULL &&
01064 L1desc[l1descInx].dataObjInp != NULL) {
01065
01066 L1desc[l1descInx].dataObjInp->numThreads = portalOprOut->numThreads;
01067 }
01068 if (portalOprOut != NULL) free (portalOprOut);
01069 clearKeyVal (&dataOprInp->condInput);
01070
01071 return (status);
01072 }
01073
01074 int
01075 l3DataCopySingleBuf (rsComm_t *rsComm, int l1descInx)
01076 {
01077 bytesBuf_t dataBBuf;
01078 int bytesRead, bytesWritten;
01079 int srcL1descInx;
01080
01081 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01082 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01083
01084 if (L1desc[srcL1descInx].dataSize < 0) {
01085 rodsLog (LOG_ERROR,
01086 "l3DataCopySingleBuf: dataSize %lld for %s is negative",
01087 L1desc[srcL1descInx].dataSize,
01088 L1desc[srcL1descInx].dataObjInfo->objPath);
01089 return (SYS_COPY_LEN_ERR);
01090 } else if (L1desc[srcL1descInx].dataSize == 0) {
01091 bytesRead = 0;
01092 } else {
01093 dataBBuf.buf = malloc (L1desc[srcL1descInx].dataSize);
01094 bytesRead = rsL3FileGetSingleBuf (rsComm, &srcL1descInx, &dataBBuf);
01095 }
01096
01097 if (bytesRead < 0) {
01098 free( dataBBuf.buf );
01099 return (bytesRead);
01100 } else if (getValByKey (&L1desc[l1descInx].dataObjInp->condInput,
01101 NO_CHK_COPY_LEN_KW) != NULL) {
01102
01103 L1desc[l1descInx].dataSize = L1desc[l1descInx].dataObjInp->dataSize
01104 = bytesRead;
01105 }
01106
01107 bytesWritten = rsL3FilePutSingleBuf (rsComm, &l1descInx, &dataBBuf);
01108
01109 L1desc[l1descInx].bytesWritten = bytesWritten;
01110
01111 if (dataBBuf.buf != NULL) {
01112 free (dataBBuf.buf);
01113 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01114 }
01115
01116 if (bytesWritten != bytesRead) {
01117 if (bytesWritten >= 0) {
01118 rodsLog (LOG_NOTICE,
01119 "l3DataCopySingleBuf: l3FilePut error, towrite %d, written %d",
01120 bytesRead, bytesWritten);
01121 return (SYS_COPY_LEN_ERR);
01122 } else {
01123 return (bytesWritten);
01124 }
01125 }
01126
01127
01128 return (0);
01129 }
01130
01131 int
01132 l3DataStageSync (rsComm_t *rsComm, int l1descInx)
01133 {
01134 bytesBuf_t dataBBuf;
01135 int srcL1descInx;
01136 int status = 0;
01137
01138 memset (&dataBBuf, 0, sizeof (bytesBuf_t));
01139
01140 srcL1descInx = L1desc[l1descInx].srcL1descInx;
01141 if (L1desc[srcL1descInx].dataSize < 0 &&
01142 L1desc[srcL1descInx].dataSize != UNKNOWN_FILE_SZ) {
01143 rodsLog (LOG_ERROR,
01144 "l3DataStageSync: dataSize %lld for %s is negative",
01145 L1desc[srcL1descInx].dataSize,
01146 L1desc[srcL1descInx].dataObjInfo->objPath);
01147 return (SYS_COPY_LEN_ERR);
01148 } else if (L1desc[srcL1descInx].dataSize > 0 ||
01149 L1desc[srcL1descInx].dataSize == UNKNOWN_FILE_SZ) {
01150 if (L1desc[l1descInx].stageFlag == SYNC_DEST) {
01151
01152 status = l3FileSync (rsComm, srcL1descInx, l1descInx);
01153 } else {
01154
01155 status = l3FileStage (rsComm, srcL1descInx, l1descInx);
01156 }
01157 }
01158
01159 if (status < 0) {
01160 L1desc[l1descInx].bytesWritten = -1;
01161 } else {
01162 L1desc[l1descInx].bytesWritten = L1desc[l1descInx].dataSize =
01163 L1desc[srcL1descInx].dataSize;
01164 }
01165
01166 return (status);
01167 }
01168
01169 int
01170 l3FileSync (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01171 {
01172 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01173
01174 fileStageSyncInp_t fileSyncToArchInp;
01175 dataObjInp_t *dataObjInp;
01176 int status;
01177 dataObjInfo_t tmpDataObjInfo;
01178
01179 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01180 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01181
01182 #if 0 // JMC legacy resource
01183 rescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01184 cacheRescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01185
01186 switch (RescTypeDef[rescTypeInx].rescCat) {
01187 case FILE_CAT:
01188
01189 if (RescTypeDef[rescTypeInx].createPathFlag == CREATE_PATH) {
01190 #endif // JMC legacy resource
01191
01192 int dst_create_path = 0;
01193 eirods::error err = eirods::get_resource_property< int >( destDataObjInfo->rescInfo->rescName,
01194 eirods::RESOURCE_CREATE_PATH, dst_create_path );
01195 if( !err.ok() ) {
01196 eirods::log( PASS( err ) );
01197 }
01198
01199 if( CREATE_PATH == dst_create_path ) {
01200
01201 status = chkOrphanFile ( rsComm, destDataObjInfo->filePath, destDataObjInfo->rescName, &tmpDataObjInfo );
01202 if (status == 0 && tmpDataObjInfo.dataId != destDataObjInfo->dataId) {
01203
01204 char tmp_str[ MAX_NAME_LEN ];
01205 snprintf( tmp_str, MAX_NAME_LEN, "%s.%-d", destDataObjInfo->filePath, (int) random());
01206 strncpy( destDataObjInfo->filePath, tmp_str, MAX_NAME_LEN );
01207 }
01208 }
01209
01210 memset (&fileSyncToArchInp, 0, sizeof (fileSyncToArchInp));
01211 dataObjInp = L1desc[destL1descInx].dataObjInp;
01212 fileSyncToArchInp.dataSize = srcDataObjInfo->dataSize;
01213 fileSyncToArchInp.fileType = static_cast< fileDriverType_t >( -1 );
01214 fileSyncToArchInp.cacheFileType = static_cast< fileDriverType_t >( -1 );
01215
01216
01217
01218 std::string location;
01219 eirods::error ret = eirods::get_loc_for_hier_string( srcDataObjInfo->rescHier, location );
01220 if( !ret.ok() ) {
01221 eirods::log( PASSMSG( "failed in get_loc_for_hier_String", ret ) );
01222 return -1;
01223 }
01224
01225 rstrcpy( fileSyncToArchInp.addr.hostAddr, location.c_str(), NAME_LEN );
01226
01227
01228 rstrcpy( fileSyncToArchInp.filename, destDataObjInfo->filePath, MAX_NAME_LEN );
01229 rstrcpy( fileSyncToArchInp.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01230 rstrcpy( fileSyncToArchInp.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01231 rstrcpy( fileSyncToArchInp.cacheFilename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01232
01233 fileSyncToArchInp.mode = getFileMode (dataObjInp);
01234 status = rsFileSyncToArch (rsComm, &fileSyncToArchInp);
01235
01236 if (status >= 0 &&
01237 CREATE_PATH == dst_create_path &&
01238 fileSyncToArchInp.filename != NULL) {
01239
01240
01241 rstrcpy (destDataObjInfo->filePath, fileSyncToArchInp.filename, MAX_NAME_LEN);
01242 L1desc[destL1descInx].replStatus |= FILE_PATH_HAS_CHG;
01243 }
01244 #if 0 // JMC legacy resource
01245 break;
01246 default:
01247 rodsLog (LOG_ERROR,
01248 "l3FileSync: rescCat type %d is not recognized",
01249 RescTypeDef[rescTypeInx].rescCat);
01250 status = SYS_INVALID_RESC_TYPE;
01251 break;
01252 }
01253 #endif // JMC legacy resource
01254 return (status);
01255 }
01256
01257 int
01258 l3FileStage (rsComm_t *rsComm, int srcL1descInx, int destL1descInx)
01259 {
01260 dataObjInfo_t *srcDataObjInfo, *destDataObjInfo;
01261 int mode, status;
01262
01263 srcDataObjInfo = L1desc[srcL1descInx].dataObjInfo;
01264 destDataObjInfo = L1desc[destL1descInx].dataObjInfo;
01265
01266 mode = getFileMode (L1desc[destL1descInx].dataObjInp);
01267
01268 status = _l3FileStage (rsComm, srcDataObjInfo, destDataObjInfo, mode);
01269
01270 return status;
01271 }
01272
01273 int
01274 _l3FileStage (rsComm_t *rsComm, dataObjInfo_t *srcDataObjInfo,
01275 dataObjInfo_t *destDataObjInfo, int mode)
01276 {
01277
01278 fileStageSyncInp_t file_stage;
01279 int status;
01280
01281 #if 0 // JMC legacy resource
01282 rescTypeInx = srcDataObjInfo->rescInfo->rescTypeInx;
01283 cacheRescTypeInx = destDataObjInfo->rescInfo->rescTypeInx;
01284
01285
01286 switch (RescTypeDef[rescTypeInx].rescCat) {
01287 case FILE_CAT:
01288 #endif // JMC legacy resource
01289 memset (&file_stage, 0, sizeof (file_stage));
01290 file_stage.dataSize = srcDataObjInfo->dataSize;
01291 file_stage.fileType = static_cast< fileDriverType_t >( -1 );
01292 file_stage.cacheFileType = static_cast< fileDriverType_t >( -1 );
01293
01294 rstrcpy( file_stage.addr.hostAddr,
01295 destDataObjInfo->rescInfo->rescLoc, NAME_LEN );
01296
01297 rstrcpy( file_stage.cacheFilename, destDataObjInfo->filePath, MAX_NAME_LEN );
01298 rstrcpy( file_stage.filename, srcDataObjInfo->filePath, MAX_NAME_LEN );
01299 rstrcpy( file_stage.rescHier, destDataObjInfo->rescHier, MAX_NAME_LEN );
01300 rstrcpy( file_stage.objPath, srcDataObjInfo->objPath, MAX_NAME_LEN );
01301 file_stage.mode = mode;
01302 status = rsFileStageToCache( rsComm, &file_stage );
01303 #if 0 // JMC legacy resource
01304 break;
01305 default:
01306 rodsLog (LOG_ERROR,
01307 "l3FileStage: rescCat type %d is not recognized",
01308 RescTypeDef[rescTypeInx].rescCat);
01309 status = SYS_INVALID_RESC_TYPE;
01310 break;
01311 }
01312 #endif // JMC legacy resource
01313 return (status);
01314 }
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324 int
01325 rsReplAndRequeDataObjInfo (rsComm_t *rsComm,
01326 dataObjInfo_t **srcDataObjInfoHead, char *destRescName, char *flagStr)
01327 {
01328 dataObjInfo_t *dataObjInfoHead, *myDataObjInfo;
01329 transferStat_t transStat;
01330 dataObjInp_t dataObjInp;
01331 char tmpStr[NAME_LEN];
01332 int status;
01333
01334 dataObjInfoHead = *srcDataObjInfoHead;
01335 myDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01336 memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
01337 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01338 memset (&transStat, 0, sizeof (transStat));
01339
01340 if (flagStr != NULL) {
01341 if (strstr (flagStr, ALL_KW) != NULL) {
01342 addKeyVal (&dataObjInp.condInput, ALL_KW, "");
01343 }
01344 if (strstr (flagStr, RBUDP_TRANSFER_KW) != NULL) {
01345 addKeyVal (&dataObjInp.condInput, RBUDP_TRANSFER_KW, "");
01346 }
01347 if (strstr (flagStr, SU_CLIENT_USER_KW) != NULL) {
01348 addKeyVal (&dataObjInp.condInput, SU_CLIENT_USER_KW, "");
01349 }
01350 if (strstr (flagStr, UPDATE_REPL_KW) != NULL) {
01351 addKeyVal (&dataObjInp.condInput, UPDATE_REPL_KW, "");
01352 }
01353 }
01354
01355 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01356 snprintf (tmpStr, NAME_LEN, "%d", dataObjInfoHead->replNum);
01357 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01358 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, destRescName);
01359
01360 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01361 myDataObjInfo);
01362
01363
01364 clearKeyVal (&dataObjInp.condInput);
01365 if (status >= 0) {
01366 status = 1;
01367
01368 queDataObjInfo (srcDataObjInfoHead, myDataObjInfo, 0, 1);
01369 } else {
01370 freeAllDataObjInfo (myDataObjInfo);
01371 }
01372
01373 return status;
01374 }
01375
01376 #if 0 // JMC - legacy resource
01377 int
01378 stageDataFromCompToCache (rsComm_t *rsComm, dataObjInfo_t *compObjInfo,
01379 dataObjInfo_t *outCacheObjInfo)
01380 {
01381 int status;
01382 rescInfo_t *cacheResc;
01383 transferStat_t transStat;
01384 dataObjInp_t dataObjInp;
01385 char tmpStr[NAME_LEN];
01386
01387 #if 0 // JMC - legacy resource
01388 if (getRescClass (compObjInfo->rescInfo) != COMPOUND_CL) return 0;
01389 #endif // JMC - legacy resource
01390
01391 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,compObjInfo->rescInfo, &cacheResc);
01392 if (status < 0) {
01393 rodsLog (LOG_ERROR,
01394 "stageDataFromCompToCache: getCacheRescInGrp %s failed for %s stat=%d",
01395 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01396 return status;
01397 }
01398 if (outCacheObjInfo != NULL)
01399 memset (outCacheObjInfo, 0, sizeof (dataObjInfo_t));
01400 memset (&dataObjInp, 0, sizeof (dataObjInp_t));
01401 memset (&transStat, 0, sizeof (transStat));
01402
01403 rstrcpy (dataObjInp.objPath, compObjInfo->objPath, MAX_NAME_LEN);
01404 snprintf (tmpStr, NAME_LEN, "%d", compObjInfo->replNum);
01405 addKeyVal (&dataObjInp.condInput, REPL_NUM_KW, tmpStr);
01406 addKeyVal (&dataObjInp.condInput, DEST_RESC_NAME_KW, cacheResc->rescName);
01407
01408 status = _rsDataObjRepl (rsComm, &dataObjInp, &transStat,
01409 outCacheObjInfo);
01410
01411 clearKeyVal (&dataObjInp.condInput);
01412 return status;
01413 }
01414
01415
01416
01417
01418
01419
01420 int
01421 stageAndRequeDataToCache (rsComm_t *rsComm, dataObjInfo_t **compObjInfoHead)
01422 {
01423 int status;
01424 dataObjInfo_t *outCacheObjInfo;
01425
01426 outCacheObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01427 bzero (outCacheObjInfo, sizeof (dataObjInfo_t));
01428 status = stageDataFromCompToCache (rsComm, *compObjInfoHead,
01429 outCacheObjInfo);
01430
01431 if (status < 0) {
01432
01433 if (outCacheObjInfo->dataId > 0) {
01434
01435 if( requeDataObjInfoByReplNum (compObjInfoHead,
01436 outCacheObjInfo->replNum) == 0) {
01437
01438 status = 0;
01439 }
01440 }
01441 free (outCacheObjInfo);
01442 } else {
01443 queDataObjInfo (compObjInfoHead, outCacheObjInfo, 0, 1);
01444 }
01445 return status;
01446 }
01447
01448 int
01449 replToCacheRescOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01450 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *compObjInfo,
01451 dataObjInfo_t *oldDataObjInfo, dataObjInfo_t **outDestDataObjInfo)
01452 {
01453 int status = 0;
01454 rescInfo_t *cacheResc;
01455 dataObjInfo_t *destDataObjInfo, *srcDataObjInfo;
01456 dataObjInfo_t *tmpDestDataObjInfo = NULL;
01457 int updateFlag;
01458
01459 if ((status = getCacheDataInfoForRepl (rsComm, oldDataObjInfo, NULL, compObjInfo, &tmpDestDataObjInfo)) >= 0) {
01460 cacheResc = oldDataObjInfo->rescInfo;
01461 updateFlag = 1;
01462 } else {
01463
01464 status = getCacheRescInGrp (rsComm, compObjInfo->rescGroupName,
01465 compObjInfo->rescInfo, &cacheResc);
01466 if (status < 0) {
01467 rodsLog (LOG_ERROR,
01468 "replToCacheRescOfCompObj:getCacheRescInGrp %s err for %s stat=%d",
01469 compObjInfo->rescGroupName, compObjInfo->objPath, status);
01470 return status;
01471 }
01472 updateFlag = 0;
01473 }
01474
01475 if (outDestDataObjInfo == NULL) {
01476 destDataObjInfo = tmpDestDataObjInfo;
01477 } else {
01478 destDataObjInfo = (dataObjInfo_t*)malloc (sizeof (dataObjInfo_t));
01479 if (tmpDestDataObjInfo == NULL) {
01480 bzero (destDataObjInfo, sizeof (dataObjInfo_t));
01481 } else {
01482 *destDataObjInfo = *tmpDestDataObjInfo;
01483 }
01484 }
01485 srcDataObjInfo = srcDataObjInfoHead;
01486 while (srcDataObjInfo != NULL) {
01487 status = _rsDataObjReplS (rsComm, dataObjInp, srcDataObjInfo,
01488 cacheResc, compObjInfo->rescGroupName, destDataObjInfo, updateFlag);
01489 if (status >= 0) {
01490 if (outDestDataObjInfo != NULL)
01491 *outDestDataObjInfo = destDataObjInfo;
01492 break;
01493 }
01494 srcDataObjInfo = srcDataObjInfo->next;
01495 }
01496
01497 return status;
01498 }
01499 #endif // JMC - legacy resource
01500 int
01501 stageBundledData (rsComm_t *rsComm, dataObjInfo_t **subfileObjInfoHead)
01502 {
01503 int status;
01504 dataObjInfo_t *dataObjInfoHead = *subfileObjInfoHead;
01505 rescInfo_t *cacheResc;
01506 dataObjInp_t dataObjInp;
01507 dataObjInfo_t *cacheObjInfo;
01508
01509 #if 0 // JMC - legacy resource
01510 if (getRescClass (dataObjInfoHead->rescInfo) != BUNDLE_CL) return 0;
01511 #endif // JMC - legacy resource
01512
01513 status = unbunAndStageBunfileObj (rsComm, dataObjInfoHead->filePath,
01514 &cacheResc);
01515
01516 if (status < 0) return status;
01517
01518
01519 bzero (&dataObjInp, sizeof (dataObjInp));
01520 rstrcpy (dataObjInp.objPath, dataObjInfoHead->objPath, MAX_NAME_LEN);
01521 addKeyVal (&dataObjInp.condInput, RESC_NAME_KW, cacheResc->rescName);
01522 status = getDataObjInfo (rsComm, &dataObjInp, &cacheObjInfo, NULL, 0);
01523 clearKeyVal (&dataObjInp.condInput);
01524 if (status < 0) {
01525 rodsLog (LOG_ERROR,
01526 "unbunAndStageBunfileObj: getDataObjInfo of subfile %s failed.stat=%d",
01527 dataObjInp.objPath, status);
01528 return status;
01529 }
01530
01531 queDataObjInfo (subfileObjInfoHead, cacheObjInfo, 0, 1);
01532
01533
01534 return status;
01535 }
01536
01537 int
01538 unbunAndStageBunfileObj (rsComm_t *rsComm, char *bunfileObjPath,
01539 rescInfo_t **outCacheResc)
01540 {
01541 dataObjInfo_t *bunfileObjInfoHead;
01542 dataObjInp_t dataObjInp;
01543 int status;
01544
01545
01546 bzero (&dataObjInp, sizeof (dataObjInp));
01547 rstrcpy (dataObjInp.objPath, bunfileObjPath, MAX_NAME_LEN);
01548
01549 status = getDataObjInfo (rsComm, &dataObjInp, &bunfileObjInfoHead, NULL, 1);
01550 if (status < 0) {
01551 rodsLog (LOG_ERROR,
01552 "unbunAndStageBunfileObj: getDataObjInfo of bunfile %s failed.stat=%d",
01553 dataObjInp.objPath, status);
01554 return status;
01555 }
01556 status = _unbunAndStageBunfileObj (rsComm, &bunfileObjInfoHead, &dataObjInp.condInput,
01557 outCacheResc, 0);
01558
01559 freeAllDataObjInfo (bunfileObjInfoHead);
01560
01561 return status;
01562 }
01563
01564 int
01565 _unbunAndStageBunfileObj (rsComm_t *rsComm, dataObjInfo_t **bunfileObjInfoHead, keyValPair_t *condInput,
01566 rescInfo_t **outCacheResc, int rmBunCopyFlag)
01567 {
01568 int status;
01569
01570 dataObjInp_t dataObjInp;
01571
01572 bzero (&dataObjInp, sizeof (dataObjInp));
01573 bzero (&dataObjInp.condInput, sizeof (dataObjInp.condInput));
01574 rstrcpy (dataObjInp.objPath, (*bunfileObjInfoHead)->objPath, MAX_NAME_LEN);
01575 status = sortObjInfoForOpen (rsComm, bunfileObjInfoHead, condInput, 0);
01576
01577 addKeyVal( &dataObjInp.condInput, RESC_HIER_STR_KW, (*bunfileObjInfoHead)->rescHier );
01578 if (status < 0) return status;
01579
01580 #if 0 // JMC - legacy resource
01581 if (getRescClass ((*bunfileObjInfoHead)->rescInfo) != CACHE_CL) {
01582
01583 status = getCacheRescInGrp (rsComm,
01584 (*bunfileObjInfoHead)->rescGroupName,
01585 (*bunfileObjInfoHead)->rescInfo, &cacheResc);
01586 if (status < 0) {
01587 rodsLog (LOG_ERROR,
01588 "unbunAndStageBunfileObj:getCacheRescInGrp %s err for %s stat=%d",
01589 (*bunfileObjInfoHead)->rescGroupName,
01590 (*bunfileObjInfoHead)->objPath, status);
01591 return status;
01592 }
01593 if (outCacheResc != NULL)
01594 *outCacheResc = cacheResc;
01595
01596
01597 status = rsReplAndRequeDataObjInfo (rsComm, bunfileObjInfoHead,
01598 cacheResc->rescName, SU_CLIENT_USER_KW);
01599 if (status < 0) {
01600 rodsLog (LOG_ERROR,
01601 "unbunAndStageBunfileObj:rsReplAndRequeDataObjInfo %s err stat=%d",
01602 (*bunfileObjInfoHead)->objPath, status);
01603 return status;
01604 }
01605 } else
01606 #endif // JMC - legacy resource
01607
01608 if (outCacheResc != NULL)
01609 *outCacheResc = (*bunfileObjInfoHead)->rescInfo;
01610
01611 addKeyVal (&dataObjInp.condInput, BUN_FILE_PATH_KW,
01612 (*bunfileObjInfoHead)->filePath);
01613 if (rmBunCopyFlag > 0) {
01614 addKeyVal (&dataObjInp.condInput, RM_BUN_COPY_KW, "");
01615 }
01616 if (strlen ((*bunfileObjInfoHead)->dataType) > 0) {
01617 addKeyVal (&dataObjInp.condInput, DATA_TYPE_KW,
01618 (*bunfileObjInfoHead)->dataType);
01619 }
01620 status = _rsUnbunAndRegPhyBunfile (rsComm, &dataObjInp,
01621 (*bunfileObjInfoHead)->rescInfo);
01622
01623 return status;
01624 }
01625
01626 #if 0 // JMC - legacy resource
01627
01628
01629
01630
01631
01632
01633
01634
01635
01636
01637
01638
01639
01640
01641
01642 int
01643 getCacheDataInfoOfCompObj (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01644 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01645 dataObjInfo_t *compDataObjInfo, dataObjInfo_t *oldDataObjInfo,
01646 dataObjInfo_t **outDataObjInfo)
01647 {
01648 int status;
01649
01650 if ((status = getCacheDataInfoForRepl (rsComm, srcDataObjInfoHead,
01651 destDataObjInfoHead, compDataObjInfo, outDataObjInfo)) < 0) {
01652
01653 status = replToCacheRescOfCompObj (rsComm, dataObjInp,
01654 srcDataObjInfoHead, compDataObjInfo, oldDataObjInfo,
01655 outDataObjInfo);
01656 if (status < 0) {
01657 rodsLog (LOG_ERROR,
01658 "_rsDataObjRepl: replToCacheRescOfCompObj of %s error stat=%d",
01659 srcDataObjInfoHead->objPath, status);
01660 return status;
01661 }
01662
01663
01664 queDataObjInfo (&srcDataObjInfoHead, *outDataObjInfo, 1, 0);
01665 }
01666 return status;
01667 }
01668
01669 int
01670 getCacheDataInfoOfCompResc (rsComm_t *rsComm, dataObjInp_t *dataObjInp,
01671 dataObjInfo_t *srcDataObjInfoHead, dataObjInfo_t *destDataObjInfoHead,
01672 rescGrpInfo_t *compRescGrpInfo, dataObjInfo_t *oldDataObjInfo,
01673 dataObjInfo_t **outDataObjInfo)
01674 {
01675 int status;
01676 dataObjInfo_t compDataObjInfo;
01677
01678
01679 bzero (&compDataObjInfo, sizeof (compDataObjInfo));
01680 rstrcpy (compDataObjInfo.rescGroupName, compRescGrpInfo->rescGroupName,
01681 NAME_LEN);
01682 compDataObjInfo.rescInfo=compRescGrpInfo->rescInfo;
01683 status = getCacheDataInfoOfCompObj (rsComm, dataObjInp,
01684 srcDataObjInfoHead, destDataObjInfoHead, &compDataObjInfo,
01685 oldDataObjInfo, outDataObjInfo);
01686
01687 return status;
01688 }
01689
01690 #endif // JMC - legacy resource