ext_storage.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import os
  2. import shutil
  3. from collections.abc import Generator
  4. from contextlib import closing
  5. from datetime import datetime, timedelta
  6. from typing import Union
  7. import boto3
  8. from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
  9. from botocore.exceptions import ClientError
  10. from flask import Flask
  11. class Storage:
  12. def __init__(self):
  13. self.storage_type = None
  14. self.bucket_name = None
  15. self.client = None
  16. self.folder = None
  17. def init_app(self, app: Flask):
  18. self.storage_type = app.config.get('STORAGE_TYPE')
  19. if self.storage_type == 's3':
  20. self.bucket_name = app.config.get('S3_BUCKET_NAME')
  21. self.client = boto3.client(
  22. 's3',
  23. aws_secret_access_key=app.config.get('S3_SECRET_KEY'),
  24. aws_access_key_id=app.config.get('S3_ACCESS_KEY'),
  25. endpoint_url=app.config.get('S3_ENDPOINT'),
  26. region_name=app.config.get('S3_REGION')
  27. )
  28. elif self.storage_type == 'azure-blob':
  29. self.bucket_name = app.config.get('AZURE_BLOB_CONTAINER_NAME')
  30. sas_token = generate_account_sas(
  31. account_name=app.config.get('AZURE_BLOB_ACCOUNT_NAME'),
  32. account_key=app.config.get('AZURE_BLOB_ACCOUNT_KEY'),
  33. resource_types=ResourceTypes(service=True, container=True, object=True),
  34. permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True),
  35. expiry=datetime.utcnow() + timedelta(hours=1)
  36. )
  37. self.client = BlobServiceClient(account_url=app.config.get('AZURE_BLOB_ACCOUNT_URL'),
  38. credential=sas_token)
  39. else:
  40. self.folder = app.config.get('STORAGE_LOCAL_PATH')
  41. if not os.path.isabs(self.folder):
  42. self.folder = os.path.join(app.root_path, self.folder)
  43. def save(self, filename, data):
  44. if self.storage_type == 's3':
  45. self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
  46. elif self.storage_type == 'azure-blob':
  47. blob_container = self.client.get_container_client(container=self.bucket_name)
  48. blob_container.upload_blob(filename, data)
  49. else:
  50. if not self.folder or self.folder.endswith('/'):
  51. filename = self.folder + filename
  52. else:
  53. filename = self.folder + '/' + filename
  54. folder = os.path.dirname(filename)
  55. os.makedirs(folder, exist_ok=True)
  56. with open(os.path.join(os.getcwd(), filename), "wb") as f:
  57. f.write(data)
  58. def load(self, filename: str, stream: bool = False) -> Union[bytes, Generator]:
  59. if stream:
  60. return self.load_stream(filename)
  61. else:
  62. return self.load_once(filename)
  63. def load_once(self, filename: str) -> bytes:
  64. if self.storage_type == 's3':
  65. try:
  66. with closing(self.client) as client:
  67. data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read()
  68. except ClientError as ex:
  69. if ex.response['Error']['Code'] == 'NoSuchKey':
  70. raise FileNotFoundError("File not found")
  71. else:
  72. raise
  73. elif self.storage_type == 'azure-blob':
  74. blob = self.client.get_container_client(container=self.bucket_name)
  75. blob = blob.get_blob_client(blob=filename)
  76. data = blob.download_blob().readall()
  77. else:
  78. if not self.folder or self.folder.endswith('/'):
  79. filename = self.folder + filename
  80. else:
  81. filename = self.folder + '/' + filename
  82. if not os.path.exists(filename):
  83. raise FileNotFoundError("File not found")
  84. with open(filename, "rb") as f:
  85. data = f.read()
  86. return data
  87. def load_stream(self, filename: str) -> Generator:
  88. def generate(filename: str = filename) -> Generator:
  89. if self.storage_type == 's3':
  90. try:
  91. with closing(self.client) as client:
  92. response = client.get_object(Bucket=self.bucket_name, Key=filename)
  93. for chunk in response['Body'].iter_chunks():
  94. yield chunk
  95. except ClientError as ex:
  96. if ex.response['Error']['Code'] == 'NoSuchKey':
  97. raise FileNotFoundError("File not found")
  98. else:
  99. raise
  100. elif self.storage_type == 'azure-blob':
  101. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  102. with closing(blob.download_blob()) as blob_stream:
  103. while chunk := blob_stream.readall(4096):
  104. yield chunk
  105. else:
  106. if not self.folder or self.folder.endswith('/'):
  107. filename = self.folder + filename
  108. else:
  109. filename = self.folder + '/' + filename
  110. if not os.path.exists(filename):
  111. raise FileNotFoundError("File not found")
  112. with open(filename, "rb") as f:
  113. while chunk := f.read(4096): # Read in chunks of 4KB
  114. yield chunk
  115. return generate()
  116. def download(self, filename, target_filepath):
  117. if self.storage_type == 's3':
  118. with closing(self.client) as client:
  119. client.download_file(self.bucket_name, filename, target_filepath)
  120. elif self.storage_type == 'azure-blob':
  121. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  122. with open(target_filepath, "wb") as my_blob:
  123. blob_data = blob.download_blob()
  124. blob_data.readinto(my_blob)
  125. else:
  126. if not self.folder or self.folder.endswith('/'):
  127. filename = self.folder + filename
  128. else:
  129. filename = self.folder + '/' + filename
  130. if not os.path.exists(filename):
  131. raise FileNotFoundError("File not found")
  132. shutil.copyfile(filename, target_filepath)
  133. def exists(self, filename):
  134. if self.storage_type == 's3':
  135. with closing(self.client) as client:
  136. try:
  137. client.head_object(Bucket=self.bucket_name, Key=filename)
  138. return True
  139. except:
  140. return False
  141. elif self.storage_type == 'azure-blob':
  142. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  143. return blob.exists()
  144. else:
  145. if not self.folder or self.folder.endswith('/'):
  146. filename = self.folder + filename
  147. else:
  148. filename = self.folder + '/' + filename
  149. return os.path.exists(filename)
  150. storage = Storage()
  151. def init_app(app: Flask):
  152. storage.init_app(app)