aws_s3_storage.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import logging
  2. from collections.abc import Generator
  3. from contextlib import closing
  4. import boto3
  5. from botocore.client import Config
  6. from botocore.exceptions import ClientError
  7. from flask import Flask
  8. from extensions.storage.base_storage import BaseStorage
  9. logger = logging.getLogger(__name__)
  10. class AwsS3Storage(BaseStorage):
  11. """Implementation for Amazon Web Services S3 storage."""
  12. def __init__(self, app: Flask):
  13. super().__init__(app)
  14. app_config = self.app.config
  15. self.bucket_name = app_config.get("S3_BUCKET_NAME")
  16. if app_config.get("S3_USE_AWS_MANAGED_IAM"):
  17. logger.info("Using AWS managed IAM role for S3")
  18. session = boto3.Session()
  19. region_name = app_config.get("S3_REGION")
  20. self.client = session.client(service_name="s3", region_name=region_name)
  21. else:
  22. logger.info("Using ak and sk for S3")
  23. self.client = boto3.client(
  24. "s3",
  25. aws_secret_access_key=app_config.get("S3_SECRET_KEY"),
  26. aws_access_key_id=app_config.get("S3_ACCESS_KEY"),
  27. endpoint_url=app_config.get("S3_ENDPOINT"),
  28. region_name=app_config.get("S3_REGION"),
  29. config=Config(s3={"addressing_style": app_config.get("S3_ADDRESS_STYLE")}),
  30. )
  31. # create bucket
  32. try:
  33. self.client.head_bucket(Bucket=self.bucket_name)
  34. except ClientError as e:
  35. # if bucket not exists, create it
  36. if e.response["Error"]["Code"] == "404":
  37. self.client.create_bucket(Bucket=self.bucket_name)
  38. # if bucket is not accessible, pass, maybe the bucket is existing but not accessible
  39. elif e.response["Error"]["Code"] == "403":
  40. pass
  41. else:
  42. # other error, raise exception
  43. raise
  44. def save(self, filename, data):
  45. self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
  46. def load_once(self, filename: str) -> bytes:
  47. try:
  48. with closing(self.client) as client:
  49. data = client.get_object(Bucket=self.bucket_name, Key=filename)["Body"].read()
  50. except ClientError as ex:
  51. if ex.response["Error"]["Code"] == "NoSuchKey":
  52. raise FileNotFoundError("File not found")
  53. else:
  54. raise
  55. return data
  56. def load_stream(self, filename: str) -> Generator:
  57. def generate(filename: str = filename) -> Generator:
  58. try:
  59. with closing(self.client) as client:
  60. response = client.get_object(Bucket=self.bucket_name, Key=filename)
  61. yield from response["Body"].iter_chunks()
  62. except ClientError as ex:
  63. if ex.response["Error"]["Code"] == "NoSuchKey":
  64. raise FileNotFoundError("File not found")
  65. else:
  66. raise
  67. return generate()
  68. def download(self, filename, target_filepath):
  69. with closing(self.client) as client:
  70. client.download_file(self.bucket_name, filename, target_filepath)
  71. def exists(self, filename):
  72. with closing(self.client) as client:
  73. try:
  74. client.head_object(Bucket=self.bucket_name, Key=filename)
  75. return True
  76. except:
  77. return False
  78. def delete(self, filename):
  79. self.client.delete_object(Bucket=self.bucket_name, Key=filename)