00001
00002
00003
00004
00005
00006
00007
00008 #include "hpssFileDriver.h"
00009 #include "hpss_rpc.h"
00010 #include "pdata.h"
00011 #include "rsGlobalExtern.h"
00012 #include "miscServerFunct.h"
00013
00014 struct hpssCosDef *HpssCosHead = NULL;
00015 int HpssDefCos = COS_NOT_INIT;
00016 int HpssAuthFlag = 0;
00017 api_config_t Api_config;
00018
00019 int
00020 hpssFileUnlink (rsComm_t *rsComm, char *filename)
00021 {
00022 int status;
00023
00024 if ((status = initHpssAuth ()) < 0) return status;
00025
00026 status = hpss_Unlink (filename);
00027
00028 if (status < 0) {
00029 status = HPSS_FILE_UNLINK_ERR + status;
00030 rodsLog (LOG_ERROR, "hpssFileUnlink: unlink of %s error, status = %d",
00031 filename, status);
00032 }
00033
00034 return (status);
00035 }
00036
00037 int
00038 hpssFileStat (rsComm_t *rsComm, char *filename, struct stat *statbuf)
00039 {
00040 int status;
00041 hpss_stat_t hpssstat;
00042
00043 if ((status = initHpssAuth ()) < 0) return status;
00044 status = hpss_Stat (filename, &hpssstat);
00045
00046 if (status < 0) {
00047 status = HPSS_FILE_STAT_ERR + status;
00048 rodsLog (LOG_ERROR, "hpssFileStat: stat of %s error, status = %d",
00049 filename, status);
00050 } else {
00051 hpssStatToStat (&hpssstat, statbuf);
00052 }
00053
00054 return (status);
00055 }
00056
00057 int
00058 hpssFileMkdir (rsComm_t *rsComm, char *filename, int mode)
00059 {
00060 int status;
00061
00062 if ((status = initHpssAuth ()) < 0) return status;
00063 status = hpss_Mkdir (filename, mode);
00064
00065 if (status < 0) {
00066 if (status != HPSS_EEXIST)
00067 rodsLog (LOG_NOTICE,
00068 "hpssFileMkdir: mkdir of %s error, status = %d",
00069 filename, status);
00070 status = HPSS_FILE_MKDIR_ERR + status;
00071 }
00072
00073 return (status);
00074 }
00075
00076 int
00077 hpssFileChmod (rsComm_t *rsComm, char *filename, int mode)
00078 {
00079 int status;
00080
00081 if ((status = initHpssAuth ()) < 0) return status;
00082 status = hpss_Chmod (filename, mode);
00083
00084 if (status < 0) {
00085 status = HPSS_FILE_CHMOD_ERR + status;
00086 rodsLog (LOG_ERROR,
00087 "hpssFileChmod: chmod of %s error, status = %d",
00088 filename, status);
00089 }
00090
00091 return (status);
00092 }
00093
00094 int
00095 hpssFileRmdir (rsComm_t *rsComm, char *filename)
00096 {
00097 int status;
00098
00099 if ((status = initHpssAuth ()) < 0) return status;
00100 status = hpss_Rmdir (filename);
00101
00102 if (status < 0) {
00103 status = HPSS_FILE_RMDIR_ERR + status;
00104 rodsLog (LOG_ERROR,
00105 "hpssFileRmdir: rmdir of %s error, status = %d",
00106 filename, status);
00107 }
00108
00109 return (status);
00110 }
00111
00112 int
00113 hpssFileOpendir (rsComm_t *rsComm, char *dirname, void **outDirPtr)
00114 {
00115 int status;
00116 int *intPtr;
00117
00118 if ((status = initHpssAuth ()) < 0) return status;
00119 status = hpss_Opendir (dirname);
00120 if (status < 0) {
00121 status = HPSS_FILE_OPENDIR_ERR + status;
00122 rodsLog (LOG_ERROR,
00123 "hpssFileOpendir: opendir of %s error, status = %d",
00124 dirname, status);
00125 return status;
00126 }
00127
00128 intPtr = (int *) malloc (sizeof (int));
00129 *intPtr = status;
00130 *outDirPtr = (void *) intPtr;
00131 return (0);
00132 }
00133
00134 int
00135 hpssFileClosedir (rsComm_t *rsComm, void *dirPtr)
00136 {
00137 int status;
00138
00139 status = hpss_Closedir (* (int *) dirPtr);
00140
00141 if (status < 0) {
00142 status = HPSS_FILE_CLOSEDIR_ERR + status;
00143 rodsLog (LOG_ERROR,
00144 "hpssFileClosedir: closedir error, status = %d", status);
00145 }
00146 return (status);
00147 }
00148
00149 int
00150 hpssFileReaddir (rsComm_t *rsComm, void *dirPtr, struct dirent *direntPtr)
00151 {
00152 int status;
00153 int dirdesc;
00154
00155 dirdesc = *(int *) dirPtr;
00156 status = hpss_Readdir (dirdesc, (hpss_dirent_t *) direntPtr);
00157
00158 if (status < 0) {
00159 status = HPSS_FILE_READDIR_ERR + status;
00160 rodsLog (LOG_ERROR,
00161 "hpssFileReaddir: readdir error, status = %d", status);
00162 }
00163 return status;
00164 }
00165
00166 int
00167 hpssFileRename (rsComm_t *rsComm, char *oldFileName, char *newFileName)
00168 {
00169 int status;
00170
00171 if ((status = initHpssAuth ()) < 0) return status;
00172
00173 status = hpss_Rename (oldFileName, newFileName);
00174
00175 if (status < 0) {
00176 status = HPSS_FILE_RENAME_ERR + status;
00177 rodsLog (LOG_ERROR,
00178 "hpssFileRename: rename error, status = %d\n",
00179 status);
00180 }
00181
00182 return (status);
00183 }
00184
00185 rodsLong_t
00186 hpssFileGetFsFreeSpace (rsComm_t *rsComm, char *path, int flag)
00187 {
00188 int space = LARGE_SPACE;
00189 return (space * 1024 * 1024);
00190 }
00191
00192
00193
00194
00195
00196
00197
00198 int
00199 hpssStageToCache (rsComm_t *rsComm, fileDriverType_t cacheFileType,
00200 int mode, int flags, char *filename,
00201 char *cacheFilename, rodsLong_t dataSize,
00202 keyValPair_t *condInput)
00203 {
00204 int status;
00205
00206 struct stat statbuf;
00207 rodsLong_t mySize;
00208
00209 if ((status = initHpssAuth ()) < 0) return status;
00210
00211 status = hpssFileStat (NULL, filename, &statbuf);
00212
00213 if (status < 0 || (statbuf.st_mode & S_IFREG) == 0) {
00214 status = HPSS_FILE_STAT_ERR + status;
00215 rodsLog (LOG_ERROR, "hpssStageToCache: stat of %s error, status = %d",
00216 filename, status);
00217 return status;
00218 }
00219
00220
00221 if (dataSize > 0 && dataSize != statbuf.st_size) {
00222 rodsLog (LOG_ERROR,
00223 "hpssStageToCache: %s inp dataSize %lld does not match size %lld",
00224 filename, dataSize, statbuf.st_size);
00225 return SYS_COPY_LEN_ERR;
00226 }
00227 mySize = statbuf.st_size;
00228
00229 if (dataSize > MAX_SZ_FOR_SINGLE_BUF) {
00230 status = paraHpssGet (filename, cacheFilename, mode, flags, mySize);
00231 } else {
00232 status = seqHpssGet (filename, cacheFilename, mode, flags, mySize);
00233 }
00234 return status;
00235
00236 }
00237
00238
00239
00240
00241
00242
00243
00244 int
00245 hpssSyncToArch (rsComm_t *rsComm, fileDriverType_t cacheFileType,
00246 int mode, int flags, char *filename,
00247 char *cacheFilename, rodsLong_t dataSize, keyValPair_t *condInput)
00248 {
00249 int status;
00250 struct stat statbuf;
00251
00252 if ((status = initHpssAuth ()) < 0) return status;
00253
00254 status = stat (cacheFilename, &statbuf);
00255
00256 if (status < 0) {
00257 status = UNIX_FILE_STAT_ERR - errno;
00258 rodsLog (LOG_ERROR, "hpssSyncToArch: stat of %s error, status = %d",
00259 cacheFilename, status);
00260 return status;
00261 }
00262
00263 if ((statbuf.st_mode & S_IFREG) == 0) {
00264 status = UNIX_FILE_STAT_ERR - errno;
00265 rodsLog (LOG_ERROR, "hpssSyncToArch: %s is not a file, status = %d",
00266 cacheFilename, status);
00267 return status;
00268 }
00269
00270 if (dataSize > 0 && dataSize != statbuf.st_size) {
00271 rodsLog (LOG_ERROR,
00272 "hpssSyncToArch: %s inp size %lld does not match actual size %lld",
00273 cacheFilename, dataSize, statbuf.st_size);
00274 return SYS_COPY_LEN_ERR;
00275 }
00276 dataSize = statbuf.st_size;
00277
00278 if (dataSize > MAX_SZ_FOR_SINGLE_BUF) {
00279 status = paraHpssPut (cacheFilename, filename, mode, flags, dataSize);
00280 } else {
00281 status = seqHpssPut (cacheFilename, filename, mode, flags, dataSize);
00282 }
00283
00284 return status;
00285 }
00286
00287 int
00288 seqHpssPut (char *srcUnixFile, char *destHpssFile, int mode, int flags,
00289 rodsLong_t mySize)
00290 {
00291 int srcFd, destFd;
00292 char myBuf[TRANS_BUF_SZ];
00293 rodsLong_t bytesCopied = 0;
00294 int bytesRead;
00295 int status;
00296
00297 srcFd = open (srcUnixFile, O_RDONLY, 0);
00298 if (srcFd < 0) {
00299 status = UNIX_FILE_OPEN_ERR - errno;
00300 rodsLog (LOG_ERROR,
00301 "seqHpssPut: open error for srcUnixFile %s, status = %d",
00302 srcUnixFile, status);
00303 return status;
00304 }
00305
00306 destFd = hpssOpenForWrite (destHpssFile, mode, flags, mySize);
00307 if (destFd < 0) {
00308 close (srcFd);
00309 return destFd;
00310 }
00311
00312 while ((bytesRead = read (srcFd, (void *) myBuf, TRANS_BUF_SZ)) > 0) {
00313 int left, written;
00314 char *bufptr;
00315
00316 left = bytesRead;
00317 bufptr = myBuf;
00318 while (left > 0) {
00319 written = hpss_Write (destFd, bufptr, left);
00320 if (written < 0) {
00321 status = HPSS_FILE_WRITE_ERR + written;
00322 rodsLog (LOG_ERROR,
00323 "seqHpssPut: hpss_Write err for %s, status = %d",
00324 destHpssFile, status);
00325 close (srcFd);
00326 hpss_Close (destFd);
00327 return status;
00328 }
00329 left -= written;
00330 bufptr += written;
00331 }
00332 bytesCopied += bytesRead;
00333 }
00334
00335 if (mySize != bytesCopied) {
00336 rodsLog (LOG_ERROR,
00337 "seqHpssPut: %s bytesCopied %lld does not match actual size %lld",
00338 srcUnixFile, bytesCopied, mySize);
00339 status = SYS_COPY_LEN_ERR;
00340 } else {
00341 status = 0;
00342 }
00343 close (srcFd);
00344 hpss_Close (destFd);
00345
00346 return status;
00347 }
00348
00349 int
00350 seqHpssGet (char *srcHpssFile, char *destUnixFile, int mode, int flags,
00351 rodsLong_t dataSize)
00352 {
00353 int srcFd, destFd;
00354 char myBuf[TRANS_BUF_SZ];
00355 rodsLong_t bytesCopied = 0;
00356 int bytesRead, bytesWritten;
00357 int status;
00358
00359 srcFd = hpssOpenForRead (srcHpssFile, O_RDONLY);
00360 if (srcFd < 0) {
00361 status = HPSS_FILE_OPEN_ERR + srcFd;
00362 rodsLog (LOG_ERROR,
00363 "seqHpssGet: hpssOpenForRead error for srcHpssFile %s, status = %d",
00364 srcHpssFile, status);
00365 return status;
00366 }
00367
00368 destFd = open (destUnixFile, O_WRONLY | O_CREAT | O_TRUNC, mode);
00369 if (destFd < 0) {
00370 status = UNIX_FILE_OPEN_ERR - errno;
00371 rodsLog (LOG_ERROR,
00372 "hpssFileCopy: open error for destUnixFile %s, status = %d",
00373 destUnixFile, status);
00374 hpss_Close (srcFd);
00375 return status;
00376 }
00377
00378 while ((bytesRead = hpss_Read (srcFd, (void *) myBuf, TRANS_BUF_SZ)) > 0) {
00379 bytesWritten = write (destFd, (void *) myBuf, bytesRead);
00380 if (bytesWritten <= 0) {
00381 status = UNIX_FILE_WRITE_ERR - errno;
00382 rodsLog (LOG_ERROR,
00383 "seqHpssGet: write error for destUnixFile %s, status = %d",
00384 destUnixFile, status);
00385 hpss_Close (srcFd);
00386 close (destFd);
00387 return status;
00388 }
00389 bytesCopied += bytesWritten;
00390 }
00391
00392 if (dataSize != bytesCopied) {
00393 rodsLog (LOG_ERROR,
00394 "seqHpssPut: %s bytesCopied %lld does not match actual size %lld",
00395 srcHpssFile, bytesCopied, dataSize);
00396 status = SYS_COPY_LEN_ERR;
00397 } else {
00398 status = 0;
00399 }
00400
00401 hpss_Close (srcFd);
00402 close (destFd);
00403
00404 return status;
00405 }
00406
00407 int
00408 hpssOpenForWrite (char *destHpssFile, int mode, int flags, rodsLong_t dataSize)
00409 {
00410 hpss_cos_hints_t HintsIn;
00411 hpss_cos_priorities_t HintsPri;
00412 hpss_cos_hints_t HintsOut;
00413 int myCos;
00414 int destFd;
00415 int myFlags;
00416 int status;
00417
00418 if ((status = initHpssAuth ()) < 0) return status;
00419
00420 myFlags = O_CREAT | O_TRUNC | O_RDWR;
00421 (void) hpss_Umask((mode_t) 0000);
00422 myCos = getHpssCos (dataSize);
00423
00424 if (myCos < 0) {
00425
00426 if (dataSize > 0) {
00427 memset(&HintsIn, 0, sizeof HintsIn);
00428 memset(&HintsPri, 0, sizeof HintsPri);
00429 CONVERT_LONGLONG_TO_U64(dataSize, HintsIn.MinFileSize);
00430 HintsIn.MaxFileSize = HintsIn.MinFileSize;
00431 HintsPri.MaxFileSizePriority = HintsPri.MinFileSizePriority =
00432 REQUIRED_PRIORITY;
00433 destFd = hpss_Open (destHpssFile, myFlags, mode, &HintsIn,
00434 &HintsPri, NULL);
00435 } else {
00436 destFd = hpss_Open (destHpssFile, myFlags, mode, NULL, NULL, NULL);
00437 }
00438 } else {
00439 memset(&HintsIn, 0, sizeof HintsIn);
00440 memset(&HintsPri, 0, sizeof HintsPri);
00441 memset(&HintsOut, 0, sizeof HintsOut);
00442 HintsIn.COSId = myCos;
00443 HintsPri.COSIdPriority = REQUIRED_PRIORITY;
00444 destFd = hpss_Open (destHpssFile, myFlags, mode, &HintsIn ,&HintsPri,
00445 &HintsOut);
00446 }
00447
00448 if (destFd < 0) {
00449 if (destFd != HPSS_ENOENT) {
00450 char *errorText;
00451 errorText = hpss_RPCGetLastErrorText();
00452 if (errorText != NULL) {
00453 rodsLog (LOG_ERROR,
00454 "hpssOpenForWrite: error text = %s", errorText);
00455 }
00456 rodsLog (LOG_ERROR,
00457 "hpssOpenForWrite: hpss_Open error, status = %d\n",
00458 destFd);
00459 }
00460 destFd = HPSS_FILE_OPEN_ERR + destFd;
00461 }
00462 return destFd;
00463 }
00464
00465 int
00466 hpssOpenForRead (char *srcHpssFile, int flags)
00467 {
00468 int srcFd;
00469 int myFlags;
00470
00471 int status;
00472
00473 if ((status = initHpssAuth ()) < 0) return status;
00474
00475 myFlags = O_RDONLY;
00476 srcFd = hpss_Open (srcHpssFile, myFlags, 0, NULL, NULL, NULL);
00477
00478 if (srcFd < 0) {
00479 if (srcFd != HPSS_ENOENT) {
00480 rodsLog (LOG_ERROR,
00481 "hpssOpenForRead: hpss_Open error, status = %d\n", srcFd);
00482 }
00483 srcFd = HPSS_FILE_OPEN_ERR + srcFd;
00484 }
00485 return srcFd;
00486 }
00487
00488 int
00489 hpssStatToStat (hpss_stat_t *hpssstat, struct stat *statbuf)
00490 {
00491
00492 memset (statbuf, 0, sizeof (struct stat));
00493 statbuf->st_dev = hpssstat->st_dev;
00494 statbuf->st_mode = hpssstat->st_mode;
00495 statbuf->st_nlink = hpssstat->st_nlink;
00496 statbuf->st_uid = hpssstat->st_uid;
00497 statbuf->st_gid = hpssstat->st_gid;
00498 statbuf->st_rdev = hpssstat->st_rdev;
00499 CONVERT_U64_TO_LONGLONG(hpssstat->st_size, statbuf->st_size);
00500
00501 statbuf->st_atime = hpssstat->st_atime_n;
00502 statbuf->st_mtime = hpssstat->st_mtime_n;
00503 statbuf->st_ctime = hpssstat->st_ctime_n;
00504 statbuf->st_blksize = hpssstat->st_blksize;
00505 statbuf->st_blocks = hpssstat->st_blocks;
00506 #if defined(aix_platform)
00507 statbuf->st_vfstype = hpssstat->st_vfstype;
00508 statbuf->st_vfs = hpssstat->st_vfs;
00509 statbuf->st_type = hpssstat->st_type;
00510 statbuf->st_flag = hpssstat->st_flag;
00511 #endif
00512 #if defined(aix_platform) || defined(alpha_platform)
00513 statbuf->st_gen = hpssstat->st_gen;
00514 #endif
00515 return (0);
00516 }
00517
00518 int
00519 initHpssAuth ()
00520 {
00521 char hpssUser[MAX_NAME_LEN], hpssAuthInfo[MAX_NAME_LEN];
00522 int status;
00523 hpss_authn_mech_t mech_type;
00524 const char * mech_name;
00525
00526 if (HpssAuthFlag) return 0;
00527
00528 HpssAuthFlag = 1;
00529
00530 if ((status = readHpssAuthInfo (hpssUser, hpssAuthInfo)) < 0) {
00531 rodsLog (LOG_ERROR,
00532 "initHpssAuth: readHpssAuthInfo error. status = %d", status);
00533 return status;
00534 }
00535
00536
00537
00538
00539 status = hpss_GetConfiguration(&Api_config);
00540 if(status != 0) {
00541 rodsLog (LOG_ERROR,
00542 "initHpssAuth: hpss_GetConfiguration error. status = %d", status);
00543 return (status + HPSS_AUTH_ERR);
00544 }
00545
00546 #ifdef HPSS_KRB5_AUTH
00547 Api_config.AuthnMech = hpss_authn_mech_krb5;
00548 #else
00549 Api_config.AuthnMech = hpss_authn_mech_unix;
00550 #endif
00551 Api_config.Flags |= API_USE_CONFIG;
00552
00553 status = hpss_SetConfiguration(&Api_config);
00554 if(status != 0) {
00555 rodsLog (LOG_ERROR,
00556 "initHpssAuth: hpss_SetConfiguration error. status = %d", status);
00557 return (status + HPSS_AUTH_ERR);
00558 }
00559
00560 #ifdef HPSS_KRB5_AUTH
00561 mech_name = "krb5";
00562 #else
00563 mech_name = "unix";
00564 #endif
00565 status = hpss_AuthnMechTypeFromString(mech_name, &mech_type);
00566 if(status != 0) {
00567 rodsLog (LOG_ERROR,
00568 "initHpssAuth: invalid authentication type %s", mech_name);
00569 status = HPSS_AUTH_NOT_SUPPORTED + status;
00570 return status;
00571 }
00572 #ifdef HPSS_UNIX_PASSWD_AUTH
00573 status = hpss_SetLoginCred (hpssUser, mech_type, hpss_rpc_cred_client,
00574 hpss_rpc_auth_type_passwd, hpssAuthInfo);
00575 memset (hpssAuthInfo, 0, MAX_NAME_LEN);
00576 if(status != 0) {
00577 rodsLog (LOG_ERROR,
00578 "initHpssAuth: hpss_SetLoginCred error,stat=%d.hpssUser=%s,errno=%d",
00579 status, hpssUser, errno);
00580 status = HPSS_AUTH_ERR - errno;
00581 return status;
00582 }
00583 #else
00584 status = hpss_SetLoginCred(hpssUser, mech_type, hpss_rpc_cred_client,
00585 hpss_rpc_auth_type_keytab, hpssAuthInfo);
00586 if(status != 0) {
00587 char *errorText;
00588 errorText = hpss_RPCGetLastErrorText();
00589 if (errorText != NULL) {
00590 rodsLog (LOG_ERROR,
00591 "hpss_SetLoginCred: error text = %s", errorText);
00592 }
00593 rodsLog (LOG_ERROR,
00594 "initHpssAuth: hpss_SetLoginCred err,stat=%d.User=%s,Info=%s,err=%d",
00595 status, hpssUser, hpssAuthInfo, errno);
00596 status = HPSS_AUTH_ERR - errno;
00597 return status;
00598 }
00599 #endif
00600 return status;
00601 }
00602
00603 int
00604 readHpssAuthInfo (char *hpssUser, char *hpssAuthInfo)
00605 {
00606 FILE *fptr;
00607 char hpssAuthFile[MAX_NAME_LEN];
00608 char inbuf[MAX_NAME_LEN];
00609 int lineLen, bytesCopied;
00610 int linecnt = 0;
00611
00612 snprintf (hpssAuthFile, MAX_NAME_LEN, "%-s/%-s",
00613 getConfigDir(), HPSS_AUTH_FILE);
00614
00615 fptr = fopen (hpssAuthFile, "r");
00616
00617 if (fptr == NULL) {
00618 rodsLog (LOG_ERROR,
00619 "readHpssAuthInfo: open HPSS_AUTH_FILE file %s err. ernro = %d",
00620 hpssAuthFile, errno);
00621 return (SYS_CONFIG_FILE_ERR);
00622 }
00623 while ((lineLen = getLine (fptr, inbuf, MAX_NAME_LEN)) > 0) {
00624 char *inPtr = inbuf;
00625 if (linecnt == 0) {
00626 while ((bytesCopied = getStrInBuf (&inPtr, hpssUser,
00627 &lineLen, LONG_NAME_LEN)) > 0) {
00628 linecnt ++;
00629 break;
00630 }
00631 } else if (linecnt == 1) {
00632 while ((bytesCopied = getStrInBuf (&inPtr, hpssAuthInfo,
00633 &lineLen, LONG_NAME_LEN)) > 0) {
00634 linecnt ++;
00635 break;
00636 }
00637 }
00638 }
00639 if (linecnt != 2) {
00640 rodsLog (LOG_ERROR,
00641 "readHpssAuthInfo: read %d lines in HPSS_AUTH_FILE file",
00642 linecnt);
00643 return (SYS_CONFIG_FILE_ERR);
00644 }
00645 return 0;
00646 }
00647
00648 int
00649 initHpssCos ()
00650 {
00651 FILE *fptr;
00652 char hpssCosFile[MAX_NAME_LEN];
00653 char inbuf[MAX_NAME_LEN];
00654 char strbuf[LONG_NAME_LEN];
00655 int lineLen, bytesCopied;
00656 int strcnt;
00657 hpssCosDef_t *tmpHpssCos = NULL;
00658
00659 if (HpssDefCos != COS_NOT_INIT) return 0;
00660
00661 snprintf (hpssCosFile, MAX_NAME_LEN, "%-s/%-s",
00662 getConfigDir(), HPSS_COS_CONFIG_FILE);
00663
00664 fptr = fopen (hpssCosFile, "r");
00665
00666 if (fptr == NULL) {
00667 rodsLog (LOG_ERROR,
00668 "initHpssCos: open HPSS_COS_CONFIG_FILE file %s err. ernro = %d",
00669 hpssCosFile, errno);
00670 HpssDefCos = NO_DEF_COS;
00671 return (SYS_CONFIG_FILE_ERR);
00672 }
00673 while ((lineLen = getLine (fptr, inbuf, MAX_NAME_LEN)) > 0) {
00674 char *inPtr = inbuf;
00675 strcnt = 0;
00676 while ((bytesCopied = getStrInBuf (&inPtr, strbuf, &lineLen,
00677 LONG_NAME_LEN)) > 0) {
00678 if (strcnt == 0) {
00679
00680 tmpHpssCos = malloc (sizeof(hpssCosDef_t));
00681 bzero (tmpHpssCos, sizeof sizeof(hpssCosDef_t));
00682 tmpHpssCos->cos = atoi (strbuf);
00683 } else if (strcnt == 1) {
00684 tmpHpssCos->maxSzInKByte = strtoll (strbuf, 0, 0);
00685 } else {
00686 if (strcmp (strbuf, DEF_COS_KW) == 0) {
00687 HpssDefCos = tmpHpssCos->cos;
00688 }
00689 }
00690 strcnt++;
00691 }
00692 if (tmpHpssCos != NULL) {
00693 if (strcnt <= 1) {
00694 rodsLog (LOG_ERROR,
00695 "initHpssCos: input cos %d has no other entries",
00696 tmpHpssCos->cos);
00697 free (tmpHpssCos);
00698 } else {
00699 queCos (tmpHpssCos);
00700 }
00701 tmpHpssCos = NULL;
00702 }
00703 }
00704
00705 if (HpssDefCos == COS_NOT_INIT) HpssDefCos = NO_DEF_COS;
00706
00707 return 0;
00708 }
00709
00710 int
00711 queCos (hpssCosDef_t *myHpssCos)
00712 {
00713 hpssCosDef_t *lastHpssCos = NULL;
00714 hpssCosDef_t *tmpHpssCos;
00715
00716 if (HpssCosHead == NULL) {
00717 HpssCosHead = myHpssCos;
00718 HpssCosHead->next = NULL;
00719 return 0;
00720 }
00721 tmpHpssCos = HpssCosHead;
00722 while (tmpHpssCos != NULL) {
00723 if (tmpHpssCos->maxSzInKByte > myHpssCos->maxSzInKByte)
00724 break;
00725 lastHpssCos = tmpHpssCos;
00726 tmpHpssCos = lastHpssCos->next;
00727 }
00728 if (lastHpssCos == NULL) {
00729 myHpssCos->next = HpssCosHead;
00730 HpssCosHead = myHpssCos;
00731 } else {
00732 myHpssCos->next = lastHpssCos->next;
00733 lastHpssCos->next = myHpssCos;
00734 }
00735 return (0);
00736 }
00737
00738 int
00739 getHpssCos (rodsLong_t fileSize)
00740 {
00741 hpssCosDef_t *tmpHpssCos;
00742 int lastCos = -1;
00743
00744 initHpssCos ();
00745
00746 if (HpssCosHead == NULL || fileSize < 0) return HpssDefCos;
00747
00748 tmpHpssCos = HpssCosHead;
00749
00750 while (tmpHpssCos != NULL) {
00751 if ((fileSize / 1024) < tmpHpssCos->maxSzInKByte)
00752 return (tmpHpssCos->cos);
00753 lastCos = tmpHpssCos->cos;
00754 tmpHpssCos = tmpHpssCos->next;
00755 }
00756 if (lastCos >= 0) return lastCos;
00757 return HpssDefCos;
00758 }
00759
00760 int
00761 paraHpssPut (char *srcUnixFile, char *destHpssFile, int mode, int flags,
00762 rodsLong_t mySize)
00763 {
00764 int destFd;
00765 int status;
00766 hpssSession_t myHpssSession;
00767 hpss_IOD_t iod;
00768 hpss_IOR_t ior;
00769 iod_srcsinkdesc_t srcDesc, sinkDesc;
00770
00771 status = initHpssSession (&myHpssSession, HPSS_PUT_OPR,
00772 srcUnixFile, mySize, mode);
00773 if (status < 0) return status;
00774
00775 destFd = hpssOpenForWrite (destHpssFile, mode, flags, mySize);
00776 if (destFd < 0) {
00777 return destFd;
00778 }
00779
00780 status = createControlSocket (&myHpssSession);
00781 if (status < 0) return status;
00782
00783 pthread_create(&myHpssSession.moverConnManagerThr, pthread_attr_default,
00784 (void *(*)(void *)) moverConnManager,
00785 (void *) &myHpssSession);
00786
00787 #if !defined(solaris_platform)
00788 sched_yield();
00789 #endif
00790 initHpssIodForWrite (&iod, &srcDesc, &sinkDesc, destFd, &myHpssSession);
00791
00792 bzero (&ior, sizeof(ior));
00793 status = hpss_WriteList (&iod, 0, &ior);
00794 if (status != 0) {
00795 if (ior.Status != HPSS_E_NOERROR) {
00796 status = HPSS_WRITE_LIST_ERR + ior.Status;
00797 rodsLog (LOG_ERROR,
00798 "paraHpssPut: hpss_WriteList error for %s. ior ststus = %d",
00799 destHpssFile, status);
00800 myHpssSession.status = status;
00801 } else if (myHpssSession.status == HPSS_E_NOERROR) {
00802 rodsLog (LOG_ERROR,
00803 "paraHpssPut: hpss_WriteList error for %s. ststus = %d",
00804 destHpssFile, status);
00805 myHpssSession.status = HPSS_WRITE_LIST_ERR;
00806 }
00807 }
00808 hpss_Close(destFd);
00809
00810 status = postProcSessionThr (&myHpssSession, destHpssFile);
00811 return status;
00812 }
00813
00814 int
00815 initHpssSession (hpssSession_t *hpssSession, int operation, char *unixFilePath,
00816 rodsLong_t fileSize, int createMode)
00817 {
00818 struct hostent *hostEntry;
00819 char *hostname;
00820 char myhostname[NAME_LEN];
00821
00822 bzero (hpssSession, sizeof (hpssSession_t));
00823 hpssSession->fileSize64 = cast64m (fileSize);
00824 hpssSession->operation = operation;
00825 hpssSession->unixFilePath = unixFilePath;
00826 hpssSession->createFlag = 0;
00827 hpssSession->createMode = createMode;
00828 hpssSession->status = HPSS_E_NOERROR;
00829 hpssSession->requestId = getpid();
00830
00831 if ((hostname = getenv("IRODS_HPSS_HOSTNAME")) == NULL &&
00832 (hostname = getLocalSvrAddr ()) == NULL) {
00833 if (gethostname (myhostname, NAME_LEN) < 0) {
00834 rodsLog (LOG_ERROR,
00835 "initHpssSession: gethostname error, errno=%d", errno);
00836 return (SYS_INVALID_SERVER_HOST - errno);
00837 } else {
00838 hostname = myhostname;
00839 }
00840 }
00841 hostEntry = gethostbyname (hostname);
00842 if (hostEntry == NULL) {
00843 rodsLog (LOG_ERROR,
00844 "initHpssSession: gethostbyname of %s error, errno=%d",
00845 hostname, errno);
00846 return (SYS_INVALID_SERVER_HOST - errno);
00847 }
00848 hpssSession->ipAddr = *((unsigned32 *) hostEntry->h_addr_list[0]);
00849 pthread_mutex_init (&hpssSession->myMutex, pthread_mutexattr_default);
00850 return (0);
00851 }
00852
00853
00854
00855 int
00856 createControlSocket (hpssSession_t *hpssSession)
00857 {
00858 int status, myLen;
00859
00860 hpssSession->mySocket = socket(AF_INET, SOCK_STREAM, 0);
00861 if (hpssSession->mySocket < 0) {
00862 rodsLog (LOG_ERROR,
00863 "createControlSocket: create socket error, errno=%d", errno);
00864 return (SYS_INVALID_SERVER_HOST - errno);
00865 }
00866
00867 bzero (&hpssSession->mySocketAddr, sizeof(struct sockaddr_in));
00868 hpssSession->mySocketAddr.sin_family = AF_INET;
00869 hpssSession->mySocketAddr.sin_addr.s_addr = INADDR_ANY;
00870 hpssSession->mySocketAddr.sin_port = 0;
00871 status = bind (hpssSession->mySocket,
00872 (const struct sockaddr *) &hpssSession->mySocketAddr,
00873 sizeof(struct sockaddr_in));
00874 if (status < 0) {
00875 rodsLog (LOG_ERROR,
00876 "createControlSocket: create socket error, errno=%d", errno);
00877 return (SYS_INVALID_SERVER_HOST - errno);
00878 }
00879
00880 myLen = sizeof (struct sockaddr_in);
00881
00882 status = getsockname (hpssSession->mySocket,
00883 (struct sockaddr *) &hpssSession->mySocketAddr, (socklen_t *) &myLen);
00884
00885 if (status < 0) {
00886 rodsLog (LOG_ERROR,
00887 "createControlSocket: getsockname error, errno=%d", errno);
00888 return (SYS_INVALID_SERVER_HOST - errno);
00889 }
00890
00891 status = listen (hpssSession->mySocket, SOMAXCONN);
00892 if (status < 0) {
00893 rodsLog (LOG_ERROR,
00894 "createControlSocket: listen error, errno=%d", errno);
00895 return (SYS_INVALID_SERVER_HOST - errno);
00896 }
00897
00898 return (0);
00899 }
00900
00901 int
00902 initHpssIodForWrite (hpss_IOD_t *iod, iod_srcsinkdesc_t *srcDesc,
00903 iod_srcsinkdesc_t *sinkDesc, int destFd, hpssSession_t *hpssSession)
00904 {
00905
00906 memset(iod, 0, sizeof(hpss_IOD_t));
00907 memset(srcDesc, 0, sizeof(iod_srcsinkdesc_t));
00908 memset(sinkDesc, 0, sizeof(iod_srcsinkdesc_t));
00909
00910 srcDesc->Flags = HPSS_IOD_XFEROPT_IP;
00911 srcDesc->Flags |= HPSS_IOD_CONTROL_ADDR;
00912
00913
00914 sinkDesc->Offset = srcDesc->Offset = cast64m(0);
00915 sinkDesc->Length = srcDesc->Length = hpssSession->fileSize64;
00916
00917 sinkDesc->SrcSinkAddr.Type = CLIENTFILE_ADDRESS;
00918 sinkDesc->SrcSinkAddr.Addr_u.ClientFileAddr.FileDes = destFd;
00919 sinkDesc->SrcSinkAddr.Addr_u.ClientFileAddr.FileOffset = cast64m(0);
00920 srcDesc->SrcSinkAddr.Type = NET_ADDRESS;
00921 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockTransferID =
00922 cast64m(hpssSession->requestId);
00923 #ifdef HPSS7
00924 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockAddr.Addr.hpss_saddr_u.ipv4_addr =
00925 hpssSession->ipAddr;
00926 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockAddr.Addr.family =
00927 hpssSession->mySocketAddr.sin_family;
00928 #else
00929 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockAddr.addr =
00930 hpssSession->ipAddr;
00931 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockAddr.family =
00932 hpssSession->mySocketAddr.sin_family;
00933 #endif
00934 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockAddr.port =
00935 hpssSession->mySocketAddr.sin_port;
00936 srcDesc->SrcSinkAddr.Addr_u.NetAddr.SockOffset = cast64m(0);
00937 iod->Function = HPSS_IOD_WRITE;
00938 iod->RequestID = hpssSession->requestId;
00939 iod->SrcDescLength = 1;
00940 iod->SinkDescLength = 1;
00941 iod->SrcDescList = srcDesc;
00942 iod->SinkDescList = sinkDesc;
00943
00944 hpssSession->totalBytesMoved64 = cast64m(0);
00945 hpssSession->status = HPSS_E_NOERROR;
00946
00947 return (0);
00948 }
00949
00950 void
00951 moverConnManager (hpssSession_t *hpssSession)
00952 {
00953 int moverSocket;
00954 int i;
00955 struct sockaddr_in socketAddr;
00956 int len;
00957
00958 for (;;) {
00959 len = sizeof(socketAddr);
00960 while ((moverSocket = accept (hpssSession->mySocket,
00961 (struct sockaddr *) &socketAddr, (socklen_t *) &len)) < 0) {
00962 if ((errno != EINTR) && (errno != EAGAIN)) {
00963 rodsLog (LOG_ERROR,
00964 "moverConnManager: socket accept error, errno=%d", errno);
00965 hpssSession->status = SYS_SOCK_ACCEPT_ERR - errno;
00966 return;
00967 }
00968 hpssSession->status = SYS_SOCK_ACCEPT_ERR - errno;
00969 break;
00970 }
00971 if (moverSocket < 0) break;
00972
00973 do {
00974 pthread_mutex_lock(&hpssSession->myMutex);
00975 for (i = 0; i < MAX_HPSS_CONNECTIONS; i++) {
00976 if (!hpssSession->thrInfo[i].active) {
00977 hpssSession->thrInfo[i].active = 1;
00978 hpssSession->thrInfo[i].moverSocket = moverSocket;
00979 hpssSession->thrInfo[i].hpssSession = hpssSession;
00980 hpssSession->thrCnt++;
00981 break;
00982 }
00983 }
00984 pthread_mutex_unlock (&hpssSession->myMutex);
00985 if (i == MAX_HPSS_CONNECTIONS) {
00986 rodsSleep (0, 500000);
00987 }
00988 } while (i == MAX_HPSS_CONNECTIONS);
00989 rodsSetSockOpt (moverSocket, 0);
00990
00991 if (hpssSession->operation == HPSS_GET_OPR) {
00992 pthread_create(&hpssSession->thrInfo[i].threadId,
00993 pthread_attr_default,
00994 (void *(*)(void *)) getMover,
00995 (void *) &hpssSession->thrInfo[i]);
00996 } else if (hpssSession->operation == HPSS_PUT_OPR) {
00997 pthread_create (&hpssSession->thrInfo[i].threadId,
00998 pthread_attr_default,
00999 (void *(*)(void *)) putMover,
01000 (void *) &hpssSession->thrInfo[i]);
01001 }
01002 #if !defined(solaris_platform)
01003 sched_yield();
01004 #endif
01005 }
01006 return;
01007 }
01008
01009 void
01010 getMover (hpssThrInfo_t *thrInfo)
01011 {
01012 int status = 0;
01013 int transferListenSocket = -1;
01014 int transferSocketFd = -1;
01015 int bytesReceived;
01016 initiator_msg_t initMessage, initReply;
01017 initiator_ipaddr_t ipAddr;
01018 completion_msg_t completionMessage;
01019 char buffer[HPSS_BUF_SIZE];
01020 int destFd = -1;
01021 rodsLong_t offset = -1;
01022 rodsLong_t curoffset = 0;
01023 hpssSession_t *mySession = (hpssSession_t *) thrInfo->hpssSession;
01024 int bytesWritten;
01025 int length = 0;
01026
01027 pthread_mutex_lock(&mySession->myMutex);
01028 if (mySession->createFlag == 0) {
01029
01030 destFd = open (mySession->unixFilePath, O_WRONLY | O_CREAT | O_TRUNC,
01031 mySession->createMode);
01032 mySession->createFlag = 1;
01033 } else {
01034 destFd = open (mySession->unixFilePath, O_WRONLY,
01035 mySession->createMode);
01036 }
01037 pthread_mutex_unlock(&mySession->myMutex);
01038
01039 if (destFd < 0) {
01040 mySession->status = UNIX_FILE_OPEN_ERR - errno;
01041 rodsLog (LOG_ERROR,
01042 "getMover: open error for destUnixFile %s, status = %d",
01043 mySession->unixFilePath, mySession->status);
01044 return;
01045 }
01046
01047
01048 while (mySession->status == HPSS_E_NOERROR) {
01049
01050
01051 status = procMoverInitmsg (thrInfo, &initMessage, &initReply);
01052 if (status == HPSS_ECONN) {
01053 break;
01054 } else if (status != HPSS_E_NOERROR) {
01055 mySession->status = status;
01056 continue;
01057 }
01058
01059 status = procTransferListenSocket (thrInfo, &ipAddr,
01060 &transferListenSocket);
01061 if (status != HPSS_E_NOERROR) {
01062 mySession->status = status;
01063 continue;
01064 }
01065
01066
01067
01068
01069 status = procTransferSocketFd (thrInfo, transferListenSocket,
01070 &transferSocketFd);
01071 if (status != HPSS_E_NOERROR) {
01072 mySession->status = status;
01073 continue;
01074 }
01075
01076
01077
01078
01079 status = mover_socket_recv_data(transferSocketFd,
01080 cast64m(mySession->requestId),
01081 initMessage.Offset,
01082 buffer,
01083 low32m(initReply.Length),
01084 &bytesReceived, 1);
01085
01086 if (status <= 0) {
01087 rodsLog (LOG_ERROR,
01088 "getMover: mover_socket_recv_data error, status = %d", status);
01089 mySession->status = HPSS_MOVER_PROT_ERR + status;
01090 continue;
01091 } else {
01092 length = low32m (initReply.Length);
01093 }
01094 CONVERT_U64_TO_LONGLONG(initMessage.Offset, offset);
01095 if (offset != curoffset) {
01096 lseek (destFd, offset, SEEK_SET);
01097 curoffset = offset;
01098 }
01099 bytesWritten = write (destFd, (void *) buffer, length);
01100 if (bytesWritten <= 0) {
01101 mySession->status = UNIX_FILE_WRITE_ERR - errno;
01102 rodsLog (LOG_ERROR,
01103 "getMover: write error for destUnixFile %s, status = %d",
01104 mySession->unixFilePath, mySession->status);
01105 continue;
01106 } else if (bytesWritten != length) {
01107 mySession->status = SYS_COPY_LEN_ERR;
01108 rodsLog (LOG_ERROR,
01109 "getMover: write error for unixFile %s, towrite %d != written %d",
01110 mySession->unixFilePath, length, bytesWritten);
01111 continue;
01112 } else {
01113 curoffset += length;
01114
01115 }
01116
01117
01118
01119 status = mvrprot_recv_compmsg( thrInfo->moverSocket,
01120 &completionMessage);
01121 if (status != HPSS_E_NOERROR) {
01122 rodsLog (LOG_ERROR,
01123 "getMover: mvrprot_recv_compmsg error, status = %d",
01124 status);
01125 mySession->status = HPSS_MOVER_PROT_ERR + status;
01126 continue;
01127 }
01128 pthread_mutex_lock(&mySession->myMutex);
01129 inc64m (mySession->totalBytesMoved64, completionMessage.BytesMoved);
01130 if (ge64m(mySession->totalBytesMoved64, mySession->fileSize64)) {
01131 mySession->status = 0;
01132 pthread_mutex_unlock(&mySession->myMutex);
01133 break;
01134 }
01135 pthread_mutex_unlock(&mySession->myMutex);
01136 }
01137
01138
01139 if (transferSocketFd != -1) (void) close(transferSocketFd);
01140 if (transferListenSocket != -1) (void) close(transferListenSocket);
01141 close (destFd);
01142
01143 pthread_mutex_lock(&mySession->myMutex);
01144 (void) close(thrInfo->moverSocket);
01145 thrInfo->active = 0;
01146 pthread_mutex_unlock(&mySession->myMutex);
01147 return;
01148 }
01149
01150 void
01151 putMover (hpssThrInfo_t *thrInfo)
01152 {
01153 int status = 0;
01154 int transferListenSocket = -1;
01155 int transferSocketFd = -1;
01156 int bytesSent;
01157 initiator_msg_t initMessage, initReply;
01158 initiator_ipaddr_t ipAddr;
01159 completion_msg_t completionMessage;
01160 char buffer[HPSS_BUF_SIZE];
01161 int srcFd;
01162 rodsLong_t offset = -1;
01163 rodsLong_t curoffset = 0;
01164 hpssSession_t *mySession = (hpssSession_t *) thrInfo->hpssSession;
01165 int length = 0;
01166
01167
01168 srcFd = open (mySession->unixFilePath, O_RDONLY, 0);
01169 if (srcFd < 0) {
01170 mySession->status = UNIX_FILE_OPEN_ERR - errno;
01171 rodsLog (LOG_ERROR,
01172 "paraHpssPut: open error for unixFilePath %s, status = %d",
01173 mySession->unixFilePath, mySession->status);
01174 return;
01175 }
01176
01177
01178 while (mySession->status == HPSS_E_NOERROR) {
01179
01180
01181 status = procMoverInitmsg (thrInfo, &initMessage, &initReply);
01182 if (status == HPSS_ECONN) {
01183 break;
01184 } else if (status != HPSS_E_NOERROR) {
01185 mySession->status = status;
01186 continue;
01187 }
01188
01189 status = procTransferListenSocket (thrInfo, &ipAddr,
01190 &transferListenSocket);
01191 if (status != HPSS_E_NOERROR) {
01192 mySession->status = status;
01193 continue;
01194 }
01195
01196
01197
01198
01199
01200 status = procTransferSocketFd (thrInfo, transferListenSocket,
01201 &transferSocketFd);
01202 if (status != HPSS_E_NOERROR) {
01203 mySession->status = status;
01204 continue;
01205 }
01206
01207 CONVERT_U64_TO_LONGLONG(initMessage.Offset, offset);
01208 if (offset != curoffset) {
01209 lseek (srcFd, offset, SEEK_SET);
01210 curoffset = offset;
01211 }
01212 length = low32m (initReply.Length);
01213
01214 status = read (srcFd, buffer, length);
01215
01216 if (status != length) {
01217 rodsLog (LOG_ERROR,
01218 "putMover: bytes read %d != requested %d", status, length);
01219 mySession->status = SYS_COPY_LEN_ERR;
01220 continue;
01221 }
01222
01223
01224
01225
01226 status = mover_socket_send_requested_data(transferSocketFd,
01227 cast64m(mySession->requestId),
01228 initMessage.Offset,
01229 buffer,
01230 low32m(initReply.Length),
01231 &bytesSent, 1);
01232 if (status <= 0) {
01233 rodsLog (LOG_ERROR,
01234 "putMover: mover_socket_send_requested_data error, status = %d",
01235 status);
01236 mySession->status = HPSS_MOVER_PROT_ERR + status;
01237 continue;
01238 } else {
01239 curoffset += length;
01240 }
01241
01242
01243
01244 status = mvrprot_recv_compmsg( thrInfo->moverSocket,
01245 &completionMessage);
01246 if (status != HPSS_E_NOERROR) {
01247 rodsLog (LOG_ERROR,
01248 "putMover: mvrprot_recv_compmsg error, status = %d",
01249 status);
01250 mySession->status = HPSS_MOVER_PROT_ERR + status;
01251 continue;
01252 }
01253
01254 pthread_mutex_lock(&mySession->myMutex);
01255 inc64m (mySession->totalBytesMoved64, completionMessage.BytesMoved);
01256 if (ge64m(mySession->totalBytesMoved64, mySession->fileSize64)) {
01257 mySession->status = 0;
01258 pthread_mutex_unlock(&mySession->myMutex);
01259 break;
01260 }
01261 pthread_mutex_unlock(&mySession->myMutex);
01262 }
01263
01264
01265 if (transferSocketFd != -1) (void) close(transferSocketFd);
01266 if (transferListenSocket != -1) (void) close(transferListenSocket);
01267 close (srcFd);
01268
01269 pthread_mutex_lock(&mySession->myMutex);
01270 (void) close(thrInfo->moverSocket);
01271 thrInfo->active = 0;
01272 pthread_mutex_unlock(&mySession->myMutex);
01273 return;
01274 }
01275
01276 int
01277 procMoverInitmsg (hpssThrInfo_t *thrInfo, initiator_msg_t *initMessage,
01278 initiator_msg_t *initReply)
01279 {
01280 int status;
01281 hpssSession_t *mySession = (hpssSession_t *) thrInfo->hpssSession;
01282
01283
01284
01285
01286 status = mvrprot_recv_initmsg (thrInfo->moverSocket, initMessage);
01287 if (status != HPSS_E_NOERROR) {
01288 if (status == HPSS_ECONN) return HPSS_ECONN;
01289 status = HPSS_MOVER_PROT_ERR + status;
01290 rodsLog (LOG_ERROR,
01291 "procMoverInitmsg: mvrprot_recv_initmsg err, status = %d",
01292 mySession->status);
01293 return status;
01294 }
01295
01296
01297 initReply->Flags = MVRPROT_COMP_REPLY | MVRPROT_ADDR_FOLLOWS;
01298
01299
01300
01301
01302
01303
01304
01305
01306 initReply->Type = initMessage->Type;
01307 initReply->Offset = initMessage->Offset;
01308 if (gt64m(initMessage->Length, cast64m(HPSS_BUF_SIZE)))
01309 initReply->Length = cast64m(HPSS_BUF_SIZE);
01310 else
01311 initReply->Length = initMessage->Length;
01312
01313
01314
01315 status = mvrprot_send_initmsg( thrInfo->moverSocket, initReply);
01316 if (status != HPSS_E_NOERROR) {
01317 status = HPSS_MOVER_PROT_ERR + status;
01318 rodsLog (LOG_ERROR,
01319 "procMoverInitmsg: mvrprot_send_initmsg err, status = %d", status);
01320 return status;
01321 }
01322
01323
01324
01325
01326 if (initMessage->Type != NET_ADDRESS) {
01327 status = HPSS_MOVER_PROT_ERR;
01328 rodsLog (LOG_ERROR,
01329 "procMoverInitmsg: initMessage.Type != NET_ADDRESS");
01330 return status;
01331 }
01332
01333 return HPSS_E_NOERROR;
01334 }
01335
01336 int
01337 procTransferListenSocket (hpssThrInfo_t *thrInfo, initiator_ipaddr_t *ipAddr,
01338 int *transferListenSocket)
01339 {
01340 int status;
01341 struct sockaddr_in transferSocketAddr;
01342 int tmp;
01343 hpssSession_t *mySession = (hpssSession_t *) thrInfo->hpssSession;
01344
01345 if (*transferListenSocket < 0) {
01346 *transferListenSocket = socket(AF_INET, SOCK_STREAM, 0);
01347 if (*transferListenSocket == -1) {
01348 rodsLog (LOG_ERROR,
01349 "procTransferListenSocket: socket error, errno = %d", errno);
01350 status = SYS_SOCK_OPEN_ERR - errno;
01351 return status;
01352 }
01353
01354 (void) memset(&transferSocketAddr, 0, sizeof(transferSocketAddr));
01355 transferSocketAddr.sin_family = AF_INET;
01356 transferSocketAddr.sin_port = 0;
01357
01358
01359
01360 pthread_mutex_lock(&mySession->myMutex);
01361 transferSocketAddr.sin_addr.s_addr = mySession->ipAddr;
01362 pthread_mutex_unlock(&mySession->myMutex);
01363 if (bind(*transferListenSocket,
01364 (const struct sockaddr *) &transferSocketAddr,
01365 sizeof(transferSocketAddr)) == -1) {
01366 rodsLog (LOG_ERROR,
01367 "procTransferListenSocket: socket bind error, errno=%d", errno);
01368 status = SYS_SOCK_BIND_ERR - errno;
01369 return status;;
01370 }
01371 tmp = sizeof(transferSocketAddr);
01372 (void) memset(&transferSocketAddr, 0, sizeof(transferSocketAddr));
01373 if (getsockname(*transferListenSocket,
01374 (struct sockaddr *) & transferSocketAddr,
01375 (socklen_t *) & tmp) == -1) {
01376 rodsLog (LOG_ERROR,
01377 "procTransferListenSocket: getsockname error, errno=%d", errno);
01378 status = SYS_SOCK_OPEN_ERR - errno;
01379 return status;;
01380 }
01381 if (listen(*transferListenSocket, SOMAXCONN) == -1) {
01382 rodsLog (LOG_ERROR,
01383 "procTransferListenSocket: listen error, errno=%d", errno);
01384 status = SYS_SOCK_OPEN_ERR - errno;
01385 return status;;
01386 }
01387
01388 memset(ipAddr, 0, sizeof(ipAddr));
01389 ipAddr->IpAddr.SockTransferID =
01390 cast64m(mySession->requestId);
01391 #ifdef HPSS7
01392 ipAddr->IpAddr.SockAddr.Addr.family = transferSocketAddr.sin_family;
01393 ipAddr->IpAddr.SockAddr.Addr.hpss_saddr_u.ipv4_addr =
01394 transferSocketAddr.sin_addr.s_addr;
01395 #else
01396 ipAddr->IpAddr.SockAddr.family = transferSocketAddr.sin_family;
01397 ipAddr->IpAddr.SockAddr.addr = transferSocketAddr.sin_addr.s_addr;
01398 #endif
01399 ipAddr->IpAddr.SockAddr.port = transferSocketAddr.sin_port;
01400 ipAddr->IpAddr.SockOffset = cast64m(0);
01401 }
01402 status = mvrprot_send_ipaddr(thrInfo->moverSocket, ipAddr);
01403 if (status != HPSS_E_NOERROR) {
01404 rodsLog (LOG_ERROR,
01405 "procTransferListenSocket: mvrprot_send_ipaddr err, errno=%d",errno);
01406 status = HPSS_MOVER_PROT_ERR + status;
01407 }
01408
01409 return status;
01410 }
01411
01412 int
01413 procTransferSocketFd (hpssThrInfo_t *thrInfo, int transferListenSocket,
01414 int *transferSocketFd)
01415 {
01416 int status, tmp;
01417 struct sockaddr_in transferSocketAddr;
01418
01419
01420
01421
01422
01423 if (*transferSocketFd >= 0) return HPSS_E_NOERROR;
01424 tmp = sizeof(transferSocketAddr);
01425 while ((*transferSocketFd = accept(transferListenSocket,
01426 (struct sockaddr *) &transferSocketAddr,
01427 (socklen_t *) & tmp)) < 0) {
01428 if ((errno != EINTR) && (errno != EAGAIN)) {
01429 rodsLog (LOG_ERROR,
01430 "procTransferSocketFd: accept error, errno=%d", errno);
01431 status = SYS_SOCK_OPEN_ERR - errno;
01432 return status;
01433 }
01434 }
01435
01436 if (*transferSocketFd < 0) {
01437
01438 rodsLog (LOG_ERROR,
01439 "procTransferSocketFd: accept error, errno=%d", errno);
01440 status = SYS_SOCK_OPEN_ERR - errno;
01441 return status;
01442 }
01443 rodsSetSockOpt (*transferSocketFd, 0);
01444
01445 return HPSS_E_NOERROR;
01446 }
01447
01448 int
01449 paraHpssGet (char *srcHpssFile, char *destUnixFile, int mode, int flags,
01450 rodsLong_t mySize)
01451 {
01452 int srcFd;
01453 int status;
01454 hpssSession_t myHpssSession;
01455 hpss_IOD_t iod;
01456 hpss_IOR_t ior;
01457 iod_srcsinkdesc_t srcDesc, sinkDesc;
01458 u_signed64 bytesMoved;
01459 u_signed64 gapLength;
01460 int readListFlags = 0;
01461
01462
01463
01464
01465 status = initHpssSession (&myHpssSession, HPSS_GET_OPR,
01466 destUnixFile, mySize, mode);
01467 if (status < 0) return status;
01468
01469 srcFd = hpssOpenForRead (srcHpssFile, O_RDONLY);
01470 if (srcFd < 0) {
01471 return srcFd;
01472 }
01473
01474 status = createControlSocket (&myHpssSession);
01475 if (status < 0) return status;
01476
01477 pthread_create(&myHpssSession.moverConnManagerThr, pthread_attr_default,
01478 (void *(*)(void *)) moverConnManager,
01479 (void *) &myHpssSession);
01480
01481 #if !defined(solaris_platform)
01482 sched_yield();
01483 #endif
01484 initHpssIodForRead (&iod, &srcDesc, &sinkDesc, srcFd, &myHpssSession);
01485
01486 gapLength = bytesMoved = cast64m(0);
01487
01488
01489
01490
01491
01492
01493 while (lt64m(add64m(bytesMoved, gapLength), myHpssSession.fileSize64) &&
01494 myHpssSession.status == HPSS_E_NOERROR) {
01495
01496
01497
01498 srcDesc.Offset = sinkDesc.Offset = add64m(bytesMoved, gapLength);
01499 srcDesc.Length = sinkDesc.Length =
01500 sub64m(myHpssSession.fileSize64, srcDesc.Offset);
01501 srcDesc.SrcSinkAddr.Addr_u.ClientFileAddr.FileOffset = srcDesc.Offset;
01502 memset(&ior, 0, sizeof(ior));
01503 status = hpss_ReadList(&iod, readListFlags, &ior);
01504 if (status) {
01505 if (ior.Status != HPSS_E_NOERROR) {
01506 rodsLog (LOG_ERROR,
01507 "paraHpssGet: hpss_ReadList error,status=%d,ior status=%d",
01508 status, ior.Status);
01509 myHpssSession.status = HPSS_READ_LIST_ERR + ior.Status;
01510 } else if (myHpssSession.status == HPSS_E_NOERROR) {
01511 rodsLog (LOG_ERROR,
01512 "paraHpssGet: hpss_ReadList error, status= %d", status);
01513 myHpssSession.status = HPSS_READ_LIST_ERR + status;
01514 }
01515 } else {
01516 inc64m(bytesMoved, ior.SinkReplyList->BytesMoved);
01517
01518
01519
01520 if (ior.Flags & HPSS_IOR_GAPINFO_VALID) {
01521 inc64m (gapLength,
01522 ior.ReqSpecReply->ReqReply_s.ReqReply_u.GapInfo.Length);
01523 }
01524 if (ior.SrcReplyList) free (ior.SrcReplyList);
01525 if (ior.SinkReplyList) free (ior.SinkReplyList);
01526 }
01527 }
01528
01529
01530
01531
01532 status = hpss_Close(srcFd);
01533
01534 status = postProcSessionThr (&myHpssSession, srcHpssFile);
01535 return status;
01536
01537 }
01538
01539
01540
01541
01542 int
01543 initHpssIodForRead (hpss_IOD_t *iod, iod_srcsinkdesc_t *src,
01544 iod_srcsinkdesc_t *sink, int hpssSrcFd, hpssSession_t *hpssSession)
01545 {
01546
01547 memset(iod, 0, sizeof(hpss_IOD_t));
01548 memset(src, 0, sizeof(iod_srcsinkdesc_t));
01549 memset(sink, 0, sizeof(iod_srcsinkdesc_t));
01550
01551 sink->Flags = HPSS_IOD_XFEROPT_IP;
01552 sink->Flags |= HPSS_IOD_CONTROL_ADDR;
01553 src->SrcSinkAddr.Type = CLIENTFILE_ADDRESS;
01554 src->SrcSinkAddr.Addr_u.ClientFileAddr.FileDes = hpssSrcFd;
01555 hpssSession->requestId = getpid();
01556 sink->SrcSinkAddr.Type = NET_ADDRESS;
01557 sink->SrcSinkAddr.Addr_u.NetAddr.SockTransferID =
01558 cast64m(hpssSession->requestId);
01559 #ifdef HPSS7
01560 sink->SrcSinkAddr.Addr_u.NetAddr.SockAddr.Addr.hpss_saddr_u.ipv4_addr =
01561 hpssSession->ipAddr;
01562 sink->SrcSinkAddr.Addr_u.NetAddr.SockAddr.Addr.family =
01563 hpssSession->mySocketAddr.sin_family;;
01564 #else
01565 sink->SrcSinkAddr.Addr_u.NetAddr.SockAddr.addr =
01566 hpssSession->ipAddr;
01567 sink->SrcSinkAddr.Addr_u.NetAddr.SockAddr.family =
01568 hpssSession->mySocketAddr.sin_family;;
01569 #endif
01570 sink->SrcSinkAddr.Addr_u.NetAddr.SockOffset = cast64m(0);
01571 sink->SrcSinkAddr.Addr_u.NetAddr.SockAddr.port =
01572 hpssSession->mySocketAddr.sin_port;
01573 iod->Function = HPSS_IOD_READ;
01574 iod->RequestID = hpssSession->requestId;
01575 iod->SrcDescLength = 1;
01576 iod->SinkDescLength = 1;
01577 iod->SrcDescList = src;
01578 iod->SinkDescList = sink;
01579
01580 hpssSession->totalBytesMoved64 = cast64m(0);
01581 hpssSession->status = HPSS_E_NOERROR;
01582
01583 return (0);
01584 }
01585
01586 int
01587 postProcSessionThr (hpssSession_t *myHpssSession, char *hpssPath)
01588 {
01589 int i;
01590 int status;
01591 int pthreadStatus;
01592
01593
01594
01595 pthread_mutex_lock (&myHpssSession->myMutex);
01596 if (neq64m (myHpssSession->fileSize64, myHpssSession->totalBytesMoved64)) {
01597 #if defined(aix_platform)
01598 struct timespec delay = {1, 0};
01599 #else
01600 int delay = 1;
01601 #endif
01602 pthread_mutex_unlock (&myHpssSession->myMutex);
01603
01604 for (i = 0; i < MAX_HPSS_CONNECTIONS; i++) {
01605 pthread_mutex_lock (&myHpssSession->myMutex);
01606 while (myHpssSession->thrInfo[i].active) {
01607 pthread_mutex_unlock (&myHpssSession->myMutex);
01608 #if defined(aix_platform)
01609 (void) pthread_delay_np (&delay);
01610 #else
01611 rodsSleep (delay, 0);
01612 #endif
01613 pthread_mutex_lock (&myHpssSession->myMutex);
01614 }
01615 pthread_mutex_unlock (&myHpssSession->myMutex);
01616 }
01617 }
01618 pthread_mutex_unlock(&myHpssSession->myMutex);
01619
01620
01621
01622
01623 (void) pthread_cancel (myHpssSession->moverConnManagerThr);
01624
01625 for (i = 0; i < myHpssSession->thrCnt; i++) {
01626 if (myHpssSession->thrInfo[i].threadId != 0)
01627 (void) pthread_join (myHpssSession->thrInfo[i].threadId, NULL);
01628 }
01629
01630 #if !defined (solaris_platform)
01631
01632 if (myHpssSession->moverConnManagerThr != 0)
01633 (void) pthread_join (myHpssSession->moverConnManagerThr,
01634 (void **) &pthreadStatus);
01635 #endif
01636
01637 if (myHpssSession->status == HPSS_E_NOERROR ||
01638 myHpssSession->status == HPSS_ECONN) {
01639 rodsLong_t bytesWritten;
01640 rodsLong_t mySize;
01641
01642 CONVERT_U64_TO_LONGLONG(myHpssSession->totalBytesMoved64, bytesWritten);
01643 CONVERT_U64_TO_LONGLONG(myHpssSession->fileSize64, mySize);
01644 if (bytesWritten != mySize) {
01645 rodsLog (LOG_ERROR,
01646 "postProcSessionThr: %s transfer size %lld != file size %lld",
01647 hpssPath, bytesWritten, mySize);
01648 status = SYS_COPY_LEN_ERR;
01649 } else {
01650 status = 0;
01651 }
01652 } else {
01653 status = myHpssSession->status;
01654 }
01655 return status;
01656
01657
01658 return HPSS_E_NOERROR;
01659 }