1. 前言
安裝使用Python SDK可以幫助開發者快速接入并使用天翼云的云日志服務相關功能。
2. 使用條件
2.1. 先決條件
用戶需要具備以下條件才能夠使用LTS SDK Python 版本:
1、購買并訂閱了天翼云的云日志服務,并創建了日志項目和日志單元,獲取到相應編碼(logProject、logUnit)。
2、已獲取AccessKey 和 SecretKey。
3、已安裝Python3.6或以上版本。
2.2. 下載及安裝
下載ctyun_lts_python_sdk.zip壓縮包,放到相應位置后并解壓。“ctyun_lts_python_sdk”目錄中“example”為SDK的使用示例代碼。
在“ctyun_lts_python_sdk”目錄下執行標準 python 包安裝命令:
# python setup.py install
安裝命令會安裝SDK所需要的依賴包,并將整個SDK作為一個python包安裝到您的python環境中。其中會同步安裝lz4、six、protobuf>=3.5.2、requests。
如果上述命令安裝不成功,也可以手動逐步安裝這四個python包。
pip install requests==2.27.1
pip install lz4==3.1.10
pip install six
pip install protobuf==3.5.2
之后可以通過一下命令查看是否安裝成功,安裝后的python包名為ctyun-lts-python-sdk。
pip list # 成功安裝會出現 ctyun-lts-python-sdk
如果修改了SDK的源碼,并希望重新安裝使用,可以先卸載再安裝。
pip uninstall ctyun-lts-python-sdk
python setup.py install
安裝完python包之后,只需要引入lts的包就可以使用SDK的功能了。
from lts import LogItem, get_current_timestamp, LogClient, LogException
然后運行程序,以sample_putlogs舉例:
python sample_putlogs.py
3. SDK使用設置
3.1. 基本設置
使用 SDK訪問 LTS 的服務,需要設置正確的 AccessKey、SecretKey 和服務端 Endpoint,所有的服務可以使用同一 key 憑證來進行訪問,但不同的服務需要使用不同的 endpoint 進行訪問,詳情參考天翼云官網-SDK接入概述。在調用前SDK,需要已知以下參數:
1、云日志服務訪問地址。詳情請查看訪問地址(Endpoint)。
2、key憑證:accessKey和secretKey 。詳情請查看如何獲取訪問密鑰(AK/SK)。
3、日志項目編碼:logProject,在使用SDK前,需要確保您有至少一個已經存在的日志項目,日志項目就是您要將日志上傳到的地方。
4、日志單元編碼:logUnit,在使用SDK前,需要確保日志項目中有至少一個已經存在的日志單元。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| endpoint | string | 域名 | 是 |
| accessKey | string | AccessKey,簡稱ak | 是 |
| secretKey | string | SecretKey ,簡稱sk | 是 |
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
目前通過SDK將日志上傳有兩種上傳形式:同步上傳和異步批量上傳。
1、同步上傳:當調用日志上傳接口時,sdk會立即進行網絡請求調用,并返回發送結果。這種方式結構簡單,可用于發送頻率不高的場景。
2、異步批量上傳:當調用日志上傳接口時,后臺線程會將日志進行累積,當達到發送條件時,會進行一次合并發送。對于需要頻繁調用發送接口的場景,這種方式性能更卓越,更高效。
示例代碼:同步上傳
from lts import LogItem, get_current_timestamp, LogClient, LogException, ClientConfig
def main():
access_key = "your accessKey"
secret_key = "your secretKey"
log_project = "log project Code"
log_unit = "log unit Code"
endpoint = "endpoint"
log_items = []
log_item = LogItem()
curr_time = get_current_timestamp(3) # 獲取當前的時間戳,單位納秒
log_item.set_log_timestamp(curr_time)
log_item.set_origin_msg("Python sdk test oriMessage")
log_item.contents_push_back("level", "info")
log_item.contents_push_back("area", 3.1415926)
log_item.labels_push_back("user_tag", "string")
log_num = 10
for i in range(0, log_num):
log_items.append(log_item)
try:
client_config = ClientConfig(endpoint, access_key, secret_key,log_project)
log_client = LogClient(client_config)
for i in range(0, 100): # send 100 times
log_response = log_client.put_logs(log_project, log_unit, log_items)
log_response.log_print_body()
except LogException as ex:
print(ex)
if __name__ == "__main__":
? ?main()
示例代碼:異步批量上傳
import time
from asynchronous.producer import Producer
from asynchronous.producer_config import ProducerConfig
from lts import LogItem, get_current_timestamp, LogException, ClientConfig
def main():
access_key = "your accessKey"
secret_key = "your secretKey"
log_project = "log project Code"
log_unit = "log unit Code"
endpoint = "endpoint"
log_items = []
log_item = LogItem()
curr_time = get_current_timestamp(3)
log_item.set_log_timestamp(curr_time)
log_item.set_origin_msg("Python sdk test oriMessage")
log_item.contents_push_back("level", "info")
log_item.contents_push_back("area", 3.1415926)
log_item.labels_push_back("user_tag", "string")
log_num = 10
for i in range(0, log_num):
log_items.append(log_item)
producer_config = ProducerConfig()
producer = Producer(producer_config)
try:
client_config = ClientConfig(endpoint, access_key, secret_key, log_project)
producer.build_clients(client_config)
producer.start()
for i in range(0, 100): # send 100 times
producer.send_logs_callback(log_project, log_unit, log_items)
time.sleep(5)
for i in range(0, 100): # send 100 times
producer.send_logs_callback(log_project, log_unit, log_items)
time.sleep(5)
# 關閉producer 發送
producer.safe_close()
except LogException as ex:
print(ex)
4. 服務代碼示例-同步上傳
同步上傳每調用一次發送日志的方法,就會進行一次https請求發送日志,然后返回日志發送結果。同步上傳對于需求簡單,發送頻率不高,發送量不大的場景比較適用。雖然是同步發送,但是可以將多條日志合在一起,一次性發送,減少API請求次數。
4.1. 關于Client的操作
4.1.1. ClientConfig()
此操作是初始化一個client的默認配置,里面包含如下參數,通過這份配置就可以去初始化一個client。
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| endpoint | string | 域名 | 是 |
| access_key | string | 用戶信息憑證,簡稱ak | 是 |
| secret_key | string | 用戶信息憑證,簡稱sk | 是 |
| log_project | string | 日志項目編碼 | 是 |
| request_timeout | int | http請求超時時間,默認30s | 否 |
| compress_type | string | 日志壓縮算法,默認“lz4” | 否 |
| user_agent | string | lts-sdk-python/{sdk版本信息} | 否 |
| retries | int | 日志上傳失敗的重試次數 | 否 |
| no_retry_status_code | list | 不進行重試的響應狀態碼 | 否 |
示例代碼:初始化ClientConfig
client_config = ClientConfig(endpoint, access_key, secret_key,log_project)
# 可對默認配置進行變更
client_config.request_timeout = 30
Client_config.retries = 3
...
4.1.2. LogClient()
此操作是初始化Client。用戶使用client_config配置就可以去初始化一個Client。Client類似一個日志服務的處理器,它對API進行了有效的封裝處理,只需要調用特定接口就可以使用對應功能。初始化Client之后,其包含的配置信息如下:
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| endpoint | string | client_config 內的參數 | 是 |
| ... | String | client_config 內的其他參數,見上表 | 是 |
| security_token | tuple | ak/sk生成的用戶臨時憑證信息{token字符串,過期時間} | 否 |
| user_agent | string | lts-sdk-python/{sdk版本信息} | 否 |
| session | Session | requests.Session(),用于http請求連接的復用 | 否 |
| lock | Lock | threading.Lock() | 否 |
示例代碼:初始化Client
client_config = ClientConfig(endpoint, access_key, secret_key,log_project)
log_client = LogClient(client_config)
4.2. 關于臨時憑證Token的操作
4.2.1. init_security_token()
此操作是為client注入security_token信息,這一步需要使用ak和sk信息換取臨時憑證security_token,其中包含了token隨機串和過期時間兩個參數。這一步需要去訪問CTIAM的api接口,調用api接口,傳入ak/sk/endpoint信息,返回token信息。
Security_token信息如下:
| 參數 | 類型 | 描述 |
|---|---|---|
| token | string | token 隨機串 |
| expire_time | int | 過期時間,默認30分鐘 |
獲取security_token這一步在Client 初始化時會自動調用。用戶默認可以不用進行這一步操作。
示例代碼:為client注入Token信息
def _init_security_token(self):
? token,expire_time=
self._ak_sk_to_token(self.get_access_key(),self.get_secret_key())
? token_tuple = (token, expire_time)
4.3. 關于Log的操作
4.3.1. log_items.append(log_item)
此操作用于生成待上傳的日志,日志上傳只能上傳LogItem格式的日志,log_items是一個數組類型,里面包含若干條LogItem日志,格式如下:
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| log_items | []LogItem | LogItem格式的數組,將多份日志組合起來發送 | 是 |
配置一個或多個log_item,然后放入log_items下的數組,作為參數傳遞,這就是要傳入的日志信息,LogItem類型需要的參數如下。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| log_timestamp | int | 時間戳,單位納秒 | 是 |
| origin_msg | string | 原始日志內容 | 是 |
| contents | dict | 日志內容,分詞后的內容 | 否 |
| labels | dict | 自定義標簽 | 否 |
注意:其中contents和labels的key的長度不超過64字符,僅支持數字、字母、下劃線、連字符(-)、點(.),且必須以字母開頭。value類型最好使用字符串(string)和數字類型(int,double),其他類型建議先轉為字符串類型,并且value值不能為空或空字符串。
示例代碼:組裝生成10條日志
log_items = []
log_item = LogItem()
curr_time = get_current_timestamp(3)
log_item.set_log_timestamp(curr_time)
log_item.set_origin_msg("Python sdk test oriMessage")
log_item.contents_push_back("contentInt", curr_time)
log_item.contents_push_back("level", "info")
log_item.contents_push_back("area:", 3.1415926)
log_item.contents_push_back("unit_id", 123145)
log_item.labels_push_back("user_tag", "string")
for i in range(1, 10):
log_items.append(log_item)
當然,對于dict 類型的contents 和labels 類型,也可以用set方法進行賦值,減少push_back()操作。
contents = {"contentInt": 123456,"contentString": "string content"}
log_item.set_contents(contents)
4.4. 關于日志上傳的操作
4.4.1. put_logs()
此操作用于日志上傳服務,需要傳入的參數有三個,分別是log_project(日志項目編碼),unit_code(日志單元編碼),log_items(要上傳的日志)。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| log_project | string | 日志項目編碼 | 是 |
| unit_code | string | 日志單元編碼 | 是 |
| log_items | []LogItem | 日志信息 | 是 |
示例代碼:上傳日志
try:
? log_client = LogClient(endpoint=endpoint, access_key=ak, secret_key=sk)
? log_response = log_client.put_logs(log_project, unit_code, log_items)
? log_response.log_print_body()
except LogException as ex:
? print(ex)
resp_body有如下種類:
| 參數 | 類型 | 描述 | 示例 |
|---|---|---|---|
| statusCode | int | 返回碼取值范圍:0-正常、-1:嚴重錯誤其他自定義 | |
| message | string | 狀態描述 | SUCCESS |
| error | string | 參考錯誤編碼列表 |
日志服務相關錯誤編碼(部分):
| statusCode | error | message |
|---|---|---|
| -1 | LTS_8000 | 請求失敗,請稍候重試,或提交工單反饋 |
| -1 | LTS_8001 | 內容不合法,無法解析 |
| -1 | LTS_8004 | 日志內容包含的日志必須小于[x] MB和[y]條 |
| -1 | LTS_8006 | 日志內容解壓失敗 |
| -1 | LTS_8007 | Token失效,請重新獲取 |
| -1 | LTS_8009 | 無云日志服務產品實例,請先開通云日志服務 |
| -1 | LTS_8010 | 日志項目不存在 |
| -1 | LTS_8011 | 日志單元不存在 |
| -1 | LTS_8013 | 在1個日志項目下,寫入流量最大限制:200MB/s |
| -1 | LTS_8014 | 在1個日志項目下,寫入次數最大限制:1000次/s |
| -1 | LTS_8015 | 在1個日志單元下,寫入流量最大限制:100MB/s |
| -1 | LTS_8016 | 在1個日志單元下,寫入次數最大限制:500次/s |
| -1 | LTS_18000 | 調用ITIAM的接口失敗 |
5. 服務代碼-異步上傳
異步上傳是為了解決同步上傳無法高頻異步發送等問題所增加的模塊。原理是會開啟多個線程,當調用日志發送接口后,會立刻返回,而內部的線程會將日志數據緩存合并,最后進行批量發送。
5.1. 關于Producer的操作
5.1.1. ProducerConfig()
此操作是初始化一個producer的默認配置,里面包含如下參數,通過這份配置就可以去初始化一個producer。
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| default_batch_size | int | 每批次發送的日志的容量,默認512KB,最大5MB | 是 |
| default_batch_count | int | 每批次發送的日志的條數,默認4096條,最大40960條 | 是 |
| default_linger_ms | int | 批量發送時日志留存時間,默認2000ms,最小100ms | 是 |
| default_worker_count | int | 日志發送的最大線程數,默認8個線程 | 是 |
示例代碼:初始化ProducerConfig()配置
producer_config = ProducerConfig()
# 可對默認配置進行變更
producer.max_batch_size = 512*1024
producer.max_max_batch_count = 4096
...
5.1.2. Producer()
此操作是初始化producer,producer是一個異步任務的啟動器,里面封裝了異步任務的邏輯操作。使用producer_config 就可對其進行初始化。
示例代碼:初始化Producer
producer = Producer(producer_config)
5.1.3. build_client()
此操作是根據client_config配置初始化client,其中logProject屬性是client的唯一標識,不同的logProject會構建不同的client,使用不同的配置就可以構建多個client,每個client負責該project項目下的日志發送任務。
示例代碼:構建兩個client
client_config = ClientConfig(endpoint, ak, sk, log_project)
client_config2 = ClientConfig(endpoint, ak, sk, log_project2)
producer.build_clients(client_config)
producer.build_clients(client_config2)
5.1.4. start()
此操作是啟動producer,使用start()方法后,producer內部的一些異步線程就會啟動,之后就可以進行日志上傳等任務了。
producer.start()
5.1.5. safe_close()
此操作是用于關閉producer。當不再需要發送數據或當前進程即將終止時,關閉producer是必要的步驟,以確保producer中緩存的所有數據都能得到妥善處理。當前,producer提供了兩種關閉模式:安全關閉和有限關閉。
安全關閉模式確保在關閉producer之前,所有緩存的數據都已完成處理,所有相關線程都已關閉,并且所有注冊的回調函數都已執行完畢。一旦producer被安全關閉,緩存的批次數據會立即得到處理。如果回調函數沒有被阻塞,close方法通常能夠迅速返回。
producer.safe_close()
有限關閉模式適用于那些可能存在阻塞回調函數的場景,但您又希望close方法能在指定的時間內返回。為此,可以使用close(long timeoutMs)方法,并指定一個超時時間。如果超過了指定的timeoutMs時間后producer仍未完全關閉,該方法將拋出一個Exception異常,這意味著可能還有部分緩存的數據未及時處理就被丟棄,同時用戶注冊的回調函數也可能不會被執行。
producer.close(2000)
5.2 異步上傳操作
5.2.1 send_logs()
此操作是將日志發送到后臺的日志累加器隊列中,然后立刻返回。累加器的狀態達到可發送條件時(日志量達到閾值或者等待時間達到閾值),后臺任務的線程將里面的日志進行打包批量發送。發送的日志可以是單條,也可以是多條,同種這種發送方式不會返回響應結果,所以可能失敗也可能成功。
示例代碼:日志上傳發送,無回調結果
# 發送單條日志
producer.send_logs(log_project, log_unit, log_item)
# 發送多條日志,log_items 是 log_item 的隊列
producer.send_logs(log_project, log_unit, log_items)
5.2.2 send_logs_callback()
此操作是將日志異步批量發送,但會返回發送的響應結果,有利于檢測當前的發送狀況。
producer.send_logs_callback(log_project, log_unit, log_item)
producer.send_logs_callback(log_project, log_unit, log_items)
關于回調函數,可以通過修改io_work.py 中的callback()方法來進行自定義。回調結果是異步非阻塞的。
def callback(self, future):
? result = future.result()
? print(f"response: {result[0]}, send count : {result[1]}")
? self.logger.info(f"response: {result[0]}, send count : {result[1]}")
?
# 回調結果:
response: {"statusCode":0,"message":"SUCCESS","error":""}, send count : 3070