Python調用示例
更新時間 2023-12-07 17:42:01
最近更新時間: 2023-12-07 17:42:01
分享文章
#-*- coding: utf8 -*-
import requests
import json
import hashlib
import base64
import hmac
import datetime
import uuid
METHOD_GET = 'GET'
METHOD_POST = 'POST'
#官網accessKey
AK = '3d98d123d633xxxxxxxxxxx5a60d4bb3'
#官網securityKey
SK = '173615ae4177xxxxxxxxxxxd17b1c3ed'
def hmac_sha256(secret, data):
secret = bytearray(secret)
data = bytearray(data)
return hmac.new(secret, data, digestmod=hashlib.sha256).digest()
def base64_of_hmac(data):
return base64.b64encode(data)
def get_request_uuid():
return str(uuid.uuid1())
def get_sorted_str(data):
"""
鑒權用的參數整理
:param data: dict 需要整理的參數
:return: str
"""
sorted_data = sorted(data.items(), key=lambda item: item[0])
str_list = map(lambda x_y: '%s=%s' % (x_y[0], x_y[1]), sorted_data)
return '&'.join(str_list)
def build_sign(query_params, body_params, eop_date, request_uuid):
"""
計算鑒權字段
:param query_params: dict get請求中的參數
:param body_params: dict post請求中的參數
:param eop_date: str 請求時間,格式為:'%Y%m%dT%H%M%SZ'
:return: str
"""
#body_str = json.dumps(body_params) if body_params else ''
body_str = json.dumps(body_params)
body_digest = hashlib.sha256(body_str.encode('utf-8')).hexdigest()
#請求頭中必要的兩個參數
header_str = 'ctyun-eop-request-id:%s\neop-date:%s\n' % (request_uuid, eop_date)
#url中的參數,或get參數
query_str = get_sorted_str(query_params)
signature_str = '%s\n%s\n%s' % (header_str, query_str, body_digest)
print_log(repr('signature_str is: %s' % signature_str))
sign_date = eop_date.split('T')[0]
#計算鑒權密鑰
k_time = hmac_sha256(SK, eop_date)
k_ak = hmac_sha256(k_time, AK)
k_date = hmac_sha256(k_ak, sign_date)
signature_base64 = base64_of_hmac(hmac_sha256(k_date, signature_str))
#構建請求頭的鑒權字段值
sign_header = '%s Headers=ctyun-eop-request-id;eop-date Signature=%s' % (AK, signature_base64)
return sign_header
def get_sign_headers(query_params, body):
"""
獲取鑒權用的請求頭參數
:param query_params: dict get請求中的參數
:param body: dict post請求中的參數
:return:
"""
now = datetime.datetime.now()
eop_date = datetime.datetime.strftime(now, '%Y%m%dT%H%M%SZ')
request_uuid = get_request_uuid()
headers = { # 三個鑒權用的參數
'eop-date': eop_date,
'ctyun-eop-request-id': request_uuid,
'Eop-Authorization': build_sign(query_params=query_params, body_params=body, eop_date=eop_date,
request_uuid=request_uuid),
}
return headers
def get(url, params=None, header_params=None, get_body=None):
return execute(url, method=METHOD_GET, params=params, header_params=header_params, get_body=get_body)
def post(url, params=None, header_params=None):
return execute(url, method=METHOD_POST, params=params, header_params=header_params)
def execute(url, method, params=None, header_params=None, get_body=None):
get_body = get_body or {}
params = params or {}
header_params = header_params or {}
query_params, body = (params, get_body) if method == METHOD_GET else ({}, params)
headers = get_sign_headers(query_params, body)
headers.update(header_params)
print_log(u'url: %s' % url)
print_log(u'請求方式: %s' % method)
print_log(u'請求頭: %s' % headers)
print_log(u'請求參數: %s' % params)
if method == METHOD_GET:
res = requests.get(url, params=params, headers=headers, verify=False, json=get_body)
else:
res = requests.post(url, json=params, headers=headers, verify=False)
print_log(u'返回狀態碼: %s' % res.status_code)
print_log(u'返回: %s' % res.text)
return res
def print_log(log_info):
now = datetime.datetime.now()
log_info = u'[%s]: %s' % (str(now), log_info)
print log_info
#操作類示例
post("//ctcdr-global.ctapi.daliqc.cn/v4/disaster/open-service-disaster",
params={
"regionID": "41f64827xxxxxxxxxffa3a5deb5d15d"
})
#查詢類示例
get("//ctcdr-global.ctapi.daliqc.cn/v4/disaster/info-pair-disaster",
params={
"regionID": "81f772xxxxxxxxxxx307d5b"
})