大文件分段上傳
更新時間 2024-10-20 23:51:31
最近更新時間: 2024-10-20 23:51:31
分享文章
本文幫助您了解對象存儲大文件分段上傳的最佳實踐。
前提條件
- 開通對象存儲服務。
- 在天翼云對象存儲控制臺獲取Access Key、Secret Key和外網訪問控制的endpoint。
- 安裝Python環境,下載對應版本的ZOS官方Python SDK (本文以sdk_python3.X.tar為例),請參見開發者文檔。
環境配置
- 解壓sdk_python3.X.tar,在解壓出的sdk文件夾下,打開cmd導入需要的包。
pip3 install -r requirements.txt - 把sdk文件夾中的service-2.sns.sdk-extras.json和service-2.s3.sdk-extras.json文件放到“C:\Users\你的用戶名.aws\models\s3\2006-03-01”文件夾(如果不存在該文件夾,可自行創建)下。
分段上傳
操作場景
- 大型文件上傳:當需要上傳大型文件時,使用分段上傳可以更高效地完成任務。相比于一次性上傳整個文件,分段上傳允許在網絡故障或其他原因導致上傳中斷時,只需要重新傳輸中斷的片段,而不需要重新上傳整個文件。
- 網絡不穩定環境:在網絡連接不穩定或帶寬有限的環境中,分段上傳可以降低因網絡中斷或超時導致整個文件上傳失敗的風險。當上傳過程中發生中斷,只需要重新上傳中斷的片段,而不需要重新上傳整個文件。
- 文件大小不確定:可以在需要上傳的文件大小還不確定的情況下開始上傳,這種場景在視頻監控等行業應用中比較常見。
操作步驟
- 創建py文件,并引入boto3包的session模塊。
from boto3.sessionimport Session - 配置用于訪問對象存儲服務的憑證Access Key、Secret Key和外網訪問地址endpoint。
access_key = "此處輸入你的Access Key" # 這里輸入你的Access Key secret_key = "此處輸入你的Secret Key" # 這里輸入你的Secret Key url = "此處輸入你的endpoint" # 這里輸入你的endpoint - 獲取對象存儲服務的操作客戶端。
session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url) - 開啟分塊上傳,獲取uploadID。
# 在’’中填入要上傳對象的桶名(必須已存在)和對象名。 dict_cmp = s3_client.create_multipart_upload(Bucket='?', Key='?') uploadID = dict_cmp['UploadId'] # 記錄uploadID,用于后續上傳分塊。 - 分塊讀取要上傳的文件。
# 設置列表,用于收集文件的所有分塊 chunks = [] # 分塊讀取文件 with open("此處輸入你的文件地址", 'rb') as f: while True: chunk = f.read(5 * 1024 * 1024) # 此處輸入分塊大小,本示例是5M,分塊大小最低為5M if not chunk: break chunks.append(chunk) - 上傳所有分塊并記錄各個分塊的ETag和編號。
# 設置列表,用于記錄上傳分塊的ETag和編號。 parts = [] i = 0 for chunk in chunks: i += 1 # 上傳分塊 dict_up = s3_client.upload_part(Bucket='?', Body=chunk, Key='?', PartNumber=i, UploadId=uploadID) # 記錄該分塊的ETag和編號 part = { 'ETag': dict_up['ETag'], 'PartNumber': i, } parts.append(part) - 完成所有分塊的拼接,存入ZOS。
# 完成所有分塊拼接后,上傳成功。 s3_client.complete_multipart_upload( Bucket='?', Key='?', MultipartUpload={ 'Parts': parts }, UploadId=uploadID, )
整體代碼
from boto3.session import Session
access_key = "此處輸入你的Access Key" # 這里輸入你的Access Key
secret_key = "此處輸入你的Secret Key" # 這里輸入你的Secret Key
url = "此處輸入你的endpoint" # 這里輸入你的endpoint
key = '?' # 傳入桶內后的文件名
bucketName = '?' # 要傳入文件的桶名
try:
session = Session(access_key, secret_key)
s3_client = session.client('s3', endpoint_url=url)
# 開啟分塊上傳,獲取uploadID
dict_cmp = s3_client.create_multipart_upload(Bucket=bucketName, Key=key)
uploadID = dict_cmp['UploadId']#記錄uploadID,用于后續上傳分塊。
filePos = "此處輸入文件路徑"
size = 5 * 1024 * 1024 #此處輸入分塊大小,本示例是5M,分塊大小最低為5M
# 設置列表,用于收集文件的所有分塊
chunks = []
# 分塊讀取文件
with open(filePos, 'rb') as f:
while True:
chunk = f.read(size)
if not chunk:
break
chunks.append(chunk)
# 設置列表,用于收集上傳分塊的ETag和編號。
parts = []
i = 0
for chunk in chunks:
i += 1
# 上傳分塊
dict_up = s3_client.upload_part(Bucket=bucketName, Body=chunk,
Key=key, PartNumber=i,
UploadId=uploadID)
# 記錄該分塊的ETag和編號
part = {
'ETag': dict_up['ETag'],
'PartNumber': i,
}
parts.append(part)
# 完成整個拼接傳入
s3_client.complete_multipart_upload(
Bucket=bucketName,
Key=key,
MultipartUpload={
'Parts': parts
},
UploadId=uploadID,
)
print('文件上傳成功!')
except BaseException:
print("上傳異常,請檢查各個參數后重試。")
finally:
# 關閉文件流
f.close()
追加上傳
操作場景
通過普通上傳創建的對象,用戶無法在原對象上進行追加寫操作,如果對象內容發生了改變,只能重新上傳同名對象來進行修改。這在日志、視頻監控等數據復寫較頻繁的場景下使用不方便。所以可以通過追加上傳的方式來只上傳增加部分的內容,增強擴展性,提高文件的上傳效率。
操作步驟
-
創建py文件,并引入boto3包的session模塊。
from boto3.sessionimport Session -
配置用于訪問對象存儲服務的憑證Access Key、Secret Key和外網訪問地址endpoint。
access_key = "此處輸入你的Access Key" # 這里輸入你的Access Key secret_key = "此處輸入你的Secret Key" # 這里輸入你的Secret Key url = "此處輸入你的endpoint" # 這里輸入你的endpoint -
獲取對象存儲服務的操作客戶端。
session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url) -
讀取文件并記錄文件md5值。
with open('輸入要上傳文件的路徑', 'rb') as file: data = file.read() md5 = hashlib.md5(data).digest() md5 = base64.b64encode(md5) -
上傳對象。
(1)首次上傳
s3_client.put_object(Bucket='輸入要傳入的桶名', Metadata=dict(m1='m1'), Body=data, Key='輸入存入后對象的鍵值', ContentMD5=str(md5, 'utf-8'), Append=True,# 開啟追加上傳 AppendPosition=0)# 指定追加上傳開始的位置(2)追加上傳
# 獲取需要追加上傳的對象信息 response = s3_client.head_object(Bucket="bucketblocks", Key="mac.txt") # 獲取當前對象長度 appendPos = response['ContentLength'] s3_client.put_object(Bucket='輸入要傳入的桶名', Metadata=dict(m1='m1'), Body=data, Key='輸入存入后對象的鍵值', ContentMD5=str(md5, 'utf-8'), Append=True,# 開啟追加上傳 AppendPosition=appendPos)# 指定追加上傳開始的位置
整體代碼
from boto3.session import Session
import hashlib
import base64
access_key = "此處輸入你的Access Key" # 這里輸入你的Access Key
secret_key = "此處輸入你的Secret Key" # 這里輸入你的Secret Key
url = "此處輸入你的endpoint" # 這里輸入你的endpoint
# 上傳文件桶的桶名
bname = '輸入要傳入的桶名'
# 文件在桶內存儲的key值
key = '輸入存入后對象的鍵值'
# 追加上傳開始的位置,默認為0
appendPos = 0
try:
# 讀取文件并記錄文件md5值
with open('輸入要上傳文件的路徑', 'rb') as file:
data = file.read()
md5 = hashlib.md5(data).digest()
md5 = base64.b64encode(md5)
# 獲取對象存儲服務的操作客戶端
session = Session(access_key, secret_key)
s3_client = session.client("s3", endpoint_url=url)
# 獲取桶內所有對象信息
response = s3_client.list_objects(Bucket=bname)
# 遍歷所有對象鍵值
for obj in response['Contents']:
# 桶中存在與待上傳對象同key的對象,需要判斷該對象能否進行追加上傳
if obj['Key'] == key:
res = s3_client.head_object(Bucket=bname, Key=key)
if res['ResponseMetadata']['HTTPHeaders']['x-rgw-object-type'] != 'Appendable':
print('該對象不是Appendable類型,無法進行追加上傳.')
raise Exception
#桶內對象可以進行追加上傳,更新追加上傳開始的位置
appendPos = res['ContentLength']
# 完成追加上傳
s3_client.put_object(Bucket=bname,
Metadata=dict(m1='m1'),
Body=data,
Key=key,
ContentMD5=str(md5, 'utf-8'),
Append=True,
AppendPosition=appendPos)
if appendPos == 0:
print('初次上傳成功.')
else:
print('追加上傳成功.')
except BaseException:
print("參數有問題,請修改后重試.")
finally:
# 關閉文件流
file.close()
斷點續傳
操作場景
- 大文件上傳:當需要上傳大文件時,如視頻、備份文件或軟件包等,由于上傳時間較長,在上傳過程中可能會遇到各種問題,例如網絡中斷、上傳工具崩潰或用戶手動中斷等。斷點續傳的操作場景允許用戶在上傳中斷后,能夠從中斷點繼續上傳,而不需要重新上傳整個文件。
- 網絡不穩定:在網絡環境不穩定的情況下,如移動網絡或低帶寬連接,大文件的上傳過程可能會中斷或超時。通過斷點續傳,在寬帶不穩定的情況下,上傳仍可恢復并繼續進行。
- 長時間上傳:某些上傳任務可能需要較長的時間才能完成,這可能會增加上傳過程中意外中斷的風險。斷點續傳可以將上傳任務分段處理,并保存上傳進度信息,以便在上傳中斷后能夠恢復并繼續上傳。
- 客戶端或服務端故障:在上傳過程中,當客戶端或服務端發生故障時,斷點續傳允許用戶重新連接并從中斷點繼續上傳,而不會對上傳任務造成重大影響。
操作步驟
- 創建py文件,并引入boto3包的session模塊。
from boto3.sessionimport Session - 配置用于訪問對象存儲服務的憑證Access Key、Secret Key和外網訪問地址endpoint。
access_key = "此處輸入你的Access Key" # 這里輸入你的Access Key secret_key = "此處輸入你的Secret Key" # 這里輸入你的Secret Key url = "此處輸入你的endpoint" # 這里輸入你的endpoint - 獲取對象存儲服務的操作客戶端。
session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url) - 查詢桶中還沒有完成或放棄的分塊上傳,獲取需要續傳對象的UploadId和Key。
response = s3_client.list_multipart_uploads(Bucket='輸入對象傳入的桶名') uploadInfo = response['Uploads'] - 根據上一步得到的UploadId和Key來獲取當前對象已經上傳完成的分段信息。
response = s3_client.list_parts(Bucket='輸入對象傳入的桶名', Key='Key值', UploadId='UploadId值') finishedPartsInfo = response['Parts'] # 設置列表,用于記錄當前已經完成上傳分塊的編號。 finishedPartNumbers = [] # 記錄分塊的大小。 partSize = 0 # 設置列表,用于收集上傳分塊的ETag和編號。 parts = [] for finishedPart in finishedPartsInfo: finishedPartNumbers.append(finishedPart['PartNumber']) if partSize < finishedPart['Size']: partSize = finishedPart['Size'] part = { 'ETag': finishedPart['ETag'], 'PartNumber': finishedPart['PartNumber'] } parts.append(part) - 上傳對象中未完成的分段,并記錄各分段的ETag信息。
# 設置列表,用于收集文件的所有分塊 chunks = [] # 分塊讀取文件 with open("此處輸入你的文件地址", 'rb') as f: while True: chunk = f.read(5*1024*1024) # 此處輸入分塊大小,本示例是5M,分塊大小最低為5M if not chunk: break chunks.append(chunk) i = 0 for chunk in chunks: i += 1 # 當該分塊已經完成上傳時,跳過。 if i in finishedPartNumbers: continue # 上傳分塊 response = s3_client.upload_part(Bucket=bucketName, Body=chunk, Key=key, PartNumber=i, UploadId='UploadId值') # 記錄該分塊的ETag和編號 part = { 'ETag': response['ETag'], 'PartNumber': i, } parts.append(part) - 完成所有分塊的拼接,存入ZOS。
# 完成所有分塊拼接后,上傳成功。 s3_client.complete_multipart_upload( Bucket=bucketName, Key=key, MultipartUpload={ 'Parts': parts }, UploadId=uploadID, )
整體代碼
from boto3.session import Session
access_key = "此處輸入你的Access Key" # 這里輸入你的Access Key
secret_key = "此處輸入你的Secret Key" # 這里輸入你的Secret Key
url = "此處輸入你的endpoint" # 這里輸入你的endpoint
bucketName = '?' # 要傳入文件的桶名
filePos = "?" # 待上傳文件所在位置
try:
session = Session(access_key, secret_key)
s3_client = session.client('s3', endpoint_url=url)
# 查詢桶中還沒有完成或放棄的分塊上傳,獲取需要續傳對象的UploadId和Key。
response = s3_client.list_multipart_uploads(Bucket=bucketName)
uploadInfo = response['Uploads']
if uploadInfo is None:
print(bucketName + '桶內沒有需要斷點上傳的對象。')
raise BaseException
# 獲取需要續傳的對象的UploadId和Key,這里以第一個對象為例
uploadId = uploadInfo[0]['UploadId']
key = uploadInfo[0]['Key']
# 獲取已經完成上傳的分塊信息
response = s3_client.list_parts(Bucket=bucketName, Key=key, UploadId=uploadId)
finishedPartsInfo = response['Parts']
# 設置列表,用于記錄當前已經完成上傳分塊的編號。
finishedPartNumbers = []
# 記錄分塊的大小。
partSize = 0
# 設置列表,用于收集上傳分塊的ETag和編號。
parts = []
for finishedPart in finishedPartsInfo:
finishedPartNumbers.append(finishedPart['PartNumber'])
if partSize < finishedPart['Size']:
partSize = finishedPart['Size']
part = {
'ETag': finishedPart['ETag'],
'PartNumber': finishedPart['PartNumber']
}
parts.append(part)
# 設置列表,用于收集文件的所有分塊
chunks = []
# 分塊讀取文件
with open(filePos, 'rb') as f:
while True:
chunk = f.read(partSize) # 此處輸入分塊大小,本示例是5M,分塊大小最低為5M
if not chunk:
break
chunks.append(chunk)
i = 0
for chunk in chunks:
i += 1
# 當該分塊已經完成上傳時,跳過。
if i in finishedPartNumbers:
continue
# 上傳分塊
response = s3_client.upload_part(Bucket=bucketName, Body=chunk,
Key=key, PartNumber=i,
UploadId=uploadId)
# 記錄該分塊的ETag和編號
part = {
'ETag': response['ETag'],
'PartNumber': i,
}
parts.append(part)
# 完成整個拼接傳入
s3_client.complete_multipart_upload(
Bucket=bucketName,
Key=key,
MultipartUpload={
'Parts': parts
},
UploadId=uploadId
)
print('文件上傳成功!')
except BaseException:
print("上傳異常,請檢查各個參數后重試。")
finally:
f.close()