00001
00002
00003
00004
00005
00006
00007
00008 #include "s3FileDriver.h"
00009 #include "rsGlobalExtern.h"
00010
00011 static int S3Initialized = 0;
00012 s3Auth_t S3Auth;
00013
00014
00015
00016 int
00017 s3FileUnlink (rsComm_t *rsComm, char *s3ObjName)
00018 {
00019 int status;
00020 char key[MAX_NAME_LEN], myBucket[MAX_NAME_LEN];
00021 callback_data_t data;
00022 S3BucketContext bucketContext;
00023
00024 bzero (&data, sizeof (data));
00025
00026 if ((status = parseS3Path (s3ObjName, myBucket, key)) < 0) return status;
00027
00028
00029 if ((status = myS3Init ()) != S3StatusOK) return (status);
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 S3ResponseHandler responseHandler = {
00043 0, &responseCompleteCallback
00044 };
00045
00046 S3_delete_object(&bucketContext, key, 0, &responseHandler, &data);
00047
00048 if (data.status != S3StatusOK) {
00049 status = myS3Error (data.status, S3_FILE_UNLINK_ERR);
00050 } else {
00051 status = 0;
00052 }
00053
00054
00055 return (status);
00056 }
00057
00058 int
00059 s3FileStat (rsComm_t *rsComm, char *filename, struct stat *statbuf)
00060 {
00061 int status;
00062 char key[MAX_NAME_LEN], myBucket[MAX_NAME_LEN];
00063 s3Stat_t s3Stat;
00064 int len;
00065
00066 len = strlen (filename);
00067
00068 bzero (statbuf, sizeof (struct stat));
00069
00070 if (filename[len - 1] == '/') {
00071
00072 statbuf->st_mode = S_IFDIR;
00073 return 0;
00074 }
00075
00076 if ((status = parseS3Path (filename, myBucket, key)) < 0) return status;
00077 status = list_bucket (myBucket, key, NULL, NULL, 1, 1, &s3Stat);
00078
00079 if (status == 0) {
00080 statbuf->st_mode = S_IFREG;
00081 statbuf->st_nlink = 1;
00082 statbuf->st_uid = getuid ();
00083 statbuf->st_gid = getgid ();
00084 statbuf->st_atime = statbuf->st_mtime = statbuf->st_ctime =
00085 s3Stat.lastModified;
00086 statbuf->st_size = s3Stat.size;
00087 }
00088
00089 return (status);
00090 }
00091
00092 int
00093 s3FileMkdir (rsComm_t *rsComm, char *filename, int mode)
00094 {
00095 int status;
00096
00097 status = 0;
00098
00099 return (status);
00100 }
00101
00102 int
00103 s3FileChmod (rsComm_t *rsComm, char *filename, int mode)
00104 {
00105 int status;
00106
00107 status = 0;
00108
00109 return (status);
00110 }
00111
00112 int
00113 s3FileRmdir (rsComm_t *rsComm, char *filename)
00114 {
00115 int status;
00116
00117 status = 0;
00118
00119 return (status);
00120 }
00121
00122
00123
00124
00125 int
00126 s3FileRename (rsComm_t *rsComm, char *oldFileName, char *newFileName)
00127 {
00128 int status;
00129
00130 status = copyS3Obj (oldFileName, newFileName);
00131 if (status < 0) {
00132 rodsLog (LOG_ERROR, "s3FileRename: copyS3Obj of %s error, status = %d",
00133 oldFileName, status);
00134 return status;
00135 }
00136 status = s3FileUnlink (rsComm, oldFileName);
00137 if (status < 0) {
00138 rodsLog (LOG_ERROR,
00139 "s3FileRename: s3FileUnlink of %s error, status = %d",
00140 oldFileName, status);
00141 }
00142 return status;
00143 }
00144
00145 rodsLong_t
00146 s3FileGetFsFreeSpace (rsComm_t *rsComm, char *path, int flag)
00147 {
00148 int space = LARGE_SPACE;
00149 return (space * 1024 * 1024);
00150 }
00151
00152
00153
00154
00155
00156
00157
00158 int
00159 s3StageToCache (rsComm_t *rsComm, fileDriverType_t cacheFileType,
00160 int mode, int flags, char *s3ObjName,
00161 char *cacheFilename, rodsLong_t dataSize,
00162 keyValPair_t *condInput)
00163 {
00164 int status;
00165 struct stat statbuf;
00166 rodsLong_t mySize;
00167
00168 status = s3FileStat (rsComm, s3ObjName, &statbuf);
00169
00170 if (status < 0 || (statbuf.st_mode & S_IFREG) == 0) {
00171 rodsLog (LOG_ERROR, "s3StageToCache: stat of %s error, status = %d",
00172 s3ObjName, status);
00173 return status;
00174 }
00175
00176 if (dataSize > 0 && dataSize != statbuf.st_size) {
00177 rodsLog (LOG_ERROR,
00178 "s3StageToCache: %s inp dataSize %lld does not match size %lld",
00179 s3ObjName, dataSize, statbuf.st_size);
00180 return SYS_COPY_LEN_ERR;
00181 }
00182 mySize = statbuf.st_size;
00183
00184 status = getFileFromS3 (cacheFilename, s3ObjName, mySize);
00185
00186 return status;
00187 }
00188
00189
00190
00191
00192
00193
00194
00195 int
00196 s3SyncToArch (rsComm_t *rsComm, fileDriverType_t cacheFileType,
00197 int mode, int flags, char *s3ObjName,
00198 char *cacheFilename, rodsLong_t dataSize, keyValPair_t *condInput)
00199 {
00200 int status;
00201 struct stat statbuf;
00202
00203 status = stat (cacheFilename, &statbuf);
00204
00205 if (status < 0) {
00206 status = UNIX_FILE_STAT_ERR - errno;
00207 rodsLog (LOG_ERROR, "s3SyncToArch: stat of %s error, status = %d",
00208 cacheFilename, status);
00209 return status;
00210 }
00211
00212 if ((statbuf.st_mode & S_IFREG) == 0) {
00213 status = UNIX_FILE_STAT_ERR - errno;
00214 rodsLog (LOG_ERROR, "s3SyncToArch: %s is not a file, status = %d",
00215 cacheFilename, status);
00216 return status;
00217 }
00218
00219 if (dataSize > 0 && dataSize != statbuf.st_size) {
00220 rodsLog (LOG_ERROR,
00221 "s3SyncToArch: %s inp size %lld does not match actual size %lld",
00222 cacheFilename, dataSize, statbuf.st_size);
00223 return SYS_COPY_LEN_ERR;
00224 }
00225 dataSize = statbuf.st_size;
00226
00227 status = putFileIntoS3 (cacheFilename, s3ObjName, dataSize);
00228
00229 return status;
00230 }
00231
00232 void
00233 responseCompleteCallback (S3Status status, const S3ErrorDetails *error,
00234 void *callbackData)
00235 {
00236 int i;
00237 callback_data_t *data = (callback_data_t *) callbackData;
00238
00239 data->status = status;
00240
00241 if (error) {
00242 if (error->message) {
00243 rodsLog (LOG_NOTICE,
00244 "responseCompleteCallback: Message: %s", error->message);
00245 }
00246 if (error->resource) {
00247 rodsLog (LOG_NOTICE,
00248 "responseCompleteCallback: resource: %s", error->resource);
00249 }
00250 if (error->furtherDetails) {
00251 rodsLog (LOG_NOTICE,
00252 "responseCompleteCallback: furtherDetails: %s",
00253 error->furtherDetails);
00254 }
00255
00256 if (error->extraDetailsCount) {
00257 rodsLog (LOG_NOTICE,
00258 "responseCompleteCallback: extraDetails");
00259
00260 for (i = 0; i < error->extraDetailsCount; i++) {
00261 printf(" %s: %s\n",
00262 error->extraDetails[i].name,
00263 error->extraDetails[i].value);
00264 }
00265 }
00266 }
00267 }
00268
00269 S3Status
00270 responsePropertiesCallback(const S3ResponseProperties *properties,
00271 void *callbackData)
00272 {
00273
00274 return S3StatusOK;
00275 }
00276
00277 int
00278 putObjectDataCallback (int bufferSize, char *buffer, void *callbackData)
00279 {
00280 callback_data_t *data =
00281 (callback_data_t *) callbackData;
00282 int ret = 0;
00283
00284 if (data->contentLength) {
00285 int length = ((data->contentLength > (unsigned) bufferSize) ?
00286 (unsigned) bufferSize : data->contentLength);
00287 ret = fread (buffer, 1, length, data->fd);
00288 }
00289 data->contentLength -= ret;
00290 return ret;
00291 }
00292
00293 int
00294 putFileIntoS3 (char *fileName, char *s3ObjName, rodsLong_t fileSize)
00295 {
00296
00297 #if 0
00298 S3Status status;
00299 #else
00300 int status;
00301 #endif
00302 char key[MAX_NAME_LEN], myBucket[MAX_NAME_LEN];
00303 callback_data_t data;
00304 S3BucketContext bucketContext;
00305
00306 bzero (&data, sizeof (data));
00307
00308 if ((status = parseS3Path (s3ObjName, myBucket, key)) < 0) return status;
00309
00310 data.fd = fopen (fileName, "r");
00311 if (data.fd == NULL) {
00312 status = UNIX_FILE_OPEN_ERR - errno;
00313 rodsLog (LOG_ERROR,
00314 "putFileIntoS3: open error for fileName %s, status = %d",
00315 fileName, status);
00316 return status;
00317 }
00318
00319 data.contentLength = data.originalContentLength = fileSize;
00320
00321 if ((status = myS3Init ()) != S3StatusOK) return (status);
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333 bzero (&bucketContext, sizeof (bucketContext));
00334 bucketContext.bucketName = myBucket;
00335 bucketContext.protocol = S3ProtocolHTTPS;
00336 bucketContext.uriStyle = S3UriStylePath;
00337 bucketContext.accessKeyId = S3Auth.accessKeyId;
00338 bucketContext.secretAccessKey = S3Auth.secretAccessKey;
00339
00340 S3PutObjectHandler putObjectHandler = {
00341 { &responsePropertiesCallback, &responseCompleteCallback },
00342 &putObjectDataCallback
00343 };
00344
00345 S3_put_object(&bucketContext, key, fileSize, NULL, 0,
00346 &putObjectHandler, &data);
00347 if (data.status != S3StatusOK) {
00348 status = myS3Error (data.status, S3_PUT_ERROR);
00349 }
00350
00351
00352 fclose (data.fd);
00353 return (status);
00354 }
00355
00356 int
00357 myS3Init (void)
00358 {
00359 int status = -1;
00360 char *tmpPtr;
00361
00362 if (S3Initialized) return 0;
00363
00364 S3Initialized = 1;
00365 #ifdef libs3_3_1_4 // JMC - backport 4810
00366 if ((status = S3_initialize ("s3", S3_INIT_ALL)) != S3StatusOK) {
00367 #else
00368 if ((status = S3_initialize ("s3", S3_INIT_ALL, NULL)) != S3StatusOK) {
00369 #endif
00370 status = myS3Error (status, S3_INIT_ERROR);
00371 }
00372
00373 bzero (&S3Auth, sizeof (S3Auth));
00374
00375 if ((tmpPtr = getenv("S3_ACCESS_KEY_ID")) != NULL) {
00376 rstrcpy (S3Auth.accessKeyId, tmpPtr, MAX_NAME_LEN);
00377 if ((tmpPtr = getenv("S3_SECRET_ACCESS_KEY")) != NULL) {
00378 rstrcpy (S3Auth.secretAccessKey, tmpPtr, MAX_NAME_LEN);
00379 return 0;
00380 }
00381 }
00382
00383 if ((status = readS3AuthInfo ()) < 0) {
00384 rodsLog (LOG_ERROR,
00385 "initHpssAuth: readHpssAuthInfo error. status = %d", status);
00386 return status;
00387 }
00388
00389 return status;
00390 }
00391
00392 int
00393 readS3AuthInfo (void)
00394 {
00395 FILE *fptr;
00396 char s3AuthFile[MAX_NAME_LEN];
00397 char inbuf[MAX_NAME_LEN];
00398 int lineLen, bytesCopied;
00399 int linecnt = 0;
00400
00401 snprintf (s3AuthFile, MAX_NAME_LEN, "%-s/%-s",
00402 getConfigDir(), S3_AUTH_FILE);
00403
00404 fptr = fopen (s3AuthFile, "r");
00405
00406 if (fptr == NULL) {
00407 rodsLog (LOG_ERROR,
00408 "readS3AuthInfo: open S3_AUTH_FILE file %s err. ernro = %d",
00409 s3AuthFile, errno);
00410 return (SYS_CONFIG_FILE_ERR);
00411 }
00412 while ((lineLen = getLine (fptr, inbuf, MAX_NAME_LEN)) > 0) {
00413 char *inPtr = inbuf;
00414 if (linecnt == 0) {
00415 while ((bytesCopied = getStrInBuf (&inPtr,
00416 S3Auth.accessKeyId, &lineLen, LONG_NAME_LEN)) > 0) {
00417 linecnt ++;
00418 break;
00419 }
00420 } else if (linecnt == 1) {
00421 while ((bytesCopied = getStrInBuf (&inPtr,
00422 S3Auth.secretAccessKey, &lineLen, LONG_NAME_LEN)) > 0) {
00423 linecnt ++;
00424 break;
00425 }
00426 }
00427 }
00428 if (linecnt != 2) {
00429 rodsLog (LOG_ERROR,
00430 "readS3AuthInfo: read %d lines in S3_AUTH_FILE file",
00431 linecnt);
00432 return (SYS_CONFIG_FILE_ERR);
00433 }
00434 return 0;
00435 }
00436
00437 int
00438 myS3Error (int status, int irodsErrorCode)
00439 {
00440 if (status < 0) return status;
00441
00442 rodsLogError (LOG_ERROR, irodsErrorCode,
00443 "myS3Error: error:%s", S3_get_status_name((S3Status) status));
00444 return (irodsErrorCode - status);
00445 }
00446
00447 int
00448 parseS3Path (char *s3ObjName, char *bucket, char *key)
00449 {
00450 char tmpPath[MAX_NAME_LEN];
00451 char *tmpBucket, *tmpKey;
00452
00453 rstrcpy (tmpPath, s3ObjName, MAX_NAME_LEN);
00454
00455 if (tmpPath[0] == '/') {
00456 tmpBucket = tmpPath + 1;
00457 } else {
00458 tmpBucket = tmpPath;
00459 }
00460 tmpKey = (char *) strchr (tmpBucket, '/');
00461 if (tmpKey == NULL) {
00462 rodsLog (LOG_ERROR,
00463 "putFileIntoS3: problem parsing %s", s3ObjName);
00464 return SYS_INVALID_FILE_PATH;
00465 }
00466 *tmpKey = '\0';
00467 tmpKey++;
00468 rstrcpy (bucket, tmpBucket, MAX_NAME_LEN);
00469 rstrcpy (key, tmpKey, MAX_NAME_LEN);
00470
00471 return 0;
00472 }
00473
00474 int
00475 list_bucket(const char *bucketName, const char *prefix, const char *marker,
00476 const char *delimiter, int maxkeys, int allDetails, s3Stat_t *s3Stat)
00477 {
00478 int status;
00479 callback_data_t data;
00480 S3BucketContext bucketContext;
00481
00482 if ((status = myS3Init ()) != S3StatusOK) return (status);
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493 bzero (&bucketContext, sizeof (bucketContext));
00494 bucketContext.bucketName = bucketName;
00495 bucketContext.protocol = S3ProtocolHTTPS;
00496 bucketContext.uriStyle = S3UriStylePath;
00497 bucketContext.accessKeyId = S3Auth.accessKeyId;
00498 bucketContext.secretAccessKey = S3Auth.secretAccessKey;
00499
00500 S3ListBucketHandler listBucketHandler = {
00501 { &responsePropertiesCallback, &responseCompleteCallback },
00502 &listBucketCallback
00503 };
00504
00505 #if 0
00506 if (marker != NULL)
00507 snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
00508 else
00509 data.nextMarker[0] = '\0';
00510 #endif
00511 data.keyCount = 0;
00512 data.allDetails = allDetails;
00513
00514 S3_list_bucket(&bucketContext, prefix, marker,
00515 delimiter, maxkeys, 0, &listBucketHandler, &data);
00516
00517 if (data.keyCount > 0) {
00518 *s3Stat = data.s3Stat;
00519 status = 0;
00520 } else {
00521 status = myS3Error (data.status, S3_FILE_STAT_ERR);
00522 }
00523
00524
00525 return status;
00526 }
00527
00528 S3Status
00529 listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,
00530 const S3ListBucketContent *contents, int commonPrefixesCount,
00531 const char **commonPrefixes, void *callbackData)
00532 {
00533 callback_data_t *data =
00534 (callback_data_t *) callbackData;
00535
00536 if (contentsCount <= 0) {
00537 data->keyCount = 0;
00538 return S3StatusOK;
00539 } else if (contentsCount > 1) {
00540 rodsLog (LOG_ERROR,
00541 "listBucketCallback: contentsCount %d > 1 for %s",
00542 contentsCount, contents->key);
00543 }
00544 data->keyCount = contentsCount;
00545 data->s3Stat.size = contents->size;
00546 data->s3Stat.lastModified = contents->lastModified;
00547 rstrcpy (data->s3Stat.key, (char *) contents->key, MAX_NAME_LEN);
00548
00549 return S3StatusOK;
00550 }
00551
00552 int
00553 getFileFromS3 (char *fileName, char *s3ObjName, rodsLong_t fileSize)
00554 {
00555 #if 0
00556 S3Status status;
00557 #else
00558 int status;
00559 #endif
00560 char key[MAX_NAME_LEN], myBucket[MAX_NAME_LEN];
00561 callback_data_t data;
00562 S3BucketContext bucketContext;
00563
00564 bzero (&data, sizeof (data));
00565
00566 if ((status = parseS3Path (s3ObjName, myBucket, key)) < 0) return status;
00567
00568 data.fd = fopen (fileName, "w+");
00569 if (data.fd == NULL) {
00570 status = UNIX_FILE_OPEN_ERR - errno;
00571 rodsLog (LOG_ERROR,
00572 "getFileFromS3: open error for fileName %s, status = %d",
00573 fileName, status);
00574 return status;
00575 }
00576
00577 data.contentLength = data.originalContentLength = fileSize;
00578
00579 if ((status = myS3Init ()) != S3StatusOK) return (status);
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590 bzero (&bucketContext, sizeof (bucketContext));
00591 bucketContext.bucketName = myBucket;
00592 bucketContext.protocol = S3ProtocolHTTPS;
00593 bucketContext.uriStyle = S3UriStylePath;
00594 bucketContext.accessKeyId = S3Auth.accessKeyId;
00595 bucketContext.secretAccessKey = S3Auth.secretAccessKey;
00596
00597 S3GetObjectHandler getObjectHandler = {
00598 { &responsePropertiesCallback, &responseCompleteCallback },
00599 &getObjectDataCallback
00600 };
00601
00602 S3_get_object (&bucketContext, key, NULL, 0, fileSize, 0,
00603 &getObjectHandler, &data);
00604 if (data.status != S3StatusOK) {
00605 status = myS3Error (data.status, S3_GET_ERROR);
00606 }
00607
00608
00609 fclose (data.fd);
00610 return (status);
00611 }
00612
00613 S3Status getObjectDataCallback(int bufferSize, const char *buffer,
00614 void *callbackData)
00615 {
00616 callback_data_t *data =
00617 (callback_data_t *) callbackData;
00618
00619 size_t wrote = fwrite(buffer, 1, bufferSize, data->fd);
00620
00621 return ((wrote < (size_t) bufferSize) ?
00622 S3StatusAbortedByCallback : S3StatusOK);
00623 }
00624
00625 int
00626 copyS3Obj (char *srcObj, char *destObj)
00627 {
00628 #if 0
00629 S3Status status;
00630 #else
00631 int status;
00632 #endif
00633 char srcKey[MAX_NAME_LEN], srcBucket[MAX_NAME_LEN];
00634 char destKey[MAX_NAME_LEN], destBucket[MAX_NAME_LEN];
00635 callback_data_t data;
00636 int64_t lastModified;
00637 char eTag[256];
00638 S3BucketContext bucketContext;
00639
00640
00641 bzero (&data, sizeof (data));
00642
00643 if ((status = parseS3Path (srcObj, srcBucket, srcKey)) < 0) return status;
00644 if ((status = parseS3Path (destObj, destBucket, destKey)) < 0)
00645 return status;
00646
00647 if ((status = myS3Init ()) != S3StatusOK) return (status);
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657 bzero (&bucketContext, sizeof (bucketContext));
00658 bucketContext.bucketName = srcBucket;
00659 bucketContext.protocol = S3ProtocolHTTPS;
00660 bucketContext.uriStyle = S3UriStylePath;
00661 bucketContext.accessKeyId = S3Auth.accessKeyId;
00662 bucketContext.secretAccessKey = S3Auth.secretAccessKey;
00663
00664
00665 S3ResponseHandler responseHandler = {
00666 &responsePropertiesCallback,
00667 &responseCompleteCallback
00668 };
00669
00670 S3_copy_object(&bucketContext, srcKey, destBucket,
00671 destKey, NULL, &lastModified, sizeof(eTag), eTag, 0,
00672 &responseHandler, &data);
00673 if (data.status != S3StatusOK) {
00674 status = myS3Error (data.status, S3_FILE_COPY_ERR);
00675 }
00676
00677
00678 return (status);
00679 }
00680